18 Commits

Author SHA1 Message Date
Sven Sager
b4f817b477 feat(examples): Add revpi-config CLI tool for RevPi configuration
Introduced a new Python-based command-line tool `revpi-config` to manage
Revolution Pi features via D-Bus. The tool supports enabling,
disabling, checking status, and availability for various features.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-04 08:10:52 +02:00
Sven Sager
66046b6a9d feat(dbus): Update systemd unit state check to include 'activating'
Previously, the check only considered 'active' as a valid ActiveState.
This update ensures the system also accounts for units in the
'activating' state, improving compatibility with transitional states in
systemd services.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:54:44 +02:00
Sven Sager
9ec1e04d2b feat(dbus): Update service key for systemd-timesyncd to 'ntp'
Renamed the key 'systemd-timesyncd' to 'ntp' in the services mapping.
This improves naming consistency and aligns it with the expected service
identifier. No functional changes were introduced.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:54:14 +02:00
Sven Sager
1cb1062138 feat(dbus): Update feature key from 'dphys-swapfile' to 'swapfile'
Renamed the feature key for swapfile configuration to ensure consistency
and simplify naming.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:28:07 +02:00
Sven Sager
8d01eee66d feat(dbus): Update status change notification in feature toggle methods
Added calls to `StatusChanged` in `Disable` and `Enable` methods to
notify when a feature's state is updated. This ensures proper tracking
and external communication of feature status changes.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:28:07 +02:00
Sven Sager
40059d6068 feat(dbus): Add InterfaceSoftwareServices to bus provider initialization
This change integrates the InterfaceSoftwareServices class into the list
of interfaces initialized by the bus provider. It ensures the new
interface is properly registered and available for use, improving
system functionality and extensibility.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
50789853e9 feat(dbus): Add D-Bus interface for managing software services
Introduce `InterfaceSoftwareServices` to handle service enable/disable
actions, status, and availability via D-Bus. Consolidate `avahi` service
configuration into the new interface by removing redundant logic from
`revpi_config.py`.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
d2e2f1c7f3 refactor(dbus): Move simple_systemd to systemd_helper
Moved the `simple_systemd` function to a dedicated `systemd_helper`
module to improve code organization and reusability. Updated imports
accordingly in affected files.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
32574e0815 refactor: DBus interface includes bus instance
Updated `InterfaceRevpiConfig` to require a bus parameter, ensuring
proper initialization. Introduced a constructor in `DbusInterface` to
store the bus instance.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 12:37:15 +02:00
Sven Sager
a29979ad78 feat(dbus): Add AvailabilityChanged signal to .RevpiConfig
This signal provides a mechanism for notifying changes in availability
status. It will complement the existing `StatusChanged` signal and
enhance the system's event-handling capabilities.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 07:58:40 +02:00
Sven Sager
431f9308de feat(dbus): Add StatusChanged DBus signal to interface configuration
Introduced the `StatusChanged` signal in `.RevpiConfig` interface to
notify changes in feature status. Emitting this signal in `Enable` and
`Disable` methods ensures real-time updates for feature state changes.
This enhances communication and monitoring within the DBus middleware.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-26 15:21:08 +02:00
Sven Sager
dc2d1ab3a0 ci: Test in a bookworm container
In order to be able to provide all functions of the DBus during testing,
the CI uses the Python 3.11 version in a Bookworm container for
testing. This is the version that is pre-installed in Debian Bookworm.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
983c6cefea ci: Start dbus session bus for testing
A dbus session bus must be started for testing. The command
`dbus-run-session` creates a session for the following command. As a
result, `pytest` runs with its own session bus for the tests.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
a977884e17 ci: Set machine-id for dbus
Systemd does not exist in the docker file. So we just set an machine-id,
which the dbus-daemon can use to start a session.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
20fb85a3f0 ci: Install system dependencies for testing
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:52:43 +02:00
Sven Sager
f4efb1daae fix: Add missing reuse license information
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:36:41 +02:00
Sven Sager
fb181f8810 ci: Add Revolution Pi pipelines
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-22 13:14:22 +02:00
Sven Sager
3dee7784e2 ci: Add GitLab python pipelines
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-22 13:14:22 +02:00
16 changed files with 537 additions and 124 deletions

