5 Commits

Author SHA1 Message Date
8d8d3bbae4 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-20 16:32:57 +02:00
1ae661a5e9 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-20 16:16:51 +02:00
ffdb27049e feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-20 16:02:39 +02:00
38f495e864 refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-20 15:43:09 +02:00
85dc6a7e56 feat(dbus): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-20 15:39:02 +02:00
20 changed files with 65 additions and 1149 deletions

4
.gitignore vendored
View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@@ -1,40 +0,0 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
default:
tags:
- self-hosted
- host-arm64
- high-perf
include:
- project: "revolutionpi/infrastructure/ci-templates"
file: "base.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "check-commit/lint-commit.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "reuse-lint.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "package-devel.yml"
- local: debian/gitlab-ci.yml
rules:
- exists:
- debian/gitlab-ci.yml
run_tests:
stage: test
image: python:3.11-bookworm
script:
- apt-get update
- apt-get -y install dbus libgirepository1.0-dev
- dbus-uuidgen --ensure=/etc/machine-id
- pip install -r requirements.txt
- PYTHONPATH=src dbus-run-session -- pytest -v --junitxml=report.xml --cov=src --cov-report term --cov-report xml:coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: ${CI_PROJECT_DIR}/report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
recursive-include .reuse *
recursive-include data *
recursive-include LICENSES *

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
SHELL := bash
MAKEFLAGS = --no-print-directory --no-builtin-rules
.DEFAULT_GOAL = all

View File

@@ -1,9 +1,3 @@
<!--
SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface.

View File

@@ -1,4 +1,4 @@
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.middleware1.conf -->
<!-- /etc/dbus-1/system.d/revpi-middleware.conf -->
<busconfig>
<!-- Allow full access to root as the bus owner -->
<policy user="root">
@@ -12,7 +12,7 @@
<allow send_destination="com.revolutionpi.middleware1"
send_interface="org.freedesktop.DBus.Introspectable"/>
<allow send_destination="com.revolutionpi.middleware1"
send_interface="com.revolutionpi.middleware1.PiControl"/>
send_interface="com.revolutionpi.middleware1.picontrol"/>
</policy>
<!-- Standard-Policy -->

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from argparse import ArgumentParser, SUPPRESS
from typing import NamedTuple
from pydbus import SessionBus, SystemBus
FeatureMapping = NamedTuple("FeatureMapping", [("dbus_interface", str), ("name", str)])
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
IFACE_SOFTWARE_SERVICES = "com.revolutionpi.middleware1.SoftwareServices"
IFACE_REVPI_CONFIG = "com.revolutionpi.middleware1.RevpiConfig"
REVPI_FEATURE_MAPPINGS = {
"gui": FeatureMapping(IFACE_REVPI_CONFIG, "gui"),
"revpi-con-can": FeatureMapping(IFACE_REVPI_CONFIG, "revpi-con-can"),
"dphys-swapfile": FeatureMapping(IFACE_REVPI_CONFIG, "swapfile"),
"pimodbus-master": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-master"),
"pimodbus-slave": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-slave"),
"systemd-timesyncd": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ntp"),
"ssh": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ssh"),
"nodered": FeatureMapping(IFACE_SOFTWARE_SERVICES, "nodered"),
"noderedrevpinodes-server": FeatureMapping(IFACE_SOFTWARE_SERVICES, "noderedrevpinodes-server"),
"revpipyload": FeatureMapping(IFACE_SOFTWARE_SERVICES, "revpipyload"),
"bluetooth": FeatureMapping(IFACE_REVPI_CONFIG, "bluetooth"),
"ieee80211": FeatureMapping(IFACE_REVPI_CONFIG, "wlan"),
"avahi": FeatureMapping(IFACE_SOFTWARE_SERVICES, "avahi"),
"external-antenna": FeatureMapping(IFACE_REVPI_CONFIG, "external-antenna"),
}
# Generate command arguments
parser = ArgumentParser(
prog="revpi-config",
description="Configuration tool for Revolution Pi.",
)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "availstat"],
help="Action to be executed: enable, disable, status or available.",
)
parser.add_argument(
"feature",
nargs="*",
help="Name of the feature to configure.",
)
args = parser.parse_args()
# Init dbus
bus = SessionBus() if args.use_session_bus else SystemBus()
revpi_middleware = bus.get(REVPI_DBUS_NAME, REVPI_DBUS_BASE_PATH)
lst_results = []
for feature in args.feature:
# Get the mappings
feature_mapping = REVPI_FEATURE_MAPPINGS.get(feature, None)
if feature_mapping is None:
if args.action in ("enable", "disable"):
# Missing feature with action enable/disable will return 5
lst_results.append(5)
elif args.action == "availstat":
# Missing feature with action availstat will return 2
lst_results.append(2)
else:
# Missing feature with action status/available will return 0
lst_results.append(0)
continue
dbus_interface = revpi_middleware[feature_mapping.dbus_interface]
if args.action == "enable":
dbus_interface.Enable(feature_mapping.name)
lst_results.append(0)
elif args.action == "disable":
dbus_interface.Disable(feature_mapping.name)
lst_results.append(0)
elif args.action in ("status", "availstat"):
status = dbus_interface.GetStatus(feature_mapping.name)
lst_results.append(int(status))
elif args.action == "available":
availability = dbus_interface.GetAvailability(feature_mapping.name)
lst_results.append(int(availability))
if lst_results:
print(" ".join(map(str, lst_results)))

