2 Commits

Author SHA1 Message Date
Sven Sager
1172954ad7 feat(revpiconfig): Add CmdLineTxt class for managing cmdline.txt
Introduced the `CmdLineTxt` class to handle parsing, modifying, and
writing operations on the `cmdline.txt` file with thread safety.
Implemented methods for setting, removing keys, and ensuring safe file
operations.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 08:41:49 +02:00
Sven Sager
fedb0f8924 feat(revpiconfig): Add thread safety for config.txt file operations
Introduced a threading lock to safeguard read and write operations on
the `config.txt` file. This ensures thread-safe access and prevents
potential race conditions during concurrent execution.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 06:41:59 +02:00
34 changed files with 102 additions and 959 deletions

View File

@@ -7,8 +7,3 @@ SPDX-License-Identifier: GPL-2.0-or-later
# Middleware for Revolution Pi # Middleware for Revolution Pi
This middleware will support D-Bus as IPC interface. This middleware will support D-Bus as IPC interface.
## D-Bus
See [docs/dbus.md](docs/dbus.md) for an overview of the standard D-Bus data
types.

View File

@@ -1,19 +0,0 @@
<!-- /usr/share/dbus-1/system.d/com.revolutionpi.ios1.conf -->
<busconfig>
<!-- Allow full access to root as the bus owner -->
<policy user="root">
<allow own="com.revolutionpi.ios1"/>
<allow send_destination="com.revolutionpi.ios1"/>
<allow receive_sender="com.revolutionpi.ios1"/>
</policy>
<!-- System group picontrol -->
<policy group="picontrol">
<allow send_destination="com.revolutionpi.ios1"/>
</policy>
<!-- Standard-Policy -->
<policy context="default">
<deny send_destination="com.revolutionpi.ios1"/>
</policy>
</busconfig>

View File

@@ -1,8 +0,0 @@
# Additional options that are passed to revpi-ios.
# add '-f /var/log/revpi-ios.log' to write logs to own log file
# add '-v' or '-vv' for verbose logging
DAEMON_OPTS=""
# In addition to journalctl, use your own additional log file
# DAEMON_OPTS="-f /var/log/revpi-ios.log"

View File

@@ -1,14 +0,0 @@
/var/log/revpi-ios.log
{
rotate 6
weekly
maxsize 1M
compress
delaycompress
missingok
notifempty
sharedscripts
postrotate
systemctl kill --signal=SIGUSR1 revpi-ios > /dev/null 2>&1 || true
endscript
}

View File

