4 Commits

Author SHA1 Message Date
acdb814007 doc(revpiconfig): Docstrings for get_rfkill_index and simple_systemd
The new docstrings provide detailed explanations of the purpose,
parameters, and return values for both functions, improving code
readability and maintainability. This ensures better understanding for
future contributors and reduces ambiguity.
2025-04-21 14:44:50 +02:00
8451b5401b doc(revpiconfig): Add docstrings to enums in revpi_config.py
This update introduces detailed docstrings for the `ComputeModuleTypes`
and `ConfigActions` enumeration classes. The docstrings provide
descriptions for each class and their attributes, improving code
readability and maintainability.
2025-04-21 14:44:04 +02:00
0fe8be6515 doc(revpiconfig): Add docstrings to RevPiConfig class and methods
Enhance documentation for the `RevPiConfig` class, its methods, and
properties to improve code readability and ease of use. The added
docstrings provide clear explanations of the class's purpose,
attributes, and functionality for developers and users. This update
supports better maintainability and understanding of the codebase.
2025-04-21 14:42:14 +02:00
5d376acf49 doc(revpiconfig): Add detailed docstrings to ConfigTxt methods
Enhance the `ConfigTxt` class with comprehensive docstrings for all
methods, providing clear explanations of their functionality,
parameters, and return values. This improves code readability and
facilitates easier maintenance and understanding for future developers.
2025-04-21 14:38:32 +02:00
18 changed files with 107 additions and 728 deletions

4
.gitignore vendored
View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View File

@@ -1,40 +0,0 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
default:
tags:
- self-hosted
- host-arm64
- high-perf
include:
- project: "revolutionpi/infrastructure/ci-templates"
file: "base.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "check-commit/lint-commit.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "reuse-lint.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "package-devel.yml"
- local: debian/gitlab-ci.yml
rules:
- exists:
- debian/gitlab-ci.yml
run_tests:
stage: test
image: python:3.11-bookworm
script:
- apt-get update
- apt-get -y install dbus libgirepository1.0-dev
- dbus-uuidgen --ensure=/etc/machine-id
- pip install -r requirements.txt
- PYTHONPATH=src dbus-run-session -- pytest -v --junitxml=report.xml --cov=src --cov-report term --cov-report xml:coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: ${CI_PROJECT_DIR}/report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
recursive-include .reuse * recursive-include .reuse *
recursive-include data * recursive-include data *
recursive-include LICENSES * recursive-include LICENSES *

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
SHELL := bash SHELL := bash
MAKEFLAGS = --no-print-directory --no-builtin-rules MAKEFLAGS = --no-print-directory --no-builtin-rules
.DEFAULT_GOAL = all .DEFAULT_GOAL = all

View File

@@ -1,9 +1,3 @@
<!--
SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# Middleware for Revolution Pi # Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface. This middleware will support D-Bus as IPC interface.

View File