4
.gitignore vendored
View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

40
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
default:
tags:
- self-hosted
- host-arm64
- high-perf
include:
- project: "revolutionpi/infrastructure/ci-templates"
file: "base.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "check-commit/lint-commit.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "reuse-lint.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "package-devel.yml"
- local: debian/gitlab-ci.yml
rules:
- exists:
- debian/gitlab-ci.yml
run_tests:
stage: test
image: python:3.11-bookworm
script:
- apt-get update
- apt-get -y install dbus libgirepository1.0-dev
- dbus-uuidgen --ensure=/etc/machine-id
- pip install -r requirements.txt
- PYTHONPATH=src dbus-run-session -- pytest -v --junitxml=report.xml --cov=src --cov-report term --cov-report xml:coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: ${CI_PROJECT_DIR}/report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
recursive-include .reuse *
recursive-include data *
recursive-include LICENSES *

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
SHELL := bash
MAKEFLAGS = --no-print-directory --no-builtin-rules
.DEFAULT_GOAL = all

View File

@@ -1,3 +1,9 @@
<!--
SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface.

100
examples/revpi-config Executable file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from argparse import ArgumentParser, SUPPRESS
from typing import NamedTuple
from pydbus import SessionBus, SystemBus
FeatureMapping = NamedTuple("FeatureMapping", [("dbus_interface", str), ("name", str)])
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
IFACE_SOFTWARE_SERVICES = "com.revolutionpi.middleware1.SoftwareServices"
IFACE_REVPI_CONFIG = "com.revolutionpi.middleware1.RevpiConfig"
REVPI_FEATURE_MAPPINGS = {
"gui": FeatureMapping(IFACE_REVPI_CONFIG, "gui"),
"revpi-con-can": FeatureMapping(IFACE_REVPI_CONFIG, "revpi-con-can"),
"dphys-swapfile": FeatureMapping(IFACE_REVPI_CONFIG, "swapfile"),
"pimodbus-master": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-master"),
"pimodbus-slave": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-slave"),
"systemd-timesyncd": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ntp"),
"ssh": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ssh"),
"nodered": FeatureMapping(IFACE_SOFTWARE_SERVICES, "nodered"),
"noderedrevpinodes-server": FeatureMapping(IFACE_SOFTWARE_SERVICES, "noderedrevpinodes-server"),
"revpipyload": FeatureMapping(IFACE_SOFTWARE_SERVICES, "revpipyload"),
"bluetooth": FeatureMapping(IFACE_REVPI_CONFIG, "bluetooth"),
"ieee80211": FeatureMapping(IFACE_REVPI_CONFIG, "wlan"),
"avahi": FeatureMapping(IFACE_SOFTWARE_SERVICES, "avahi"),
"external-antenna": FeatureMapping(IFACE_REVPI_CONFIG, "external-antenna"),
}
# Generate command arguments
parser = ArgumentParser(
prog="revpi-config",
description="Configuration tool for Revolution Pi.",
)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "availstat"],
help="Action to be executed: enable, disable, status or available.",
)
parser.add_argument(
"feature",
nargs="*",
help="Name of the feature to configure.",
)
args = parser.parse_args()
# Init dbus
bus = SessionBus() if args.use_session_bus else SystemBus()
revpi_middleware = bus.get(REVPI_DBUS_NAME, REVPI_DBUS_BASE_PATH)
lst_results = []
for feature in args.feature:
# Get the mappings
feature_mapping = REVPI_FEATURE_MAPPINGS.get(feature, None)
if feature_mapping is None:
if args.action in ("enable", "disable"):
# Missing feature with action enable/disable will return 5
lst_results.append(5)
elif args.action == "availstat":
# Missing feature with action availstat will return 2
lst_results.append(2)
else:
# Missing feature with action status/available will return 0
lst_results.append(0)
continue
dbus_interface = revpi_middleware[feature_mapping.dbus_interface]
if args.action == "enable":
dbus_interface.Enable(feature_mapping.name)
lst_results.append(0)
elif args.action == "disable":
dbus_interface.Disable(feature_mapping.name)
lst_results.append(0)
elif args.action in ("status", "availstat"):
status = dbus_interface.GetStatus(feature_mapping.name)
lst_results.append(int(status))
elif args.action == "available":
availability = dbus_interface.GetAvailability(feature_mapping.name)
lst_results.append(int(availability))
if lst_results:
print(" ".join(map(str, lst_results)))