@@ -1,65 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Switches 14 outputs on a DIO to the same value as the first input."""
from time import perf_counter
from gi.repository import GLib
from pydbus import SystemBus
detected_signal = False
timestamp = perf_counter()
# Get system bus
bus = SystemBus()
# Get IoManager interface on ios1 bus
iface_io_manager = bus.get(
"com.revolutionpi.ios1",
"/com/revolutionpi/ios1",
)["com.revolutionpi.ios1.IoManager"]
# Query object path of RevPiLED output
path_revpi_led = iface_io_manager.Get("RevPiLED")
# Get Output interface in the queried object path.
out_RevPiLED = bus.get("com.revolutionpi.ios1", path_revpi_led)["com.revolutionpi.ios1.Output"]
# Get all 14 outputs
lst_path_outputs = [iface_io_manager.Get(f"O_{i}") for i in range(1, 15)]
lst_out_outputs = [
bus.get("com.revolutionpi.ios1", path)["com.revolutionpi.ios1.Output"]
for path in lst_path_outputs
]
def signal_handler(io_name, io_value):
global timestamp
print(f"Signal received: {io_name} = {io_value}")
if io_name == "O_14":
print(f"Time since input detection: {perf_counter() - timestamp}")
elif io_name == "I_1":
timestamp = perf_counter()
out_RevPiLED.SetValue(GLib.Variant("i", io_value))
for output in lst_out_outputs:
output.SetValue(GLib.Variant("b", io_value))
# Start change detection to fire signals on dbus
iface_io_manager.ActivateIoSignals()
iface_io_manager.onIoChanged = signal_handler
try:
loop = GLib.MainLoop()
loop.run()
except KeyboardInterrupt:
pass
# Stop change detection
iface_io_manager.DeactivateIoSignals()

View File

@@ -1,12 +0,0 @@
[Unit]
Description=D-Bus interface for Inputs/Outputs of Revolution Pi
[Service]
EnvironmentFile=-/etc/default/revpi-ios
Type=notify
NotifyAccess=all
ExecStart=/usr/sbin/revpi-middleware ios $DAEMON_OPTS
ExecReload=/bin/kill -HUP $MAINPID
[Install]
WantedBy=multi-user.target

View File

@@ -1,11 +0,0 @@
[Unit]
Description=D-Bus interface for Inputs/Outputs of Revolution Pi
[Service]
EnvironmentFile=-/etc/default/revpi-ios
Type=notify-reload
NotifyAccess=all
ExecStart=/usr/sbin/revpi-middleware ios $DAEMON_OPTS
[Install]
WantedBy=multi-user.target

View File

@@ -1,51 +0,0 @@
<!--
SPDX-FileCopyrightText: 2026 KUNBUS GmbH <support@kunbus.com>
SPDX-License-Identifier: GPL-2.0-or-later
-->
# D-Bus
## Datentypen
Hier ist eine **Liste der standardisierten DBus-Datentypen** (Signaturen), wie
sie u. a. in Introspection/XML (`type="..."`) verwendet werden.
## Basis-Typen (Basic Types)
| Signatur | Bedeutung |
|----------|-------------------------------------|
| `y` | Byte (unsigned 8-bit) |
| `b` | Boolean |
| `n` | Int16 (signed) |
| `q` | UInt16 (unsigned) |
| `i` | Int32 (signed) |
| `u` | UInt32 (unsigned) |
| `x` | Int64 (signed) |
| `t` | UInt64 (unsigned) |
| `d` | Double (IEEE 754) |
| `s` | String (UTF8) |
| `o` | Object Path |
| `g` | Signature (Typ-Signatur als String) |
## Container-Typen (Compound/Container)
| Form | Bedeutung | Beispiel |
|------------|-------------------------------------------------|---------------------------------|
| `aX` | Array von `X` | `as` = Array von Strings |
| `(XYZ...)` | Struct/Tupel | `(is)` = (Int32, String) |
| `{KV}` | Dict-Entry (nur innerhalb eines Arrays erlaubt) | `a{sv}` = Dict String → Variant |
## Spezial-Typ
| Signatur | Bedeutung |
|----------|---------------------------------------------------|
| `v` | Variant (beliebiger Typ, zur Laufzeit festgelegt) |
## Häufig genutzte Kombis (Praxis)
- `as` → Liste von Strings
- `ao` → Liste von Object Paths
- `a{sv}` → „Properties“-Map (String → Variant), sehr verbreitet bei
`org.freedesktop.DBus.Properties`
- `a{ss}` → String → String Map

View File

@@ -2,8 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Metadata of package.""" """Metadata of package."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2025 KUNBUS GmbH" __copyright__ = "Copyright (C) 2025 KUNBUS GmbH"
__license__ = " GPL-2.0-or-later" __license__ = " GPL-2.0-or-later"
__version__ = "0.0.6" __version__ = "0.0.1"

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Package: revpi_middleware.""" """Package: revpi_middleware."""
from .__about__ import __author__, __copyright__, __license__, __version__ from .__about__ import __author__, __copyright__, __license__, __version__

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""CLI commands to control revpi_middleware.""" """CLI commands to control revpi_middleware."""
from ..__about__ import __author__, __copyright__, __license__, __version__ from ..__about__ import __author__, __copyright__, __license__, __version__

View File

@@ -5,7 +5,6 @@
This module provides the foundation for the RevPi middleware CLI commands This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup. and argument parsing setup.
""" """
from logging import getLogger from logging import getLogger
from . import cli_config, cli_picontrol from . import cli_config, cli_picontrol

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI.""" """Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI.""" """Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger

View File

@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands.""" """D-Bus helper functions for cli commands."""
from enum import Enum from enum import Enum
from threading import Thread from threading import Thread
from time import sleep from time import sleep

View File

