44 Commits

Author SHA1 Message Date
Sven Sager
8fc058c7ee Update changelog for 0.0.3-1+deb12+1 release 2025-04-21 13:44:28 +02:00
0618bf8693 feat(deb): Add dependencies for dtoverlay command
This command is used in the `revpi_config.py` file to load overlays.
2025-04-21 13:43:00 +02:00
e1e9db8a21 Merge tag '0.0.3' into debian/bookworm
Release version 0.0.3
2025-04-21 13:36:02 +02:00
1fab228272 refactor: Rename WiFi to WLAN for consistent terminology
Updated variable names, function names, and comments to replace "WiFi"
with "WLAN" throughout the codebase. This ensures alignment with
standardized terminology and improves clarity in functionality and
configuration handling. Adjusted related configurations and interface
mappings accordingly.
2025-04-21 13:34:10 +02:00
fc82ec0eb9 feat(revpiconfig): Replace rfkill subprocess calls with sysfs writes
Updated Bluetooth and WiFi rfkill handling by replacing subprocess calls
to "rfkill" with direct writes to sysfs files. This change simplifies
the implementation and improves performance by avoiding external
command execution.
2025-04-21 13:25:56 +02:00
de1774f60e feat(cli): Add CLI support for RevPi configuration object (revpi-config)
This implements a new command "config" in the CLI to handle RevPi
configuration. It includes parsing and subparser setup for
configuration-related operations. The change improves usability by
extending CLI functionality to manage RevPi configuration objects.
2025-04-21 10:55:32 +02:00
8c145ef2ff feat(cli): Add CLI command for configuring Revpi features
Introduced a new CLI command to enable, disable, check the status, and
list available features for Revpi using D-Bus calls. This implementation
provides a structured interface for managing Revpi configurations via
command-line actions.
2025-04-21 10:55:11 +02:00
0ecd86bd64 feat(cli): Add get_properties helper function for DBus interactions
This function facilitates retrieving specific properties from a DBus
interface, improving code modularity and reusability. It supports both
system and session bus types, streamlining access to DBus resources.
2025-04-21 10:54:47 +02:00
555c781aed feat(dbus): Add InterfaceRevpiConfig to DBus interfaces list
Added the InterfaceRevpiConfig class to the list of DBus interfaces in
`bus_provider.py`. This ensures the new system configuration interface
is properly registered and accessible.
2025-04-21 10:21:36 +02:00
604cb61870 feat(dbus): Add Bluetooth configuration functionality
Introduce functionality to enable, disable, and check the status and
availability of Bluetooth devices using the `configure_bluetooth`
method. Integrate Bluetooth configuration into the feature management
system by mapping it in `interface_config.py`.
2025-04-21 10:17:56 +02:00
69e370f964 refactor(revpiconfig: Change Wi-Fi detection and rfkill index logic
Replaced inline rfkill index detection with a standalone
`get_rfkill_index` function for improved modularity. Removed
`_cm_with_wifi` and `_wlan_rfkill_index` attributes, utilizing
`_wlan_class_path` for Wi-Fi presence checks. Adjusted property and
output logic to incorporate the new function.
2025-04-21 10:05:20 +02:00
6eb7eeea40 feat(dbus): Add Wi-Fi configuration support to the system config
Introduces the `configure_wifi` function to handle Wi-Fi actions such as
enabling, disabling, and checking status. Updates the `ieee80211`
feature to use the new function, integrating Wi-Fi management into the
existing configuration framework.
2025-04-21 09:28:01 +02:00
18e6fb3d14 feat(revpiconfig): Enhance Wi-Fi detection and add rfkill index support
Improved logic to detect built-in and third-party Wi-Fi interfaces,
including integration of `rfkill` index retrieval for Wi-Fi devices.
This enhances support for various hardware setups and allows better
control over Wi-Fi functionality.
2025-04-21 09:05:47 +02:00
fe614d026a feat(dbus): Add support for configuring 'revpi-con-can' feature
Introduce the `configure_con_can` function to manage enabling,
disabling, status checking, and availability of the 'revpi-con-can'
feature. Update the `AVAILABLE_FEATURES` dictionary to integrate
'revpi-con-can' as a configurable feature.
2025-04-21 09:05:47 +02:00
870a55042e feat(dbus): Add support for configuring the external antenna
Introduce the `configure_external_antenna` function to manage external
antenna settings, including enable, disable, and status checks. Update
the feature configuration in `interface_config` to include this
functionality. This enhances WiFi-related configuration options.
2025-04-21 09:05:47 +02:00
2848fdcf54 feat(revpiconfig): Add ConfigTxt class for managing config.txt file
Introduced the ConfigTxt class to handle parsing, editing, and saving of
config.txt files. This includes methods for adding, clearing, and
retrieving configuration values, as well as handling dtoverlays and
dtparams. Enhanced system configuration capabilities by providing
structured support for config file operations.
2025-04-21 09:05:47 +02:00
d0f641f2b5 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-21 09:05:47 +02:00
e8eba43647 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-21 09:05:47 +02:00
0d601f623c feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-21 09:05:47 +02:00
66735a9eba refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-21 09:05:47 +02:00
7525c9f653 feat(revpiconfig): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-21 09:05:46 +02:00
3909fab379 feat(dbus): Add GUI configuration handling to interface_config.py
Introduced the `configure_gui` function to manage GUI enabling,
disabling, availability, and status retrieval. Updated the
`AVAILABLE_FEATURES` dictionary to include GUI management functionality,
leveraging systemd and os module operations.
2025-04-20 15:34:16 +02:00
9ce36c78e7 feat(dbus): Add D-Bus interface for system configuration in middleware
Introduced `InterfaceRevpiConfig` to manage feature actions like
enable/disable, status, and availability using D-Bus. Includes support
for predefined features with systemd integration and exception handling
for unsupported features.
2025-04-20 15:18:27 +02:00
c24c78761f fix(cli): Change absolute imports to relative imports 2025-04-20 15:12:09 +02:00
3cc64f514f feat(dbus): Add grep function to search for patterns in a file
The new `grep` function reads a specified file and returns lines
containing a given pattern. It handles file not found errors and logs
unexpected exceptions, improving resilience in file operations.
2025-04-20 15:11:43 +02:00
865d2ca7a9 refactor: Update interface name from 'picontrol' to 'PiControl'
Renamed all occurrences of 'picontrol' to 'PiControl' in the D-Bus
interface definitions, method calls, and test cases for consistency and
adherence to naming conventions. This ensures uniformity across the
codebase and resolves potential naming-related issues.
2025-04-20 12:19:41 +02:00
Sven Sager
686bf407a8 Update changelog for 0.0.2-1+deb12+1 release 2025-04-19 16:37:26 +02:00
c12c0d36f0 fix(deb): Skip tests because of missing SystemBus in build container 2025-04-19 16:36:40 +02:00
e2bf154185 feat(deb): Add dbus for testing to build dependencies 2025-04-19 16:34:03 +02:00
7506f56121 Merge tag 'v0.0.2' into debian/bookworm
Release version 0.0.2
2025-04-19 15:58:57 +02:00
157b7bd118 test(dbus): Add support for testing driver reset notification
Introduce `FakeResetDriverWatchdog` to simulate driver reset triggers.
Update existing tests and add a new test, `test_notify_reset_driver`, to
verify reset notifications using the event signaling mechanism.
2025-04-19 15:56:59 +02:00
26c3ac0afb refactor(dbus): D-Bus interface management with cleanup support.
Introduced a `DbusInterface` base class with a `cleanup` method to
standardize resource cleanup for D-Bus interfaces. Modified
`InterfacePiControl` to inherit from it and implemented `cleanup` logic
for proper watchdog resource handling. Adjusted `BusProvider` to manage
multiple interfaces and ensure cleanup on shutdown.
2025-04-19 15:55:49 +02:00
f8bc1532e3 refactor(dbus): Fix typo and remove unused thread instance
Corrected a typo in the `timeout` parameter name in `method_await_reset`
and removed an unused thread instance `th_sleep` from `dbus_helper.py`.
These changes improve code clarity and eliminate redundant components.
2025-04-19 15:05:47 +02:00
7207845b13 test(dbus): Add unit tests for PiControl D-Bus interface
Introduces initial tests for the PiControl interface in
`dbus_middleware1`. These tests cover basic functionality like checking
activity status and resetting the driver while verifying IOCTL calls.
2025-04-19 14:40:05 +02:00
3f808c55f8 test(dbus): Add unit test framework for dbus_middleware1 module
Introduces test infrastructure and mockup classes for
`dbus_middleware1`. Includes `BusProvider` test setup, fake device
implementations, and initialization logic to support session bus
testing. Enhances testability and isolation of the D-Bus middleware
components.
2025-04-19 14:40:05 +02:00
4ccc328d0b refactor(dbus): piControl driver reset with PiControlIoctl class
Replaces inline ioctl logic with the new `PiControlIoctl` class for
cleaner and reusable code. This improves readability, encapsulates
device operations, and simplifies error handling for resetting the
piControl driver.
2025-04-19 13:57:56 +02:00
76b53423c1 fix(dbus): Add error handling for DBus publishing and main loop
Wrap DBus publishing and main loop execution in try-except blocks to
capture and log failures. This ensures better visibility into errors and
prevents silent failures during runtime.
2025-04-19 12:43:22 +02:00
114cbd8099 refactor(cli): D-Bus helpers support session and system bus types
Introduced `BusType` enum to differentiate between session and system
bus usage in D-Bus calls. Updated `simple_call` and `await_signal`
functions to include a `bus_type` parameter, improving flexibility.
Adjusted `cli_picontrol` to leverage the new `BusType` parameter for
D-Bus interactions.
2025-04-19 12:24:37 +02:00
487d5b3d46 feat(dbus): Add running property to BusProvider
This property checks if the event loop is running, enhancing code
readability and convenience. It provides an easier way to monitor the
status of the loop, which may improve debugging and control flow
handling.
2025-04-19 12:13:45 +02:00
93b328bf3f feat(dbus): Add import for BusProvider in dbus_middleware1 module
This change includes the BusProvider import in the `__init__.py` file of
dbus_middleware1. It ensures the module has access to BusProvider
functionality, improving modularity and readiness for use.
2025-04-19 12:13:44 +02:00
bde3920fc1 feat: Add session bus option for local testing and development
Introduced a `--use-session-bus` flag to optionally use the D-Bus
session bus instead of the system bus. This allows better flexibility
for local testing and development scenarios without requiring
system-level changes. Updated related classes and functions to respect
the new flag.
2025-04-19 09:33:54 +02:00
a4ccb9081f refactor(dbus): Parameterize picontrol_device and config_rsc
Replaced hardcoded paths with configurable parameters `picontrol_device`
and `config_rsc` across multiple classes. This improves flexibility,
making the components adaptable to various environments or setups.
Updated corresponding initialization and method implementations to use
these parameters.
2025-04-19 08:21:24 +02:00
4c1dc1c9b5 refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py
The ResetDriverWatchdog class was relocated from dbus_helper.py to a new
helper module, process_image_helper.py, to improve code organization
and maintainability. Updated imports in relevant files to reflect this
change.
2025-04-19 08:13:02 +02:00
e756d68556 refactor(dbus): Move D-Bus helper functions to a dedicated file
Consolidated `REVPI_DBUS_*` constants and `extend_interface` function
into `dbus_helper.py` for better modularity and reusability. Updated
imports across modules to reflect this change.
2025-04-19 07:56:17 +02:00
23 changed files with 1189 additions and 90 deletions

50
debian/changelog vendored
View File

@@ -1,3 +1,53 @@
revpi-middleware (0.0.3-1+deb12+1) bookworm; urgency=medium
* refactor: Update interface name from 'picontrol' to 'PiControl'
* feat(dbus): Add `grep` function to search for patterns in a file
* fix(cli): Change absolute imports to relative imports
* feat(dbus): Add D-Bus interface for system configuration in middleware
* feat(dbus): Add GUI configuration handling to interface_config.py
* feat(revpiconfig): Add RevPiConfig class for device information handling
* refactor(dbus): Move system configuration methods to revpi_config.py
* feat(dbus): Remove 'var-log.mount' feature
* feat(dbus): Add avahi-daemon configuration to system services
* feat(dbus): Add dphys-swapfile configuration functionality
* feat(revpiconfig): Add ConfigTxt class for managing config.txt file
* feat(dbus): Add support for configuring the external antenna
* feat(dbus): Add support for configuring 'revpi-con-can' feature
* feat(revpiconfig): Enhance Wi-Fi detection and add rfkill index support
* feat(dbus): Add Wi-Fi configuration support to the system config
* refactor(revpiconfig: Change Wi-Fi detection and rfkill index logic
* feat(dbus): Add Bluetooth configuration functionality
* feat(dbus): Add InterfaceRevpiConfig to DBus interfaces list
* feat(cli): Add `get_properties` helper function for DBus interactions
* feat(cli): Add CLI command for configuring Revpi features
* feat(cli): Add CLI support for RevPi configuration object (revpi-config)
* feat(revpiconfig): Replace rfkill subprocess calls with sysfs writes
* refactor: Rename WiFi to WLAN for consistent terminology
* feat(deb): Add dependencies for `dtoverlay` command
-- Sven Sager <s.sager@kunbus.com> Mon, 21 Apr 2025 13:44:18 +0200
revpi-middleware (0.0.2-1+deb12+1) bookworm; urgency=medium
* refactor(dbus): Move D-Bus helper functions to a dedicated file
* refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py
* refactor(dbus): Parameterize `picontrol_device` and `config_rsc`
* feat: Add session bus option for local testing and development
* feat(dbus): Add import for BusProvider in dbus_middleware1 module
* feat(dbus): Add `running` property to `BusProvider`
* refactor(cli): D-Bus helpers support session and system bus types
* fix(dbus): Add error handling for DBus publishing and main loop
* refactor(dbus): piControl driver reset with PiControlIoctl class
* test(dbus): Add unit test framework for dbus_middleware1 module
* test(dbus): Add unit tests for PiControl D-Bus interface
* refactor(dbus): Fix typo and remove unused thread instance
* refactor(dbus): D-Bus interface management with cleanup support.
* test(dbus): Add support for testing driver reset notification
* feat(deb): Add dbus for testing to build dependencies
* fix(deb): Skip tests because of missing SystemBus in build container
-- Sven Sager <s.sager@kunbus.com> Sat, 19 Apr 2025 16:34:20 +0200
revpi-middleware (0.0.1-1+deb12+1) bookworm; urgency=medium
* docs: Start git project with python git-ignore and Readme

3
debian/control vendored
View File

@@ -7,6 +7,8 @@ Homepage: https://revolutionpi.com/
Vcs-Browser: https://gitlab.com/revolutionpi/revpi-middleware
Vcs-Git: https://gitlab.com/revolutionpi/revpi-middleware.git -b debian/bookworm
Build-Depends:
dbus,
dbus-x11,
debhelper-compat (= 13),
dh-python,
python3-all,
@@ -19,6 +21,7 @@ Package: revpi-middleware
Architecture: all
Pre-Depends: ${misc:Pre-Depends}
Depends:
raspi-utils-dt | libraspberrypi-bin,
${python3:Depends},
${misc:Depends}
Description: Revolution Pi middleware with D-Bus interface

5
debian/rules vendored
View File

@@ -5,3 +5,8 @@ export PYBUILD_INSTALL_ARGS=--install-lib=/usr/share/$(PYBUILD_NAME)/ --install-
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_auto_test:
# Currently, the tests have to be skipped, because no SystemBus is
# available in the Docker container.
@echo "Skipped tests"

View File

@@ -5,12 +5,11 @@
This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup.
"""
from argparse import ArgumentParser
from logging import getLogger
from revpi_middleware.cli_commands import cli_picontrol
from revpi_middleware.proginit import StdLogOutput
from . import cli_config, cli_picontrol
from .. import proginit as pi
from ..proginit import StdLogOutput
log = getLogger(__name__)
@@ -30,6 +29,11 @@ def setup_command_line_arguments():
help="RevPi PiControl object",
)
cli_picontrol.add_subparsers(obj_picontrol)
obj_config = rpictl_obj.add_parser(
"config",
help="RevPi configuration object (revpi-config)",
)
cli_config.add_subparsers(obj_config)
def main() -> int:
@@ -40,6 +44,9 @@ def main() -> int:
if obj == "picontrol":
rc = cli_picontrol.main()
elif obj == "config":
rc = cli_config.main()
else:
log.error(f"Unknown object: {obj}")
rc = 1

View File

@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, get_properties, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
parent_parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.",
)
parent_parser.add_argument(
"feature",
nargs="?",
default="",
help="Name of the feature to configer. To list all features use 'list-features' as action.",
)
def main() -> int:
action = pi.pargs.action
dbus_value = False
try:
if action == "list-features":
dbus_value = get_properties(
"available_features",
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
for feature in dbus_value:
print(feature)
return 0
# For the following actions, a feature name is required
if pi.pargs.feature == "":
raise Exception("Feature name is required")
if action == "enable":
simple_call(
"Enable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "disable":
simple_call(
"Disable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "status":
dbus_value = simple_call(
"GetStatus",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "available":
dbus_value = simple_call(
"GetAvailability",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
else:
raise Exception(f"Unknown action: {action}")
except Exception as e:
log.error(f"Error: {e}")
return 1
log.debug(
f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}"
)
print(int(dbus_value))
return 0

View File

@@ -4,7 +4,7 @@
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import await_signal, simple_call
from .dbus_helper import BusType, await_signal, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
@@ -36,12 +36,21 @@ def add_subparsers(parent_parser: ArgumentParser):
def method_reset():
log.debug("D-Bus call of method ResetDriver")
simple_call("ResetDriver", interface=extend_interface("picontrol"))
simple_call(
"ResetDriver",
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus")
def method_await_reset(timout: int = 0):
detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol"))
def method_await_reset(timeout: int = 0):
detected_signal = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal:
log.info("ResetDriver signal received")
else:

View File

@@ -1,50 +1,113 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands."""
from enum import Enum
from threading import Thread
from time import sleep
from gi.repository import GLib
from pydbus import SystemBus
from pydbus import SessionBus, SystemBus
from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH
from ..dbus_middleware1 import REVPI_DBUS_NAME
PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol"
RESET_DRIVER_METHOD = "ResetDriver"
class BusType(Enum):
SESSION = "session"
SYSTEM = "system"
def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH):
def get_properties(
property_name: str,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, property_name)
def simple_call(
method: str,
*args,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
"""
Executes a method on a specific D-Bus object interface within the RevPi system. This function
connects to the system bus, retrieves the desired interface and object path, and invokes
the specified method with provided arguments.
Performs a call to a D-Bus method on a specified interface and object.
This function uses the D-Bus messaging system to dynamically call a method
of a specified interface, using the given object path and bus type. It
provides a way to interact with D-Bus interfaces, using either a system or
session bus, and returns the result of executing the specified method.
Parameters:
method: str
The name of the method to be invoked on the targeted interface.
*args: tuple
Positional arguments to be passed to the method being invoked.
The name of the method to invoke on the D-Bus interface.
*args:
Additional positional arguments to pass to the specified D-Bus method.
interface: str
The name of the D-Bus interface providing the required functionality.
object_path: str, optional
The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH.
The name of the D-Bus interface containing the method.
object_path:
The path of the D-Bus object on which the interface is defined. Defaults
to REVPI_DBUS_BASE_PATH.
bus_type: BusType
Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM.
Returns:
Any
The result of the method invocation on the targeted D-Bus interface.
The value returned by the D-Bus method.
Raises:
Any errors raised from the D-Bus call will propagate to the caller.
"""
bus = SystemBus()
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, method)(*args)
def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH):
def await_signal(
signal_name: str,
timeout: int,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
"""
Waits for a specific signal and returns whether the signal was detected.
This function connects to a D-Bus interface and waits for a specific signal
to be emitted. If the signal is not received within the specified timeout
period, the function will return False. If the signal is detected within
the timeout, the function will return True. It can connect to either the
system bus or the session bus, depending on the provided `bus_type`.
Parameters:
signal_name: str
The name of the signal to be awaited.
timeout: int
The maximum time to wait for the signal, in seconds. A value of 0 or
less means that there is no timeout.
interface: str
The name of the D-Bus interface to listen on.
object_path
The D-Bus object path where the interface resides. Defaults to
REVPI_DBUS_BASE_PATH.
bus_type
The type of D-Bus to connect to. Can be either BusType.SYSTEM or
BusType.SESSION. Defaults to BusType.SYSTEM.
Returns:
bool
True if the signal was detected within the timeout period, False
otherwise.
"""
detected_signal = False
timeout = int(timeout)
loop = GLib.MainLoop()
th_sleep = Thread()
def th_timeout():
sleep(timeout)
@@ -55,7 +118,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV
detected_signal = True
loop.quit()
bus = SystemBus()
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]

