49 Commits

Author SHA1 Message Date
Sven Sager
b4f817b477 feat(examples): Add revpi-config CLI tool for RevPi configuration
Introduced a new Python-based command-line tool `revpi-config` to manage
Revolution Pi features via D-Bus. The tool supports enabling,
disabling, checking status, and availability for various features.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-04 08:10:52 +02:00
Sven Sager
66046b6a9d feat(dbus): Update systemd unit state check to include 'activating'
Previously, the check only considered 'active' as a valid ActiveState.
This update ensures the system also accounts for units in the
'activating' state, improving compatibility with transitional states in
systemd services.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:54:44 +02:00
Sven Sager
9ec1e04d2b feat(dbus): Update service key for systemd-timesyncd to 'ntp'
Renamed the key 'systemd-timesyncd' to 'ntp' in the services mapping.
This improves naming consistency and aligns it with the expected service
identifier. No functional changes were introduced.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:54:14 +02:00
Sven Sager
1cb1062138 feat(dbus): Update feature key from 'dphys-swapfile' to 'swapfile'
Renamed the feature key for swapfile configuration to ensure consistency
and simplify naming.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:28:07 +02:00
Sven Sager
8d01eee66d feat(dbus): Update status change notification in feature toggle methods
Added calls to `StatusChanged` in `Disable` and `Enable` methods to
notify when a feature's state is updated. This ensures proper tracking
and external communication of feature status changes.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:28:07 +02:00
Sven Sager
40059d6068 feat(dbus): Add InterfaceSoftwareServices to bus provider initialization
This change integrates the InterfaceSoftwareServices class into the list
of interfaces initialized by the bus provider. It ensures the new
interface is properly registered and available for use, improving
system functionality and extensibility.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
50789853e9 feat(dbus): Add D-Bus interface for managing software services
Introduce `InterfaceSoftwareServices` to handle service enable/disable
actions, status, and availability via D-Bus. Consolidate `avahi` service
configuration into the new interface by removing redundant logic from
`revpi_config.py`.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
d2e2f1c7f3 refactor(dbus): Move simple_systemd to systemd_helper
Moved the `simple_systemd` function to a dedicated `systemd_helper`
module to improve code organization and reusability. Updated imports
accordingly in affected files.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 14:18:37 +02:00
Sven Sager
32574e0815 refactor: DBus interface includes bus instance
Updated `InterfaceRevpiConfig` to require a bus parameter, ensuring
proper initialization. Introduced a constructor in `DbusInterface` to
store the bus instance.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 12:37:15 +02:00
Sven Sager
a29979ad78 feat(dbus): Add AvailabilityChanged signal to .RevpiConfig
This signal provides a mechanism for notifying changes in availability
status. It will complement the existing `StatusChanged` signal and
enhance the system's event-handling capabilities.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-27 07:58:40 +02:00
Sven Sager
431f9308de feat(dbus): Add StatusChanged DBus signal to interface configuration
Introduced the `StatusChanged` signal in `.RevpiConfig` interface to
notify changes in feature status. Emitting this signal in `Enable` and
`Disable` methods ensures real-time updates for feature state changes.
This enhances communication and monitoring within the DBus middleware.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-26 15:21:08 +02:00
Sven Sager
dc2d1ab3a0 ci: Test in a bookworm container
In order to be able to provide all functions of the DBus during testing,
the CI uses the Python 3.11 version in a Bookworm container for
testing. This is the version that is pre-installed in Debian Bookworm.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
983c6cefea ci: Start dbus session bus for testing
A dbus session bus must be started for testing. The command
`dbus-run-session` creates a session for the following command. As a
result, `pytest` runs with its own session bus for the tests.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
a977884e17 ci: Set machine-id for dbus
Systemd does not exist in the docker file. So we just set an machine-id,
which the dbus-daemon can use to start a session.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:54:30 +02:00
Sven Sager
20fb85a3f0 ci: Install system dependencies for testing
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:52:43 +02:00
Sven Sager
f4efb1daae fix: Add missing reuse license information
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-23 09:36:41 +02:00
Sven Sager
fb181f8810 ci: Add Revolution Pi pipelines
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-22 13:14:22 +02:00
Sven Sager
3dee7784e2 ci: Add GitLab python pipelines
Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-05-22 13:14:22 +02:00
cc560770ce feat(revpiconfig): Make unit config changes asynchronous
Refactored unit enable/disable actions to run in separate threads,
ensuring non-blocking operations. This enhances performance and avoids
potential delays during systemd operations.
2025-04-22 13:42:33 +02:00
4df903783c fix(revpiconfig): Ensure systemd reloads after unit changes
Added systemd reload calls after unit enable/disable to reflect changes.
Adjusted DBus method calls to capture and utilize change outputs
effectively. This improves reliability in applying unit modifications.
2025-04-22 12:35:43 +02:00
8db1f59cfe doc(revpiconfig): Docstrings for get_rfkill_index and simple_systemd
The new docstrings provide detailed explanations of the purpose,
parameters, and return values for both functions, improving code
readability and maintainability. This ensures better understanding for
future contributors and reduces ambiguity.
2025-04-22 11:06:32 +02:00
7051eba9b9 doc(revpiconfig): Add docstrings to enums in revpi_config.py
This update introduces detailed docstrings for the `ComputeModuleTypes`
and `ConfigActions` enumeration classes. The docstrings provide
descriptions for each class and their attributes, improving code
readability and maintainability.
2025-04-22 11:06:32 +02:00
04780bd0dd doc(revpiconfig): Add docstrings to RevPiConfig class and methods
Enhance documentation for the `RevPiConfig` class, its methods, and
properties to improve code readability and ease of use. The added
docstrings provide clear explanations of the class's purpose,
attributes, and functionality for developers and users. This update
supports better maintainability and understanding of the codebase.
2025-04-22 11:06:32 +02:00
41fb2b3c61 doc(revpiconfig): Add detailed docstrings to ConfigTxt methods
Enhance the `ConfigTxt` class with comprehensive docstrings for all
methods, providing clear explanations of their functionality,
parameters, and return values. This improves code readability and
facilitates easier maintenance and understanding for future developers.
2025-04-22 11:06:32 +02:00
41d9b13e71 fix(dbus): Update systemd interface and path handling
Revised DBus interactions to explicitly use `org.freedesktop.systemd1`
interface and path. This ensures that the correct interfaces are used
and bypasses ".systemd1" magic from the library `pydbus`.
2025-04-22 10:59:59 +02:00
463a61a001 fix(dbus): Update DBus policy file path and interface name
Change the comment to reflect the new DBus policy file location. Adjust
the interface name to use `PiControl` instead of `picontrol` for
consistency.
2025-04-22 10:37:36 +02:00
1fab228272 refactor: Rename WiFi to WLAN for consistent terminology
Updated variable names, function names, and comments to replace "WiFi"
with "WLAN" throughout the codebase. This ensures alignment with
standardized terminology and improves clarity in functionality and
configuration handling. Adjusted related configurations and interface
mappings accordingly.
2025-04-21 13:34:10 +02:00
fc82ec0eb9 feat(revpiconfig): Replace rfkill subprocess calls with sysfs writes
Updated Bluetooth and WiFi rfkill handling by replacing subprocess calls
to "rfkill" with direct writes to sysfs files. This change simplifies
the implementation and improves performance by avoiding external
command execution.
2025-04-21 13:25:56 +02:00
de1774f60e feat(cli): Add CLI support for RevPi configuration object (revpi-config)
This implements a new command "config" in the CLI to handle RevPi
configuration. It includes parsing and subparser setup for
configuration-related operations. The change improves usability by
extending CLI functionality to manage RevPi configuration objects.
2025-04-21 10:55:32 +02:00
8c145ef2ff feat(cli): Add CLI command for configuring Revpi features
Introduced a new CLI command to enable, disable, check the status, and
list available features for Revpi using D-Bus calls. This implementation
provides a structured interface for managing Revpi configurations via
command-line actions.
2025-04-21 10:55:11 +02:00
0ecd86bd64 feat(cli): Add get_properties helper function for DBus interactions
This function facilitates retrieving specific properties from a DBus
interface, improving code modularity and reusability. It supports both
system and session bus types, streamlining access to DBus resources.
2025-04-21 10:54:47 +02:00
555c781aed feat(dbus): Add InterfaceRevpiConfig to DBus interfaces list
Added the InterfaceRevpiConfig class to the list of DBus interfaces in
`bus_provider.py`. This ensures the new system configuration interface
is properly registered and accessible.
2025-04-21 10:21:36 +02:00
604cb61870 feat(dbus): Add Bluetooth configuration functionality
Introduce functionality to enable, disable, and check the status and
availability of Bluetooth devices using the `configure_bluetooth`
method. Integrate Bluetooth configuration into the feature management
system by mapping it in `interface_config.py`.
2025-04-21 10:17:56 +02:00
69e370f964 refactor(revpiconfig: Change Wi-Fi detection and rfkill index logic
Replaced inline rfkill index detection with a standalone
`get_rfkill_index` function for improved modularity. Removed
`_cm_with_wifi` and `_wlan_rfkill_index` attributes, utilizing
`_wlan_class_path` for Wi-Fi presence checks. Adjusted property and
output logic to incorporate the new function.
2025-04-21 10:05:20 +02:00
6eb7eeea40 feat(dbus): Add Wi-Fi configuration support to the system config
Introduces the `configure_wifi` function to handle Wi-Fi actions such as
enabling, disabling, and checking status. Updates the `ieee80211`
feature to use the new function, integrating Wi-Fi management into the
existing configuration framework.
2025-04-21 09:28:01 +02:00
18e6fb3d14 feat(revpiconfig): Enhance Wi-Fi detection and add rfkill index support
Improved logic to detect built-in and third-party Wi-Fi interfaces,
including integration of `rfkill` index retrieval for Wi-Fi devices.
This enhances support for various hardware setups and allows better
control over Wi-Fi functionality.
2025-04-21 09:05:47 +02:00
fe614d026a feat(dbus): Add support for configuring 'revpi-con-can' feature
Introduce the `configure_con_can` function to manage enabling,
disabling, status checking, and availability of the 'revpi-con-can'
feature. Update the `AVAILABLE_FEATURES` dictionary to integrate
'revpi-con-can' as a configurable feature.
2025-04-21 09:05:47 +02:00
870a55042e feat(dbus): Add support for configuring the external antenna
Introduce the `configure_external_antenna` function to manage external
antenna settings, including enable, disable, and status checks. Update
the feature configuration in `interface_config` to include this
functionality. This enhances WiFi-related configuration options.
2025-04-21 09:05:47 +02:00
2848fdcf54 feat(revpiconfig): Add ConfigTxt class for managing config.txt file
Introduced the ConfigTxt class to handle parsing, editing, and saving of
config.txt files. This includes methods for adding, clearing, and
retrieving configuration values, as well as handling dtoverlays and
dtparams. Enhanced system configuration capabilities by providing
structured support for config file operations.
2025-04-21 09:05:47 +02:00
d0f641f2b5 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-21 09:05:47 +02:00
e8eba43647 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-21 09:05:47 +02:00
0d601f623c feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-21 09:05:47 +02:00
66735a9eba refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-21 09:05:47 +02:00
7525c9f653 feat(revpiconfig): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-21 09:05:46 +02:00
3909fab379 feat(dbus): Add GUI configuration handling to interface_config.py
Introduced the `configure_gui` function to manage GUI enabling,
disabling, availability, and status retrieval. Updated the
`AVAILABLE_FEATURES` dictionary to include GUI management functionality,
leveraging systemd and os module operations.
2025-04-20 15:34:16 +02:00
9ce36c78e7 feat(dbus): Add D-Bus interface for system configuration in middleware
Introduced `InterfaceRevpiConfig` to manage feature actions like
enable/disable, status, and availability using D-Bus. Includes support
for predefined features with systemd integration and exception handling
for unsupported features.
2025-04-20 15:18:27 +02:00
c24c78761f fix(cli): Change absolute imports to relative imports 2025-04-20 15:12:09 +02:00
3cc64f514f feat(dbus): Add grep function to search for patterns in a file
The new `grep` function reads a specified file and returns lines
containing a given pattern. It handles file not found errors and logs
unexpected exceptions, improving resilience in file operations.
2025-04-20 15:11:43 +02:00
865d2ca7a9 refactor: Update interface name from 'picontrol' to 'PiControl'
Renamed all occurrences of 'picontrol' to 'PiControl' in the D-Bus
interface definitions, method calls, and test cases for consistency and
adherence to naming conventions. This ensures uniformity across the
codebase and resolves potential naming-related issues.
2025-04-20 12:19:41 +02:00
30 changed files with 1421 additions and 152 deletions

4
.gitignore vendored
View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

40
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
default:
tags:
- self-hosted
- host-arm64
- high-perf
include:
- project: "revolutionpi/infrastructure/ci-templates"
file: "base.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "check-commit/lint-commit.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "reuse-lint.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "package-devel.yml"
- local: debian/gitlab-ci.yml
rules:
- exists:
- debian/gitlab-ci.yml
run_tests:
stage: test
image: python:3.11-bookworm
script:
- apt-get update
- apt-get -y install dbus libgirepository1.0-dev
- dbus-uuidgen --ensure=/etc/machine-id
- pip install -r requirements.txt
- PYTHONPATH=src dbus-run-session -- pytest -v --junitxml=report.xml --cov=src --cov-report term --cov-report xml:coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: ${CI_PROJECT_DIR}/report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
recursive-include .reuse *
recursive-include data *
recursive-include LICENSES *

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
SHELL := bash
MAKEFLAGS = --no-print-directory --no-builtin-rules
.DEFAULT_GOAL = all

View File

@@ -1,3 +1,9 @@
<!--
SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface.

View File

@@ -1,4 +1,4 @@
<!-- /etc/dbus-1/system.d/revpi-middleware.conf -->
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.middleware1.conf -->
<busconfig>
<!-- Allow full access to root as the bus owner -->
<policy user="root">
@@ -12,7 +12,7 @@
<allow send_destination="com.revolutionpi.middleware1"
send_interface="org.freedesktop.DBus.Introspectable"/>
<allow send_destination="com.revolutionpi.middleware1"
send_interface="com.revolutionpi.middleware1.picontrol"/>
send_interface="com.revolutionpi.middleware1.PiControl"/>
</policy>
<!-- Standard-Policy -->

48
debian/changelog vendored
View File

@@ -1,48 +0,0 @@
revpi-middleware (0.0.2-1+deb12+1) bookworm; urgency=medium
* refactor(dbus): Move D-Bus helper functions to a dedicated file
* refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py
* refactor(dbus): Parameterize `picontrol_device` and `config_rsc`
* feat: Add session bus option for local testing and development
* feat(dbus): Add import for BusProvider in dbus_middleware1 module
* feat(dbus): Add `running` property to `BusProvider`
* refactor(cli): D-Bus helpers support session and system bus types
* fix(dbus): Add error handling for DBus publishing and main loop
* refactor(dbus): piControl driver reset with PiControlIoctl class
* test(dbus): Add unit test framework for dbus_middleware1 module
* test(dbus): Add unit tests for PiControl D-Bus interface
* refactor(dbus): Fix typo and remove unused thread instance
* refactor(dbus): D-Bus interface management with cleanup support.
* test(dbus): Add support for testing driver reset notification
* feat(deb): Add dbus for testing to build dependencies
* fix(deb): Skip tests because of missing SystemBus in build container
-- Sven Sager <s.sager@kunbus.com> Sat, 19 Apr 2025 16:34:20 +0200
revpi-middleware (0.0.1-1+deb12+1) bookworm; urgency=medium
* docs: Start git project with python git-ignore and Readme
* docs: Use 'reuse' for SPDX Headers and Licenses
* feat: Add python base project files
* feat: Add proginit application basic module
* feat: Add the data directory for additional data files for the project
* test: Add tests directory with a dummy test
* build: Add all necessary files for the build system
* feat: Add dummy main application script
* feat: Add systemd file and data to integrate the app as a daemon
* chore: Update proginit to 1.4.0
* feat(dbus): Add ResetDriverWatchdog helper as global dbus helper
* feat(process_image): Add D-Bus interface for piControl driver
* feat(dbus): Add initial D-Bus middleware implementation
* feat(dbus): Add `extend_interface` function for dynamic interface naming
* feat(dbus): Add DBus policy configuration for revpi-middleware
* feat: Add MiddlewareDaemon implementation to revpi-middleware
* feat: Add daemon mode and signal handling to main application
* feat(cli): Add D-Bus helper functions for CLI commands.
* feat(cli): Add await_signal function to handle D-Bus signals
* feat(cli): Add `await-reset` to wait for piControl reset signal
* chore(build): Update requirements for this project
* feat(cli): Add new CLI tool entry point for `revpictl`
* feat(deb): Start packaging branch
-- Sven Sager <s.sager@kunbus.com> Fri, 18 Apr 2025 19:02:20 +0200

38
debian/control vendored
View File

@@ -1,38 +0,0 @@
Source: revpi-middleware
Section: python
Priority: optional
Maintainer: KUNBUS GmbH <support@kunbus.com>
Rules-Requires-Root: no
Homepage: https://revolutionpi.com/
Vcs-Browser: https://gitlab.com/revolutionpi/revpi-middleware
Vcs-Git: https://gitlab.com/revolutionpi/revpi-middleware.git -b debian/bookworm
Build-Depends:
dbus,
dbus-x11,
debhelper-compat (= 13),
dh-python,
python3-all,
python3-gi (>= 3.42.2),
python3-pydbus (>= 0.6.0),
python3-setuptools,
Standards-Version: 4.6.2
Package: revpi-middleware
Architecture: all
Pre-Depends: ${misc:Pre-Depends}
Depends:
${python3:Depends},
${misc:Depends}
Description: Revolution Pi middleware with D-Bus interface
The Revolution Pi middleware provides a robust communication interface for
Revolution Pi industrial computers. It enables seamless integration between
hardware components and applications through a D-Bus interface. The middleware
serves as a bridge for data exchange, device configuration, and system
monitoring.
.
Key features:
* Hardware abstraction layer for Revolution Pi I/O modules
* Real-time data processing and event handling
* Simplified API for accessing Revolution Pi hardware features
* Extensive configuration options for industrial automation tasks
* Built-in monitoring and diagnostic capabilities

27
debian/copyright vendored
View File

@@ -1,27 +0,0 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Source: https://gitlab.com/revolutionpi/opcua-revpi-server
Files: *
Copyright: 2025 KUNBUS GmbH
License: GPL-2+
Files: debian/*
Copyright: 2025 KUNBUS GmbH
License: GPL-2+
License: GPL-2+
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
.
On Debian systems, the complete text of the GNU General
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".

7
debian/gbp.conf vendored
View File

@@ -1,7 +0,0 @@
[DEFAULT]
upstream-branch = main
upstream-tag = v%(version)s
debian-branch=debian/bookworm
debian-tag = debian/%(version)s
debian-tag-msg = %(pkg)s Debian release %(version)s
pristine-tar = True

View File

@@ -1,4 +0,0 @@
data/dbus-policy/com.revolutionpi.middleware1.conf /usr/share/dbus-1/system.d
data/etc/default/revpi-middleware /etc/default/
data/etc/revpi-middleware/revpi-middleware.conf /etc/revpi-middleware/
data/systemd/before_253/revpi-middleware.service /lib/systemd/system/

View File

@@ -1,2 +0,0 @@
/usr/share/revpi-middleware/revpi-middleware /usr/sbin/revpi-middleware
/usr/share/revpi-middleware/revpicli /usr/bin/revpicli

12
debian/rules vendored
View File

@@ -1,12 +0,0 @@
#!/usr/bin/make -f
export PYBUILD_NAME=revpi-middleware
export PYBUILD_INSTALL_ARGS=--install-lib=/usr/share/$(PYBUILD_NAME)/ --install-scripts=/usr/share/$(PYBUILD_NAME)/
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_auto_test:
# Currently, the tests have to be skipped, because no SystemBus is
# available in the Docker container.
@echo "Skipped tests"

View File

@@ -1 +0,0 @@
3.0 (quilt)

100
examples/revpi-config Executable file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from argparse import ArgumentParser, SUPPRESS
from typing import NamedTuple
from pydbus import SessionBus, SystemBus
FeatureMapping = NamedTuple("FeatureMapping", [("dbus_interface", str), ("name", str)])
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
IFACE_SOFTWARE_SERVICES = "com.revolutionpi.middleware1.SoftwareServices"
IFACE_REVPI_CONFIG = "com.revolutionpi.middleware1.RevpiConfig"
REVPI_FEATURE_MAPPINGS = {
"gui": FeatureMapping(IFACE_REVPI_CONFIG, "gui"),
"revpi-con-can": FeatureMapping(IFACE_REVPI_CONFIG, "revpi-con-can"),
"dphys-swapfile": FeatureMapping(IFACE_REVPI_CONFIG, "swapfile"),
"pimodbus-master": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-master"),
"pimodbus-slave": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-slave"),
"systemd-timesyncd": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ntp"),
"ssh": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ssh"),
"nodered": FeatureMapping(IFACE_SOFTWARE_SERVICES, "nodered"),
"noderedrevpinodes-server": FeatureMapping(IFACE_SOFTWARE_SERVICES, "noderedrevpinodes-server"),
"revpipyload": FeatureMapping(IFACE_SOFTWARE_SERVICES, "revpipyload"),
"bluetooth": FeatureMapping(IFACE_REVPI_CONFIG, "bluetooth"),
"ieee80211": FeatureMapping(IFACE_REVPI_CONFIG, "wlan"),
"avahi": FeatureMapping(IFACE_SOFTWARE_SERVICES, "avahi"),
"external-antenna": FeatureMapping(IFACE_REVPI_CONFIG, "external-antenna"),
}
# Generate command arguments
parser = ArgumentParser(
prog="revpi-config",
description="Configuration tool for Revolution Pi.",
)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "availstat"],
help="Action to be executed: enable, disable, status or available.",
)
parser.add_argument(
"feature",
nargs="*",
help="Name of the feature to configure.",
)
args = parser.parse_args()
# Init dbus
bus = SessionBus() if args.use_session_bus else SystemBus()
revpi_middleware = bus.get(REVPI_DBUS_NAME, REVPI_DBUS_BASE_PATH)
lst_results = []
for feature in args.feature:
# Get the mappings
feature_mapping = REVPI_FEATURE_MAPPINGS.get(feature, None)
if feature_mapping is None:
if args.action in ("enable", "disable"):
# Missing feature with action enable/disable will return 5
lst_results.append(5)
elif args.action == "availstat":
# Missing feature with action availstat will return 2
lst_results.append(2)
else:
# Missing feature with action status/available will return 0
lst_results.append(0)
continue
dbus_interface = revpi_middleware[feature_mapping.dbus_interface]
if args.action == "enable":
dbus_interface.Enable(feature_mapping.name)
lst_results.append(0)
elif args.action == "disable":
dbus_interface.Disable(feature_mapping.name)
lst_results.append(0)
elif args.action in ("status", "availstat"):
status = dbus_interface.GetStatus(feature_mapping.name)
lst_results.append(int(status))
elif args.action == "available":
availability = dbus_interface.GetAvailability(feature_mapping.name)
lst_results.append(int(availability))
if lst_results:
print(" ".join(map(str, lst_results)))

View File

@@ -1,2 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
[tool.black]
line-length = 100

View File

@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Build dependencies
pip-licenses
Pyinstaller

View File

@@ -5,12 +5,11 @@
This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup.
"""
from argparse import ArgumentParser
from logging import getLogger
from revpi_middleware.cli_commands import cli_picontrol
from revpi_middleware.proginit import StdLogOutput
from . import cli_config, cli_picontrol
from .. import proginit as pi
from ..proginit import StdLogOutput
log = getLogger(__name__)
@@ -30,6 +29,11 @@ def setup_command_line_arguments():
help="RevPi PiControl object",
)
cli_picontrol.add_subparsers(obj_picontrol)
obj_config = rpictl_obj.add_parser(
"config",
help="RevPi configuration object (revpi-config)",
)
cli_config.add_subparsers(obj_config)
def main() -> int:
@@ -40,6 +44,9 @@ def main() -> int:
if obj == "picontrol":
rc = cli_picontrol.main()
elif obj == "config":
rc = cli_config.main()
else:
log.error(f"Unknown object: {obj}")
rc = 1

View File

@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, get_properties, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
parent_parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.",
)
parent_parser.add_argument(
"feature",
nargs="?",
default="",
help="Name of the feature to configer. To list all features use 'list-features' as action.",
)
def main() -> int:
action = pi.pargs.action
dbus_value = False
try:
if action == "list-features":
dbus_value = get_properties(
"available_features",
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
for feature in dbus_value:
print(feature)
return 0
# For the following actions, a feature name is required
if pi.pargs.feature == "":
raise Exception("Feature name is required")
if action == "enable":
simple_call(
"Enable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "disable":
simple_call(
"Disable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "status":
dbus_value = simple_call(
"GetStatus",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "available":
dbus_value = simple_call(
"GetAvailability",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
else:
raise Exception(f"Unknown action: {action}")
except Exception as e:
log.error(f"Error: {e}")
return 1
log.debug(
f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}"
)
print(int(dbus_value))
return 0

View File

@@ -38,7 +38,7 @@ def method_reset():
log.debug("D-Bus call of method ResetDriver")
simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus")
@@ -48,7 +48,7 @@ def method_await_reset(timeout: int = 0):
detected_signal = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal:

View File

@@ -17,6 +17,18 @@ class BusType(Enum):
SYSTEM = "system"
def get_properties(
property_name: str,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, property_name)
def simple_call(
method: str,
*args,

View File

@@ -10,6 +10,7 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices
log = getLogger(__name__)
@@ -40,7 +41,9 @@ class BusProvider(Thread):
# ("Subdir2", Example()),
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(self._bus),
InterfaceSoftwareServices(self._bus),
]
try:

View File

@@ -2,8 +2,10 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus."""
from logging import getLogger
from typing import Union
from pydbus import SessionBus, SystemBus
log = getLogger(__name__)
@@ -13,6 +15,9 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface:
def __init__(self, bus: Union[SessionBus, SystemBus]):
self.bus = bus
def cleanup(self):
"""
Represents a method responsible for performing cleanup operations. This method is executed to properly
@@ -41,3 +46,38 @@ def extend_interface(*args) -> str:
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])
def grep(pattern, filename):
"""
Searches for lines in a file that contain a given pattern and returns them as a list.
The function reads lines from the specified file and checks whether each line
contains the provided pattern. It returns a list of lines that match the
pattern. If the file is not found, an empty list is returned. Any other
exceptions during the file reading process are caught and logged.
Args:
pattern (str): The substring to search for within the file's lines.
filename (str): The path to the file that will be searched.
Returns:
list[str]: A list containing lines that include the provided pattern, with
leading and trailing spaces removed.
Raises:
FileNotFoundError: This error is caught if the file specified is not
found.
Exception: Any unforeseen exception during file operations is caught and
logged.
"""
try:
with open(filename, "r") as file:
# Gibt alle Zeilen zurück, die das Muster enthalten
matching_lines = [line.strip() for line in file if pattern in line]
return matching_lines
except FileNotFoundError:
return []
except Exception as e:
log.error(f"Error reading file: {e}")
return []

View File

@@ -15,7 +15,7 @@ log = getLogger(__name__)
class InterfacePiControl(DbusInterface):
"""
<node>
<interface name='com.revolutionpi.middleware1.picontrol'>
<interface name='com.revolutionpi.middleware1.PiControl'>
<signal name="NotifyDriverReset">
</signal>
<method name='ResetDriver'>
@@ -26,7 +26,9 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal()
def __init__(self, picontrol_device: str, config_rsc: str):
def __init__(self, bus, picontrol_device: str, config_rsc: str):
super().__init__(bus)
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration."""
from collections import namedtuple
from logging import getLogger
from pydbus.generic import signal
from .revpi_config import (
ConfigActions,
configure_bluetooth,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
configure_wlan,
)
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
class InterfaceRevpiConfig(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.RevpiConfig">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
self.StatusChanged(feature, False)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
self.StatusChanged(feature, True)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.STATUS, *feature_function.args)
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args)
@property
def available_features(self) -> list[str]:
return list(AVAILABLE_FEATURES.keys())
def get_feature(feature: str) -> FeatureFunction:
if feature not in AVAILABLE_FEATURES:
raise ValueError(f"feature {feature} does not exist")
feature_function = AVAILABLE_FEATURES[feature]
if not feature_function:
raise NotImplementedError(f"feature {feature} is not implemented")
return feature_function
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"swapfile": FeatureFunction(configure_dphys_swapfile, []),
"bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -0,0 +1,205 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services."""
from logging import getLogger
from typing import List
from pydbus.generic import signal
from ..dbus_helper import DbusInterface
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
class InterfaceSoftwareServices(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.SoftwareServices">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self.mrk_available = {}
self.mrk_status = {}
self.services = {
"pimodbus-master": ["pimodbus-master.service"],
"pimodbus-slave": ["pimodbus-slave.service"],
"ntp": ["systemd-timesyncd.service"],
"ssh": ["ssh.service"],
"nodered": ["nodered.service"],
"noderedrevpinodes-server": ["noderedrevpinodes-server.service"],
"revpipyload": ["revpipyload.service"],
"avahi": ["avahi-daemon.service", "avahi-daemon.socket"],
}
# Create a systemd manager interface object
systemd = self.bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
# Load all unit paths and subscribe to properties changed signal
self.object_paths = {}
for feature in self.services:
# Get the status and availability of the feature
self.mrk_available[feature] = self.GetAvailability(feature)
self.mrk_status[feature] = self.GetStatus(feature)
# Subscribe to properties changed signal for each unit
for unit_name in self.services[feature]:
unit_path = systemd_manager.LoadUnit(unit_name)
self.object_paths[unit_path] = feature
self.bus.subscribe(
iface="org.freedesktop.DBus.Properties",
signal="PropertiesChanged",
object=unit_path,
signal_fired=self._callback_properties_changed,
)
# Subscribe to the reloading signal to update the availability of the feature
self.bus.subscribe(
iface="org.freedesktop.systemd1.Manager",
signal="Reloading",
object="/org/freedesktop/systemd1",
signal_fired=self._callback_reloading_signal,
)
def _callback_reloading_signal(self, sender, object_path, interface, signal, parameters):
"""
Handles the signal emitted for reloading and checks for changes in availability
and status for a set of services. If changes are identified, corresponding
update methods are triggered to reflect the new states.
Args:
sender: The entity sending the signal.
object_path: Path to the object emitting the signal.
interface: Interface through which the signal is sent.
signal: The signal being received.
parameters: A list of parameters associated with the signal.
Raises:
None
"""
if parameters[0]:
return
for feature in self.services:
availability = self.GetAvailability(feature)
if self.mrk_available[feature] != availability:
self.mrk_available[feature] = availability
self.AvailabilityChanged(feature, availability)
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _callback_properties_changed(self, sender, object_path, interface, signal, parameters):
"""
Handles the 'PropertiesChanged' signal callback by updating internal status tracking for given
features and invoking status change notifications if necessary.
Args:
sender (Any): Information about the signal sender.
object_path (str): The path of the object emitting the signal.
interface (str): The interface where the signal was emitted.
signal (str): The name of the emitted signal.
parameters (tuple): Signal parameters containing interface name, changed properties, and
invalidated properties.
Raises:
TypeError: If the 'parameters' argument does not unpack to three expected values.
"""
interface, changed_properties, invalidated_properties = parameters
if "ActiveState" not in changed_properties:
return
feature = self.object_paths[object_path]
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _get_unit_names(self, feature: str) -> List[str]:
if feature not in self.services:
raise ValueError(f"feature {feature} does not exist")
return self.services[feature]
def Disable(self, feature: str) -> None:
"""Disable the feature."""
action = ServiceActions.DISABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
action = ServiceActions.ENABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
unit_names = self._get_unit_names(feature)
rc_status = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.STATUS, unit_name):
rc_status = False
break
return rc_status
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
unit_names = self._get_unit_names(feature)
rc_available = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.AVAILABLE, unit_name):
rc_available = False
break
return rc_available
@property
def available_features(self) -> list[str]:
return list(self.services.keys())

View File

@@ -0,0 +1,650 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
"""
Enumeration class to represent compute module types.
This class is an enumeration that defines various types of compute
modules and assigns them associated integer values for identifying
different module types.
Attributes:
UNKNOWN (int): Represents an unknown or undefined compute module type.
CM1 (int): Represents a Compute Module 1.
CM3 (int): Represents a Compute Module 3.
CM4 (int): Represents a Compute Module 4.
CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5.
"""
UNKNOWN = 0
CM1 = 6
CM3 = 10
CM4 = 20
CM4S = 21
CM5 = 24
class ConfigActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class RevPiConfig:
"""
Represents the configuration and hardware details of a Revolution Pi system.
This class provides methods and properties to initialize and fetch
information related to the Revolution Pi device, such as model, serial
number, compute module type, WLAN capability, and the presence of a
connection bridge. The class works by parsing system-level files (e.g.,
`/proc/cpuinfo`) and using this data to identify hardware characteristics
and features.
Attributes:
serial (str): The serial number of the Revolution Pi device.
model (str): The model name of the Revolution Pi device.
"""
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self.serial = ""
self.model = ""
self._init_device_info()
def _init_device_info(self):
"""
Initialize and retrieve detailed hardware information, including CPU details,
device type, WLAN interface, and connectivity features.
This method gathers information from system files and other sources to
initialize device-specific attributes such as model, serial number,
compute module type, and optional features like integrated WLAN
or ConBridge support. It performs checks specific to the detected
module type to accurately populate necessary device details.
Attributes
----------
model : str
The model of the CPU based on information from /proc/cpuinfo.
serial : str
The serial number extracted from /proc/cpuinfo.
_cm_type : ComputeModuleTypes, optional
The type of the compute module derived from the hardware revision value.
_wlan_class_path : str, optional
Filesystem path to the detected WLAN interface, if any.
_revpi_with_con_bridge : bool
Indicates whether the device supports the ConBridge feature.
"""
dc_cpuinfo = {}
# Extract CPU information
with open("/proc/cpuinfo", "r") as f:
line = "\n"
while line:
line = f.readline()
if line.startswith(("Revision", "Serial", "Model")):
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
dc_cpuinfo[key] = value
self.model = dc_cpuinfo.get("model", "")
self.serial = dc_cpuinfo.get("serial", "")
# Detect Compute Module type
revision = dc_cpuinfo.get("revision", "")
if revision:
revision = int(revision, 16)
mask = 4080 # 0xFF0 in dezimal
try:
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
except ValueError:
pass
# Detect WLAN on CM module
could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wlan:
wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0")
if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
for wlan_interface in lst_wlan_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
if could_have_con_bridge:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def class_path_wlan(self) -> str:
"""
Provides access to the WLAN class path.
This property retrieves the stored WLAN class path, allowing the user to access it when
needed.
Returns:
str: The WLAN class path.
"""
return self._wlan_class_path
@property
def cm_type(self) -> ComputeModuleTypes:
"""
Gets the type of the compute module.
The property provides access to the type of the compute
module used. The type is represented as an instance of
the `ComputeModuleTypes` class.
Returns
-------
ComputeModuleTypes
The type of the compute module.
"""
return self._cm_type
@property
def with_con_bridge(self) -> bool:
"""
Indicates if the device is configured with a connection bridge.
This property checks the internal status and determines whether the device setup
includes a connection bridge functionality. It is read-only.
Returns:
bool: True if the connection bridge is configured, False otherwise.
"""
return self._revpi_with_con_bridge
@property
def with_wlan(self) -> bool:
"""
Checks if WLAN is available.
This property evaluates whether WLAN is enabled or available by checking
the presence or value of the internal attribute `_wlan_class_path`.
Returns:
bool: True if WLAN is available, False otherwise.
"""
return bool(self._wlan_class_path)
class ConfigTxt:
"""
Configuration file handler for managing 'config.txt'.
This class provides an interface to read, modify, save, and reload
Raspbian's configuration file `config.txt`. It includes functionalities
to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
"""
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
"""
Removes all occurrences of specified name-value pairs from the configuration.
This method searches for all name-value pairs in the configuration and
removes those that match the given name and value(s). It returns the
number of occurrences removed.
Arguments:
name: str
The name of the configuration variable to search for.
values: str or list
The value or list of values to match the configuration variable
against.
Returns:
int: The number of name-value pairs removed from the configuration.
"""
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
"""
Retrieves all name-value pairs from the configuration text lines.
This method parses the configuration text lines to extract all name-value
pairs. If the configuration text lines are not loaded, it reloads the
configuration before processing. Each extracted name-value pair is added to a
list as a ConfigVariable object, which also holds the index of the match in
the text lines. The method returns the compiled list of these ConfigVariable
objects.
Returns:
List[ConfigVariable]: A list of ConfigVariable objects representing the
name-value pairs found in the configuration text lines, along with their
corresponding indexes.
"""
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
"""
Reloads the configuration file and updates the list of configuration lines.
This method reads the content of the configuration file specified by the
attribute `_config_txt_path` and updates `_config_txt_lines` with the file
contents as a list of strings, where each string represents a line.
Returns:
None
"""
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
"""
Saves the current configuration to a file. The method ensures atomicity by first writing
to a temporary file and then moving it to the desired path. After the configuration is
saved, the internal list of configuration lines is cleared.
Raises:
OSError: If there is an issue writing to or moving the file.
"""
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
"""
Adds a name-value pair to the configuration if it does not already exist.
This method checks if the given name-value pair is already present in
the configuration. If it is not present, the pair is appended to the
configuration text lines.
Parameters:
name (str): The name to be added to the configuration.
value (str): The value corresponding to the name to be added.
Returns:
None
"""
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
"""
Clears the specified device tree overlays. This method removes one or more
device tree overlays by clearing their corresponding name-value pairs.
Args:
dtoverlays (str or list): A device tree overlay name as a string, or a
list of such overlay names to be cleared.
Returns:
int: The number of device tree overlay name-value pairs successfully
cleared.
"""
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
"""
Clears the specified device tree parameters.
This method removes the given device tree parameters by utilizing
the underlying `_clear_name_values` function with a predefined
parameter type.
Parameters:
dtparams: str or list
A string or list of strings specifying the device tree
parameters to remove.
Returns:
int
The number of parameters cleared.
"""
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
"""
Get all values associated with a given variable name.
This method retrieves a list of values corresponding to the specified
variable name by iterating through a collection of configuration
variables. Each configuration variable is checked for a matching name,
and its value is appended to the resulting list if a match is found.
Parameters:
var_name (str): The name of the variable for which values are to
be retrieved.
Returns:
list: A list of values associated with the specified variable name.
"""
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
"""
Get the file path for the configuration text file.
This property provides access to the private attribute `_config_txt_path` which
stores the file path to the configuration text file.
Returns:
str
The file path to the configuration text file.
"""
return self._config_txt_path
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
# If the bluetooth device is not present, the device should have been
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
# based on CM4 and newer). Nothing we can do here, so treat the interface
# as disabled.
if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.STATUS:
if bt_rfkill_index is None:
return False
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
elif action is ConfigActions.AVAILABLE:
return bt_rfkill_index is not None
else:
raise ValueError(f"action {action} not supported")
return None
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions):
# Translate config action to systemd action
if action is ConfigActions.ENABLE:
systemd_action = ServiceActions.ENABLE
elif action is ConfigActions.DISABLE:
systemd_action = ServiceActions.DISABLE
elif action is ConfigActions.STATUS:
systemd_action = ServiceActions.STATUS
else:
systemd_action = ServiceActions.AVAILABLE
return_value = simple_systemd(systemd_action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
# Remove swapfile afer disabling the service unit
subprocess.call(
["/sbin/dphys-swapfile", "uninstall"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wlan
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def configure_wlan(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.ENABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.AVAILABLE:
return revpi.with_wlan
elif action is ConfigActions.STATUS:
if not revpi.with_wlan:
return False
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
else:
raise ValueError(f"action {action} not supported")
return None
def get_rfkill_index(device_class_path: str) -> Optional[int]:
"""
Get the rfkill index for a device under a specific device class path.
This function searches for and extracts the rfkill index associated with
devices located under the given device class path. It uses a regular
expression to identify and parse the rfkill index from the paths
of matching rfkill device files.
Parameters:
device_class_path: str
The path to the device class directory where rfkill entries
are located.
Returns:
Optional[int]:
The index of the rfkill device if found, otherwise None.
"""
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path)
if match_index:
return int(match_index.group("index"))
return None
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With WLAN: ", rc.with_wlan)
if rc.with_wlan:
print(" class path: ", rc.class_path_wlan)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)

View File

@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from enum import Enum
from logging import getLogger
from threading import Thread
from typing import Optional
from pydbus import SystemBus
log = getLogger(__name__)
class ServiceActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]:
"""
Perform systemd service actions such as enable, disable, check status, or availability.
This function interacts with the systemd D-Bus API to manage and query the
state of services on a system. The supported actions include enabling a systemd
unit, disabling it, starting/stopping a unit, and checking its status or
availability. The function supports asynchronous configuration changes through
threads where applicable.
Parameters:
action (ServiceActions): The action to perform on the systemd service.
Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit to operate on (e.g., "example.service").
unmask (bool): When enabling a unit, if True, any masked unit file will
first be unmasked before proceeding with the operation. Defaults to False.
Returns:
Optional[bool]: The return value depends on the action. For STATUS or
AVAILABLE actions, it returns True if the unit satisfies the condition
(e.g., enabled and active, or available and loaded), False otherwise.
For other actions, it returns None.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ServiceActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
lst_change_unmask = []
if unmask:
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ServiceActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ServiceActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState in (
"active",
"activating",
)
elif action is ServiceActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None

View File

@@ -18,7 +18,7 @@ class TestObjectPicontrol(TestBusProvider):
def test_reset_driver(self):
simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
ioctl_call = IOCTL_QUEUE.get(timeout=2.0)
@@ -37,7 +37,7 @@ class TestObjectPicontrol(TestBusProvider):
result = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
self.assertTrue(result)