@@ -3,43 +3,29 @@
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Main daemon of revpi-middleware.""" """Main daemon of revpi-middleware."""
from enum import Enum
from logging import getLogger from logging import getLogger
from os import environ
from threading import Event from threading import Event
from time import perf_counter from time import perf_counter
from gi.repository import Gio
from . import proginit as pi from . import proginit as pi
from .dbus_ios1 import BusProviderIos1 from .dbus_middleware1.bus_provider import BusProvider
from .dbus_middleware1 import BusProviderMiddleware1
from .dbus_middleware1.process_image.process_image_helper import ResetDriverWatchdog
log = getLogger(__name__) log = getLogger(__name__)
class BusProvider(Enum):
middleware = "middleware"
ios = "ios"
class MiddlewareDaemon: class MiddlewareDaemon:
"""Main program of MiddlewareDaemon class.""" """Main program of MiddlewareDaemon class."""
def __init__(self, bus_provider: BusProvider): def __init__(self):
"""Init MiddlewareDaemon class.""" """Init MiddlewareDaemon class."""
log.debug("enter MiddlewareDaemon.__init__") log.debug("enter MiddlewareDaemon.__init__")
self._cycle_time = 1.0 self._cycle_time = 1.0
self.do_cycle = Event() self.do_cycle = Event()
self._force_dbus_restart = False
self._reconfigure = False self._reconfigure = False
self._running = True self._running = True
self.bus_provider = None self.bus_provider = None
self.bus_provider_selected = bus_provider
self.wd_reset = ResetDriverWatchdog("")
self._configure() self._configure()
log.debug("leave MiddlewareDaemon.__init__") log.debug("leave MiddlewareDaemon.__init__")
@@ -49,56 +35,14 @@ class MiddlewareDaemon:
log.debug("enter MiddlewareDaemon._configure") log.debug("enter MiddlewareDaemon._configure")
pi.reload_conf() pi.reload_conf()
if self._force_dbus_restart:
self.dbus_stop()
self._force_dbus_restart = False
if pi.pargs.procimg != self.wd_reset.procimg:
self.dbus_stop()
self.wd_reset.stop()
self.wd_reset = ResetDriverWatchdog(pi.pargs.procimg)
self.wd_reset.register_call(self._reset_driver_callack)
log.debug("leave MiddlewareDaemon._configure") log.debug("leave MiddlewareDaemon._configure")
@staticmethod
def _get_bus_address() -> str:
if pi.pargs.use_session_bus:
environ_name = "DBUS_SESSION_BUS_ADDRESS"
bus_address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SESSION)
else:
environ_name = "DBUS_SYSTEM_BUS_ADDRESS"
bus_address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM)
return environ.get(environ_name, bus_address)
def _reset_driver_callack(self, *args) -> None:
log.debug("enter MiddlewareDaemon._reset_driver_callack")
if self.bus_provider_selected is BusProvider.ios:
self._force_dbus_restart = True
self.reload_config()
log.debug("leave MiddlewareDaemon._reset_driver_callack")
def dbus_start(self): def dbus_start(self):
log.debug("enter MiddlewareDaemon.dbus_start") log.debug("enter MiddlewareDaemon.dbus_start")
if self.bus_provider and self.bus_provider.is_alive():
return
dbus_running = self.bus_provider and self.bus_provider.is_alive() self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus)
if not dbus_running:
if self.bus_provider_selected is BusProvider.middleware:
self.bus_provider = BusProviderMiddleware1(
dbus_address=self._get_bus_address(),
picontrol_device=pi.pargs.procimg,
)
elif self.bus_provider_selected is BusProvider.ios:
self.bus_provider = BusProviderIos1(
dbus_address=self._get_bus_address(),
picontrol_device=pi.pargs.procimg,
)
else:
raise ValueError("Unknown bus provider")
self.bus_provider.start() self.bus_provider.start()
log.debug("leave MiddlewareDaemon.dbus_start") log.debug("leave MiddlewareDaemon.dbus_start")
@@ -110,7 +54,7 @@ class MiddlewareDaemon:
self.bus_provider.stop() self.bus_provider.stop()
self.bus_provider.join(timeout=10.0) self.bus_provider.join(timeout=10.0)
if self.bus_provider.is_alive(): if self.bus_provider.is_alive():
log.warning(f"dbus {self.bus_provider.name} thread is still alive") log.warning("dbus provider thread is still alive")
log.debug("leave MiddlewareDaemon.dbus_stop") log.debug("leave MiddlewareDaemon.dbus_stop")
@@ -141,6 +85,7 @@ class MiddlewareDaemon:
# Startup tasks # Startup tasks
self.dbus_start() self.dbus_start()
pi.startup_complete()
# Go into mainloop of daemon # Go into mainloop of daemon
while self._running: while self._running:
ot = perf_counter() ot = perf_counter()
@@ -150,13 +95,6 @@ class MiddlewareDaemon:
if self._reconfigure: if self._reconfigure:
self._configure() self._configure()
self._reconfigure = False self._reconfigure = False
# Monitor bus providers for errors and restart them
if not (self.bus_provider and self.bus_provider.is_alive()):
log.warning(f"dbus {self.bus_provider.name} thread is not alive - restarting")
self.dbus_start()
if self.bus_provider and self.bus_provider.published.is_set():
pi.startup_complete() pi.startup_complete()
# Cycle time calculation # Cycle time calculation

View File

@@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus ios version 1 of revpi_middleware."""
from .ios1_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .bus_provider_ios1 import BusProviderIos1

View File

