diff --git a/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py index d07f305..d5989ce 100644 --- a/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py +++ b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py @@ -16,7 +16,6 @@ from .revpi_config import ( configure_external_antenna, configure_gui, configure_wlan, - simple_systemd, ) from ..dbus_helper import DbusInterface diff --git a/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py index ec01aef..5cdbc3b 100644 --- a/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py +++ b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py @@ -10,12 +10,12 @@ from glob import glob from logging import getLogger from os import X_OK, access from os.path import exists, join -from threading import Thread from typing import List, Optional from pydbus import SystemBus from ..dbus_helper import grep +from ..systemd_helper import simple_systemd, ServiceActions log = getLogger(__name__) @@ -42,6 +42,7 @@ class ComputeModuleTypes(IntEnum): CM4S (int): Represents a Compute Module 4S. CM5 (int): Represents a Compute Module 5. """ + UNKNOWN = 0 CM1 = 6 CM3 = 10 @@ -58,6 +59,7 @@ class ConfigActions(Enum): actions. It can be used to ensure consistency when working with or defining such actions in a system. """ + ENABLE = "enable" DISABLE = "disable" STATUS = "status" @@ -232,6 +234,7 @@ class ConfigTxt: _config_txt_lines (list[str]): Contains all lines of the configuration file as a list of strings, where each string represents a line. """ + re_name_value = re.compile(r"^\s*(?!#)(?P[^=\s].+?)\s*=\s*(?P\S+)\s*$") def __init__(self): @@ -509,7 +512,17 @@ def configure_con_can(action: ConfigActions): def configure_dphys_swapfile(action: ConfigActions): - return_value = simple_systemd(action, "dphys-swapfile.service") + # Translate config action to systemd action + if action is ConfigActions.ENABLE: + systemd_action = ServiceActions.ENABLE + elif action is ConfigActions.DISABLE: + systemd_action = ServiceActions.DISABLE + elif action is ConfigActions.STATUS: + systemd_action = ServiceActions.STATUS + else: + systemd_action = ServiceActions.AVAILABLE + + return_value = simple_systemd(systemd_action, "dphys-swapfile.service") # Post actions for dphys-swapfile if action is ConfigActions.DISABLE: @@ -634,98 +647,6 @@ def get_rfkill_index(device_class_path: str) -> Optional[int]: return None -def simple_systemd(action: ConfigActions, unit: str): - """ - Performs specified actions on systemd units. - - This function allows interaction with systemd units for various operations - such as enabling, disabling, checking the status, and verifying availability. - It communicates with the systemd manager via the SystemBus and handles units - based on the action specified. - - Parameters: - action (ConfigActions): Specifies the action to be performed on the - systemd unit. Supported actions include ENABLE, - DISABLE, STATUS, and AVAILABLE. - unit (str): The name of the systemd unit on which the action is to be - performed. - - Returns: - bool: For STATUS and AVAILABLE actions, returns True if the corresponding - criteria are met (e.g., enabled and active for STATUS, or not found - for AVAILABLE). Otherwise, returns False. - - Raises: - ValueError: If the specified action is not supported. - """ - bus = SystemBus() - systemd = bus.get( - "org.freedesktop.systemd1", - "/org/freedesktop/systemd1", - ) - systemd_manager = systemd["org.freedesktop.systemd1.Manager"] - - if action is ConfigActions.ENABLE: - - def thread_unit_config(): - """Change configuration asynchronously.""" - # Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes - lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False) - - # Dbus call: EnableUnitFiles(in as files, in b runtime, in b force, - # out b carries_install_info, out a(sss) changes - lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False) - if lst_change_unmask or lst_change_enable: - # Reload systemd after modified unit property - systemd_manager.Reload() - - Thread(target=thread_unit_config, daemon=True).start() - - # Dbus call: StartUnit(in s name, in s mode, out o job - systemd_manager.StartUnit(unit, "replace") - - elif action is ConfigActions.DISABLE: - - def thread_unit_config(): - """Change configuration asynchronously.""" - # Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes) - change = systemd_manager.DisableUnitFiles([unit], False) - if change: - # Reload systemd after modified unit property - systemd_manager.Reload() - - Thread(target=thread_unit_config, daemon=True).start() - - # Dbus call: StopUnit(in s name,in s mode, out o job - systemd_manager.StopUnit(unit, "replace") - - - elif action is ConfigActions.STATUS: - try: - unit_path = systemd_manager.LoadUnit(unit) - properties = bus.get("org.freedesktop.systemd1", unit_path) - except Exception: - log.warning(f"could not get systemd unit {unit}") - return False - - return properties.UnitFileState == "enabled" and properties.ActiveState == "active" - - elif action is ConfigActions.AVAILABLE: - try: - unit_path = systemd_manager.LoadUnit(unit) - properties = bus.get("org.freedesktop.systemd1", unit_path) - except Exception: - log.warning(f"could not get systemd unit {unit}") - return False - - return properties.LoadState != "not-found" - - else: - raise ValueError(f"action {action} not supported") - - return None - - if __name__ == "__main__": rc = RevPiConfig() print("Model:", rc.model) diff --git a/src/revpi_middleware/dbus_middleware1/systemd_helper.py b/src/revpi_middleware/dbus_middleware1/systemd_helper.py new file mode 100644 index 0000000..bec26b7 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/systemd_helper.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from enum import Enum +from logging import getLogger +from threading import Thread +from typing import Optional + +from pydbus import SystemBus + +log = getLogger(__name__) + + +class ServiceActions(Enum): + """ + Enumeration class for defining configuration actions. + + This enumeration provides predefined constants for common configuration + actions. It can be used to ensure consistency when working with or defining + such actions in a system. + """ + + ENABLE = "enable" + DISABLE = "disable" + STATUS = "status" + AVAILABLE = "available" + + +def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]: + """ + Perform systemd service actions such as enable, disable, check status, or availability. + + This function interacts with the systemd D-Bus API to manage and query the + state of services on a system. The supported actions include enabling a systemd + unit, disabling it, starting/stopping a unit, and checking its status or + availability. The function supports asynchronous configuration changes through + threads where applicable. + + Parameters: + action (ServiceActions): The action to perform on the systemd service. + Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE. + unit (str): The name of the systemd unit to operate on (e.g., "example.service"). + unmask (bool): When enabling a unit, if True, any masked unit file will + first be unmasked before proceeding with the operation. Defaults to False. + + Returns: + Optional[bool]: The return value depends on the action. For STATUS or + AVAILABLE actions, it returns True if the unit satisfies the condition + (e.g., enabled and active, or available and loaded), False otherwise. + For other actions, it returns None. + """ + bus = SystemBus() + systemd = bus.get( + "org.freedesktop.systemd1", + "/org/freedesktop/systemd1", + ) + systemd_manager = systemd["org.freedesktop.systemd1.Manager"] + + if action is ServiceActions.ENABLE: + + def thread_unit_config(): + """Change configuration asynchronously.""" + lst_change_unmask = [] + if unmask: + # Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes + lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False) + + # Dbus call: EnableUnitFiles(in as files, in b runtime, in b force, + # out b carries_install_info, out a(sss) changes + lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False) + if lst_change_unmask or lst_change_enable: + # Reload systemd after modified unit property + systemd_manager.Reload() + + Thread(target=thread_unit_config, daemon=True).start() + + # Dbus call: StartUnit(in s name, in s mode, out o job + systemd_manager.StartUnit(unit, "replace") + + elif action is ServiceActions.DISABLE: + + def thread_unit_config(): + """Change configuration asynchronously.""" + # Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes) + change = systemd_manager.DisableUnitFiles([unit], False) + if change: + # Reload systemd after modified unit property + systemd_manager.Reload() + + Thread(target=thread_unit_config, daemon=True).start() + + # Dbus call: StopUnit(in s name,in s mode, out o job + systemd_manager.StopUnit(unit, "replace") + + elif action is ServiceActions.STATUS: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get("org.freedesktop.systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.UnitFileState == "enabled" and properties.ActiveState == "active" + + elif action is ServiceActions.AVAILABLE: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get("org.freedesktop.systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.LoadState != "not-found" + + else: + raise ValueError(f"action {action} not supported") + + return None