View File

@@ -42,7 +42,7 @@ class MiddlewareDaemon:
if self.bus_provider and self.bus_provider.is_alive():
return
self.bus_provider = BusProvider()
self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus)
self.bus_provider.start()
log.debug("leave MiddlewareDaemon.dbus_start")

View File

@@ -2,27 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus middleware version 1 of revpi_middleware."""
from ..__about__ import __author__, __copyright__, __license__, __version__
from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .dbus_helper import extend_interface
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
def extend_interface(*args) -> str:
"""
Extends an interface name by appending additional segments to a pre-defined base name.
This function takes multiple arguments, concatenates them with a predefined base
interface name, and returns the resulting string, effectively constructing an
extended interface name.
Args:
*args: str
Components to be appended to the base interface name.
Returns:
str
Fully constructed interface name by joining the base interface name with
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])
from .bus_provider import BusProvider

View File

@@ -6,35 +6,71 @@ from logging import getLogger
from threading import Thread
from gi.repository import GLib
from pydbus import SystemBus
from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig
log = getLogger(__name__)
class BusProvider(Thread):
def __init__(self):
def __init__(
self,
picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc",
use_system_bus=True,
):
log.debug("enter BusProvider.__init__")
super().__init__()
self._bus = SystemBus()
self._bus = SystemBus() if use_system_bus else SessionBus()
self._loop = GLib.MainLoop()
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc
def run(self):
log.debug("enter BusProvider.run")
self._bus.publish(
REVPI_DBUS_NAME,
InterfacePiControl(),
)
# The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object
# Example(),
# ("Subdir1", Example()),
# ("Subdir2", Example()),
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(),
]
try:
self._bus.publish(
REVPI_DBUS_NAME,
*lst_interfaces,
)
except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
try:
self._loop.run()
except Exception as e:
log.error(f"can not run dbus mainloop: {e}")
# Clean up all interfaces
for interface in lst_interfaces:
if type(interface) is tuple:
_, interface = interface
interface.cleanup()
self._loop.run()
log.debug("leave BusProvider.run")
def stop(self):
log.debug("enter BusProvider.stop")
self._loop.quit()
log.debug("leave BusProvider.stop")
@property
def running(self):
return self._loop.is_running()

