4 Commits

Author SHA1 Message Date
acdb814007 doc(revpiconfig): Docstrings for get_rfkill_index and simple_systemd
The new docstrings provide detailed explanations of the purpose,
parameters, and return values for both functions, improving code
readability and maintainability. This ensures better understanding for
future contributors and reduces ambiguity.
2025-04-21 14:44:50 +02:00
8451b5401b doc(revpiconfig): Add docstrings to enums in revpi_config.py
This update introduces detailed docstrings for the `ComputeModuleTypes`
and `ConfigActions` enumeration classes. The docstrings provide
descriptions for each class and their attributes, improving code
readability and maintainability.
2025-04-21 14:44:04 +02:00
0fe8be6515 doc(revpiconfig): Add docstrings to RevPiConfig class and methods
Enhance documentation for the `RevPiConfig` class, its methods, and
properties to improve code readability and ease of use. The added
docstrings provide clear explanations of the class's purpose,
attributes, and functionality for developers and users. This update
supports better maintainability and understanding of the codebase.
2025-04-21 14:42:14 +02:00
5d376acf49 doc(revpiconfig): Add detailed docstrings to ConfigTxt methods
Enhance the `ConfigTxt` class with comprehensive docstrings for all
methods, providing clear explanations of their functionality,
parameters, and return values. This improves code readability and
facilitates easier maintenance and understanding for future developers.
2025-04-21 14:38:32 +02:00
43 changed files with 121 additions and 1483 deletions

4
.gitignore vendored
View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View File

@@ -1,40 +0,0 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
default:
tags:
- self-hosted
- host-arm64
- high-perf
include:
- project: "revolutionpi/infrastructure/ci-templates"
file: "base.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "check-commit/lint-commit.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "reuse-lint.yml"
- project: "revolutionpi/infrastructure/ci-templates"
file: "package-devel.yml"
- local: debian/gitlab-ci.yml
rules:
- exists:
- debian/gitlab-ci.yml
run_tests:
stage: test
image: python:3.11-bookworm
script:
- apt-get update
- apt-get -y install dbus libgirepository1.0-dev
- dbus-uuidgen --ensure=/etc/machine-id
- pip install -r requirements.txt
- PYTHONPATH=src dbus-run-session -- pytest -v --junitxml=report.xml --cov=src --cov-report term --cov-report xml:coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: ${CI_PROJECT_DIR}/report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
recursive-include .reuse * recursive-include .reuse *
recursive-include data * recursive-include data *
recursive-include LICENSES * recursive-include LICENSES *

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
SHELL := bash SHELL := bash
MAKEFLAGS = --no-print-directory --no-builtin-rules MAKEFLAGS = --no-print-directory --no-builtin-rules
.DEFAULT_GOAL = all .DEFAULT_GOAL = all

View File

@@ -1,14 +1,3 @@
<!--
SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# Middleware for Revolution Pi # Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface. This middleware will support D-Bus as IPC interface.
## D-Bus
See [docs/dbus.md](docs/dbus.md) for an overview of the standard D-Bus data
types.

View File

@@ -1,19 +0,0 @@
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.ios1.conf -->
<busconfig>
<!-- Allow full access to root as the bus owner -->
<policy user="root">
<allow own="com.revolutionpi.ios1"/>
<allow send_destination="com.revolutionpi.ios1"/>
<allow receive_sender="com.revolutionpi.ios1"/>
</policy>
<!-- System group picontrol -->
<policy group="picontrol">
<allow send_destination="com.revolutionpi.ios1"/>
</policy>
<!-- Standard-Policy -->
<policy context="default">
<deny send_destination="com.revolutionpi.ios1"/>
</policy>
</busconfig>

View File

@@ -1,4 +1,4 @@
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.middleware1.conf --> <!-- /etc/dbus-1/system.d/revpi-middleware.conf -->
<busconfig> <busconfig>
<!-- Allow full access to root as the bus owner --> <!-- Allow full access to root as the bus owner -->
<policy user="root"> <policy user="root">
@@ -12,7 +12,7 @@
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="org.freedesktop.DBus.Introspectable"/> send_interface="org.freedesktop.DBus.Introspectable"/>
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="com.revolutionpi.middleware1.PiControl"/> send_interface="com.revolutionpi.middleware1.picontrol"/>
</policy> </policy>
<!-- Standard-Policy --> <!-- Standard-Policy -->

View File

@@ -1,8 +0,0 @@
# Additional options that are passed to revpi-ios.
# add '-f /var/log/revpi-ios.log' to write logs to own log file
# add '-v' or '-vv' for verbose logging
DAEMON_OPTS=""
# In addition to journalctl, use your own additional log file
# DAEMON_OPTS="-f /var/log/revpi-ios.log"

View File

@@ -1,14 +0,0 @@
/var/log/revpi-ios.log
{
rotate 6
weekly
maxsize 1M
compress
delaycompress
missingok
notifempty
sharedscripts
postrotate
systemctl kill --signal=SIGUSR1 revpi-ios > /dev/null 2>&1 || true
endscript
}

View File