@@ -1,4 +1,4 @@
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.middleware1.conf --> <!-- /etc/dbus-1/system.d/revpi-middleware.conf -->
<busconfig> <busconfig>
<!-- Allow full access to root as the bus owner --> <!-- Allow full access to root as the bus owner -->
<policy user="root"> <policy user="root">
@@ -12,7 +12,7 @@
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="org.freedesktop.DBus.Introspectable"/> send_interface="org.freedesktop.DBus.Introspectable"/>
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="com.revolutionpi.middleware1.PiControl"/> send_interface="com.revolutionpi.middleware1.picontrol"/>
</policy> </policy>
<!-- Standard-Policy --> <!-- Standard-Policy -->

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from argparse import ArgumentParser, SUPPRESS
from typing import NamedTuple
from pydbus import SessionBus, SystemBus
FeatureMapping = NamedTuple("FeatureMapping", [("dbus_interface", str), ("name", str)])
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
IFACE_SOFTWARE_SERVICES = "com.revolutionpi.middleware1.SoftwareServices"
IFACE_REVPI_CONFIG = "com.revolutionpi.middleware1.RevpiConfig"
REVPI_FEATURE_MAPPINGS = {
"gui": FeatureMapping(IFACE_REVPI_CONFIG, "gui"),
"revpi-con-can": FeatureMapping(IFACE_REVPI_CONFIG, "revpi-con-can"),
"dphys-swapfile": FeatureMapping(IFACE_REVPI_CONFIG, "swapfile"),
"pimodbus-master": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-master"),
"pimodbus-slave": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-slave"),
"systemd-timesyncd": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ntp"),
"ssh": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ssh"),
"nodered": FeatureMapping(IFACE_SOFTWARE_SERVICES, "nodered"),
"noderedrevpinodes-server": FeatureMapping(IFACE_SOFTWARE_SERVICES, "noderedrevpinodes-server"),
"revpipyload": FeatureMapping(IFACE_SOFTWARE_SERVICES, "revpipyload"),
"bluetooth": FeatureMapping(IFACE_REVPI_CONFIG, "bluetooth"),
"ieee80211": FeatureMapping(IFACE_REVPI_CONFIG, "wlan"),
"avahi": FeatureMapping(IFACE_SOFTWARE_SERVICES, "avahi"),
"external-antenna": FeatureMapping(IFACE_REVPI_CONFIG, "external-antenna"),
}
# Generate command arguments
parser = ArgumentParser(
prog="revpi-config",
description="Configuration tool for Revolution Pi.",
)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "availstat"],
help="Action to be executed: enable, disable, status or available.",
)
parser.add_argument(
"feature",
nargs="*",
help="Name of the feature to configure.",
)
args = parser.parse_args()
# Init dbus
bus = SessionBus() if args.use_session_bus else SystemBus()
revpi_middleware = bus.get(REVPI_DBUS_NAME, REVPI_DBUS_BASE_PATH)
lst_results = []
for feature in args.feature:
# Get the mappings
feature_mapping = REVPI_FEATURE_MAPPINGS.get(feature, None)
if feature_mapping is None:
if args.action in ("enable", "disable"):
# Missing feature with action enable/disable will return 5
lst_results.append(5)
elif args.action == "availstat":
# Missing feature with action availstat will return 2
lst_results.append(2)
else:
# Missing feature with action status/available will return 0
lst_results.append(0)
continue
dbus_interface = revpi_middleware[feature_mapping.dbus_interface]
if args.action == "enable":
dbus_interface.Enable(feature_mapping.name)
lst_results.append(0)
elif args.action == "disable":
dbus_interface.Disable(feature_mapping.name)
lst_results.append(0)
elif args.action in ("status", "availstat"):
status = dbus_interface.GetStatus(feature_mapping.name)
lst_results.append(int(status))
elif args.action == "available":
availability = dbus_interface.GetAvailability(feature_mapping.name)
lst_results.append(int(availability))
if lst_results:
print(" ".join(map(str, lst_results)))

View File

@@ -1,6 +1,2 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
[tool.black] [tool.black]
line-length = 100 line-length = 100

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Build dependencies # Build dependencies
pip-licenses pip-licenses
Pyinstaller Pyinstaller

View File

@@ -10,7 +10,7 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices, InterfaceWlan from .system_config import InterfaceRevpiConfig
log = getLogger(__name__) log = getLogger(__name__)
@@ -41,10 +41,8 @@ class BusProvider(Thread):
# ("Subdir2", Example()), # ("Subdir2", Example()),
# ("Subdir2/Whatever", Example()) # ("Subdir2/Whatever", Example())
lst_interfaces = [ lst_interfaces = [
InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc), InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(self._bus), InterfaceRevpiConfig(),
InterfaceSoftwareServices(self._bus),
InterfaceWlan(self._bus),
] ]
try: try:

View File

