refactor(io): Simplify generation of io object path

Signed-off-by: Sven Sager <s.sager@kunbus.com>
This commit is contained in:
Sven Sager
2026-02-05 11:29:35 +01:00
parent 5528f6dff4
commit 3d64cd5837
2 changed files with 25 additions and 21 deletions

View File

@@ -36,8 +36,8 @@ class BusProviderIo(Thread):
super().__init__()
self._bus = SystemBus() if use_system_bus else SessionBus()
self._dc_io_interfaces = {}
self._loop = GLib.MainLoop()
self._lst_io_interfaces = []
self._modio = revpimodio2.RevPiModIO(
procimg=picontrol_device,
configrsc=config_rsc,
@@ -50,7 +50,7 @@ class BusProviderIo(Thread):
def run(self):
log.debug("enter BusProviderIo.run")
self._dc_io_interfaces.clear()
self._lst_io_interfaces.clear()
for io in self._modio.io:
interface = None
@@ -66,10 +66,10 @@ class BusProviderIo(Thread):
log.warning(f"can not create dbus interface for {io.name}: {e}")
if interface is not None:
self._dc_io_interfaces[io.name] = interface
self._lst_io_interfaces.append(interface)
lst_interfaces = [
(f"io/{io_name}", self._dc_io_interfaces[io_name]) for io_name in self._dc_io_interfaces
(interface.object_path, interface) for interface in self._lst_io_interfaces
]
lst_interfaces += [
(f"device/{device.position}", InterfaceDevice(self._bus, device))
@@ -79,7 +79,7 @@ class BusProviderIo(Thread):
self._bus.publish(
REVPI_DBUS_NAME,
InterfaceDeviceManager(self._modio),
InterfaceIoManager(self._modio, self._dc_io_interfaces),
InterfaceIoManager(self._modio, self._lst_io_interfaces),
*lst_interfaces,
)
except Exception as e:

View File

@@ -3,12 +3,12 @@
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
from typing import Union
from typing import Union, List
from dbus import SystemBus, SessionBus
from gi.overrides.GLib import Variant
from pydbus.generic import signal
from revpimodio2 import RevPiModIO, Cycletools
from revpimodio2 import RevPiModIO, Cycletools, INP, OUT
from revpimodio2.io import IOBase
from .ios1_helper import get_io_object_path, get_variant_type
@@ -44,6 +44,7 @@ class InterfaceInput:
self._raw = False
self.dbus = dbus
self.io = io
self.object_path = get_io_object_path(io.name)
try:
self.variant_type = get_variant_type(self.io)
except ValueError:
@@ -205,18 +206,21 @@ class InterfaceIoManager:
interface_name = "com.revolutionpi.ios1.IoManager"
IoChanged = signal()
def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, InterfaceInput]):
self._dc_io_interfaces = io_interfaces
def __init__(
self,
modio: RevPiModIO,
io_interfaces: List[Union[InterfaceInput, InterfaceOutput]],
):
self._dc_io_interfaces = {interface.name: interface for interface in io_interfaces}
self.modio = modio
self.lst_inp = []
for dev in self.modio.device:
for io in dev.get_inputs():
self.lst_inp.append(get_io_object_path(io.name))
self.lst_out = []
for dev in self.modio.device:
for io in dev.get_outputs():
self.lst_out.append(get_io_object_path(io.name))
self.lst_inp_object_path = []
self.lst_out_object_path = []
for interface in io_interfaces:
if interface.io.type == INP:
self.lst_inp_object_path.append(interface.object_path)
elif interface.io.type == OUT:
self.lst_out_object_path.append(interface.object_path)
def _modio_cycle(self, ct: Cycletools) -> None:
for io_name in self._dc_io_interfaces:
@@ -229,14 +233,14 @@ class InterfaceIoManager:
)
def GetAllInputs(self) -> list[str]:
return self.lst_inp
return self.lst_inp_object_path
def GetAllOutputs(self) -> list[str]:
return self.lst_out
return self.lst_out_object_path
def Get(self, io_name) -> str:
if io_name in self.modio.io:
return get_io_object_path(io_name)
if io_name in self._dc_io_interfaces:
return self._dc_io_interfaces[io_name].object_path
raise KeyError(f"No IO with name '{io_name}' found.")