@@ -1,65 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Switches 14 outputs on a DIO to the same value as the first input."""
from time import perf_counter
from gi.repository import GLib
from pydbus import SystemBus
detected_signal = False
timestamp = perf_counter()
# Get system bus
bus = SystemBus()
# Get IoManager interface on ios1 bus
iface_io_manager = bus.get(
"com.revolutionpi.ios1",
"/com/revolutionpi/ios1",
)["com.revolutionpi.ios1.IoManager"]
# Query object path of RevPiLED output
path_revpi_led = iface_io_manager.Get("RevPiLED")
# Get Output interface in the queried object path.
out_RevPiLED = bus.get("com.revolutionpi.ios1", path_revpi_led)["com.revolutionpi.ios1.Output"]
# Get all 14 outputs
lst_path_outputs = [iface_io_manager.Get(f"O_{i}") for i in range(1, 15)]
lst_out_outputs = [
bus.get("com.revolutionpi.ios1", path)["com.revolutionpi.ios1.Output"]
for path in lst_path_outputs
]
def signal_handler(io_name, io_value):
global timestamp
print(f"Signal received: {io_name} = {io_value}")
if io_name == "O_14":
print(f"Time since input detection: {perf_counter() - timestamp}")
elif io_name == "I_1":
timestamp = perf_counter()
out_RevPiLED.SetValue(GLib.Variant("i", io_value))
for output in lst_out_outputs:
output.SetValue(GLib.Variant("b", io_value))
# Start change detection to fire signals on dbus
iface_io_manager.ActivateIoSignals()
iface_io_manager.onIoChanged = signal_handler
try:
loop = GLib.MainLoop()
loop.run()
except KeyboardInterrupt:
pass
# Stop change detection
iface_io_manager.DeactivateIoSignals()

View File

@@ -1,12 +0,0 @@
[Unit]
Description=D-Bus interface for Inputs/Outputs of Revolution Pi
[Service]
EnvironmentFile=-/etc/default/revpi-ios
Type=notify
NotifyAccess=all
ExecStart=/usr/sbin/revpi-middleware ios $DAEMON_OPTS
ExecReload=/bin/kill -HUP $MAINPID
[Install]
WantedBy=multi-user.target

View File

@@ -1,11 +0,0 @@
[Unit]
Description=D-Bus interface for Inputs/Outputs of Revolution Pi
[Service]
EnvironmentFile=-/etc/default/revpi-ios
Type=notify-reload
NotifyAccess=all
ExecStart=/usr/sbin/revpi-middleware ios $DAEMON_OPTS
[Install]
WantedBy=multi-user.target

View File

@@ -1,51 +0,0 @@
<!--
SPDX-FileCopyrightText: 2026 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# D-Bus
## Datentypen
Hier ist eine **Liste der standardisierten DBus-Datentypen** (Signaturen), wie
sie u. a. in Introspection/XML (`type="..."`) verwendet werden.
## Basis-Typen (Basic Types)
| Signatur | Bedeutung |
|----------|-------------------------------------|
| `y` | Byte (unsigned 8-bit) |
| `b` | Boolean |
| `n` | Int16 (signed) |
| `q` | UInt16 (unsigned) |
| `i` | Int32 (signed) |
| `u` | UInt32 (unsigned) |
| `x` | Int64 (signed) |
| `t` | UInt64 (unsigned) |
| `d` | Double (IEEE 754) |
| `s` | String (UTF8) |
| `o` | Object Path |
| `g` | Signature (Typ-Signatur als String) |
## Container-Typen (Compound/Container)
| Form | Bedeutung | Beispiel |
|------------|-------------------------------------------------|---------------------------------|
| `aX` | Array von `X` | `as` = Array von Strings |
| `(XYZ...)` | Struct/Tupel | `(is)` = (Int32, String) |
| `{KV}` | Dict-Entry (nur innerhalb eines Arrays erlaubt) | `a{sv}` = Dict String → Variant |
## Spezial-Typ
| Signatur | Bedeutung |
|----------|---------------------------------------------------|
| `v` | Variant (beliebiger Typ, zur Laufzeit festgelegt) |
## Häufig genutzte Kombis (Praxis)
- `as` → Liste von Strings
- `ao` → Liste von Object Paths
- `a{sv}` → „Properties“-Map (String → Variant), sehr verbreitet bei
`org.freedesktop.DBus.Properties`
- `a{ss}` → String → String Map

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from argparse import ArgumentParser, SUPPRESS
from typing import NamedTuple
from pydbus import SessionBus, SystemBus
FeatureMapping = NamedTuple("FeatureMapping", [("dbus_interface", str), ("name", str)])
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
IFACE_SOFTWARE_SERVICES = "com.revolutionpi.middleware1.SoftwareServices"
IFACE_REVPI_CONFIG = "com.revolutionpi.middleware1.RevpiConfig"
REVPI_FEATURE_MAPPINGS = {
"gui": FeatureMapping(IFACE_REVPI_CONFIG, "gui"),
"revpi-con-can": FeatureMapping(IFACE_REVPI_CONFIG, "revpi-con-can"),
"dphys-swapfile": FeatureMapping(IFACE_REVPI_CONFIG, "swapfile"),
"pimodbus-master": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-master"),
"pimodbus-slave": FeatureMapping(IFACE_SOFTWARE_SERVICES, "pimodbus-slave"),
"systemd-timesyncd": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ntp"),
"ssh": FeatureMapping(IFACE_SOFTWARE_SERVICES, "ssh"),
"nodered": FeatureMapping(IFACE_SOFTWARE_SERVICES, "nodered"),
"noderedrevpinodes-server": FeatureMapping(IFACE_SOFTWARE_SERVICES, "noderedrevpinodes-server"),
"revpipyload": FeatureMapping(IFACE_SOFTWARE_SERVICES, "revpipyload"),
"bluetooth": FeatureMapping(IFACE_REVPI_CONFIG, "bluetooth"),
"ieee80211": FeatureMapping(IFACE_REVPI_CONFIG, "wlan"),
"avahi": FeatureMapping(IFACE_SOFTWARE_SERVICES, "avahi"),
"external-antenna": FeatureMapping(IFACE_REVPI_CONFIG, "external-antenna"),
}
# Generate command arguments
parser = ArgumentParser(
prog="revpi-config",
description="Configuration tool for Revolution Pi.",
)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "availstat"],
help="Action to be executed: enable, disable, status or available.",
)
parser.add_argument(
"feature",
nargs="*",
help="Name of the feature to configure.",
)
args = parser.parse_args()
# Init dbus
bus = SessionBus() if args.use_session_bus else SystemBus()
revpi_middleware = bus.get(REVPI_DBUS_NAME, REVPI_DBUS_BASE_PATH)
lst_results = []
for feature in args.feature:
# Get the mappings
feature_mapping = REVPI_FEATURE_MAPPINGS.get(feature, None)
if feature_mapping is None:
if args.action in ("enable", "disable"):
# Missing feature with action enable/disable will return 5
lst_results.append(5)
elif args.action == "availstat":
# Missing feature with action availstat will return 2
lst_results.append(2)
else:
# Missing feature with action status/available will return 0
lst_results.append(0)
continue
dbus_interface = revpi_middleware[feature_mapping.dbus_interface]
if args.action == "enable":
dbus_interface.Enable(feature_mapping.name)
lst_results.append(0)
elif args.action == "disable":
dbus_interface.Disable(feature_mapping.name)
lst_results.append(0)
elif args.action in ("status", "availstat"):
status = dbus_interface.GetStatus(feature_mapping.name)
lst_results.append(int(status))
elif args.action == "available":
availability = dbus_interface.GetAvailability(feature_mapping.name)
lst_results.append(int(availability))
if lst_results:
print(" ".join(map(str, lst_results)))

View File

@@ -1,6 +1,2 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
[tool.black] [tool.black]
line-length = 100 line-length = 100

View File

@@ -1,7 +1,3 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH <support@kunbus.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# Build dependencies # Build dependencies
pip-licenses pip-licenses
Pyinstaller Pyinstaller

View File

@@ -2,8 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Metadata of package.""" """Metadata of package."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2025 KUNBUS GmbH" __copyright__ = "Copyright (C) 2025 KUNBUS GmbH"
__license__ = " GPL-2.0-or-later" __license__ = " GPL-2.0-or-later"
__version__ = "0.0.6" __version__ = "0.0.1"

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Package: revpi_middleware.""" """Package: revpi_middleware."""
from .__about__ import __author__, __copyright__, __license__, __version__ from .__about__ import __author__, __copyright__, __license__, __version__

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""CLI commands to control revpi_middleware.""" """CLI commands to control revpi_middleware."""
from ..__about__ import __author__, __copyright__, __license__, __version__ from ..__about__ import __author__, __copyright__, __license__, __version__

View File

@@ -5,7 +5,6 @@
This module provides the foundation for the RevPi middleware CLI commands This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup. and argument parsing setup.
""" """
from logging import getLogger from logging import getLogger
from . import cli_config, cli_picontrol from . import cli_config, cli_picontrol

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI.""" """Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger
@@ -17,7 +16,7 @@ def add_subparsers(parent_parser: ArgumentParser):
"action", "action",
choices=["enable", "disable", "status", "available", "list-features"], choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. " help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.", "To get all available features, use 'list-features'.",
) )
parent_parser.add_argument( parent_parser.add_argument(
"feature", "feature",

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI.""" """Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands.""" """D-Bus helper functions for cli commands."""
from enum import Enum from enum import Enum
from threading import Thread from threading import Thread
from time import sleep from time import sleep

