feat(io): Simplify IO interface hierarchy and unify property handling

Replaced specific IO interface classes (`InterfaceInpBool`,
`InterfaceInpInt`, etc.) with generic `InterfaceInput` and
`InterfaceOutput` classes to simplify the hierarchy.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
This commit is contained in:
Sven Sager
2026-02-03 11:27:42 +01:00
parent bbbbd3e0e1
commit a2d1531e77
3 changed files with 154 additions and 196 deletions

View File

@@ -17,10 +17,8 @@ from .interface_devices import (
) )
from .interface_ios import ( from .interface_ios import (
InterfaceIoManager, InterfaceIoManager,
InterfaceInpBool, InterfaceInput,
InterfaceOutBool, InterfaceOutput,
InterfaceInpInt,
InterfaceOutInt,
) )
log = getLogger(__name__) log = getLogger(__name__)
@@ -56,19 +54,16 @@ class BusProviderIo(Thread):
for io in self._modio.io: for io in self._modio.io:
interface = None interface = None
value_type = type(io.value) try:
if value_type is bool: if io.type == revpimodio2.INP:
interface = ( interface = InterfaceInput(self._bus, io)
InterfaceInpBool(self._bus, io) elif io.type == revpimodio2.OUT:
if io.type == revpimodio2.INP interface = InterfaceOutput(self._bus, io)
else InterfaceOutBool(self._bus, io) elif io.type == revpimodio2.MEM:
) # todo: Implement memory
elif value_type is int: pass
interface = ( except Exception as e:
InterfaceInpInt(self._bus, io) log.warning(f"can not create dbus interface for {io.name}: {e}")
if io.type == revpimodio2.INP
else InterfaceOutInt(self._bus, io)
)
if interface is not None: if interface is not None:
self._dc_io_interfaces[io.name] = interface self._dc_io_interfaces[io.name] = interface

View File

@@ -2,13 +2,123 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs.""" """D-Bus interfaces for IOs."""
from typing import Union
from dbus import SystemBus, SessionBus
from gi.overrides.GLib import Variant from gi.overrides.GLib import Variant
from pydbus.generic import signal from pydbus.generic import signal
from revpimodio2 import RevPiModIO, Cycletools from revpimodio2 import RevPiModIO, Cycletools
from revpimodio2.io import IntIO from revpimodio2.io import IOBase
from .ios1_helper import DbusInterfaceIo, get_io_object_path from .ios1_helper import get_io_object_path, get_variant_type
class InterfaceInput:
"""
<node>
<interface name="com.revolutionpi.ios1.Input">
<property name="address" type="n" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="byteorder" type="s" access="readwrite"/>
<property name="defaultvalue" type="v" access="read"/>
<property name="length" type="q" access="read"/>
<property name="name" type="s" access="read"/>
<property name="signed" type="b" access="readwrite"/>
<property name="value" type="v" access="read"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Input"
PropertiesChanged = signal()
def __init__(self, dbus: Union[SystemBus, SessionBus], io: IOBase):
self.dbus = dbus
self.io = io
self.variant_type = get_variant_type(self.io)
def emit_io_change(self):
if self.interface_name:
self.PropertiesChanged(
self.interface_name,
{"value": Variant(self.variant_type, self.io.value)},
[],
)
@property
def address(self) -> int:
return self.io.address
@property
def bmk(self) -> str:
return self.io.bmk
@property
def byteorder(self) -> str:
return self.io.byteorder
@byteorder.setter
def byteorder(self, value: str) -> None:
if hasattr(self.io, "_set_byteorder"):
self.io._set_byteorder(value)
self.variant_type = get_variant_type(self.io)
@property
def defaultvalue(self) -> Variant:
return Variant(self.variant_type, self.io.defaultvalue)
@property
def length(self) -> int:
# 0 length for boolean
return 0 if self.variant_type == "b" else self.io.length
@property
def name(self) -> str:
return self.io.name
@property
def signed(self) -> bool:
if hasattr(self.io, "signed"):
return self.io.signed
return False
@signed.setter
def signed(self, value: bool) -> None:
if hasattr(self.io, "_set_signed"):
self.io._set_signed(value)
self.variant_type = get_variant_type(self.io)
@property
def value(self) -> Variant:
return Variant(self.variant_type, self.io.value)
class InterfaceOutput(InterfaceInput):
"""
<node>
<interface name="com.revolutionpi.ios1.Output">
<property name="address" type="n" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="byteorder" type="s" access="readwrite"/>
<property name="defaultvalue" type="v" access="read"/>
<property name="length" type="q" access="read"/>
<property name="name" type="s" access="read"/>
<property name="signed" type="b" access="readwrite"/>
<property name="value" type="v" access="readwrite"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.Output"
@property
def value(self) -> Variant:
return super().value
@value.setter
def value(self, value: Variant) -> None:
self.io.value = value
self.io._parentdevice._modio.writeprocimg()
class InterfaceIoManager: class InterfaceIoManager:
@@ -40,7 +150,7 @@ class InterfaceIoManager:
interface_name = "com.revolutionpi.ios1.IoManager" interface_name = "com.revolutionpi.ios1.IoManager"
IoChanged = signal() IoChanged = signal()
def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, DbusInterfaceIo]): def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, InterfaceInput]):
self._dc_io_interfaces = io_interfaces self._dc_io_interfaces = io_interfaces
self.modio = modio self.modio = modio
@@ -82,135 +192,3 @@ class InterfaceIoManager:
def DeactivateIoEvents(self) -> None: def DeactivateIoEvents(self) -> None:
self.modio.exit(False) self.modio.exit(False)
class InterfaceInpBool(DbusInterfaceIo):
"""
<node>
<interface name="com.revolutionpi.ios1.InpBool">
<property name="address" type="i" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="defaultvalue" type="b" access="read"/>
<property name="length" type="i" access="read"/>
<property name="name" type="s" access="read"/>
<property name="value" type="b" access="read"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.InpBool"
@property
def value(self) -> bool:
return self.io.value
class InterfaceOutBool(InterfaceInpBool):
"""
<node>
<interface name="com.revolutionpi.ios1.OutBool">
<property name="address" type="i" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="defaultvalue" type="b" access="read"/>
<property name="length" type="i" access="read"/>
<property name="name" type="s" access="read"/>
<property name="value" type="b" access="readwrite"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.OutBool"
@property
def value(self) -> bool:
return super().value
@value.setter
def value(self, value: bool) -> None:
self.io.value = value
self.io._parentdevice._modio.writeprocimg()
class InterfaceInpInt(DbusInterfaceIo):
"""
<node>
<interface name="com.revolutionpi.ios1.InpInt">
<property name="address" type="i" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="byteorder" type="s" access="readwrite"/>
<property name="defaultvalue" type="i" access="read"/>
<property name="length" type="i" access="read"/>
<property name="min_value" type="i" access="read"/>
<property name="max_value" type="i" access="read"/>
<property name="name" type="s" access="read"/>
<property name="signed" type="b" access="readwrite"/>
<property name="value" type="i" access="read"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.InpInt"
io = IntIO # type: IntIO
@property
def byteorder(self) -> str:
return self.io.byteorder
@byteorder.setter
def byteorder(self, value: str) -> None:
self.io.byteorder = value
@property
def min_value(self) -> int:
if self.io.signed:
return -(1 << (self.io.length * 8 - 1))
return 0
@property
def max_value(self) -> int:
bit_length = self.io.length * 8
if self.io.signed:
return (1 << (bit_length - 1)) - 1
return (1 << bit_length) - 1
@property
def signed(self) -> bool:
return self.io.signed
@signed.setter
def signed(self, value: bool) -> None:
self.io.signed = value
@property
def value(self) -> int:
return self.io.value
class InterfaceOutInt(InterfaceInpInt):
"""
<node>
<interface name="com.revolutionpi.ios1.OutInt">
<property name="address" type="i" access="read"/>
<property name="bmk" type="s" access="read"/>
<property name="byteorder" type="s" access="readwrite"/>
<property name="defaultvalue" type="i" access="read"/>
<property name="length" type="i" access="read"/>
<property name="min_value" type="i" access="read"/>
<property name="max_value" type="i" access="read"/>
<property name="name" type="s" access="read"/>
<property name="signed" type="b" access="readwrite"/>
<property name="value" type="i" access="readwrite"/>
</interface>
</node>
"""
interface_name = "com.revolutionpi.ios1.OutInt"
@property
def value(self) -> int:
return super().value
@value.setter
def value(self, value: int):
self.io.value = value
self.io._parentdevice._modio.writeprocimg()