@@ -1,121 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus bus provider for revpi_middleware."""
from logging import getLogger
from threading import Thread, Event
import revpimodio2
from gi.repository import GLib
from pydbus import connect
from . import REVPI_DBUS_NAME
from .interface_devices import (
InterfaceDeviceManager,
InterfaceDevice,
)
from .interface_ios import (
InterfaceIoManager,
InterfaceInput,
InterfaceOutput,
)
log = getLogger(__name__)
class BusProviderIos1(Thread):
def __init__(
self,
dbus_address: str,
picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc",
):
log.debug("enter BusProviderIos1.__init__")
super().__init__()
self._bus_address = dbus_address
self._loop = GLib.MainLoop()
self._lst_device_interfaces = []
self._lst_io_interfaces = []
self._modio = revpimodio2.RevPiModIO(
procimg=picontrol_device,
configrsc=config_rsc,
shared_procimg=True,
)
self.picontrol_device = picontrol_device
self.published = Event()
self.config_rsc = config_rsc
def run(self):
log.debug("enter BusProviderIos1.run")
bus = connect(self._bus_address)
self._lst_device_interfaces.clear()
self._lst_io_interfaces.clear()
for device in self._modio.device:
self._lst_device_interfaces.append(
InterfaceDevice(bus, device),
)
for io in self._modio.io:
interface = None
try:
if io.type == revpimodio2.INP:
interface = InterfaceInput(bus, io)
elif io.type == revpimodio2.OUT:
interface = InterfaceOutput(bus, io)
elif io.type == revpimodio2.MEM:
# todo: Implement memory
pass
except Exception as e:
log.warning(f"can not create dbus interface for {io.name}: {e}")
if interface is not None:
self._lst_io_interfaces.append(interface)
lst_interfaces = []
lst_interfaces += [
(interface.object_path, interface) for interface in self._lst_device_interfaces
]
lst_interfaces += [
(interface.object_path, interface) for interface in self._lst_io_interfaces
]
try:
bus.publish(
REVPI_DBUS_NAME,
InterfaceDeviceManager(self._lst_device_interfaces),
InterfaceIoManager(self._lst_io_interfaces, self._modio),
*lst_interfaces,
)
self.published.set()
log.info(f"published {REVPI_DBUS_NAME} on {self._bus_address}")
except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
try:
self._loop.run()
except Exception as e:
log.error(f"can not run dbus mainloop: {e}")
bus.con.close()
self._modio.cleanup()
log.info(f"closed {REVPI_DBUS_NAME} connection to {self._bus_address}")
log.debug("leave BusProviderIos1.run")
def stop(self):
log.debug("enter BusProviderIos1.stop")
self._loop.quit()
log.debug("leave BusProviderIos1.stop")
@property
def name(self) -> str:
return REVPI_DBUS_NAME
@property
def running(self):
return self._loop.is_running()

View File

@@ -1,133 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
from pydbus.bus import Bus
from pydbus.generic import signal
from revpimodio2.device import Device
from .ios1_helper import get_io_object_path, get_device_object_path
class InterfaceDevice:
"""
<node>
<interface name="com.revolutionpi.ios1.Device">
<method name="GetDeviceInputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetDeviceOutputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="catalognr" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="comment" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="id" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="position" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="type" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Device"
PropertiesChanged = signal()
def __init__(self, dbus: Bus, device: Device):
self.dbus = dbus
self.device = device
self.object_path = get_device_object_path(device)
def GetDeviceInputs(self) -> list[str]:
return [get_io_object_path(io) for io in self.device.get_inputs()]
def GetDeviceOutputs(self) -> list[str]:
return [get_io_object_path(io) for io in self.device.get_outputs()]
@property
def bmk(self) -> str:
return self.device.bmk
@property
def catalognr(self):
return self.device.catalognr
@property
def comment(self):
return self.device.comment
@property
def id(self):
return self.device.id
@property
def name(self) -> str:
return self.device.name
@property
def position(self) -> int:
return self.device.position
@property
def type(self):
return self.device.type
class InterfaceDeviceManager:
"""
<node>
<interface name="com.revolutionpi.ios1.DeviceManager">
<method name="GetAllDevices">
<arg type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="device-name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="GetByPosition">
<arg name="device-position" type="n" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.DeviceManager"
def __init__(
self,
device_interfaces: list[InterfaceDevice],
):
self._lst_device_interfaces = device_interfaces
def GetAllDevices(self) -> list[str]:
return [interface.object_path for interface in self._lst_device_interfaces]
def GetByName(self, device_name) -> str:
for interface in self._lst_device_interfaces:
if interface.device.name == device_name:
return interface.object_path
raise KeyError(f"No device with name '{device_name}' found.")
def GetByPosition(self, device_position) -> str:
for interface in self._lst_device_interfaces:
if interface.device.position == device_position:
return interface.object_path
raise KeyError(f"No device on position '{device_position}' found.")

View File

@@ -1,306 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
from typing import Union, List
from pydbus import Variant
from pydbus.bus import Bus
from pydbus.generic import signal
from revpimodio2 import RevPiModIO, Cycletools, INP, OUT
from revpimodio2.io import IOBase
from .ios1_helper import get_io_object_path, get_variant_type
class InterfaceInput:
"""
<node>
<interface name="com.revolutionpi.ios1.Input">
<method name="SetByteorder">
<arg name="order" type="s" direction="in"/>
</method>
<method name="SetSigned">
<arg name="signed" type="b" direction="in"/>
</method>
<property name="address" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bitaddress" type="n" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="byteorder" type="s" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="defaultvalue" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="length" type="q" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="signed" type="b" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="value" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Input"
PropertiesChanged = signal()
def __init__(self, dbus: Bus, io: IOBase):
self._raw = False
self.dbus = dbus
self.io = io
self.object_path = get_io_object_path(io)
try:
self.variant_type = get_variant_type(self.io)
except ValueError:
# Fallback to bytes if the integer is too large
self._raw = True
self.variant_type = "ay"
def emit_io_change(self):
self.PropertiesChanged(
self.interface_name,
{
"value": Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.value,
),
},
[],
)
def SetByteorder(self, order: str) -> None:
self.byteorder = order
def SetSigned(self, signed: bool) -> None:
self.signed = signed
@property
def address(self) -> int:
return self.io.address
@property
def bmk(self) -> str:
return self.io.bmk
@property
def bitaddress(self) -> int:
return self.io._bitaddress
@property
def byteorder(self) -> str:
return self.io.byteorder
@byteorder.setter
def byteorder(self, value: str) -> None:
if hasattr(self.io, "_set_byteorder"):
self.io._set_byteorder(value)
self.variant_type = get_variant_type(self.io)
# Changing the byteorder can change the value, but we do NOT send a signal for that
# because the real value of the process image was not changed. But we inform the client
# about the changed byteorder property.
self.PropertiesChanged(
self.interface_name,
{},
["byteorder"],
)
@property
def defaultvalue(self) -> Variant:
return Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.defaultvalue,
)
@property
def length(self) -> int:
# 0 length for boolean
return 0 if self.variant_type == "b" else self.io.length
@property
def name(self) -> str:
return self.io.name
@property
def signed(self) -> bool:
if hasattr(self.io, "signed"):
return self.io.signed
return False
@signed.setter
def signed(self, value: bool) -> None:
if hasattr(self.io, "_set_signed"):
self.io._set_signed(value)
self.variant_type = get_variant_type(self.io)
# Changing the signedness can change the value, but we do NOT send a signal for that
# because the real value of the process image was not changed. But we inform the client
# about the changed signedness property.
self.PropertiesChanged(
self.interface_name,
{},
["signed"],
)
@property
def value(self) -> Variant:
if not self.io._parentdevice._selfupdate:
self.io._parentdevice.readprocimg()
return Variant(
self.variant_type,
self.io.get_value() if self._raw else self.io.value,
)
class InterfaceOutput(InterfaceInput):
"""
<node>
<interface name="com.revolutionpi.ios1.Output">
<method name="SetByteorder">
<arg name="order" type="s" direction="in"/>
</method>
<method name="SetSigned">
<arg name="signed" type="b" direction="in"/>
</method>
<method name="SetValue">
<arg name="value" type="v" direction="in"/>
</method>
<property name="address" type="n" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bmk" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="bitaddress" type="n" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="byteorder" type="s" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="defaultvalue" type="v" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="length" type="q" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="name" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="signed" type="b" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="invalidates"/>
</property>
<property name="value" type="v" access="readwrite">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Output"
def SetValue(self, value: Variant) -> None:
self.value = value
@property
def value(self) -> Variant:
return super().value
@value.setter
def value(self, value: Variant) -> None:
if self._raw:
self.io.set_value(value)
else:
self.io.value = value
if not self.io._parentdevice._selfupdate:
self.io._parentdevice.writeprocimg()
class InterfaceIoManager:
"""
<node>
<interface name="com.revolutionpi.ios1.IoManager">
<method name="GetAllInputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetAllOutputs">
<arg name="object-path-list" type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="ActivateIoSignals">
</method>
<method name="DeactivateIoSignals">
</method>
<signal name="IoChanged">
<arg name="name" type="s" direction="out"/>
<arg name="value" type="v" direction="out"/>
</signal>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.IoManager"
IoChanged = signal()
def __init__(
self,
io_interfaces: List[Union[InterfaceInput, InterfaceOutput]],
modio: RevPiModIO,
):
self._dc_io_interfaces = {interface.name: interface for interface in io_interfaces}
self.modio = modio
self.lst_inp_object_path = []
self.lst_out_object_path = []
for interface in io_interfaces:
if interface.io.type == INP:
self.lst_inp_object_path.append(interface.object_path)
elif interface.io.type == OUT:
self.lst_out_object_path.append(interface.object_path)
def _modio_cycle(self, ct: Cycletools) -> None:
for io_name in self._dc_io_interfaces:
interface = self._dc_io_interfaces[io_name]
if ct.changed(interface.io):
interface.emit_io_change()
self.IoChanged(
interface.io.name,
Variant(interface.variant_type, interface.io.value),
)
def GetAllInputs(self) -> list[str]:
return self.lst_inp_object_path
def GetAllOutputs(self) -> list[str]:
return self.lst_out_object_path
def GetByName(self, io_name) -> str:
if io_name in self._dc_io_interfaces:
return self._dc_io_interfaces[io_name].object_path
raise KeyError(f"No IO with name '{io_name}' found.")
def ActivateIoSignals(self) -> None:
if not self.modio._looprunning:
self.modio.autorefresh_all()
self.modio.cycleloop(self._modio_cycle, cycletime=50, blocking=False)
def DeactivateIoSignals(self) -> None:
self.modio.exit(False)