View File

@@ -3,43 +3,29 @@
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Main daemon of revpi-middleware.""" """Main daemon of revpi-middleware."""
from enum import Enum
from logging import getLogger from logging import getLogger
from os import environ
from threading import Event from threading import Event
from time import perf_counter from time import perf_counter
from gi.repository import Gio
from . import proginit as pi from . import proginit as pi
from .dbus_ios1 import BusProviderIos1 from .dbus_middleware1.bus_provider import BusProvider
from .dbus_middleware1 import BusProviderMiddleware1
from .dbus_middleware1.process_image.process_image_helper import ResetDriverWatchdog
log = getLogger(__name__) log = getLogger(__name__)
class BusProvider(Enum):
middleware = "middleware"
ios = "ios"
class MiddlewareDaemon: class MiddlewareDaemon:
"""Main program of MiddlewareDaemon class.""" """Main program of MiddlewareDaemon class."""
def __init__(self, bus_provider: BusProvider): def __init__(self):
"""Init MiddlewareDaemon class.""" """Init MiddlewareDaemon class."""
log.debug("enter MiddlewareDaemon.__init__") log.debug("enter MiddlewareDaemon.__init__")
self._cycle_time = 1.0 self._cycle_time = 1.0
self.do_cycle = Event() self.do_cycle = Event()
self._force_dbus_restart = False
self._reconfigure = False self._reconfigure = False
self._running = True self._running = True
self.bus_provider = None self.bus_provider = None
self.bus_provider_selected = bus_provider
self.wd_reset = ResetDriverWatchdog("")
self._configure() self._configure()
log.debug("leave MiddlewareDaemon.__init__") log.debug("leave MiddlewareDaemon.__init__")
@@ -49,57 +35,15 @@ class MiddlewareDaemon:
log.debug("enter MiddlewareDaemon._configure") log.debug("enter MiddlewareDaemon._configure")
pi.reload_conf() pi.reload_conf()
if self._force_dbus_restart:
self.dbus_stop()
self._force_dbus_restart = False
if pi.pargs.procimg != self.wd_reset.procimg:
self.dbus_stop()
self.wd_reset.stop()
self.wd_reset = ResetDriverWatchdog(pi.pargs.procimg)
self.wd_reset.register_call(self._reset_driver_callack)
log.debug("leave MiddlewareDaemon._configure") log.debug("leave MiddlewareDaemon._configure")
@staticmethod
def _get_bus_address() -> str:
if pi.pargs.use_session_bus:
environ_name = "DBUS_SESSION_BUS_ADDRESS"
bus_address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SESSION)
else:
environ_name = "DBUS_SYSTEM_BUS_ADDRESS"
bus_address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM)
return environ.get(environ_name, bus_address)
def _reset_driver_callack(self, *args) -> None:
log.debug("enter MiddlewareDaemon._reset_driver_callack")
if self.bus_provider_selected is BusProvider.ios:
self._force_dbus_restart = True
self.reload_config()
log.debug("leave MiddlewareDaemon._reset_driver_callack")
def dbus_start(self): def dbus_start(self):
log.debug("enter MiddlewareDaemon.dbus_start") log.debug("enter MiddlewareDaemon.dbus_start")
if self.bus_provider and self.bus_provider.is_alive():
return
dbus_running = self.bus_provider and self.bus_provider.is_alive() self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus)
if not dbus_running: self.bus_provider.start()
if self.bus_provider_selected is BusProvider.middleware:
self.bus_provider = BusProviderMiddleware1(
dbus_address=self._get_bus_address(),
picontrol_device=pi.pargs.procimg,
)
elif self.bus_provider_selected is BusProvider.ios:
self.bus_provider = BusProviderIos1(
dbus_address=self._get_bus_address(),
picontrol_device=pi.pargs.procimg,
)
else:
raise ValueError("Unknown bus provider")
self.bus_provider.start()
log.debug("leave MiddlewareDaemon.dbus_start") log.debug("leave MiddlewareDaemon.dbus_start")
@@ -110,7 +54,7 @@ class MiddlewareDaemon:
self.bus_provider.stop() self.bus_provider.stop()
self.bus_provider.join(timeout=10.0) self.bus_provider.join(timeout=10.0)
if self.bus_provider.is_alive(): if self.bus_provider.is_alive():
log.warning(f"dbus {self.bus_provider.name} thread is still alive") log.warning("dbus provider thread is still alive")
log.debug("leave MiddlewareDaemon.dbus_stop") log.debug("leave MiddlewareDaemon.dbus_stop")
@@ -141,6 +85,7 @@ class MiddlewareDaemon:
# Startup tasks # Startup tasks
self.dbus_start() self.dbus_start()
pi.startup_complete()
# Go into mainloop of daemon # Go into mainloop of daemon
while self._running: while self._running:
ot = perf_counter() ot = perf_counter()
@@ -150,13 +95,6 @@ class MiddlewareDaemon:
if self._reconfigure: if self._reconfigure:
self._configure() self._configure()
self._reconfigure = False self._reconfigure = False
# Monitor bus providers for errors and restart them
if not (self.bus_provider and self.bus_provider.is_alive()):
log.warning(f"dbus {self.bus_provider.name} thread is not alive - restarting")
self.dbus_start()
if self.bus_provider and self.bus_provider.published.is_set():
pi.startup_complete() pi.startup_complete()
# Cycle time calculation # Cycle time calculation

View File

@@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus ios version 1 of revpi_middleware."""
from .ios1_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .bus_provider_ios1 import BusProviderIos1