View File

@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus."""
from logging import getLogger
log = getLogger(__name__)
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface:
def cleanup(self):
"""
Represents a method responsible for performing cleanup operations. This method is executed to properly
release resources, close connections, or perform other necessary finalization tasks.
This method does not take any arguments or return a value.
"""
pass
def extend_interface(*args) -> str:
"""
Extends an interface name by appending additional segments to a pre-defined base name.
This function takes multiple arguments, concatenates them with a predefined base
interface name, and returns the resulting string, effectively constructing an
extended interface name.
Args:
*args: str
Components to be appended to the base interface name.
Returns:
str
Fully constructed interface name by joining the base interface name with
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])
def grep(pattern, filename):
"""
Searches for lines in a file that contain a given pattern and returns them as a list.
The function reads lines from the specified file and checks whether each line
contains the provided pattern. It returns a list of lines that match the
pattern. If the file is not found, an empty list is returned. Any other
exceptions during the file reading process are caught and logged.
Args:
pattern (str): The substring to search for within the file's lines.
filename (str): The path to the file that will be searched.
Returns:
list[str]: A list containing lines that include the provided pattern, with
leading and trailing spaces removed.
Raises:
FileNotFoundError: This error is caught if the file specified is not
found.
Exception: Any unforeseen exception during file operations is caught and
logged.
"""
try:
with open(filename, "r") as file:
# Gibt alle Zeilen zurück, die das Muster enthalten
matching_lines = [line.strip() for line in file if pattern in line]
return matching_lines
except FileNotFoundError:
return []
except Exception as e:
log.error(f"Error reading file: {e}")
return []

