diff --git a/src/revpi_middleware/cli_commands/cli_base.py b/src/revpi_middleware/cli_commands/cli_base.py index c57f277..e0ff36f 100644 --- a/src/revpi_middleware/cli_commands/cli_base.py +++ b/src/revpi_middleware/cli_commands/cli_base.py @@ -5,12 +5,11 @@ This module provides the foundation for the RevPi middleware CLI commands and argument parsing setup. """ -from argparse import ArgumentParser from logging import getLogger -from revpi_middleware.cli_commands import cli_picontrol -from revpi_middleware.proginit import StdLogOutput +from . import cli_config, cli_picontrol from .. import proginit as pi +from ..proginit import StdLogOutput log = getLogger(__name__) @@ -30,6 +29,11 @@ def setup_command_line_arguments(): help="RevPi PiControl object", ) cli_picontrol.add_subparsers(obj_picontrol) + obj_config = rpictl_obj.add_parser( + "config", + help="RevPi configuration object (revpi-config)", + ) + cli_config.add_subparsers(obj_config) def main() -> int: @@ -40,6 +44,9 @@ def main() -> int: if obj == "picontrol": rc = cli_picontrol.main() + elif obj == "config": + rc = cli_config.main() + else: log.error(f"Unknown object: {obj}") rc = 1 diff --git a/src/revpi_middleware/cli_commands/cli_config.py b/src/revpi_middleware/cli_commands/cli_config.py new file mode 100644 index 0000000..d91b39c --- /dev/null +++ b/src/revpi_middleware/cli_commands/cli_config.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""Command-Line for the picontrol object of CLI.""" +from argparse import ArgumentParser +from logging import getLogger + +from .dbus_helper import BusType, get_properties, simple_call +from .. import proginit as pi +from ..dbus_middleware1 import extend_interface + +log = getLogger(__name__) + + +def add_subparsers(parent_parser: ArgumentParser): + parent_parser.add_argument( + "action", + choices=["enable", "disable", "status", "available", "list-features"], + help="Action to be executed: enable, disable, status or available. " + "To get all available features, use 'list-features'.", + ) + parent_parser.add_argument( + "feature", + nargs="?", + default="", + help="Name of the feature to configer. To list all features use 'list-features' as action.", + ) + + +def main() -> int: + action = pi.pargs.action + dbus_value = False + try: + + if action == "list-features": + dbus_value = get_properties( + "available_features", + interface=extend_interface("RevpiConfig"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) + for feature in dbus_value: + print(feature) + + return 0 + + # For the following actions, a feature name is required + if pi.pargs.feature == "": + raise Exception("Feature name is required") + + if action == "enable": + simple_call( + "Enable", + pi.pargs.feature, + interface=extend_interface("RevpiConfig"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) + + elif action == "disable": + simple_call( + "Disable", + pi.pargs.feature, + interface=extend_interface("RevpiConfig"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) + + elif action == "status": + dbus_value = simple_call( + "GetStatus", + pi.pargs.feature, + interface=extend_interface("RevpiConfig"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) + + elif action == "available": + dbus_value = simple_call( + "GetAvailability", + pi.pargs.feature, + interface=extend_interface("RevpiConfig"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) + + else: + raise Exception(f"Unknown action: {action}") + + except Exception as e: + log.error(f"Error: {e}") + return 1 + + log.debug( + f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}" + ) + print(int(dbus_value)) + + return 0 diff --git a/src/revpi_middleware/cli_commands/cli_picontrol.py b/src/revpi_middleware/cli_commands/cli_picontrol.py index 6eb355f..becac3e 100644 --- a/src/revpi_middleware/cli_commands/cli_picontrol.py +++ b/src/revpi_middleware/cli_commands/cli_picontrol.py @@ -38,7 +38,7 @@ def method_reset(): log.debug("D-Bus call of method ResetDriver") simple_call( "ResetDriver", - interface=extend_interface("picontrol"), + interface=extend_interface("PiControl"), bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, ) log.info("ResetDriver called via D-Bus") @@ -48,7 +48,7 @@ def method_await_reset(timeout: int = 0): detected_signal = await_signal( "NotifyDriverReset", timeout, - extend_interface("picontrol"), + extend_interface("PiControl"), bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, ) if detected_signal: diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index e14f072..45d6690 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -17,6 +17,18 @@ class BusType(Enum): SYSTEM = "system" +def get_properties( + property_name: str, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() + revpi = bus.get(REVPI_DBUS_NAME, object_path) + iface = revpi[interface] + return getattr(iface, property_name) + + def simple_call( method: str, *args, diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index 6f91bce..dd504f1 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -10,6 +10,7 @@ from pydbus import SessionBus, SystemBus from . import REVPI_DBUS_NAME from .process_image import InterfacePiControl +from .system_config import InterfaceRevpiConfig log = getLogger(__name__) @@ -41,6 +42,7 @@ class BusProvider(Thread): # ("Subdir2/Whatever", Example()) lst_interfaces = [ InterfacePiControl(self.picontrol_device, self.config_rsc), + InterfaceRevpiConfig(), ] try: diff --git a/src/revpi_middleware/dbus_middleware1/dbus_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py index 23bcd2e..62dd766 100644 --- a/src/revpi_middleware/dbus_middleware1/dbus_helper.py +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -41,3 +41,38 @@ def extend_interface(*args) -> str: the provided segments. """ return ".".join([REVPI_DBUS_NAME, *args]) + + +def grep(pattern, filename): + """ + Searches for lines in a file that contain a given pattern and returns them as a list. + + The function reads lines from the specified file and checks whether each line + contains the provided pattern. It returns a list of lines that match the + pattern. If the file is not found, an empty list is returned. Any other + exceptions during the file reading process are caught and logged. + + Args: + pattern (str): The substring to search for within the file's lines. + filename (str): The path to the file that will be searched. + + Returns: + list[str]: A list containing lines that include the provided pattern, with + leading and trailing spaces removed. + + Raises: + FileNotFoundError: This error is caught if the file specified is not + found. + Exception: Any unforeseen exception during file operations is caught and + logged. + """ + try: + with open(filename, "r") as file: + # Gibt alle Zeilen zurück, die das Muster enthalten + matching_lines = [line.strip() for line in file if pattern in line] + return matching_lines + except FileNotFoundError: + return [] + except Exception as e: + log.error(f"Error reading file: {e}") + return [] diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index f62a972..0e8ccb0 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -15,7 +15,7 @@ log = getLogger(__name__) class InterfacePiControl(DbusInterface): """ - + diff --git a/src/revpi_middleware/dbus_middleware1/system_config/__init__.py b/src/revpi_middleware/dbus_middleware1/system_config/__init__.py new file mode 100644 index 0000000..8f6ef5d --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/system_config/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""D-Bus interfaces for system configuration.""" +from .interface_config import InterfaceRevpiConfig diff --git a/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py new file mode 100644 index 0000000..380c913 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""D-Bus interfaces for hardware configuration.""" +from collections import namedtuple +from logging import getLogger + +from .revpi_config import ( + ConfigActions, + configure_avahi_daemon, + configure_bluetooth, + configure_con_can, + configure_dphys_swapfile, + configure_external_antenna, + configure_gui, + configure_wlan, + simple_systemd, +) +from ..dbus_helper import DbusInterface + +log = getLogger(__name__) + +FeatureFunction = namedtuple("FeatureFunction", ["function", "args"]) + + +class InterfaceRevpiConfig(DbusInterface): + """ + + + + + + + + + + + + + + + + + + + + """ + + def Disable(self, feature: str) -> None: + """Disable the feature.""" + feature_function = get_feature(feature) + feature_function.function(ConfigActions.DISABLE, *feature_function.args) + + def Enable(self, feature: str) -> None: + """Enable the feature.""" + feature_function = get_feature(feature) + feature_function.function(ConfigActions.ENABLE, *feature_function.args) + + def GetStatus(self, feature: str) -> bool: + """Get feature status.""" + feature_function = get_feature(feature) + return feature_function.function(ConfigActions.STATUS, *feature_function.args) + + def GetAvailability(self, feature: str) -> bool: + """Get feature availability on the RevPi.""" + feature_function = get_feature(feature) + return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args) + + @property + def available_features(self) -> list[str]: + return list(AVAILABLE_FEATURES.keys()) + + +def get_feature(feature: str) -> FeatureFunction: + if feature not in AVAILABLE_FEATURES: + raise ValueError(f"feature {feature} does not exist") + feature_function = AVAILABLE_FEATURES[feature] + if not feature_function: + raise NotImplementedError(f"feature {feature} is not implemented") + return feature_function + + +AVAILABLE_FEATURES = { + "gui": FeatureFunction(configure_gui, []), + "revpi-con-can": FeatureFunction(configure_con_can, []), + "dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []), + "pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]), + "pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]), + "systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]), + "ssh": FeatureFunction(simple_systemd, ["ssh.service"]), + "nodered": FeatureFunction(simple_systemd, ["nodered.service"]), + "noderedrevpinodes-server": FeatureFunction( + simple_systemd, ["noderedrevpinodes-server.service"] + ), + "revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]), + "bluetooth": FeatureFunction(configure_bluetooth, []), + "wlan": FeatureFunction(configure_wlan, []), + "avahi": FeatureFunction(configure_avahi_daemon, []), + "external-antenna": FeatureFunction(configure_external_antenna, []), +} diff --git a/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py new file mode 100644 index 0000000..23d6378 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py @@ -0,0 +1,434 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +import re +import shutil +import subprocess +from collections import namedtuple +from enum import Enum, IntEnum +from glob import glob +from logging import getLogger +from os import X_OK, access +from os.path import exists, join +from typing import List, Optional + +from pydbus import SystemBus + +from ..dbus_helper import grep + +log = getLogger(__name__) + +ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"]) + +LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" +LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211" +CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") + + +class ComputeModuleTypes(IntEnum): + UNKNOWN = 0 + CM1 = 6 + CM3 = 10 + CM4 = 20 + CM4S = 21 + CM5 = 24 + + +class ConfigActions(Enum): + ENABLE = "enable" + DISABLE = "disable" + STATUS = "status" + AVAILABLE = "available" + + +class RevPiConfig: + + def __init__(self): + self._cm_type = ComputeModuleTypes.UNKNOWN + + self._revpi_with_con_bridge = False + self._wlan_class_path = "" + + self.serial = "" + self.model = "" + + self._init_device_info() + + def _init_device_info(self): + dc_cpuinfo = {} + + # Extract CPU information + with open("/proc/cpuinfo", "r") as f: + line = "\n" + while line: + line = f.readline() + if line.startswith(("Revision", "Serial", "Model")): + key, value = line.split(":", 1) + key = key.strip().lower() + value = value.strip() + dc_cpuinfo[key] = value + + self.model = dc_cpuinfo.get("model", "") + self.serial = dc_cpuinfo.get("serial", "") + + # Detect Compute Module type + revision = dc_cpuinfo.get("revision", "") + if revision: + revision = int(revision, 16) + mask = 4080 # 0xFF0 in dezimal + try: + self._cm_type = ComputeModuleTypes((revision & mask) >> 4) + except ValueError: + pass + + # Detect WLAN on CM module + could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5) + if could_have_wlan: + wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0") + if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")): + self._wlan_class_path = wlan_interface + + # If no build in WLAN on the CM, detect third party WLAN on RevPi Flat + if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"): + lst_wlan_interfaces = glob("/sys/class/ieee80211/*") + for wlan_interface in lst_wlan_interfaces: + if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")): + self._wlan_class_path = wlan_interface + + # Detect ConBridge + could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S) + if could_have_con_bridge: + lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible") + self._revpi_with_con_bridge = len(lst_grep) > 0 + + @property + def class_path_wlan(self) -> str: + return self._wlan_class_path + + @property + def cm_type(self) -> ComputeModuleTypes: + return self._cm_type + + @property + def with_con_bridge(self) -> bool: + return self._revpi_with_con_bridge + + @property + def with_wlan(self) -> bool: + return bool(self._wlan_class_path) + + +class ConfigTxt: + re_name_value = re.compile(r"^\s*(?!#)(?P[^=\s].+?)\s*=\s*(?P\S+)\s*$") + + def __init__(self): + self._config_txt_path = "" + for path in CONFIG_TXT_LOCATIONS: + if exists(path): + self._config_txt_path = path + break + + if not self._config_txt_path: + raise FileNotFoundError("no config.txt found") + + self._config_txt_lines = [] + + def _clear_name_values(self, name: str, values: str or list) -> int: + counter = 0 + if type(values) is str: + values = [values] + + for config_var in self._get_all_name_values(): + if config_var.name == name and config_var.value in values: + self._config_txt_lines.pop(config_var.line_index) + counter += 1 + + return counter + + def _get_all_name_values(self) -> List[ConfigVariable]: + if not self._config_txt_lines: + self.reload_config() + + lst_return = [] + + for i in range(len(self._config_txt_lines)): + match = self.re_name_value.match(self._config_txt_lines[i]) + if match: + lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i)) + + return lst_return + + def reload_config(self): + with open(self._config_txt_path, "r") as f: + self._config_txt_lines = f.readlines() + + def save_config(self): + if not self._config_txt_lines: + return + + tmp_path = f"{self._config_txt_path}.tmp" + with open(tmp_path, "w") as f: + f.writelines(self._config_txt_lines) + shutil.move(tmp_path, self._config_txt_path) + + self._config_txt_lines.clear() + + def add_name_value(self, name: str, value: str): + # Check weather name and value already exists + for config_var in self._get_all_name_values(): + if config_var.name == name and config_var.value == value: + return + + self._config_txt_lines.append(f"{name}={value}\n") + + def clear_dtoverlays(self, dtoverlays: str or list) -> int: + return self._clear_name_values("dtoverlay", dtoverlays) + + def clear_dtparams(self, dtparams: str or list) -> int: + return self._clear_name_values("dtparam", dtparams) + + def get_values(self, var_name: str) -> list: + var_values = [] + + for config_var in self._get_all_name_values(): + if config_var.name == var_name: + var_values.append(config_var.value) + + return var_values + + @property + def config_txt_path(self) -> str: + return self._config_txt_path + + +def configure_avahi_daemon(action: ConfigActions): + return_value = simple_systemd(action, "avahi-daemon.service") + + # Post actions for avahi-daemon + if action in (ConfigActions.ENABLE, ConfigActions.DISABLE): + # Apply the enable/disable action to the avahi socket AFTER the service + # unit, because a connected socket could interrupt stop + simple_systemd(action, "avahi-daemon.socket") + + return return_value + + +def configure_bluetooth(action: ConfigActions): + hci_device = join(LINUX_BT_CLASS_PATH, "hci0") + bt_rfkill_index = get_rfkill_index(hci_device) + + # If the bluetooth device is not present, the device should have been + # brought up by revpi-bluetooth's udev rules or vendor magic (devices + # based on CM4 and newer). Nothing we can do here, so treat the interface + # as disabled. + + if action is ConfigActions.ENABLE: + if bt_rfkill_index is not None: + with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f: + f.write("0") + + elif action is ConfigActions.DISABLE: + if bt_rfkill_index is not None: + with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f: + f.write("1") + + elif action is ConfigActions.STATUS: + if bt_rfkill_index is None: + return False + + with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f: + buffer = f.read().strip() + return buffer == "0" + + elif action is ConfigActions.AVAILABLE: + return bt_rfkill_index is not None + + else: + raise ValueError(f"action {action} not supported") + + return None + + +def configure_con_can(action: ConfigActions): + revpi = RevPiConfig() + if action is ConfigActions.AVAILABLE: + return revpi.with_con_bridge + + dt_overlay = "revpi-con-can" + config_txt = ConfigTxt() + + if action is ConfigActions.ENABLE and revpi.with_con_bridge: + config_txt.clear_dtoverlays([dt_overlay]) + config_txt.add_name_value("dtoverlay", dt_overlay) + config_txt.save_config() + subprocess.call(["/usr/bin/dtoverlay", dt_overlay]) + + elif action is ConfigActions.DISABLE and revpi.with_con_bridge: + config_txt.clear_dtoverlays([dt_overlay]) + config_txt.save_config() + subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay]) + + elif action is ConfigActions.STATUS: + return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam") + + else: + raise ValueError(f"action {action} not supported") + + return None + + +def configure_dphys_swapfile(action: ConfigActions): + return_value = simple_systemd(action, "dphys-swapfile.service") + + # Post actions for dphys-swapfile + if action is ConfigActions.DISABLE: + # Remove swapfile afer disabling the service unit + subprocess.call( + ["/sbin/dphys-swapfile", "uninstall"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + return return_value + + +def configure_external_antenna(action: ConfigActions): + revpi = RevPiConfig() + if action is ConfigActions.AVAILABLE: + return revpi.with_wlan + + config_txt = ConfigTxt() + + if action is ConfigActions.ENABLE and revpi.with_wlan: + config_txt.clear_dtparams(["ant1", "ant2"]) + config_txt.add_name_value("dtparam", "ant2") + config_txt.save_config() + + elif action is ConfigActions.DISABLE and revpi.with_wlan: + config_txt.clear_dtparams(["ant1", "ant2"]) + config_txt.save_config() + + elif action is ConfigActions.STATUS: + return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam") + + else: + raise ValueError(f"action {action} not supported") + + return None + + +def configure_gui(action: ConfigActions): + gui_available = access("/usr/bin/startx", X_OK) + + if action is ConfigActions.AVAILABLE: + return gui_available + + bus = SystemBus() + systemd_manager = bus.get(".systemd1") + + if action is ConfigActions.ENABLE: + systemd_manager.SetDefaultTarget("graphical.target", True) + + elif action is ConfigActions.DISABLE: + systemd_manager.SetDefaultTarget("multi-user.target", True) + + elif action is ConfigActions.STATUS: + return systemd_manager.GetDefaultTarget() == "graphical.target" + + else: + raise ValueError(f"action {action} not supported") + + +def configure_wlan(action: ConfigActions): + revpi = RevPiConfig() + + if action is ConfigActions.ENABLE: + if revpi.with_wlan: + wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan) + with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f: + f.write("0") + + elif action is ConfigActions.DISABLE: + if revpi.with_wlan: + wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan) + with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f: + f.write("1") + + elif action is ConfigActions.AVAILABLE: + return revpi.with_wlan + + elif action is ConfigActions.STATUS: + if not revpi.with_wlan: + return False + + wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan) + with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f: + buffer = f.read().strip() + return buffer == "0" + + else: + raise ValueError(f"action {action} not supported") + + return None + + +def get_rfkill_index(device_class_path: str) -> Optional[int]: + re_rfkill_index = re.compile(r"^/.+/rfkill(?P\d+)$") + for rfkill_path in glob(join(device_class_path, "rfkill*")): + match_index = re_rfkill_index.match(rfkill_path) + if match_index: + return int(match_index.group("index")) + + return None + + +def simple_systemd(action: ConfigActions, unit: str): + bus = SystemBus() + systemd_manager = bus.get(".systemd1") + + if action is ConfigActions.ENABLE: + systemd_manager.UnmaskUnitFiles([unit], False) + systemd_manager.EnableUnitFiles([unit], False, False) + systemd_manager.StartUnit(unit, "replace") + + elif action is ConfigActions.DISABLE: + systemd_manager.StopUnit(unit, "replace") + systemd_manager.DisableUnitFiles([unit], False) + + elif action is ConfigActions.STATUS: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get(".systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.UnitFileState == "enabled" and properties.ActiveState == "active" + + elif action is ConfigActions.AVAILABLE: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get(".systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.LoadState != "not-found" + + else: + raise ValueError(f"action {action} not supported") + + +if __name__ == "__main__": + rc = RevPiConfig() + print("Model:", rc.model) + print("Serial: ", rc.serial) + print("CM Type: ", rc.cm_type.name) + print("With WLAN: ", rc.with_wlan) + if rc.with_wlan: + print(" class path: ", rc.class_path_wlan) + print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan)) + print("With con-bridge:", rc.with_con_bridge) + + config_txt = ConfigTxt() + print("Config file: ", config_txt.config_txt_path) diff --git a/tests/dbus_middleware1/process_image/test_interface_picontrol.py b/tests/dbus_middleware1/process_image/test_interface_picontrol.py index cd11a6e..ec26c9d 100644 --- a/tests/dbus_middleware1/process_image/test_interface_picontrol.py +++ b/tests/dbus_middleware1/process_image/test_interface_picontrol.py @@ -18,7 +18,7 @@ class TestObjectPicontrol(TestBusProvider): def test_reset_driver(self): simple_call( "ResetDriver", - interface=extend_interface("picontrol"), + interface=extend_interface("PiControl"), bus_type=BusType.SESSION, ) ioctl_call = IOCTL_QUEUE.get(timeout=2.0) @@ -37,7 +37,7 @@ class TestObjectPicontrol(TestBusProvider): result = await_signal( "NotifyDriverReset", timeout, - extend_interface("picontrol"), + extend_interface("PiControl"), bus_type=BusType.SESSION, ) self.assertTrue(result)