View File

@@ -1,121 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus bus provider for revpi_middleware."""
from logging import getLogger
from threading import Thread, Event
import revpimodio2
from gi.repository import GLib
from pydbus import connect
from . import REVPI_DBUS_NAME
from .interface_devices import (
InterfaceDeviceManager,
InterfaceDevice,
)
from .interface_ios import (
InterfaceIoManager,
InterfaceInput,
InterfaceOutput,
)
log = getLogger(__name__)
class BusProviderIos1(Thread):
def __init__(
self,
dbus_address: str,
picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc",
):
log.debug("enter BusProviderIos1.__init__")
super().__init__()
self._bus_address = dbus_address
self._loop = GLib.MainLoop()
self._lst_device_interfaces = []
self._lst_io_interfaces = []
self._modio = revpimodio2.RevPiModIO(
procimg=picontrol_device,
configrsc=config_rsc,
shared_procimg=True,
)
self.picontrol_device = picontrol_device
self.published = Event()
self.config_rsc = config_rsc
def run(self):
log.debug("enter BusProviderIos1.run")
bus = connect(self._bus_address)
self._lst_device_interfaces.clear()
self._lst_io_interfaces.clear()
for device in self._modio.device:
self._lst_device_interfaces.append(
InterfaceDevice(bus, device),
)
for io in self._modio.io:
interface = None
try:
if io.type == revpimodio2.INP:
interface = InterfaceInput(bus, io)
elif io.type == revpimodio2.OUT:
interface = InterfaceOutput(bus, io)
elif io.type == revpimodio2.MEM:
# todo: Implement memory
pass
except Exception as e:
log.warning(f"can not create dbus interface for {io.name}: {e}")
if interface is not None:
self._lst_io_interfaces.append(interface)
lst_interfaces = []
lst_interfaces += [
(interface.object_path, interface) for interface in self._lst_device_interfaces
]
lst_interfaces += [
(interface.object_path, interface) for interface in self._lst_io_interfaces
]
try:
bus.publish(
REVPI_DBUS_NAME,
InterfaceDeviceManager(self._lst_device_interfaces),
InterfaceIoManager(self._lst_io_interfaces, self._modio),
*lst_interfaces,
)
self.published.set()
log.info(f"published {REVPI_DBUS_NAME} on {self._bus_address}")
except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
try:
self._loop.run()
except Exception as e:
log.error(f"can not run dbus mainloop: {e}")
bus.con.close()
self._modio.cleanup()
log.info(f"closed {REVPI_DBUS_NAME} connection to {self._bus_address}")
log.debug("leave BusProviderIos1.run")
def stop(self):
log.debug("enter BusProviderIos1.stop")
self._loop.quit()
log.debug("leave BusProviderIos1.stop")
@property
def name(self) -> str:
return REVPI_DBUS_NAME
@property
def running(self):
return self._loop.is_running()

View File

@@ -1,133 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
from pydbus.bus import Bus
from pydbus.generic import signal
from revpimodio2.device import Device
from .ios1_helper import get_io_object_path, get_device_object_path
class InterfaceDevice:
"""
<node>
<interface name="com.revolutionpi.ios1.Device">
<method name="GetDeviceInputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetDeviceOutputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="catalognr" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="comment" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="id" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="position" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="type" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Device"
PropertiesChanged = signal()
def __init__(self, dbus: Bus, device: Device):
self.dbus = dbus
self.device = device
self.object_path = get_device_object_path(device)
def GetDeviceInputs(self) -> list[str]:
return [get_io_object_path(io) for io in self.device.get_inputs()]
def GetDeviceOutputs(self) -> list[str]:
return [get_io_object_path(io) for io in self.device.get_outputs()]
@property
def bmk(self) -> str:
return self.device.bmk
@property
def catalognr(self):
return self.device.catalognr
@property
def comment(self):
return self.device.comment
@property
def id(self):
return self.device.id
@property
def name(self) -> str:
return self.device.name
@property
def position(self) -> int:
return self.device.position
@property
def type(self):
return self.device.type
class InterfaceDeviceManager:
"""
<node>
<interface name="com.revolutionpi.ios1.DeviceManager">
<method name="GetAllDevices">
<arg type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="device-name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="GetByPosition">
<arg name="device-position" type="n" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.DeviceManager"
def __init__(
self,
device_interfaces: list[InterfaceDevice],
):
self._lst_device_interfaces = device_interfaces
def GetAllDevices(self) -> list[str]:
return [interface.object_path for interface in self._lst_device_interfaces]
def GetByName(self, device_name) -> str:
for interface in self._lst_device_interfaces:
if interface.device.name == device_name:
return interface.object_path
raise KeyError(f"No device with name '{device_name}' found.")
def GetByPosition(self, device_position) -> str:
for interface in self._lst_device_interfaces:
if interface.device.position == device_position:
return interface.object_path
raise KeyError(f"No device on position '{device_position}' found.")

View File

@@ -1,306 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
from typing import Union, List
from pydbus import Variant
from pydbus.bus import Bus
from pydbus.generic import signal
from revpimodio2 import RevPiModIO, Cycletools, INP, OUT
from revpimodio2.io import IOBase
from .ios1_helper import get_io_object_path, get_variant_type
class InterfaceInput:
"""
<node>
<interface name="com.revolutionpi.ios1.Input">
<method name="SetByteorder">
<arg name="order" type="s" direction="in"/>
</method>
<method name="SetSigned">
<arg name="signed" type="b" direction="in"/>
</method>
<property name="address" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bitaddress" type="n" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="byteorder" type="s" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="defaultvalue" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="length" type="q" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="signed" type="b" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="value" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Input"
PropertiesChanged = signal()
def __init__(self, dbus: Bus, io: IOBase):
self._raw = False
self.dbus = dbus
self.io = io
self.object_path = get_io_object_path(io)
try:
self.variant_type = get_variant_type(self.io)
except ValueError:
# Fallback to bytes if the integer is too large
self._raw = True
self.variant_type = "ay"
def emit_io_change(self):
self.PropertiesChanged(
self.interface_name,
{
"value": Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.value,
),
},
[],
)
def SetByteorder(self, order: str) -> None:
self.byteorder = order
def SetSigned(self, signed: bool) -> None:
self.signed = signed
@property
def address(self) -> int:
return self.io.address
@property
def bmk(self) -> str:
return self.io.bmk
@property
def bitaddress(self) -> int:
return self.io._bitaddress
@property
def byteorder(self) -> str:
return self.io.byteorder
@byteorder.setter
def byteorder(self, value: str) -> None:
if hasattr(self.io, "_set_byteorder"):
self.io._set_byteorder(value)
self.variant_type = get_variant_type(self.io)
# Changing the byteorder can change the value, but we do NOT send a signal for that
# because the real value of the process image was not changed. But we inform the client
# about the changed byteorder property.
self.PropertiesChanged(
self.interface_name,
{},
["byteorder"],
)
@property
def defaultvalue(self) -> Variant:
return Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.defaultvalue,
)
@property
def length(self) -> int:
# 0 length for boolean
return 0 if self.variant_type == "b" else self.io.length
@property
def name(self) -> str:
return self.io.name
@property
def signed(self) -> bool:
if hasattr(self.io, "signed"):
return self.io.signed
return False
@signed.setter
def signed(self, value: bool) -> None:
if hasattr(self.io, "_set_signed"):
self.io._set_signed(value)
self.variant_type = get_variant_type(self.io)
# Changing the signedness can change the value, but we do NOT send a signal for that
# because the real value of the process image was not changed. But we inform the client
# about the changed signedness property.
self.PropertiesChanged(
self.interface_name,
{},
["signed"],
)
@property
def value(self) -> Variant:
if not self.io._parentdevice._selfupdate:
self.io._parentdevice.readprocimg()
return Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.value,
)
class InterfaceOutput(InterfaceInput):
"""
<node>
<interface name="com.revolutionpi.ios1.Output">
<method name="SetByteorder">
<arg name="order" type="s" direction="in"/>
</method>
<method name="SetSigned">
<arg name="signed" type="b" direction="in"/>
</method>
<method name="SetValue">
<arg name="value" type="v" direction="in"/>
</method>
<property name="address" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bitaddress" type="n" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="byteorder" type="s" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="defaultvalue" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="length" type="q" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="signed" type="b" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="value" type="v" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Output"
def SetValue(self, value: Variant) -> None:
self.value = value
@property
def value(self) -> Variant:
return super().value
@value.setter
def value(self, value: Variant) -> None:
if self._raw:
self.io.set_value(value)
else:
self.io.value = value
if not self.io._parentdevice._selfupdate:
self.io._parentdevice.writeprocimg()
class InterfaceIoManager:
"""
<node>
<interface name="com.revolutionpi.ios1.IoManager">
<method name="GetAllInputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetAllOutputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="ActivateIoSignals">
</method>
<method name="DeactivateIoSignals">
</method>
<signal name="IoChanged">
<arg name="name" type="s" direction="out"/>
<arg name="value" type="v" direction="out"/>
</signal>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.IoManager"
IoChanged = signal()
def __init__(
self,
io_interfaces: List[Union[InterfaceInput, InterfaceOutput]],
modio: RevPiModIO,
):
self._dc_io_interfaces = {interface.name: interface for interface in io_interfaces}
self.modio = modio
self.lst_inp_object_path = []
self.lst_out_object_path = []
for interface in io_interfaces:
if interface.io.type == INP:
self.lst_inp_object_path.append(interface.object_path)
elif interface.io.type == OUT:
self.lst_out_object_path.append(interface.object_path)
def _modio_cycle(self, ct: Cycletools) -> None:
for io_name in self._dc_io_interfaces:
interface = self._dc_io_interfaces[io_name]
if ct.changed(interface.io):
interface.emit_io_change()
self.IoChanged(
interface.io.name,
Variant(interface.variant_type, interface.io.value),
)
def GetAllInputs(self) -> list[str]:
return self.lst_inp_object_path
def GetAllOutputs(self) -> list[str]:
return self.lst_out_object_path
def GetByName(self, io_name) -> str:
if io_name in self._dc_io_interfaces:
return self._dc_io_interfaces[io_name].object_path
raise KeyError(f"No IO with name '{io_name}' found.")
def ActivateIoSignals(self) -> None:
if not self.modio._looprunning:
self.modio.autorefresh_all()
self.modio.cycleloop(self._modio_cycle, cycletime=50, blocking=False)
def DeactivateIoSignals(self) -> None:
self.modio.exit(False)