View File

@@ -1,2 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
[tool.black]
line-length = 100

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Build dependencies
pip-licenses
Pyinstaller

View File

@@ -10,7 +10,7 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices
log = getLogger(__name__)
@@ -41,8 +41,9 @@ class BusProvider(Thread):
# ("Subdir2", Example()),
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(),
InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(self._bus),
InterfaceSoftwareServices(self._bus),
]
try:

View File

@@ -2,8 +2,10 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus."""
from logging import getLogger
from typing import Union
from pydbus import SessionBus, SystemBus
log = getLogger(__name__)
@@ -13,6 +15,9 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface:
def __init__(self, bus: Union[SessionBus, SystemBus]):
self.bus = bus
def cleanup(self):
"""
Represents a method responsible for performing cleanup operations. This method is executed to properly

View File

@@ -26,7 +26,9 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal()
def __init__(self, picontrol_device: str, config_rsc: str):
def __init__(self, bus, picontrol_device: str, config_rsc: str):
super().__init__(bus)
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc

View File

@@ -3,3 +3,4 @@
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices

View File

@@ -5,16 +5,16 @@
from collections import namedtuple
from logging import getLogger
from pydbus.generic import signal
from .revpi_config import (
ConfigActions,
configure_avahi_daemon,
configure_bluetooth,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
configure_wlan,
simple_systemd,
)
from ..dbus_helper import DbusInterface
@@ -42,19 +42,32 @@ class InterfaceRevpiConfig(DbusInterface):
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
self.StatusChanged(feature, False)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
self.StatusChanged(feature, True)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
@@ -83,18 +96,8 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"swapfile": FeatureFunction(configure_dphys_swapfile, []),
"bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -0,0 +1,205 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services."""
from logging import getLogger
from typing import List
from pydbus.generic import signal
from ..dbus_helper import DbusInterface
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
class InterfaceSoftwareServices(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.SoftwareServices">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self.mrk_available = {}
self.mrk_status = {}
self.services = {
"pimodbus-master": ["pimodbus-master.service"],
"pimodbus-slave": ["pimodbus-slave.service"],
"ntp": ["systemd-timesyncd.service"],
"ssh": ["ssh.service"],
"nodered": ["nodered.service"],
"noderedrevpinodes-server": ["noderedrevpinodes-server.service"],
"revpipyload": ["revpipyload.service"],
"avahi": ["avahi-daemon.service", "avahi-daemon.socket"],
}
# Create a systemd manager interface object
systemd = self.bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
# Load all unit paths and subscribe to properties changed signal
self.object_paths = {}
for feature in self.services:
# Get the status and availability of the feature
self.mrk_available[feature] = self.GetAvailability(feature)
self.mrk_status[feature] = self.GetStatus(feature)
# Subscribe to properties changed signal for each unit
for unit_name in self.services[feature]:
unit_path = systemd_manager.LoadUnit(unit_name)
self.object_paths[unit_path] = feature
self.bus.subscribe(
iface="org.freedesktop.DBus.Properties",
signal="PropertiesChanged",
object=unit_path,
signal_fired=self._callback_properties_changed,
)
# Subscribe to the reloading signal to update the availability of the feature
self.bus.subscribe(
iface="org.freedesktop.systemd1.Manager",
signal="Reloading",
object="/org/freedesktop/systemd1",
signal_fired=self._callback_reloading_signal,
)
def _callback_reloading_signal(self, sender, object_path, interface, signal, parameters):
"""
Handles the signal emitted for reloading and checks for changes in availability
and status for a set of services. If changes are identified, corresponding
update methods are triggered to reflect the new states.
Args:
sender: The entity sending the signal.
object_path: Path to the object emitting the signal.
interface: Interface through which the signal is sent.
signal: The signal being received.
parameters: A list of parameters associated with the signal.
Raises:
None
"""
if parameters[0]:
return
for feature in self.services:
availability = self.GetAvailability(feature)
if self.mrk_available[feature] != availability:
self.mrk_available[feature] = availability
self.AvailabilityChanged(feature, availability)
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _callback_properties_changed(self, sender, object_path, interface, signal, parameters):
"""
Handles the 'PropertiesChanged' signal callback by updating internal status tracking for given
features and invoking status change notifications if necessary.
Args:
sender (Any): Information about the signal sender.
object_path (str): The path of the object emitting the signal.
interface (str): The interface where the signal was emitted.
signal (str): The name of the emitted signal.
parameters (tuple): Signal parameters containing interface name, changed properties, and
invalidated properties.
Raises:
TypeError: If the 'parameters' argument does not unpack to three expected values.
"""
interface, changed_properties, invalidated_properties = parameters
if "ActiveState" not in changed_properties:
return
feature = self.object_paths[object_path]
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _get_unit_names(self, feature: str) -> List[str]:
if feature not in self.services:
raise ValueError(f"feature {feature} does not exist")
return self.services[feature]
def Disable(self, feature: str) -> None:
"""Disable the feature."""
action = ServiceActions.DISABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
action = ServiceActions.ENABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
unit_names = self._get_unit_names(feature)
rc_status = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.STATUS, unit_name):
rc_status = False
break
return rc_status
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
unit_names = self._get_unit_names(feature)
rc_available = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.AVAILABLE, unit_name):
rc_available = False
break
return rc_available
@property
def available_features(self) -> list[str]:
return list(self.services.keys())