View File

@@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for io read and write."""
from logging import getLogger
from revpimodio2.device import Device
from revpimodio2.io import IOBase
log = getLogger(__name__)
REVPI_DBUS_NAME = "com.revolutionpi.ios1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/ios1"
def get_io_object_path(io: IOBase) -> str:
return f"{REVPI_DBUS_BASE_PATH}/io/{io.name}"
def get_device_object_path(device: Device) -> str:
return f"{REVPI_DBUS_BASE_PATH}/device/{device.position}"
def get_variant_type(io: IOBase) -> str:
value_type = type(io.value)
byte_length = io.length
signed = io._signed
if value_type is bool:
return "b"
if value_type is float:
return "d"
if value_type is int:
if byte_length <= 2:
return "n" if signed else "q"
if byte_length <= 4:
return "i" if signed else "u"
if byte_length <= 8:
return "x" if signed else "t"
raise ValueError(f"Unsupported byte length: {byte_length}")
if value_type is str:
return "s"
if value_type is bytes:
return "ay"
raise TypeError(f"Unsupported type: {value_type}")

View File

@@ -2,8 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus middleware version 1 of revpi_middleware.""" """D-Bus middleware version 1 of revpi_middleware."""
from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .dbus_helper import extend_interface from .dbus_helper import extend_interface
from .bus_provider_middleware1 import BusProviderMiddleware1 from .bus_provider import BusProvider