View File

@@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for io read and write."""
from logging import getLogger
from revpimodio2.device import Device
from revpimodio2.io import IOBase
log = getLogger(__name__)
REVPI_DBUS_NAME = "com.revolutionpi.ios1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/ios1"
def get_io_object_path(io: IOBase) -> str:
return f"{REVPI_DBUS_BASE_PATH}/io/{io.name}"
def get_device_object_path(device: Device) -> str:
return f"{REVPI_DBUS_BASE_PATH}/device/{device.position}"
def get_variant_type(io: IOBase) -> str:
value_type = type(io.value)
byte_length = io.length
signed = io._signed
if value_type is bool:
return "b"
if value_type is float:
return "d"
if value_type is int:
if byte_length <= 2:
return "n" if signed else "q"
if byte_length <= 4:
return "i" if signed else "u"
if byte_length <= 8:
return "x" if signed else "t"
raise ValueError(f"Unsupported byte length: {byte_length}")
if value_type is str:
return "s"
if value_type is bytes:
return "ay"
raise TypeError(f"Unsupported type: {value_type}")

View File

@@ -2,8 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus middleware version 1 of revpi_middleware.""" """D-Bus middleware version 1 of revpi_middleware."""
from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .dbus_helper import extend_interface from .dbus_helper import extend_interface
from .bus_provider_middleware1 import BusProviderMiddleware1 from .bus_provider import BusProvider

