diff --git a/src/revpi_middleware/dbus_middleware1/system_config/__init__.py b/src/revpi_middleware/dbus_middleware1/system_config/__init__.py new file mode 100644 index 0000000..8f6ef5d --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/system_config/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""D-Bus interfaces for system configuration.""" +from .interface_config import InterfaceRevpiConfig diff --git a/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py new file mode 100644 index 0000000..03aaca2 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/system_config/interface_config.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""D-Bus interfaces for hardware configuration.""" +from collections import namedtuple +from enum import Enum +from logging import getLogger + +from pydbus import SystemBus + +from ..dbus_helper import DbusInterface + +log = getLogger(__name__) + +FeatureFunction = namedtuple("FeatureFunction", ["function", "args"]) + + +class ConfigActions(Enum): + ENABLE = "enable" + DISABLE = "disable" + STATUS = "status" + AVAILABLE = "available" + + +class InterfaceRevpiConfig(DbusInterface): + """ + + + + + + + + + + + + + + + + + + + + """ + + def Disable(self, feature: str) -> None: + """Disable the feature.""" + feature_function = get_feature(feature) + feature_function.function(ConfigActions.DISABLE, *feature_function.args) + + def Enable(self, feature: str) -> None: + """Enable the feature.""" + feature_function = get_feature(feature) + feature_function.function(ConfigActions.ENABLE, *feature_function.args) + + def GetStatus(self, feature: str) -> bool: + """Get feature status.""" + feature_function = get_feature(feature) + return feature_function.function(ConfigActions.STATUS, *feature_function.args) + + def GetAvailability(self, feature: str) -> bool: + """Get feature availability on the RevPi.""" + feature_function = get_feature(feature) + return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args) + + @property + def available_features(self) -> list[str]: + return list(AVAILABLE_FEATURES.keys()) + + +def get_feature(feature: str) -> FeatureFunction: + if feature not in AVAILABLE_FEATURES: + raise ValueError(f"feature {feature} does not exist") + feature_function = AVAILABLE_FEATURES[feature] + if not feature_function: + raise NotImplementedError(f"feature {feature} is not implemented") + return feature_function + + +def simple_systemd(action: ConfigActions, unit: str): + bus = SystemBus() + systemd_manager = bus.get(".systemd1") + + if action is ConfigActions.ENABLE: + systemd_manager.UnmaskUnitFiles([unit], False) + systemd_manager.EnableUnitFiles([unit], False, False) + systemd_manager.StartUnit(unit, "replace") + + elif action is ConfigActions.DISABLE: + systemd_manager.StopUnit(unit, "replace") + systemd_manager.DisableUnitFiles([unit], False) + + elif action is ConfigActions.STATUS: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get(".systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.UnitFileState == "enabled" and properties.ActiveState == "active" + + elif action is ConfigActions.AVAILABLE: + try: + unit_path = systemd_manager.LoadUnit(unit) + properties = bus.get(".systemd1", unit_path) + except Exception: + log.warning(f"could not get systemd unit {unit}") + return False + + return properties.LoadState != "not-found" + + else: + raise ValueError(f"action {action} not supported") + + +AVAILABLE_FEATURES = { + "gui": False, + "revpi-con-can": False, + "var-log.mount": False, + "dphys-swapfile": False, + "pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]), + "pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]), + "systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]), + "ssh": FeatureFunction(simple_systemd, ["ssh.service"]), + "nodered": FeatureFunction(simple_systemd, ["nodered.service"]), + "noderedrevpinodes-server": FeatureFunction( + simple_systemd, ["noderedrevpinodes-server.service"] + ), + "revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]), + "bluetooth": False, + "ieee80211": False, + "avahi": False, + "external-antenna": False, +}