View File

@@ -2,21 +2,20 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl."""
import os
from fcntl import ioctl
from logging import getLogger
from pydbus.generic import signal
from ..interface_helper import ResetDriverWatchdog
from .process_image_helper import PiControlIoctl, ResetDriverWatchdog
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
class InterfacePiControl:
class InterfacePiControl(DbusInterface):
"""
<node>
<interface name='com.revolutionpi.middleware1.picontrol'>
<interface name='com.revolutionpi.middleware1.PiControl'>
<signal name="NotifyDriverReset">
</signal>
<method name='ResetDriver'>
@@ -27,12 +26,16 @@ class InterfacePiControl:
NotifyDriverReset = signal()
def __init__(self):
self.pi_control = "/dev/piControl0"
def __init__(self, picontrol_device: str, config_rsc: str):
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc
self.wd_reset_driver = ResetDriverWatchdog(self.pi_control)
self.wd_reset_driver = ResetDriverWatchdog(self.picontrol_device)
self.wd_reset_driver.register_call(self.notify_reset_driver)
def cleanup(self):
self.wd_reset_driver.stop()
def notify_reset_driver(self):
self.NotifyDriverReset()
@@ -40,21 +43,9 @@ class InterfacePiControl:
log.debug("enter InterfacePiControl.ResetDriver")
try:
fd = os.open(self.pi_control, os.O_WRONLY)
except Exception as e:
log.warning(f"could not open ${self.pi_control} to reset driver")
raise e
execption = None
try:
# KB_RESET _IO('K', 12 ) // reset the piControl driver including the config file
ioctl(fd, 19212)
picontrol_ioctl = PiControlIoctl(self.picontrol_device)
picontrol_ioctl.ioctl(PiControlIoctl.IOCTL_RESET_DRIVER)
log.info("reset piControl driver")
except Exception as e:
log.warning(f"could not reset piControl driver: ${e}")
execption = e
finally:
os.close(fd)
if execption:
raise execption
raise e

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2020-2023 Sven Sager
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""
Helper for the process image.
@@ -7,8 +7,8 @@ Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
"""
import os
from ctypes import c_int
from fcntl import ioctl
from logging import getLogger
from threading import Thread
@@ -19,9 +19,9 @@ log = getLogger(__name__)
class ResetDriverWatchdog(Thread):
"""Watchdog to catch the reset_driver action."""
def __init__(self, pi_control_device="/dev/piControl0"):
def __init__(self, picontrol_device: str):
super(ResetDriverWatchdog, self).__init__()
self.procimg = pi_control_device
self.procimg = picontrol_device
self.daemon = True
self._calls = []
self._exit = False
@@ -35,7 +35,7 @@ class ResetDriverWatchdog(Thread):
"""
Mainloop of watchdog for reset_driver.
If the thread can not open the process image or the IOCTL is not
If the thread cannot open the process image or the IOCTL is not
implemented (wheezy), the thread function will stop. The trigger
property will always return True.
"""
@@ -51,7 +51,7 @@ class ResetDriverWatchdog(Thread):
)
return
# The ioctl will return 2 byte (c-type int)
# The ioctl will return 2 bytes (c-type int)
byte_buff = bytearray(2)
while not self._exit:
try:
@@ -73,7 +73,7 @@ class ResetDriverWatchdog(Thread):
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
return ValueError("Function is not callable.")
raise ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
@@ -89,7 +89,7 @@ class ResetDriverWatchdog(Thread):
log.debug("leave ResetDriverWatchdog.stop()")
def unregister_call(self, function=None):
"""Remove a function call on watchdog trigger."""
"""Remove a function from the watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
@@ -101,3 +101,24 @@ class ResetDriverWatchdog(Thread):
rc = self._triggered
self._triggered = False
return rc
class PiControlIoctl:
IOCTL_RESET_DRIVER = 19212
def __init__(self, picontrol_device: str):
self.picontrol_device = picontrol_device
def ioctl(self, request, arg=0):
if type(arg) is c_int:
arg = arg.value
_fd = os.open(self.picontrol_device, os.O_WRONLY)
return_value = ioctl(_fd, 19212, arg)
os.close(_fd)
return return_value
@property
def name(self):
return self.picontrol_device