View File

@@ -2,41 +2,38 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus bus provider for revpi_middleware.""" """D-Bus bus provider for revpi_middleware."""
from logging import getLogger from logging import getLogger
from threading import Thread, Event from threading import Thread
from gi.repository import GLib from gi.repository import GLib
from pydbus import connect from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices from .system_config import InterfaceRevpiConfig
log = getLogger(__name__) log = getLogger(__name__)
class BusProviderMiddleware1(Thread): class BusProvider(Thread):
def __init__( def __init__(
self, self,
dbus_address: str,
picontrol_device="/dev/piControl0", picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc", config_rsc="/etc/revpi/config.rsc",
use_system_bus=True,
): ):
log.debug("enter BusProviderMiddleware1.__init__") log.debug("enter BusProvider.__init__")
super().__init__() super().__init__()
self._bus_address = dbus_address self._bus = SystemBus() if use_system_bus else SessionBus()
self._loop = GLib.MainLoop() self._loop = GLib.MainLoop()
self.picontrol_device = picontrol_device self.picontrol_device = picontrol_device
self.published = Event()
self.config_rsc = config_rsc self.config_rsc = config_rsc
def run(self): def run(self):
log.debug("enter BusProviderMiddleware1.run") log.debug("enter BusProvider.run")
bus = connect(self._bus_address)
# The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object # The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object
# Example(), # Example(),
@@ -44,18 +41,15 @@ class BusProviderMiddleware1(Thread):
# ("Subdir2", Example()), # ("Subdir2", Example()),
# ("Subdir2/Whatever", Example()) # ("Subdir2/Whatever", Example())
lst_interfaces = [ lst_interfaces = [
InterfacePiControl(bus, self.picontrol_device, self.config_rsc), InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(bus), InterfaceRevpiConfig(),
InterfaceSoftwareServices(bus),
] ]
try: try:
bus.publish( self._bus.publish(
REVPI_DBUS_NAME, REVPI_DBUS_NAME,
*lst_interfaces, *lst_interfaces,
) )
self.published.set()
log.info(f"published {REVPI_DBUS_NAME} on {self._bus_address}")
except Exception as e: except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
@@ -64,25 +58,18 @@ class BusProviderMiddleware1(Thread):
except Exception as e: except Exception as e:
log.error(f"can not run dbus mainloop: {e}") log.error(f"can not run dbus mainloop: {e}")
bus.con.close()
log.info(f"closed {REVPI_DBUS_NAME} connection to {self._bus_address}")
# Clean up all interfaces # Clean up all interfaces
for interface in lst_interfaces: for interface in lst_interfaces:
if type(interface) is tuple: if type(interface) is tuple:
_, interface = interface _, interface = interface
interface.cleanup() interface.cleanup()
log.debug("leave BusProviderMiddleware1.run") log.debug("leave BusProvider.run")
def stop(self): def stop(self):
log.debug("enter BusProviderMiddleware1.stop") log.debug("enter BusProvider.stop")
self._loop.quit() self._loop.quit()
log.debug("leave BusProviderMiddleware1.stop") log.debug("leave BusProvider.stop")
@property
def name(self) -> str:
return REVPI_DBUS_NAME
@property @property
def running(self): def running(self):

View File

@@ -5,8 +5,6 @@
from logging import getLogger from logging import getLogger
from pydbus.bus import Bus
log = getLogger(__name__) log = getLogger(__name__)
REVPI_DBUS_NAME = "com.revolutionpi.middleware1" REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
@@ -15,9 +13,6 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface: class DbusInterface:
def __init__(self, bus: Bus):
self.bus = bus
def cleanup(self): def cleanup(self):
""" """
Represents a method responsible for performing cleanup operations. This method is executed to properly Represents a method responsible for performing cleanup operations. This method is executed to properly

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl driver.""" """D-Bus interfaces for piControl driver."""
from .interface_picontrol import InterfacePiControl from .interface_picontrol import InterfacePiControl

View File

@@ -2,10 +2,8 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl.""" """D-Bus interfaces for piControl."""
from logging import getLogger from logging import getLogger
from pydbus.bus import Bus
from pydbus.generic import signal from pydbus.generic import signal
from .process_image_helper import PiControlIoctl, ResetDriverWatchdog from .process_image_helper import PiControlIoctl, ResetDriverWatchdog
@@ -28,9 +26,7 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal() NotifyDriverReset = signal()
def __init__(self, bus: Bus, picontrol_device: str, config_rsc: str): def __init__(self, picontrol_device: str, config_rsc: str):
super().__init__(bus)
self.picontrol_device = picontrol_device self.picontrol_device = picontrol_device
self.config_rsc = config_rsc self.config_rsc = config_rsc

View File

@@ -7,7 +7,6 @@ Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
""" """
import os import os
from ctypes import c_int from ctypes import c_int
from fcntl import ioctl from fcntl import ioctl

View File

@@ -2,6 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration.""" """D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices

View File

@@ -2,20 +2,19 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration.""" """D-Bus interfaces for hardware configuration."""
from collections import namedtuple from collections import namedtuple
from logging import getLogger from logging import getLogger
from pydbus.generic import signal
from .revpi_config import ( from .revpi_config import (
ConfigActions, ConfigActions,
configure_avahi_daemon,
configure_bluetooth, configure_bluetooth,
configure_con_can, configure_con_can,
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna, configure_external_antenna,
configure_gui, configure_gui,
configure_wlan, configure_wlan,
simple_systemd,
) )
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -43,32 +42,19 @@ class InterfaceRevpiConfig(DbusInterface):
<arg name="available" type="b" direction="out"/> <arg name="available" type="b" direction="out"/>
</method> </method>
<property name="available_features" type="as" access="read"/> <property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface> </interface>
</node> </node>
""" """
AvailabilityChanged = signal()
StatusChanged = signal()
def Disable(self, feature: str) -> None: def Disable(self, feature: str) -> None:
"""Disable the feature.""" """Disable the feature."""
feature_function = get_feature(feature) feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args) feature_function.function(ConfigActions.DISABLE, *feature_function.args)
self.StatusChanged(feature, False)
def Enable(self, feature: str) -> None: def Enable(self, feature: str) -> None:
"""Enable the feature.""" """Enable the feature."""
feature_function = get_feature(feature) feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args) feature_function.function(ConfigActions.ENABLE, *feature_function.args)
self.StatusChanged(feature, True)
def GetStatus(self, feature: str) -> bool: def GetStatus(self, feature: str) -> bool:
"""Get feature status.""" """Get feature status."""
@@ -97,8 +83,18 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = { AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []), "gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []), "revpi-con-can": FeatureFunction(configure_con_can, []),
"swapfile": FeatureFunction(configure_dphys_swapfile, []), "dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []), "bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []), "wlan": FeatureFunction(configure_wlan, []),
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []), "external-antenna": FeatureFunction(configure_external_antenna, []),
} }