View File

@@ -5,8 +5,6 @@
from logging import getLogger from logging import getLogger
from pydbus import SessionBus, SystemBus
from pydbus.generic import signal
from revpimodio2.io import IOBase from revpimodio2.io import IOBase
log = getLogger(__name__) log = getLogger(__name__)
@@ -15,47 +13,34 @@ REVPI_DBUS_NAME = "com.revolutionpi.ios1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/ios1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/ios1"
class DbusInterfaceIo:
interface_name = ""
PropertiesChanged = signal()
def __init__(self, dbus: SystemBus or SessionBus, io: IOBase):
self.dbus = dbus
self.io = io
self.variant_type = "b" if type(self.io.value) is bool else "i"
def emit_io_change(self):
if not self.interface_name:
return
if self.interface_name:
print(type(self.io.value))
self.PropertiesChanged(
self.interface_name,
{"value": int(self.io.value)},
[],
)
@property
def address(self):
return self.io.address
@property
def bmk(self) -> str:
return self.io.bmk
@property
def defaultvalue(self):
return self.io.defaultvalue
@property
def length(self):
return self.io.length
@property
def name(self) -> str:
return self.io.name
def get_io_object_path(io_name: str) -> str: def get_io_object_path(io_name: str) -> str:
return f"{REVPI_DBUS_BASE_PATH}/io/{io_name}" return f"{REVPI_DBUS_BASE_PATH}/io/{io_name}"
def get_variant_type(io: IOBase) -> str:
value_type = type(io.value)
byte_length = io.length
signed = io._signed
if value_type is bool:
return "b"
if value_type is float:
return "d"
if value_type is int:
if byte_length <= 2:
return "n" if signed else "q"
if byte_length <= 4:
return "i" if signed else "u"
if byte_length <= 8:
return "x" if signed else "t"
raise ValueError(f"Unsupported byte length: {byte_length}")
if value_type is str:
return "s"
if value_type is bytes:
return "ay"
raise TypeError(f"Unsupported type: {value_type}")