@@ -2,10 +2,8 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus.""" """Helper for dbus."""
from logging import getLogger
from typing import Union
from pydbus import SessionBus, SystemBus from logging import getLogger
log = getLogger(__name__) log = getLogger(__name__)
@@ -15,9 +13,6 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface: class DbusInterface:
def __init__(self, bus: Union[SessionBus, SystemBus]):
self.bus = bus
def cleanup(self): def cleanup(self):
""" """
Represents a method responsible for performing cleanup operations. This method is executed to properly Represents a method responsible for performing cleanup operations. This method is executed to properly

View File

@@ -26,9 +26,7 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal() NotifyDriverReset = signal()
def __init__(self, bus, picontrol_device: str, config_rsc: str): def __init__(self, picontrol_device: str, config_rsc: str):
super().__init__(bus)
self.picontrol_device = picontrol_device self.picontrol_device = picontrol_device
self.config_rsc = config_rsc self.config_rsc = config_rsc

View File

@@ -3,5 +3,3 @@
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration.""" """D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices
from .interface_wlan import InterfaceWlan

View File

@@ -5,15 +5,16 @@
from collections import namedtuple from collections import namedtuple
from logging import getLogger from logging import getLogger
from pydbus.generic import signal
from .revpi_config import ( from .revpi_config import (
ConfigActions, ConfigActions,
configure_avahi_daemon,
configure_bluetooth, configure_bluetooth,
configure_con_can, configure_con_can,
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna, configure_external_antenna,
configure_gui, configure_gui,
configure_wlan,
simple_systemd,
) )
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -41,32 +42,19 @@ class InterfaceRevpiConfig(DbusInterface):
<arg name="available" type="b" direction="out"/> <arg name="available" type="b" direction="out"/>
</method> </method>
<property name="available_features" type="as" access="read"/> <property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface> </interface>
</node> </node>
""" """
AvailabilityChanged = signal()
StatusChanged = signal()
def Disable(self, feature: str) -> None: def Disable(self, feature: str) -> None:
"""Disable the feature.""" """Disable the feature."""
feature_function = get_feature(feature) feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args) feature_function.function(ConfigActions.DISABLE, *feature_function.args)
self.StatusChanged(feature, False)
def Enable(self, feature: str) -> None: def Enable(self, feature: str) -> None:
"""Enable the feature.""" """Enable the feature."""
feature_function = get_feature(feature) feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args) feature_function.function(ConfigActions.ENABLE, *feature_function.args)
self.StatusChanged(feature, True)
def GetStatus(self, feature: str) -> bool: def GetStatus(self, feature: str) -> bool:
"""Get feature status.""" """Get feature status."""
@@ -95,7 +83,18 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = { AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []), "gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []), "revpi-con-can": FeatureFunction(configure_con_can, []),
"swapfile": FeatureFunction(configure_dphys_swapfile, []), "dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []), "bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []), "external-antenna": FeatureFunction(configure_external_antenna, []),
} }

View File

@@ -1,205 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services."""
from logging import getLogger
from typing import List
from pydbus.generic import signal
from ..dbus_helper import DbusInterface
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
class InterfaceSoftwareServices(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.SoftwareServices">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self.mrk_available = {}
self.mrk_status = {}
self.services = {
"pimodbus-master": ["pimodbus-master.service"],
"pimodbus-slave": ["pimodbus-slave.service"],
"ntp": ["systemd-timesyncd.service"],
"ssh": ["ssh.service"],
"nodered": ["nodered.service"],
"noderedrevpinodes-server": ["noderedrevpinodes-server.service"],
"revpipyload": ["revpipyload.service"],
"avahi": ["avahi-daemon.service", "avahi-daemon.socket"],
}
# Create a systemd manager interface object
systemd = self.bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
# Load all unit paths and subscribe to properties changed signal
self.object_paths = {}
for feature in self.services:
# Get the status and availability of the feature
self.mrk_available[feature] = self.GetAvailability(feature)
self.mrk_status[feature] = self.GetStatus(feature)
# Subscribe to properties changed signal for each unit
for unit_name in self.services[feature]:
unit_path = systemd_manager.LoadUnit(unit_name)
self.object_paths[unit_path] = feature
self.bus.subscribe(
iface="org.freedesktop.DBus.Properties",
signal="PropertiesChanged",
object=unit_path,
signal_fired=self._callback_properties_changed,
)
# Subscribe to the reloading signal to update the availability of the feature
self.bus.subscribe(
iface="org.freedesktop.systemd1.Manager",
signal="Reloading",
object="/org/freedesktop/systemd1",
signal_fired=self._callback_reloading_signal,
)
def _callback_reloading_signal(self, sender, object_path, interface, signal, parameters):
"""
Handles the signal emitted for reloading and checks for changes in availability
and status for a set of services. If changes are identified, corresponding
update methods are triggered to reflect the new states.
Args:
sender: The entity sending the signal.
object_path: Path to the object emitting the signal.
interface: Interface through which the signal is sent.
signal: The signal being received.
parameters: A list of parameters associated with the signal.
Raises:
None
"""
if parameters[0]:
return
for feature in self.services:
availability = self.GetAvailability(feature)
if self.mrk_available[feature] != availability:
self.mrk_available[feature] = availability
self.AvailabilityChanged(feature, availability)
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _callback_properties_changed(self, sender, object_path, interface, signal, parameters):
"""
Handles the 'PropertiesChanged' signal callback by updating internal status tracking for given
features and invoking status change notifications if necessary.
Args:
sender (Any): Information about the signal sender.
object_path (str): The path of the object emitting the signal.
interface (str): The interface where the signal was emitted.
signal (str): The name of the emitted signal.
parameters (tuple): Signal parameters containing interface name, changed properties, and
invalidated properties.
Raises:
TypeError: If the 'parameters' argument does not unpack to three expected values.
"""
interface, changed_properties, invalidated_properties = parameters
if "ActiveState" not in changed_properties:
return
feature = self.object_paths[object_path]
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _get_unit_names(self, feature: str) -> List[str]:
if feature not in self.services:
raise ValueError(f"feature {feature} does not exist")
return self.services[feature]
def Disable(self, feature: str) -> None:
"""Disable the feature."""
action = ServiceActions.DISABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
action = ServiceActions.ENABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
unit_names = self._get_unit_names(feature)
rc_status = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.STATUS, unit_name):
rc_status = False
break
return rc_status
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
unit_names = self._get_unit_names(feature)
rc_available = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.AVAILABLE, unit_name):
rc_available = False
break
return rc_available
@property
def available_features(self) -> list[str]:
return list(self.services.keys())