View File

@@ -2,12 +2,11 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus bus provider for revpi_middleware.""" """D-Bus bus provider for revpi_middleware."""
from logging import getLogger from logging import getLogger
from threading import Thread, Event from threading import Thread
from gi.repository import GLib from gi.repository import GLib
from pydbus import connect from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
@@ -16,27 +15,25 @@ from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices
log = getLogger(__name__) log = getLogger(__name__)
class BusProviderMiddleware1(Thread): class BusProvider(Thread):
def __init__( def __init__(
self, self,
dbus_address: str,
picontrol_device="/dev/piControl0", picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc", config_rsc="/etc/revpi/config.rsc",
use_system_bus=True,
): ):
log.debug("enter BusProviderMiddleware1.__init__") log.debug("enter BusProvider.__init__")
super().__init__() super().__init__()
self._bus_address = dbus_address self._bus = SystemBus() if use_system_bus else SessionBus()
self._loop = GLib.MainLoop() self._loop = GLib.MainLoop()
self.picontrol_device = picontrol_device self.picontrol_device = picontrol_device
self.published = Event()
self.config_rsc = config_rsc self.config_rsc = config_rsc
def run(self): def run(self):
log.debug("enter BusProviderMiddleware1.run") log.debug("enter BusProvider.run")
bus = connect(self._bus_address)
# The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object # The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object
# Example(), # Example(),
@@ -44,18 +41,16 @@ class BusProviderMiddleware1(Thread):
# ("Subdir2", Example()), # ("Subdir2", Example()),
# ("Subdir2/Whatever", Example()) # ("Subdir2/Whatever", Example())
lst_interfaces = [ lst_interfaces = [
InterfacePiControl(bus, self.picontrol_device, self.config_rsc), InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(bus), InterfaceRevpiConfig(self._bus),
InterfaceSoftwareServices(bus), InterfaceSoftwareServices(self._bus),
] ]
try: try:
bus.publish( self._bus.publish(
REVPI_DBUS_NAME, REVPI_DBUS_NAME,
*lst_interfaces, *lst_interfaces,
) )
self.published.set()
log.info(f"published {REVPI_DBUS_NAME} on {self._bus_address}")
except Exception as e: except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
@@ -64,25 +59,18 @@ class BusProviderMiddleware1(Thread):
except Exception as e: except Exception as e:
log.error(f"can not run dbus mainloop: {e}") log.error(f"can not run dbus mainloop: {e}")
bus.con.close()
log.info(f"closed {REVPI_DBUS_NAME} connection to {self._bus_address}")
# Clean up all interfaces # Clean up all interfaces
for interface in lst_interfaces: for interface in lst_interfaces:
if type(interface) is tuple: if type(interface) is tuple:
_, interface = interface _, interface = interface
interface.cleanup() interface.cleanup()
log.debug("leave BusProviderMiddleware1.run") log.debug("leave BusProvider.run")
def stop(self): def stop(self):
log.debug("enter BusProviderMiddleware1.stop") log.debug("enter BusProvider.stop")
self._loop.quit() self._loop.quit()
log.debug("leave BusProviderMiddleware1.stop") log.debug("leave BusProvider.stop")
@property
def name(self) -> str:
return REVPI_DBUS_NAME
@property @property
def running(self): def running(self):

View File

@@ -2,10 +2,10 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus.""" """Helper for dbus."""
from logging import getLogger from logging import getLogger
from typing import Union
from pydbus.bus import Bus from pydbus import SessionBus, SystemBus
log = getLogger(__name__) log = getLogger(__name__)
@@ -15,7 +15,7 @@ REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface: class DbusInterface:
def __init__(self, bus: Bus): def __init__(self, bus: Union[SessionBus, SystemBus]):
self.bus = bus self.bus = bus
def cleanup(self): def cleanup(self):