View File

@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig

View File

@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration."""
from collections import namedtuple
from logging import getLogger
from .revpi_config import (
ConfigActions,
configure_avahi_daemon,
configure_bluetooth,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
configure_wlan,
simple_systemd,
)
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
class InterfaceRevpiConfig(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.RevpiConfig">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
</interface>
</node>
"""
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.STATUS, *feature_function.args)
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args)
@property
def available_features(self) -> list[str]:
return list(AVAILABLE_FEATURES.keys())
def get_feature(feature: str) -> FeatureFunction:
if feature not in AVAILABLE_FEATURES:
raise ValueError(f"feature {feature} does not exist")
feature_function = AVAILABLE_FEATURES[feature]
if not feature_function:
raise NotImplementedError(f"feature {feature} is not implemented")
return feature_function
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -0,0 +1,434 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
UNKNOWN = 0
CM1 = 6
CM3 = 10
CM4 = 20
CM4S = 21
CM5 = 24
class ConfigActions(Enum):
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class RevPiConfig:
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self.serial = ""
self.model = ""
self._init_device_info()
def _init_device_info(self):
dc_cpuinfo = {}
# Extract CPU information
with open("/proc/cpuinfo", "r") as f:
line = "\n"
while line:
line = f.readline()
if line.startswith(("Revision", "Serial", "Model")):
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
dc_cpuinfo[key] = value
self.model = dc_cpuinfo.get("model", "")
self.serial = dc_cpuinfo.get("serial", "")
# Detect Compute Module type
revision = dc_cpuinfo.get("revision", "")
if revision:
revision = int(revision, 16)
mask = 4080 # 0xFF0 in dezimal
try:
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
except ValueError:
pass
# Detect WLAN on CM module
could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wlan:
wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0")
if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
for wlan_interface in lst_wlan_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
if could_have_con_bridge:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def class_path_wlan(self) -> str:
return self._wlan_class_path
@property
def cm_type(self) -> ComputeModuleTypes:
return self._cm_type
@property
def with_con_bridge(self) -> bool:
return self._revpi_with_con_bridge
@property
def with_wlan(self) -> bool:
return bool(self._wlan_class_path)
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
# If the bluetooth device is not present, the device should have been
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
# based on CM4 and newer). Nothing we can do here, so treat the interface
# as disabled.
if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.STATUS:
if bt_rfkill_index is None:
return False
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
elif action is ConfigActions.AVAILABLE:
return bt_rfkill_index is not None
else:
raise ValueError(f"action {action} not supported")
return None
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
# Remove swapfile afer disabling the service unit
subprocess.call(
["/sbin/dphys-swapfile", "uninstall"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wlan
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def configure_wlan(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.ENABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.AVAILABLE:
return revpi.with_wlan
elif action is ConfigActions.STATUS:
if not revpi.with_wlan:
return False
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
else:
raise ValueError(f"action {action} not supported")
return None
def get_rfkill_index(device_class_path: str) -> Optional[int]:
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path)
if match_index:
return int(match_index.group("index"))
return None
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With WLAN: ", rc.with_wlan)
if rc.with_wlan:
print(" class path: ", rc.class_path_wlan)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)

View File

@@ -9,7 +9,7 @@ __version__ = "1.4.0"
import logging
import sys
from argparse import ArgumentParser, Namespace
from argparse import ArgumentParser, Namespace, SUPPRESS
from configparser import ConfigParser
from enum import Enum
from os import R_OK, W_OK, access, environ, getpid, remove
@@ -262,6 +262,16 @@ parser = ArgumentParser(
prog=programname,
description="Program description",
)
# Use session bus of D-Bus for local testing and development proposes (hidden)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument("--version", action="version", version=f"%(prog)s {program_version}")
parser.add_argument(
"-f",

3
tests/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later

View File

@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from os import environ
# D-BUS needs a DISPLAY variable to work with the session bus
environ["DISPLAY"] = ":0"

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from time import sleep
from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup
class TestBusProvider(PiControlDeviceMockup):
def setUp(self):
super().setUp()
# Do not import things on top of the module. Some classes or functions need to be mocked up first.
from revpi_middleware.dbus_middleware1 import BusProvider
# Prepare the bus provider and start it
self.bp = BusProvider(
self.picontrol.name,
use_system_bus=False,
)
self.bp.start()
# Wait 5 seconds until the bus provider has started the main loop
counter = 50
while not self.bp.running and counter > 0:
counter -= 1
sleep(0.1)
def tearDown(self):
self.bp.stop()
self.bp.join(10.0)
if self.bp.is_alive():
raise RuntimeError("Bus provider thread is still running")
super().tearDown()

View File

@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from ctypes import c_int
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from threading import Event, Thread
from unittest import TestCase
IOCTL_QUEUE = Queue()
RESET_DRIVER_EVENT = Event()
class FakePiControlDevice:
IOCTL_RESET_DRIVER = 19212
def __init__(self, picontrol_device: str):
self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_")
self.name = self._fh.name
def __del__(self):
self._fh.close()
def reset_process_image(self):
self._fh.write(b"\x00" * 4096)
self._fh.seek(0)
def ioctl(self, request, arg=0) -> int:
if type(arg) is c_int:
arg = arg.value
if request == self.IOCTL_RESET_DRIVER:
pass
else:
raise NotImplementedError(f"Unknown IOCTL request: {request}")
IOCTL_QUEUE.put_nowait((request, arg))
return arg
def close(self):
self._fh.close()
def read(self, size):
return self._fh.read(size)
def seek(self, offset, whence):
self._fh.seek(offset, whence)
def write(self, buffer):
return self._fh.write(buffer)
class FakeResetDriverWatchdog(Thread):
def __init__(self, picontrol_device: str):
super().__init__()
self.daemon = True
self._calls = []
self._exit = False
self.not_implemented = True
self._triggered = False
self.start()
def run(self):
while not self._exit:
if RESET_DRIVER_EVENT.wait(0.1):
RESET_DRIVER_EVENT.clear()
self._triggered = True
for func in self._calls:
func()
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
raise ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
def stop(self):
"""Stop watchdog for reset_driver."""
self._exit = True
def unregister_call(self, function=None):
"""Remove a function from the watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
self._calls.remove(function)
@property
def triggered(self):
"""Will return True one time after watchdog was triggered."""
rc = self._triggered
self._triggered = False
return rc
class PiControlDeviceMockup(TestCase):
def setUp(self):
super().setUp()
# Empty the queue
while True:
try:
IOCTL_QUEUE.get_nowait()
except Empty:
break
# Replace classes with mockup classes
import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers
test_helpers.PiControlIoctl = FakePiControlDevice
test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog
# Create a fake picontrol0 device
self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0")
def tearDown(self):
self.picontrol.close()
super().tearDown()

View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later

View File

@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from threading import Thread
from time import sleep
from revpi_middleware.cli_commands.dbus_helper import BusType, await_signal, simple_call
from revpi_middleware.dbus_middleware1 import extend_interface
from tests.dbus_middleware1.bus_provider import TestBusProvider
from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE, RESET_DRIVER_EVENT
class TestObjectPicontrol(TestBusProvider):
def test_is_active(self):
self.assertTrue(self.bp.running)
def test_reset_driver(self):
simple_call(
"ResetDriver",
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
ioctl_call = IOCTL_QUEUE.get(timeout=2.0)
self.assertEqual((19212, 0), ioctl_call)
def test_notify_reset_driver(self):
timeout = 5
def target_call_reset_driver():
sleep(1.0)
RESET_DRIVER_EVENT.set()
th_wait_for_reset = Thread(target=target_call_reset_driver, daemon=True)
th_wait_for_reset.start()
result = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
self.assertTrue(result)
th_wait_for_reset.join(timeout=timeout)