View File

@@ -1,207 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services."""
from logging import getLogger
from typing import List
from pydbus.bus import Bus
from pydbus.generic import signal
from ..dbus_helper import DbusInterface
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__)
class InterfaceSoftwareServices(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.SoftwareServices">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
<signal name="StatusChanged">
<arg name="feature" type="s"/>
<arg name="status" type="b"/>
</signal>
<signal name="AvailabilityChanged">
<arg name="feature" type="s"/>
<arg name="available" type="b"/>
</signal>
</interface>
</node>
"""
AvailabilityChanged = signal()
StatusChanged = signal()
def __init__(self, bus: Bus):
super().__init__(bus)
self.mrk_available = {}
self.mrk_status = {}
self.services = {
"pimodbus-master": ["pimodbus-master.service"],
"pimodbus-slave": ["pimodbus-slave.service"],
"ntp": ["systemd-timesyncd.service"],
"ssh": ["ssh.service"],
"nodered": ["nodered.service"],
"noderedrevpinodes-server": ["noderedrevpinodes-server.service"],
"revpipyload": ["revpipyload.service"],
"avahi": ["avahi-daemon.service", "avahi-daemon.socket"],
}
# Create a systemd manager interface object
systemd = self.bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
# Load all unit paths and subscribe to properties changed signal
self.object_paths = {}
for feature in self.services:
# Get the status and availability of the feature
self.mrk_available[feature] = self.GetAvailability(feature)
self.mrk_status[feature] = self.GetStatus(feature)
# Subscribe to properties changed signal for each unit
for unit_name in self.services[feature]:
unit_path = systemd_manager.LoadUnit(unit_name)
self.object_paths[unit_path] = feature
self.bus.subscribe(
iface="org.freedesktop.DBus.Properties",
signal="PropertiesChanged",
object=unit_path,
signal_fired=self._callback_properties_changed,
)
# Subscribe to the reloading signal to update the availability of the feature
self.bus.subscribe(
iface="org.freedesktop.systemd1.Manager",
signal="Reloading",
object="/org/freedesktop/systemd1",
signal_fired=self._callback_reloading_signal,
)
def _callback_reloading_signal(self, sender, object_path, interface, signal, parameters):
"""
Handles the signal emitted for reloading and checks for changes in availability
and status for a set of services. If changes are identified, corresponding
update methods are triggered to reflect the new states.
Args:
sender: The entity sending the signal.
object_path: Path to the object emitting the signal.
interface: Interface through which the signal is sent.
signal: The signal being received.
parameters: A list of parameters associated with the signal.
Raises:
None
"""
if parameters[0]:
return
for feature in self.services:
availability = self.GetAvailability(feature)
if self.mrk_available[feature] != availability:
self.mrk_available[feature] = availability
self.AvailabilityChanged(feature, availability)
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _callback_properties_changed(self, sender, object_path, interface, signal, parameters):
"""
Handles the 'PropertiesChanged' signal callback by updating internal status tracking for given
features and invoking status change notifications if necessary.
Args:
sender (Any): Information about the signal sender.
object_path (str): The path of the object emitting the signal.
interface (str): The interface where the signal was emitted.
signal (str): The name of the emitted signal.
parameters (tuple): Signal parameters containing interface name, changed properties, and
invalidated properties.
Raises:
TypeError: If the 'parameters' argument does not unpack to three expected values.
"""
interface, changed_properties, invalidated_properties = parameters
if "ActiveState" not in changed_properties:
return
feature = self.object_paths[object_path]
status = self.GetStatus(feature)
if self.mrk_status[feature] != status:
self.mrk_status[feature] = status
self.StatusChanged(feature, status)
def _get_unit_names(self, feature: str) -> List[str]:
if feature not in self.services:
raise ValueError(f"feature {feature} does not exist")
return self.services[feature]
def Disable(self, feature: str) -> None:
"""Disable the feature."""
action = ServiceActions.DISABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
action = ServiceActions.ENABLE
unit_names = self._get_unit_names(feature)
for unit_name in unit_names:
simple_systemd(action, unit_name)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
unit_names = self._get_unit_names(feature)
rc_status = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.STATUS, unit_name):
rc_status = False
break
return rc_status
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
unit_names = self._get_unit_names(feature)
rc_available = True
for unit_name in unit_names:
if not simple_systemd(ServiceActions.AVAILABLE, unit_name):
rc_available = False
break
return rc_available
@property
def available_features(self) -> list[str]:
return list(self.services.keys())

View File