View File

@@ -2,5 +2,4 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl driver.""" """D-Bus interfaces for piControl driver."""
from .interface_picontrol import InterfacePiControl from .interface_picontrol import InterfacePiControl

View File

@@ -2,10 +2,8 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl.""" """D-Bus interfaces for piControl."""
from logging import getLogger from logging import getLogger
from pydbus.bus import Bus
from pydbus.generic import signal from pydbus.generic import signal
from .process_image_helper import PiControlIoctl, ResetDriverWatchdog from .process_image_helper import PiControlIoctl, ResetDriverWatchdog
@@ -28,7 +26,7 @@ class InterfacePiControl(DbusInterface):
NotifyDriverReset = signal() NotifyDriverReset = signal()
def __init__(self, bus: Bus, picontrol_device: str, config_rsc: str): def __init__(self, bus, picontrol_device: str, config_rsc: str):
super().__init__(bus) super().__init__(bus)
self.picontrol_device = picontrol_device self.picontrol_device = picontrol_device

View File

@@ -7,7 +7,6 @@ Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
""" """
import os import os
from ctypes import c_int from ctypes import c_int
from fcntl import ioctl from fcntl import ioctl

View File

@@ -2,6 +2,5 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration.""" """D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices from .interface_services import InterfaceSoftwareServices

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration.""" """D-Bus interfaces for hardware configuration."""
from collections import namedtuple from collections import namedtuple
from logging import getLogger from logging import getLogger

View File

@@ -2,11 +2,9 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for software services.""" """D-Bus interfaces for software services."""
from logging import getLogger from logging import getLogger
from typing import List from typing import List
from pydbus.bus import Bus
from pydbus.generic import signal from pydbus.generic import signal
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -49,7 +47,7 @@ class InterfaceSoftwareServices(DbusInterface):
AvailabilityChanged = signal() AvailabilityChanged = signal()
StatusChanged = signal() StatusChanged = signal()
def __init__(self, bus: Bus): def __init__(self, bus):
super().__init__(bus) super().__init__(bus)
self.mrk_available = {} self.mrk_available = {}

View File