View File

@@ -1,6 +1,2 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
[tool.black]
line-length = 100

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Build dependencies
pip-licenses
Pyinstaller

View File

@@ -29,11 +29,6 @@ def setup_command_line_arguments():
help="RevPi PiControl object",
)
cli_picontrol.add_subparsers(obj_picontrol)
obj_config = rpictl_obj.add_parser(
"config",
help="RevPi configuration object (revpi-config)",
)
cli_config.add_subparsers(obj_config)
def main() -> int:
@@ -44,9 +39,6 @@ def main() -> int:
if obj == "picontrol":
rc = cli_picontrol.main()
elif obj == "config":
rc = cli_config.main()
else:
log.error(f"Unknown object: {obj}")
rc = 1

View File

@@ -1,93 +0,0 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, get_properties, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
parent_parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.",
)
parent_parser.add_argument(
"feature",
nargs="?",
default="",
help="Name of the feature to configer. To list all features use 'list-features' as action.",
)
def main() -> int:
action = pi.pargs.action
dbus_value = False
try:
if action == "list-features":
dbus_value = get_properties(
"available_features",
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
for feature in dbus_value:
print(feature)
return 0
# For the following actions, a feature name is required
if pi.pargs.feature == "":
raise Exception("Feature name is required")
if action == "enable":
simple_call(
"Enable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "disable":
simple_call(
"Disable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "status":
dbus_value = simple_call(
"GetStatus",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "available":
dbus_value = simple_call(
"GetAvailability",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
else:
raise Exception(f"Unknown action: {action}")
except Exception as e:
log.error(f"Error: {e}")
return 1
log.debug(
f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}"
)
print(int(dbus_value))
return 0

View File

@@ -17,18 +17,6 @@ class BusType(Enum):
SYSTEM = "system"
def get_properties(
property_name: str,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, property_name)
def simple_call(
method: str,
*args,

View File

@@ -10,7 +10,6 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices
log = getLogger(__name__)
@@ -41,9 +40,7 @@ class BusProvider(Thread):
# ("Subdir2", Example()),
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(self._bus),
InterfaceSoftwareServices(self._bus),
InterfacePiControl(self.picontrol_device, self.config_rsc),
]
try:

View File

@@ -2,10 +2,8 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus."""
from logging import getLogger
from typing import Union
from pydbus import SessionBus, SystemBus
from logging import getLogger
log = getLogger(__name__)
@@ -15,9 +13,6 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface:
def __init__(self, bus: Union[SessionBus, SystemBus]):
self.bus = bus
def cleanup(self):
"""
Represents a method responsible for performing cleanup operations. This method is executed to properly

View File

@@ -26,9 +26,7 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal()
def __init__(self, bus, picontrol_device: str, config_rsc: str):
super().__init__(bus)
def __init__(self, picontrol_device: str, config_rsc: str):
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc

View File

@@ -3,4 +3,3 @@
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices

View File

@@ -5,16 +5,12 @@
from collections import namedtuple
from logging import getLogger
from pydbus.generic import signal
from .revpi_config import (
ConfigActions,
configure_bluetooth,
configure_con_can,
configure_avahi_daemon,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
configure_wlan,
simple_systemd,
)
from ..dbus_helper import DbusInterface
@@ -42,32 +38,19 @@ class InterfaceRevpiConfig(DbusInterface):
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
self.StatusChanged(feature, False)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
self.StatusChanged(feature, True)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
@@ -95,9 +78,19 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"swapfile": FeatureFunction(configure_dphys_swapfile, []),
"bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
"revpi-con-can": False,
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": False,
"ieee80211": False,
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": False,
}

View File

@@ -1,205 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services."""
from logging import getLogger
from typing import List
from pydbus.generic import signal
from ..dbus_helper import DbusInterface
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
class InterfaceSoftwareServices(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.SoftwareServices">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self.mrk_available = {}
self.mrk_status = {}
self.services = {
"pimodbus-master": ["pimodbus-master.service"],
"pimodbus-slave": ["pimodbus-slave.service"],
"ntp": ["systemd-timesyncd.service"],
"ssh": ["ssh.service"],
"nodered": ["nodered.service"],
"noderedrevpinodes-server": ["noderedrevpinodes-server.service"],
"revpipyload": ["revpipyload.service"],
"avahi": ["avahi-daemon.service", "avahi-daemon.socket"],
}
# Create a systemd manager interface object
systemd = self.bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
# Load all unit paths and subscribe to properties changed signal
self.object_paths = {}
for feature in self.services:
# Get the status and availability of the feature
self.mrk_available[feature] = self.GetAvailability(feature)
self.mrk_status[feature] = self.GetStatus(feature)
# Subscribe to properties changed signal for each unit
for unit_name in self.services[feature]:
unit_path = systemd_manager.LoadUnit(unit_name)
self.object_paths[unit_path] = feature
self.bus.subscribe(
iface="org.freedesktop.DBus.Properties",
signal="PropertiesChanged",
object=unit_path,
signal_fired=self._callback_properties_changed,
)
# Subscribe to the reloading signal to update the availability of the feature
self.bus.subscribe(
iface="org.freedesktop.systemd1.Manager",
signal="Reloading",
object="/org/freedesktop/systemd1",
signal_fired=self._callback_reloading_signal,
)
def _callback_reloading_signal(self, sender, object_path, interface, signal, parameters):
"""
Handles the signal emitted for reloading and checks for changes in availability
and status for a set of services. If changes are identified, corresponding
update methods are triggered to reflect the new states.
Args:
sender: The entity sending the signal.
object_path: Path to the object emitting the signal.
interface: Interface through which the signal is sent.
signal: The signal being received.
parameters: A list of parameters associated with the signal.
Raises:
None
"""
if parameters[0]:
return
for feature in self.services:
availability = self.GetAvailability(feature)
if self.mrk_available[feature] != availability:
self.mrk_available[feature] = availability
self.AvailabilityChanged(feature, availability)
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _callback_properties_changed(self, sender, object_path, interface, signal, parameters):
"""
Handles the 'PropertiesChanged' signal callback by updating internal status tracking for given
features and invoking status change notifications if necessary.
Args:
sender (Any): Information about the signal sender.
object_path (str): The path of the object emitting the signal.
interface (str): The interface where the signal was emitted.
signal (str): The name of the emitted signal.
parameters (tuple): Signal parameters containing interface name, changed properties, and
invalidated properties.
Raises:
TypeError: If the 'parameters' argument does not unpack to three expected values.
"""
interface, changed_properties, invalidated_properties = parameters
if "ActiveState" not in changed_properties:
return
feature = self.object_paths[object_path]
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _get_unit_names(self, feature: str) -> List[str]:
if feature not in self.services:
raise ValueError(f"feature {feature} does not exist")
return self.services[feature]
def Disable(self, feature: str) -> None:
"""Disable the feature."""
action = ServiceActions.DISABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
action = ServiceActions.ENABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
unit_names = self._get_unit_names(feature)
rc_status = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.STATUS, unit_name):
rc_status = False
break
return rc_status
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
unit_names = self._get_unit_names(feature)
rc_available = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.AVAILABLE, unit_name):
rc_available = False
break
return rc_available
@property
def available_features(self) -> list[str]:
return list(self.services.keys())

View File

@@ -1,48 +1,19 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
"""
Enumeration class to represent compute module types.
This class is an enumeration that defines various types of compute
modules and assigns them associated integer values for identifying
different module types.
Attributes:
UNKNOWN (int): Represents an unknown or undefined compute module type.
CM1 (int): Represents a Compute Module 1.
CM3 (int): Represents a Compute Module 3.
CM4 (int): Represents a Compute Module 4.
CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5.
"""
UNKNOWN = 0
CM1 = 6
CM3 = 10
@@ -52,14 +23,6 @@ class ComputeModuleTypes(IntEnum):
class ConfigActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
@@ -67,26 +30,12 @@ class ConfigActions(Enum):
class RevPiConfig:
"""
Represents the configuration and hardware details of a Revolution Pi system.
This class provides methods and properties to initialize and fetch
information related to the Revolution Pi device, such as model, serial
number, compute module type, WLAN capability, and the presence of a
connection bridge. The class works by parsing system-level files (e.g.,
`/proc/cpuinfo`) and using this data to identify hardware characteristics
and features.
Attributes:
serial (str): The serial number of the Revolution Pi device.
model (str): The model name of the Revolution Pi device.
"""
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._cm_with_wifi = False
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self.serial = ""
self.model = ""
@@ -94,29 +43,6 @@ class RevPiConfig:
self._init_device_info()
def _init_device_info(self):
"""
Initialize and retrieve detailed hardware information, including CPU details,
device type, WLAN interface, and connectivity features.
This method gathers information from system files and other sources to
initialize device-specific attributes such as model, serial number,
compute module type, and optional features like integrated WLAN
or ConBridge support. It performs checks specific to the detected
module type to accurately populate necessary device details.
Attributes
----------
model : str
The model of the CPU based on information from /proc/cpuinfo.
serial : str
The serial number extracted from /proc/cpuinfo.
_cm_type : ComputeModuleTypes, optional
The type of the compute module derived from the hardware revision value.
_wlan_class_path : str, optional
Filesystem path to the detected WLAN interface, if any.
_revpi_with_con_bridge : bool
Indicates whether the device supports the ConBridge feature.
"""
dc_cpuinfo = {}
# Extract CPU information
@@ -143,19 +69,11 @@ class RevPiConfig:
except ValueError:
pass
# Detect WLAN on CM module
could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wlan:
wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0")
if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
for wlan_interface in lst_wlan_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# Detect WiFi
could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wifi:
lst_grep = grep("DRIVER=brcmfmac", "/sys/class/ieee80211/phy0/device/uevent")
self._cm_with_wifi = len(lst_grep) > 0 and self._cm_type in (ComputeModuleTypes)
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
@@ -163,354 +81,33 @@ class RevPiConfig:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def class_path_wlan(self) -> str:
"""
Provides access to the WLAN class path.
This property retrieves the stored WLAN class path, allowing the user to access it when
needed.
Returns:
str: The WLAN class path.
"""
return self._wlan_class_path
@property
def cm_type(self) -> ComputeModuleTypes:
"""
Gets the type of the compute module.
The property provides access to the type of the compute
module used. The type is represented as an instance of
the `ComputeModuleTypes` class.
Returns
-------
ComputeModuleTypes
The type of the compute module.
"""
return self._cm_type
@property
def with_con_bridge(self) -> bool:
"""
Indicates if the device is configured with a connection bridge.
This property checks the internal status and determines whether the device setup
includes a connection bridge functionality. It is read-only.
Returns:
bool: True if the connection bridge is configured, False otherwise.
"""
return self._revpi_with_con_bridge
@property
def with_wlan(self) -> bool:
"""
Checks if WLAN is available.
def with_wifi(self) -> bool:
return self._cm_with_wifi
This property evaluates whether WLAN is enabled or available by checking
the presence or value of the internal attribute `_wlan_class_path`.
Returns:
bool: True if WLAN is available, False otherwise.
"""
return bool(self._wlan_class_path)
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
class ConfigTxt:
"""
Configuration file handler for managing 'config.txt'.
This class provides an interface to read, modify, save, and reload
Raspbian's configuration file `config.txt`. It includes functionalities
to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
"""
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
"""
Removes all occurrences of specified name-value pairs from the configuration.
This method searches for all name-value pairs in the configuration and
removes those that match the given name and value(s). It returns the
number of occurrences removed.
Arguments:
name: str
The name of the configuration variable to search for.
values: str or list
The value or list of values to match the configuration variable
against.
Returns:
int: The number of name-value pairs removed from the configuration.
"""
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
"""
Retrieves all name-value pairs from the configuration text lines.
This method parses the configuration text lines to extract all name-value
pairs. If the configuration text lines are not loaded, it reloads the
configuration before processing. Each extracted name-value pair is added to a
list as a ConfigVariable object, which also holds the index of the match in
the text lines. The method returns the compiled list of these ConfigVariable
objects.
Returns:
List[ConfigVariable]: A list of ConfigVariable objects representing the
name-value pairs found in the configuration text lines, along with their
corresponding indexes.
"""
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
"""
Reloads the configuration file and updates the list of configuration lines.
This method reads the content of the configuration file specified by the
attribute `_config_txt_path` and updates `_config_txt_lines` with the file
contents as a list of strings, where each string represents a line.
Returns:
None
"""
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
"""
Saves the current configuration to a file. The method ensures atomicity by first writing
to a temporary file and then moving it to the desired path. After the configuration is
saved, the internal list of configuration lines is cleared.
Raises:
OSError: If there is an issue writing to or moving the file.
"""
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
"""
Adds a name-value pair to the configuration if it does not already exist.
This method checks if the given name-value pair is already present in
the configuration. If it is not present, the pair is appended to the
configuration text lines.
Parameters:
name (str): The name to be added to the configuration.
value (str): The value corresponding to the name to be added.
Returns:
None
"""
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
"""
Clears the specified device tree overlays. This method removes one or more
device tree overlays by clearing their corresponding name-value pairs.
Args:
dtoverlays (str or list): A device tree overlay name as a string, or a
list of such overlay names to be cleared.
Returns:
int: The number of device tree overlay name-value pairs successfully
cleared.
"""
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
"""
Clears the specified device tree parameters.
This method removes the given device tree parameters by utilizing
the underlying `_clear_name_values` function with a predefined
parameter type.
Parameters:
dtparams: str or list
A string or list of strings specifying the device tree
parameters to remove.
Returns:
int
The number of parameters cleared.
"""
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
"""
Get all values associated with a given variable name.
This method retrieves a list of values corresponding to the specified
variable name by iterating through a collection of configuration
variables. Each configuration variable is checked for a matching name,
and its value is appended to the resulting list if a match is found.
Parameters:
var_name (str): The name of the variable for which values are to
be retrieved.
Returns:
list: A list of values associated with the specified variable name.
"""
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
"""
Get the file path for the configuration text file.
This property provides access to the private attribute `_config_txt_path` which
stores the file path to the configuration text file.
Returns:
str
The file path to the configuration text file.
"""
return self._config_txt_path
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
# If the bluetooth device is not present, the device should have been
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
# based on CM4 and newer). Nothing we can do here, so treat the interface
# as disabled.
if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.STATUS:
if bt_rfkill_index is None:
return False
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
elif action is ConfigActions.AVAILABLE:
return bt_rfkill_index is not None
else:
raise ValueError(f"action {action} not supported")
return None
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
return return_value
def configure_dphys_swapfile(action: ConfigActions):
# Translate config action to systemd action
if action is ConfigActions.ENABLE:
systemd_action = ServiceActions.ENABLE
elif action is ConfigActions.DISABLE:
systemd_action = ServiceActions.DISABLE
elif action is ConfigActions.STATUS:
systemd_action = ServiceActions.STATUS
else:
systemd_action = ServiceActions.AVAILABLE
return_value = simple_systemd(systemd_action, "dphys-swapfile.service")
return_value = simple_systemd(action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
@@ -524,31 +121,6 @@ def configure_dphys_swapfile(action: ConfigActions):
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wlan
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
@@ -556,11 +128,7 @@ def configure_gui(action: ConfigActions):
return gui_available
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
@@ -575,76 +143,47 @@ def configure_gui(action: ConfigActions):
raise ValueError(f"action {action} not supported")
def configure_wlan(action: ConfigActions):
revpi = RevPiConfig()
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("0")
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.AVAILABLE:
return revpi.with_wlan
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
if not revpi.with_wlan:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None
def get_rfkill_index(device_class_path: str) -> Optional[int]:
"""
Get the rfkill index for a device under a specific device class path.
This function searches for and extracts the rfkill index associated with
devices located under the given device class path. It uses a regular
expression to identify and parse the rfkill index from the paths
of matching rfkill device files.
Parameters:
device_class_path: str
The path to the device class directory where rfkill entries
are located.
Returns:
Optional[int]:
The index of the rfkill device if found, otherwise None.
"""
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path)
if match_index:
return int(match_index.group("index"))
return None
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With WLAN: ", rc.with_wlan)
if rc.with_wlan:
print(" class path: ", rc.class_path_wlan)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
print("With wifi: ", rc.with_wifi)
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)

View File

@@ -1,121 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from enum import Enum
from logging import getLogger
from threading import Thread
from typing import Optional
from pydbus import SystemBus
log = getLogger(__name__)
class ServiceActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]:
"""
Perform systemd service actions such as enable, disable, check status, or availability.
This function interacts with the systemd D-Bus API to manage and query the
state of services on a system. The supported actions include enabling a systemd
unit, disabling it, starting/stopping a unit, and checking its status or
availability. The function supports asynchronous configuration changes through
threads where applicable.
Parameters:
action (ServiceActions): The action to perform on the systemd service.
Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit to operate on (e.g., "example.service").
unmask (bool): When enabling a unit, if True, any masked unit file will
first be unmasked before proceeding with the operation. Defaults to False.
Returns:
Optional[bool]: The return value depends on the action. For STATUS or
AVAILABLE actions, it returns True if the unit satisfies the condition
(e.g., enabled and active, or available and loaded), False otherwise.
For other actions, it returns None.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ServiceActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
lst_change_unmask = []
if unmask:
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ServiceActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ServiceActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState in (
"active",
"activating",
)
elif action is ServiceActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None