@@ -15,7 +15,6 @@ from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
from ..dbus_helper import grep from ..dbus_helper import grep
from ..systemd_helper import simple_systemd, ServiceActions
log = getLogger(__name__) log = getLogger(__name__)
@@ -42,7 +41,6 @@ class ComputeModuleTypes(IntEnum):
CM4S (int): Represents a Compute Module 4S. CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5. CM5 (int): Represents a Compute Module 5.
""" """
UNKNOWN = 0 UNKNOWN = 0
CM1 = 6 CM1 = 6
CM3 = 10 CM3 = 10
@@ -59,7 +57,6 @@ class ConfigActions(Enum):
actions. It can be used to ensure consistency when working with or defining actions. It can be used to ensure consistency when working with or defining
such actions in a system. such actions in a system.
""" """
ENABLE = "enable" ENABLE = "enable"
DISABLE = "disable" DISABLE = "disable"
STATUS = "status" STATUS = "status"
@@ -234,7 +231,6 @@ class ConfigTxt:
_config_txt_lines (list[str]): Contains all lines of the configuration _config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line. file as a list of strings, where each string represents a line.
""" """
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self): def __init__(self):
@@ -435,6 +431,18 @@ class ConfigTxt:
return self._config_txt_path return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_bluetooth(action: ConfigActions): def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0") hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device) bt_rfkill_index = get_rfkill_index(hci_device)
@@ -500,17 +508,7 @@ def configure_con_can(action: ConfigActions):
def configure_dphys_swapfile(action: ConfigActions): def configure_dphys_swapfile(action: ConfigActions):
# Translate config action to systemd action return_value = simple_systemd(action, "dphys-swapfile.service")
if action is ConfigActions.ENABLE:
systemd_action = ServiceActions.ENABLE
elif action is ConfigActions.DISABLE:
systemd_action = ServiceActions.DISABLE
elif action is ConfigActions.STATUS:
systemd_action = ServiceActions.STATUS
else:
systemd_action = ServiceActions.AVAILABLE
return_value = simple_systemd(systemd_action, "dphys-swapfile.service")
# Post actions for dphys-swapfile # Post actions for dphys-swapfile
if action is ConfigActions.DISABLE: if action is ConfigActions.DISABLE:
@@ -556,11 +554,7 @@ def configure_gui(action: ConfigActions):
return gui_available return gui_available
bus = SystemBus() bus = SystemBus()
systemd = bus.get( systemd_manager = bus.get(".systemd1")
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True) systemd_manager.SetDefaultTarget("graphical.target", True)
@@ -635,6 +629,66 @@ def get_rfkill_index(device_class_path: str) -> Optional[int]:
return None return None
def simple_systemd(action: ConfigActions, unit: str):
"""
Performs specified actions on systemd units.
This function allows interaction with systemd units for various operations
such as enabling, disabling, checking the status, and verifying availability.
It communicates with the systemd manager via the SystemBus and handles units
based on the action specified.
Parameters:
action (ConfigActions): Specifies the action to be performed on the
systemd unit. Supported actions include ENABLE,
DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit on which the action is to be
performed.
Returns:
bool: For STATUS and AVAILABLE actions, returns True if the corresponding
criteria are met (e.g., enabled and active for STATUS, or not found
for AVAILABLE). Otherwise, returns False.
Raises:
ValueError: If the specified action is not supported.
"""
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__": if __name__ == "__main__":
rc = RevPiConfig() rc = RevPiConfig()
print("Model:", rc.model) print("Model:", rc.model)

View File

@@ -1,121 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from enum import Enum
from logging import getLogger
from threading import Thread
from typing import Optional
from pydbus import SystemBus
log = getLogger(__name__)
class ServiceActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
def simple_systemd(action: ServiceActions, unit: str, unmask: bool = False) -> Optional[bool]:
"""
Perform systemd service actions such as enable, disable, check status, or availability.
This function interacts with the systemd D-Bus API to manage and query the
state of services on a system. The supported actions include enabling a systemd
unit, disabling it, starting/stopping a unit, and checking its status or
availability. The function supports asynchronous configuration changes through
threads where applicable.
Parameters:
action (ServiceActions): The action to perform on the systemd service.
Supported actions are ENABLE, DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit to operate on (e.g., "example.service").
unmask (bool): When enabling a unit, if True, any masked unit file will
first be unmasked before proceeding with the operation. Defaults to False.
Returns:
Optional[bool]: The return value depends on the action. For STATUS or
AVAILABLE actions, it returns True if the unit satisfies the condition
(e.g., enabled and active, or available and loaded), False otherwise.
For other actions, it returns None.
"""
bus = SystemBus()
systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ServiceActions.ENABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
lst_change_unmask = []
if unmask:
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace")
elif action is ServiceActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace")
elif action is ServiceActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState in (
"active",
"activating",
)
elif action is ServiceActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
return None

View File

@@ -2,10 +2,9 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Main application of revpi-middleware daemon.""" """Main application of revpi-middleware daemon."""
from logging import getLogger from logging import getLogger
from .daemon import MiddlewareDaemon, BusProvider from .daemon import MiddlewareDaemon
from . import proginit as pi from . import proginit as pi
log = getLogger(__name__) log = getLogger(__name__)
@@ -27,19 +26,6 @@ pi.parser.add_argument(
default="/etc/{0}/{0}.conf".format(pi.programname), default="/etc/{0}/{0}.conf".format(pi.programname),
help="application configuration file", help="application configuration file",
) )
pi.parser.add_argument(
"--procimg",
dest="procimg",
default="/dev/piControl0",
help="Path to process image",
)
pi.parser.add_argument(
"bus_provider",
default="middleware",
nargs="?",
choices=["middleware", "ios"],
help="bus provider to use",
)
def main() -> int: def main() -> int:
@@ -48,7 +34,7 @@ def main() -> int:
# Parse command line arguments # Parse command line arguments
pi.init_app() pi.init_app()
root = MiddlewareDaemon(BusProvider(pi.pargs.bus_provider)) root = MiddlewareDaemon()
# Set signals # Set signals
signal.signal(signal.SIGHUP, lambda n, f: root.reload_config()) signal.signal(signal.SIGHUP, lambda n, f: root.reload_config())

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: 2018-2023 Sven Sager # SPDX-FileCopyrightText: 2018-2023 Sven Sager
# SPDX-License-Identifier: LGPL-2.0-or-later # SPDX-License-Identifier: LGPL-2.0-or-later
"""Global program initialization.""" """Global program initialization."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018-2023 Sven Sager" __copyright__ = "Copyright (C) 2018-2023 Sven Sager"
__license__ = "LGPL-2.0-or-later" __license__ = "LGPL-2.0-or-later"

View File

@@ -3,8 +3,6 @@
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
from time import sleep from time import sleep
from gi.repository.Gio import dbus_address_get_for_bus_sync, BusType
from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup
@@ -14,13 +12,12 @@ class TestBusProvider(PiControlDeviceMockup):
super().setUp() super().setUp()
# Do not import things on top of the module. Some classes or functions need to be mocked up first. # Do not import things on top of the module. Some classes or functions need to be mocked up first.
from revpi_middleware.dbus_middleware1 import BusProviderMiddleware1 from revpi_middleware.dbus_middleware1 import BusProvider
# Prepare the bus provider and start it # Prepare the bus provider and start it
bus_address = dbus_address_get_for_bus_sync(BusType.SESSION) self.bp = BusProvider(
self.bp = BusProviderMiddleware1( self.picontrol.name,
bus_address, use_system_bus=False,
picontrol_device=self.picontrol.name,
) )
self.bp.start() self.bp.start()