View File

@@ -10,12 +10,12 @@ from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from threading import Thread
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
@@ -42,6 +42,7 @@ class ComputeModuleTypes(IntEnum):
CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5.
"""
UNKNOWN = 0
CM1 = 6
CM3 = 10
@@ -58,6 +59,7 @@ class ConfigActions(Enum):
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
@@ -232,6 +234,7 @@ class ConfigTxt:
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
"""
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
@@ -432,18 +435,6 @@ class ConfigTxt:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
@@ -509,7 +500,17 @@ def configure_con_can(action: ConfigActions):
def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service")
# Translate config action to systemd action
if action is ConfigActions.ENABLE:
systemd_action = ServiceActions.ENABLE
elif action is ConfigActions.DISABLE:
systemd_action = ServiceActions.DISABLE
elif action is ConfigActions.STATUS:
systemd_action = ServiceActions.STATUS
else:
systemd_action = ServiceActions.AVAILABLE
return_value = simple_systemd(systemd_action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
@@ -634,98 +635,6 @@ def get_rfkill_index(device_class_path: str) -> Optional[int]:
return None
def simple_systemd(action: ConfigActions, unit: str):
"""
Performs specified actions on systemd units.
This function allows interaction with systemd units for various operations
such as enabling, disabling, checking the status, and verifying availability.
It communicates with the systemd manager via the SystemBus and handles units
based on the action specified.
Parameters:
action (ConfigActions): Specifies the action to be performed on the
systemd unit. Supported actions include ENABLE,
DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit on which the action is to be
performed.
Returns:
bool: For STATUS and AVAILABLE actions, returns True if the corresponding
criteria are met (e.g., enabled and active for STATUS, or not found
for AVAILABLE). Otherwise, returns False.
Raises:
ValueError: If the specified action is not supported.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)

View File

@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from enum import Enum
from logging import getLogger
from threading import Thread
from typing import Optional
from pydbus import SystemBus
log = getLogger(__name__)
class ServiceActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]:
"""
Perform systemd service actions such as enable, disable, check status, or availability.
This function interacts with the systemd D-Bus API to manage and query the
state of services on a system. The supported actions include enabling a systemd
unit, disabling it, starting/stopping a unit, and checking its status or
availability. The function supports asynchronous configuration changes through
threads where applicable.
Parameters:
action (ServiceActions): The action to perform on the systemd service.
Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit to operate on (e.g., "example.service").
unmask (bool): When enabling a unit, if True, any masked unit file will
first be unmasked before proceeding with the operation. Defaults to False.
Returns:
Optional[bool]: The return value depends on the action. For STATUS or
AVAILABLE actions, it returns True if the unit satisfies the condition
(e.g., enabled and active, or available and loaded), False otherwise.
For other actions, it returns None.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ServiceActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
lst_change_unmask = []
if unmask:
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ServiceActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ServiceActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState in (
"active",
"activating",
)
elif action is ServiceActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None