@@ -10,6 +10,7 @@ from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join from os.path import exists, join
from threading import Lock
from typing import List, Optional from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
@@ -24,6 +25,8 @@ ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211" LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
CMDLINE_TXT_LOCK = Lock()
CONFIG_TXT_LOCK = Lock()
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
@@ -219,6 +222,61 @@ class RevPiConfig:
return bool(self._wlan_class_path) return bool(self._wlan_class_path)
class CmdLineTxt:
# Value is optional, "?:=" non-capturing the "="
re_name_value = re.compile(r"(?P<key>[^\s=]+)(?:=(?P<value>\S+))?")
def __init__(self):
self._cmdline_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._cmdline_txt_path = path
break
if not self._cmdline_txt_path:
raise FileNotFoundError("no config.txt found")
def _get_cmdline_dict(self) -> dict:
with CMDLINE_TXT_LOCK:
with open(self._cmdline_txt_path, "r") as file:
cmdline = file.read()
return {
match.group("key"): match.group("value")
for match in self.re_name_value.finditer(cmdline)
}
def _write_cmdline_dict(self, cmdline_dict: dict) -> None:
with CMDLINE_TXT_LOCK:
tmp_path = f"{self._cmdline_txt_path}.tmp"
with open(tmp_path, "w") as file:
str_cmdline = ""
for key, value in cmdline_dict.items():
if value is None:
str_cmdline += f"{key} "
else:
str_cmdline += f"{key}={value} "
str_cmdline = str_cmdline.strip()
file.write(str_cmdline + "\n")
shutil.move(tmp_path, self._cmdline_txt_path)
def remove_key(self, key: str) -> None:
dc_cmdline = self._get_cmdline_dict()
if key in dc_cmdline:
del dc_cmdline[key]
self._write_cmdline_dict(dc_cmdline)
def set_key_value(self, key: str, value: Optional[str] = None) -> None:
dc_cmdline = self._get_cmdline_dict()
if key not in dc_cmdline or dc_cmdline.get(key, value) != value:
dc_cmdline[key] = value
self._write_cmdline_dict(dc_cmdline)
class ConfigTxt: class ConfigTxt:
""" """
Configuration file handler for managing 'config.txt'. Configuration file handler for managing 'config.txt'.
@@ -228,11 +286,6 @@ class ConfigTxt:
to manipulate specific parameters within the configuration and supports to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly. is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
""" """
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
@@ -318,6 +371,7 @@ class ConfigTxt:
Returns: Returns:
None None
""" """
with CONFIG_TXT_LOCK:
with open(self._config_txt_path, "r") as f: with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines() self._config_txt_lines = f.readlines()
@@ -333,6 +387,7 @@ class ConfigTxt:
if not self._config_txt_lines: if not self._config_txt_lines:
return return
with CONFIG_TXT_LOCK:
tmp_path = f"{self._config_txt_path}.tmp" tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f: with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines) f.writelines(self._config_txt_lines)

View File

@@ -2,10 +2,9 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""Main application of revpi-middleware daemon.""" """Main application of revpi-middleware daemon."""
from logging import getLogger from logging import getLogger
from .daemon import MiddlewareDaemon, BusProvider from .daemon import MiddlewareDaemon
from . import proginit as pi from . import proginit as pi
log = getLogger(__name__) log = getLogger(__name__)
@@ -27,19 +26,6 @@ pi.parser.add_argument(
default="/etc/{0}/{0}.conf".format(pi.programname), default="/etc/{0}/{0}.conf".format(pi.programname),
help="application configuration file", help="application configuration file",
) )
pi.parser.add_argument(
"--procimg",
dest="procimg",
default="/dev/piControl0",
help="Path to process image",
)
pi.parser.add_argument(
"bus_provider",
default="middleware",
nargs="?",
choices=["middleware", "ios"],
help="bus provider to use",
)
def main() -> int: def main() -> int:
@@ -48,7 +34,7 @@ def main() -> int:
# Parse command line arguments # Parse command line arguments
pi.init_app() pi.init_app()
root = MiddlewareDaemon(BusProvider(pi.pargs.bus_provider)) root = MiddlewareDaemon()
# Set signals # Set signals
signal.signal(signal.SIGHUP, lambda n, f: root.reload_config()) signal.signal(signal.SIGHUP, lambda n, f: root.reload_config())

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: 2018-2023 Sven Sager # SPDX-FileCopyrightText: 2018-2023 Sven Sager
# SPDX-License-Identifier: LGPL-2.0-or-later # SPDX-License-Identifier: LGPL-2.0-or-later
"""Global program initialization.""" """Global program initialization."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018-2023 Sven Sager" __copyright__ = "Copyright (C) 2018-2023 Sven Sager"
__license__ = "LGPL-2.0-or-later" __license__ = "LGPL-2.0-or-later"

View File

@@ -3,8 +3,6 @@
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
from time import sleep from time import sleep
from gi.repository.Gio import dbus_address_get_for_bus_sync, BusType
from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup
@@ -14,13 +12,12 @@ class TestBusProvider(PiControlDeviceMockup):
super().setUp() super().setUp()
# Do not import things on top of the module. Some classes or functions need to be mocked up first. # Do not import things on top of the module. Some classes or functions need to be mocked up first.
from revpi_middleware.dbus_middleware1 import BusProviderMiddleware1 from revpi_middleware.dbus_middleware1 import BusProvider
# Prepare the bus provider and start it # Prepare the bus provider and start it
bus_address = dbus_address_get_for_bus_sync(BusType.SESSION) self.bp = BusProvider(
self.bp = BusProviderMiddleware1( self.picontrol.name,
bus_address, use_system_bus=False,
picontrol_device=self.picontrol.name,
) )
self.bp.start() self.bp.start()