feat(dbus): Add D-Bus interface for system configuration in middleware
Introduced `InterfaceRevpiConfig` to manage feature actions like enable/disable, status, and availability using D-Bus. Includes support for predefined features with systemd integration and exception handling for unsupported features.
This commit is contained in:
@@ -0,0 +1,137 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
|
||||
# SPDX-License-Identifier: GPL-2.0-or-later
|
||||
"""D-Bus interfaces for hardware configuration."""
|
||||
from collections import namedtuple
|
||||
from enum import Enum
|
||||
from logging import getLogger
|
||||
|
||||
from pydbus import SystemBus
|
||||
|
||||
from ..dbus_helper import DbusInterface
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
|
||||
|
||||
|
||||
class ConfigActions(Enum):
|
||||
ENABLE = "enable"
|
||||
DISABLE = "disable"
|
||||
STATUS = "status"
|
||||
AVAILABLE = "available"
|
||||
|
||||
|
||||
class InterfaceRevpiConfig(DbusInterface):
|
||||
"""
|
||||
<node>
|
||||
<interface name="com.revolutionpi.middleware1.RevpiConfig">
|
||||
<method name="Disable">
|
||||
<arg name="feature" type="s" direction="in"/>
|
||||
</method>
|
||||
<method name="Enable">
|
||||
<arg name="feature" type="s" direction="in"/>
|
||||
</method>
|
||||
<method name="GetStatus">
|
||||
<arg name="feature" type="s" direction="in"/>
|
||||
<arg name="status" type="b" direction="out"/>
|
||||
</method>
|
||||
<method name="GetAvailability">
|
||||
<arg name="feature" type="s" direction="in"/>
|
||||
<arg name="available" type="b" direction="out"/>
|
||||
</method>
|
||||
<property name="available_features" type="as" access="read"/>
|
||||
</interface>
|
||||
</node>
|
||||
"""
|
||||
|
||||
def Disable(self, feature: str) -> None:
|
||||
"""Disable the feature."""
|
||||
feature_function = get_feature(feature)
|
||||
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
|
||||
|
||||
def Enable(self, feature: str) -> None:
|
||||
"""Enable the feature."""
|
||||
feature_function = get_feature(feature)
|
||||
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
|
||||
|
||||
def GetStatus(self, feature: str) -> bool:
|
||||
"""Get feature status."""
|
||||
feature_function = get_feature(feature)
|
||||
return feature_function.function(ConfigActions.STATUS, *feature_function.args)
|
||||
|
||||
def GetAvailability(self, feature: str) -> bool:
|
||||
"""Get feature availability on the RevPi."""
|
||||
feature_function = get_feature(feature)
|
||||
return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args)
|
||||
|
||||
@property
|
||||
def available_features(self) -> list[str]:
|
||||
return list(AVAILABLE_FEATURES.keys())
|
||||
|
||||
|
||||
def get_feature(feature: str) -> FeatureFunction:
|
||||
if feature not in AVAILABLE_FEATURES:
|
||||
raise ValueError(f"feature {feature} does not exist")
|
||||
feature_function = AVAILABLE_FEATURES[feature]
|
||||
if not feature_function:
|
||||
raise NotImplementedError(f"feature {feature} is not implemented")
|
||||
return feature_function
|
||||
|
||||
|
||||
def simple_systemd(action: ConfigActions, unit: str):
|
||||
bus = SystemBus()
|
||||
systemd_manager = bus.get(".systemd1")
|
||||
|
||||
if action is ConfigActions.ENABLE:
|
||||
systemd_manager.UnmaskUnitFiles([unit], False)
|
||||
systemd_manager.EnableUnitFiles([unit], False, False)
|
||||
systemd_manager.StartUnit(unit, "replace")
|
||||
|
||||
elif action is ConfigActions.DISABLE:
|
||||
systemd_manager.StopUnit(unit, "replace")
|
||||
systemd_manager.DisableUnitFiles([unit], False)
|
||||
|
||||
elif action is ConfigActions.STATUS:
|
||||
try:
|
||||
unit_path = systemd_manager.LoadUnit(unit)
|
||||
properties = bus.get(".systemd1", unit_path)
|
||||
except Exception:
|
||||
log.warning(f"could not get systemd unit {unit}")
|
||||
return False
|
||||
|
||||
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
|
||||
|
||||
elif action is ConfigActions.AVAILABLE:
|
||||
try:
|
||||
unit_path = systemd_manager.LoadUnit(unit)
|
||||
properties = bus.get(".systemd1", unit_path)
|
||||
except Exception:
|
||||
log.warning(f"could not get systemd unit {unit}")
|
||||
return False
|
||||
|
||||
return properties.LoadState != "not-found"
|
||||
|
||||
else:
|
||||
raise ValueError(f"action {action} not supported")
|
||||
|
||||
|
||||
AVAILABLE_FEATURES = {
|
||||
"gui": False,
|
||||
"revpi-con-can": False,
|
||||
"var-log.mount": False,
|
||||
"dphys-swapfile": False,
|
||||
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
|
||||
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
|
||||
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
|
||||
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
|
||||
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
|
||||
"noderedrevpinodes-server": FeatureFunction(
|
||||
simple_systemd, ["noderedrevpinodes-server.service"]
|
||||
),
|
||||
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
|
||||
"bluetooth": False,
|
||||
"ieee80211": False,
|
||||
"avahi": False,
|
||||
"external-antenna": False,
|
||||
}
|
||||
Reference in New Issue
Block a user