View File

@@ -1,90 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for wlan configuration."""
from logging import getLogger
from threading import Event, Thread
from pydbus.generic import signal
from .revpi_config import configure_wlan, ConfigActions
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
class InterfaceWlan(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.WlanConfiguration">
<method name="Enable">
<annotation name="org.freedesktop.DBus.Method.NoReply" value="true"/>
</method>
<method name="Disable">
<annotation name="org.freedesktop.DBus.Method.NoReply" value="true"/>
</method>
<property name="available" type="b" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="status" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
PropertiesChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self._status = ""
# NetworkManager-Objekt abrufen
self.bus_nm = self.bus.get(
"org.freedesktop.NetworkManager",
"/org/freedesktop/NetworkManager",
)
# Prepare status value and thread
self.evt_stop_threading = Event()
self._update_status(suppress_signal=True)
self.th_run_status_update = Thread(
target=self._run_status_update,
daemon=True,
).start()
def _run_status_update(self, *args, **kwargs):
while not self.evt_stop_threading.wait(1.0):
self._update_status()
def _update_status(self, suppress_signal=False) -> None:
str_status = "enabled" if configure_wlan(ConfigActions.STATUS) else "disabled"
if self._status != str_status:
self._status = str_status
if not suppress_signal:
self.PropertiesChanged(
"com.revolutionpi.middleware1.WlanConfiguration",
{"status": str_status},
[],
)
def cleanup(self):
self.evt_stop_threading.set()
def Disable(self) -> None:
"""Disable integrated WLAN hardware."""
configure_wlan(ConfigActions.DISABLE)
def Enable(self) -> None:
"""Enable integrated WLAN hardware."""
configure_wlan(ConfigActions.ENABLE)
@property
def available(self) -> bool:
"""Check if WLAN hardware is available."""
return configure_wlan(ConfigActions.AVAILABLE)
@property
def status(self) -> str:
return self._status

View File

@@ -10,13 +10,11 @@ from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join from os.path import exists, join
from threading import Lock
from typing import List, Optional from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
from ..dbus_helper import grep from ..dbus_helper import grep
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__) log = getLogger(__name__)
@@ -25,8 +23,6 @@ ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211" LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
CMDLINE_TXT_LOCK = Lock()
CONFIG_TXT_LOCK = Lock()
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
@@ -45,7 +41,6 @@ class ComputeModuleTypes(IntEnum):
CM4S (int): Represents a Compute Module 4S. CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5. CM5 (int): Represents a Compute Module 5.
""" """
UNKNOWN = 0 UNKNOWN = 0
CM1 = 6 CM1 = 6
CM3 = 10 CM3 = 10
@@ -62,7 +57,6 @@ class ConfigActions(Enum):
actions. It can be used to ensure consistency when working with or defining actions. It can be used to ensure consistency when working with or defining
such actions in a system. such actions in a system.
""" """
ENABLE = "enable" ENABLE = "enable"
DISABLE = "disable" DISABLE = "disable"
STATUS = "status" STATUS = "status"
@@ -222,87 +216,6 @@ class RevPiConfig:
return bool(self._wlan_class_path) return bool(self._wlan_class_path)
class CmdLineTxt:
"""
Represents operations on a `cmdline.txt` configuration file.
This class provides functionality to read, modify, and save the
`cmdline.txt` file commonly used for system configurations. It allows
setting key-value pairs, removing keys, and manages file locking to ensure
thread-safe modifications.
"""
# Value is optional, "?:=" non-capturing the "="
re_name_value = re.compile(r"(?P<key>[^\s=]+)(?:=(?P<value>\S+))?")
def __init__(self):
self._cmdline_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._cmdline_txt_path = path
break
if not self._cmdline_txt_path:
raise FileNotFoundError("no config.txt found")
def _get_cmdline_dict(self) -> dict:
with CMDLINE_TXT_LOCK:
with open(self._cmdline_txt_path, "r") as file:
cmdline = file.read()
return {
match.group("key"): match.group("value")
for match in self.re_name_value.finditer(cmdline)
}
def _write_cmdline_dict(self, cmdline_dict: dict) -> None:
with CMDLINE_TXT_LOCK:
tmp_path = f"{self._cmdline_txt_path}.tmp"
with open(tmp_path, "w") as file:
str_cmdline = ""
for key, value in cmdline_dict.items():
if value is None:
str_cmdline += f"{key} "
else:
str_cmdline += f"{key}={value} "
str_cmdline = str_cmdline.strip()
file.write(str_cmdline + "\n")
shutil.move(tmp_path, self._cmdline_txt_path)
def remove_key(self, key: str) -> None:
"""
Removes a specified key from the config.txt file.
Parameters:
key: str
The key to be removed from the config.txt file.
"""
dc_cmdline = self._get_cmdline_dict()
if key in dc_cmdline:
del dc_cmdline[key]
self._write_cmdline_dict(dc_cmdline)
def set_key_value(self, key: str, value: Optional[str] = None) -> None:
"""
Sets a given key-value pair in the config.txt file. If the key does not
exist or the value differs from the current one, the pair is updated.
If the value is None, just the key is set without a value.
Parameters:
key: str
The key to set in the config.txt file.
value: Optional[str], default = None
The value to associate with the key, defaulting to None.
"""
dc_cmdline = self._get_cmdline_dict()
if key not in dc_cmdline or dc_cmdline.get(key, value) != value:
dc_cmdline[key] = value
self._write_cmdline_dict(dc_cmdline)
class ConfigTxt: class ConfigTxt:
""" """
Configuration file handler for managing 'config.txt'. Configuration file handler for managing 'config.txt'.
@@ -312,8 +225,12 @@ class ConfigTxt:
to manipulate specific parameters within the configuration and supports to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly. is to abstract file operations and make modifications user-friendly.
"""
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
"""
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self): def __init__(self):
@@ -397,9 +314,8 @@ class ConfigTxt:
Returns: Returns:
None None
""" """
with CONFIG_TXT_LOCK: with open(self._config_txt_path, "r") as f:
with open(self._config_txt_path, "r") as f: self._config_txt_lines = f.readlines()
self._config_txt_lines = f.readlines()
def save_config(self): def save_config(self):
""" """
@@ -413,11 +329,10 @@ class ConfigTxt:
if not self._config_txt_lines: if not self._config_txt_lines:
return return
with CONFIG_TXT_LOCK: tmp_path = f"{self._config_txt_path}.tmp"
tmp_path = f"{self._config_txt_path}.tmp" with open(tmp_path, "w") as f:
with open(tmp_path, "w") as f: f.writelines(self._config_txt_lines)
f.writelines(self._config_txt_lines) shutil.move(tmp_path, self._config_txt_path)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear() self._config_txt_lines.clear()
@@ -516,6 +431,18 @@ class ConfigTxt:
return self._config_txt_path return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_bluetooth(action: ConfigActions): def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0") hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device) bt_rfkill_index = get_rfkill_index(hci_device)
@@ -581,17 +508,7 @@ def configure_con_can(action: ConfigActions):
def configure_dphys_swapfile(action: ConfigActions): def configure_dphys_swapfile(action: ConfigActions):
# Translate config action to systemd action return_value = simple_systemd(action, "dphys-swapfile.service")
if action is ConfigActions.ENABLE:
systemd_action = ServiceActions.ENABLE
elif action is ConfigActions.DISABLE:
systemd_action = ServiceActions.DISABLE
elif action is ConfigActions.STATUS:
systemd_action = ServiceActions.STATUS
else:
systemd_action = ServiceActions.AVAILABLE
return_value = simple_systemd(systemd_action, "dphys-swapfile.service")
# Post actions for dphys-swapfile # Post actions for dphys-swapfile
if action is ConfigActions.DISABLE: if action is ConfigActions.DISABLE:
@@ -637,11 +554,7 @@ def configure_gui(action: ConfigActions):
return gui_available return gui_available
bus = SystemBus() bus = SystemBus()
systemd = bus.get( systemd_manager = bus.get(".systemd1")
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True) systemd_manager.SetDefaultTarget("graphical.target", True)
@@ -716,6 +629,66 @@ def get_rfkill_index(device_class_path: str) -> Optional[int]:
return None return None
def simple_systemd(action: ConfigActions, unit: str):
"""
Performs specified actions on systemd units.
This function allows interaction with systemd units for various operations
such as enabling, disabling, checking the status, and verifying availability.
It communicates with the systemd manager via the SystemBus and handles units
based on the action specified.
Parameters:
action (ConfigActions): Specifies the action to be performed on the
systemd unit. Supported actions include ENABLE,
DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit on which the action is to be
performed.
Returns:
bool: For STATUS and AVAILABLE actions, returns True if the corresponding
criteria are met (e.g., enabled and active for STATUS, or not found
for AVAILABLE). Otherwise, returns False.
Raises:
ValueError: If the specified action is not supported.
"""
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__": if __name__ == "__main__":
rc = RevPiConfig() rc = RevPiConfig()
print("Model:", rc.model) print("Model:", rc.model)

View File

@@ -1,121 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from enum import Enum
from logging import getLogger
from threading import Thread
from typing import Optional
from pydbus import SystemBus
log = getLogger(__name__)
class ServiceActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]:
"""
Perform systemd service actions such as enable, disable, check status, or availability.
This function interacts with the systemd D-Bus API to manage and query the
state of services on a system. The supported actions include enabling a systemd
unit, disabling it, starting/stopping a unit, and checking its status or
availability. The function supports asynchronous configuration changes through
threads where applicable.
Parameters:
action (ServiceActions): The action to perform on the systemd service.
Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit to operate on (e.g., "example.service").
unmask (bool): When enabling a unit, if True, any masked unit file will
first be unmasked before proceeding with the operation. Defaults to False.
Returns:
Optional[bool]: The return value depends on the action. For STATUS or
AVAILABLE actions, it returns True if the unit satisfies the condition
(e.g., enabled and active, or available and loaded), False otherwise.
For other actions, it returns None.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ServiceActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
lst_change_unmask = []
if unmask:
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ServiceActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ServiceActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState in (
"active",
"activating",
)
elif action is ServiceActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None