refactor(io): Simplify InterfaceDeviceManager and device handling

Streamlined device management by introducing a dedicated `object_path`
property in `InterfaceDevice`. Refactored `InterfaceDeviceManager` to
use this property and accept a list of `InterfaceDevice` instances,
removing direct `RevPiModIO` dependencies. Updated D-Bus publishing to
reflect the new structure.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
This commit is contained in:
Sven Sager
2026-02-06 08:30:18 +01:00
parent 35dbed0798
commit 0e69ef432b
4 changed files with 65 additions and 56 deletions

View File

@@ -37,6 +37,7 @@ class BusProviderIo(Thread):
self._bus = SystemBus() if use_system_bus else SessionBus()
self._loop = GLib.MainLoop()
self._lst_device_interfaces = []
self._lst_io_interfaces = []
self._modio = revpimodio2.RevPiModIO(
procimg=picontrol_device,
@@ -50,8 +51,14 @@ class BusProviderIo(Thread):
def run(self):
log.debug("enter BusProviderIo.run")
self._lst_device_interfaces.clear()
self._lst_io_interfaces.clear()
for device in self._modio.device:
self._lst_device_interfaces.append(
InterfaceDevice(self._bus, device),
)
for io in self._modio.io:
interface = None
try:
@@ -68,18 +75,18 @@ class BusProviderIo(Thread):
if interface is not None:
self._lst_io_interfaces.append(interface)
lst_interfaces = [
(interface.object_path, interface) for interface in self._lst_io_interfaces
lst_interfaces = []
lst_interfaces += [
(interface.object_path, interface) for interface in self._lst_device_interfaces
]
lst_interfaces += [
(f"device/{device.position}", InterfaceDevice(self._bus, device))
for device in self._modio.device
(interface.object_path, interface) for interface in self._lst_io_interfaces
]
try:
self._bus.publish(
REVPI_DBUS_NAME,
InterfaceDeviceManager(self._modio),
InterfaceIoManager(self._modio, self._lst_io_interfaces),
InterfaceDeviceManager(self._lst_device_interfaces),
InterfaceIoManager(self._lst_io_interfaces, self._modio),
*lst_interfaces,
)
except Exception as e:

View File

@@ -7,57 +7,9 @@ from typing import Union
from dbus import SystemBus, SessionBus
from pydbus.generic import signal
from revpimodio2 import RevPiModIO
from revpimodio2.device import Device
from .ios1_helper import REVPI_DBUS_BASE_PATH, get_io_object_path
class InterfaceDeviceManager:
"""
<node>
<interface name="com.revolutionpi.ios1.DeviceManager">
<method name="GetAllDevices">
<arg type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="device-name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="GetByPosition">
<arg name="device-position" type="n" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.DeviceManager"
def __init__(self, modio: RevPiModIO):
self.modio = modio
self.lst_device = []
for dev in self.modio.device:
self.lst_device.append(self._get_device_path(dev.position))
def _get_device_path(self, position: int) -> str:
return f"{REVPI_DBUS_BASE_PATH}/device/{position}"
def GetAllDevices(self) -> list[str]:
return self.lst_device
def GetByName(self, device_name) -> str:
if device_name in self.modio.device:
return self._get_device_path(self.modio.device[device_name].position)
raise KeyError(f"No device with name '{device_name}' found.")
def GetByPosition(self, device_position) -> str:
if device_position in self.modio.device:
return self._get_device_path(device_position)
raise KeyError(f"No device on position '{device_position}' found.")
from .ios1_helper import get_io_object_path, get_device_object_path
class InterfaceDevice:
@@ -83,6 +35,7 @@ class InterfaceDevice:
def __init__(self, dbus: Union[SystemBus, SessionBus], device: Device):
self.dbus = dbus
self.device = device
self.object_path = get_device_object_path(device.position)
@property
def bmk(self) -> str:
@@ -119,3 +72,48 @@ class InterfaceDevice:
@property
def type(self):
return self.device.type
class InterfaceDeviceManager:
"""
<node>
<interface name="com.revolutionpi.ios1.DeviceManager">
<method name="GetAllDevices">
<arg type="ao" direction="out"/>
</method>
<method name="GetByName">
<arg name="device-name" type="s" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
<method name="GetByPosition">
<arg name="device-position" type="n" direction="in"/>
<arg name="object-path" type="o" direction="out"/>
</method>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.DeviceManager"
def __init__(
self,
device_interfaces: list[InterfaceDevice],
):
self._lst_device_interfaces = device_interfaces
def GetAllDevices(self) -> list[str]:
return [interface.object_path for interface in self._lst_device_interfaces]
def GetByName(self, device_name) -> str:
for interface in self._lst_device_interfaces:
if interface.device.name == device_name:
return interface.object_path
raise KeyError(f"No device with name '{device_name}' found.")
def GetByPosition(self, device_position) -> str:
for interface in self._lst_device_interfaces:
if interface.device.position == device_position:
return interface.object_path
raise KeyError(f"No device on position '{device_position}' found.")

View File

@@ -261,8 +261,8 @@ class InterfaceIoManager:
def __init__(
self,
modio: RevPiModIO,
io_interfaces: List[Union[InterfaceInput, InterfaceOutput]],
modio: RevPiModIO,
):
self._dc_io_interfaces = {interface.name: interface for interface in io_interfaces}
self.modio = modio

View File

@@ -17,6 +17,10 @@ def get_io_object_path(io_name: str) -> str:
return f"{REVPI_DBUS_BASE_PATH}/io/{io_name}"
def get_device_object_path(device_name: str) -> str:
return f"{REVPI_DBUS_BASE_PATH}/device/{device_name}"
def get_variant_type(io: IOBase) -> str:
value_type = type(io.value)
byte_length = io.length