diff --git a/src/revpi_middleware/ios1/bus_provider_io.py b/src/revpi_middleware/ios1/bus_provider_io.py
index 8c5f185..46eb15f 100644
--- a/src/revpi_middleware/ios1/bus_provider_io.py
+++ b/src/revpi_middleware/ios1/bus_provider_io.py
@@ -17,10 +17,8 @@ from .interface_devices import (
)
from .interface_ios import (
InterfaceIoManager,
- InterfaceInpBool,
- InterfaceOutBool,
- InterfaceInpInt,
- InterfaceOutInt,
+ InterfaceInput,
+ InterfaceOutput,
)
log = getLogger(__name__)
@@ -56,19 +54,16 @@ class BusProviderIo(Thread):
for io in self._modio.io:
interface = None
- value_type = type(io.value)
- if value_type is bool:
- interface = (
- InterfaceInpBool(self._bus, io)
- if io.type == revpimodio2.INP
- else InterfaceOutBool(self._bus, io)
- )
- elif value_type is int:
- interface = (
- InterfaceInpInt(self._bus, io)
- if io.type == revpimodio2.INP
- else InterfaceOutInt(self._bus, io)
- )
+ try:
+ if io.type == revpimodio2.INP:
+ interface = InterfaceInput(self._bus, io)
+ elif io.type == revpimodio2.OUT:
+ interface = InterfaceOutput(self._bus, io)
+ elif io.type == revpimodio2.MEM:
+ # todo: Implement memory
+ pass
+ except Exception as e:
+ log.warning(f"can not create dbus interface for {io.name}: {e}")
if interface is not None:
self._dc_io_interfaces[io.name] = interface
diff --git a/src/revpi_middleware/ios1/interface_ios.py b/src/revpi_middleware/ios1/interface_ios.py
index 3a08937..21a28ea 100644
--- a/src/revpi_middleware/ios1/interface_ios.py
+++ b/src/revpi_middleware/ios1/interface_ios.py
@@ -2,13 +2,123 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for IOs."""
+from typing import Union
+from dbus import SystemBus, SessionBus
from gi.overrides.GLib import Variant
from pydbus.generic import signal
from revpimodio2 import RevPiModIO, Cycletools
-from revpimodio2.io import IntIO
+from revpimodio2.io import IOBase
-from .ios1_helper import DbusInterfaceIo, get_io_object_path
+from .ios1_helper import get_io_object_path, get_variant_type
+
+
+class InterfaceInput:
+ """
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+ interface_name = "com.revolutionpi.ios1.Input"
+ PropertiesChanged = signal()
+
+ def __init__(self, dbus: Union[SystemBus, SessionBus], io: IOBase):
+ self.dbus = dbus
+ self.io = io
+ self.variant_type = get_variant_type(self.io)
+
+ def emit_io_change(self):
+ if self.interface_name:
+ self.PropertiesChanged(
+ self.interface_name,
+ {"value": Variant(self.variant_type, self.io.value)},
+ [],
+ )
+
+ @property
+ def address(self) -> int:
+ return self.io.address
+
+ @property
+ def bmk(self) -> str:
+ return self.io.bmk
+
+ @property
+ def byteorder(self) -> str:
+ return self.io.byteorder
+
+ @byteorder.setter
+ def byteorder(self, value: str) -> None:
+ if hasattr(self.io, "_set_byteorder"):
+ self.io._set_byteorder(value)
+ self.variant_type = get_variant_type(self.io)
+
+ @property
+ def defaultvalue(self) -> Variant:
+ return Variant(self.variant_type, self.io.defaultvalue)
+
+ @property
+ def length(self) -> int:
+ # 0 length for boolean
+ return 0 if self.variant_type == "b" else self.io.length
+
+ @property
+ def name(self) -> str:
+ return self.io.name
+
+ @property
+ def signed(self) -> bool:
+ if hasattr(self.io, "signed"):
+ return self.io.signed
+ return False
+
+ @signed.setter
+ def signed(self, value: bool) -> None:
+ if hasattr(self.io, "_set_signed"):
+ self.io._set_signed(value)
+ self.variant_type = get_variant_type(self.io)
+
+ @property
+ def value(self) -> Variant:
+ return Variant(self.variant_type, self.io.value)
+
+
+class InterfaceOutput(InterfaceInput):
+ """
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+
+ interface_name = "com.revolutionpi.ios1.Output"
+
+ @property
+ def value(self) -> Variant:
+ return super().value
+
+ @value.setter
+ def value(self, value: Variant) -> None:
+ self.io.value = value
+ self.io._parentdevice._modio.writeprocimg()
class InterfaceIoManager:
@@ -40,7 +150,7 @@ class InterfaceIoManager:
interface_name = "com.revolutionpi.ios1.IoManager"
IoChanged = signal()
- def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, DbusInterfaceIo]):
+ def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, InterfaceInput]):
self._dc_io_interfaces = io_interfaces
self.modio = modio
@@ -82,135 +192,3 @@ class InterfaceIoManager:
def DeactivateIoEvents(self) -> None:
self.modio.exit(False)
-
-
-class InterfaceInpBool(DbusInterfaceIo):
- """
-
-
-
-
-
-
-
-
-
-
- """
-
- interface_name = "com.revolutionpi.ios1.InpBool"
-
- @property
- def value(self) -> bool:
- return self.io.value
-
-
-class InterfaceOutBool(InterfaceInpBool):
- """
-
-
-
-
-
-
-
-
-
-
- """
-
- interface_name = "com.revolutionpi.ios1.OutBool"
-
- @property
- def value(self) -> bool:
- return super().value
-
- @value.setter
- def value(self, value: bool) -> None:
- self.io.value = value
- self.io._parentdevice._modio.writeprocimg()
-
-
-class InterfaceInpInt(DbusInterfaceIo):
- """
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- """
-
- interface_name = "com.revolutionpi.ios1.InpInt"
- io = IntIO # type: IntIO
-
- @property
- def byteorder(self) -> str:
- return self.io.byteorder
-
- @byteorder.setter
- def byteorder(self, value: str) -> None:
- self.io.byteorder = value
-
- @property
- def min_value(self) -> int:
- if self.io.signed:
- return -(1 << (self.io.length * 8 - 1))
- return 0
-
- @property
- def max_value(self) -> int:
- bit_length = self.io.length * 8
- if self.io.signed:
- return (1 << (bit_length - 1)) - 1
- return (1 << bit_length) - 1
-
- @property
- def signed(self) -> bool:
- return self.io.signed
-
- @signed.setter
- def signed(self, value: bool) -> None:
- self.io.signed = value
-
- @property
- def value(self) -> int:
- return self.io.value
-
-
-class InterfaceOutInt(InterfaceInpInt):
- """
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- """
-
- interface_name = "com.revolutionpi.ios1.OutInt"
-
- @property
- def value(self) -> int:
- return super().value
-
- @value.setter
- def value(self, value: int):
- self.io.value = value
- self.io._parentdevice._modio.writeprocimg()
diff --git a/src/revpi_middleware/ios1/ios1_helper.py b/src/revpi_middleware/ios1/ios1_helper.py
index 8d7cb72..f0e0b09 100644
--- a/src/revpi_middleware/ios1/ios1_helper.py
+++ b/src/revpi_middleware/ios1/ios1_helper.py
@@ -5,8 +5,6 @@
from logging import getLogger
-from pydbus import SessionBus, SystemBus
-from pydbus.generic import signal
from revpimodio2.io import IOBase
log = getLogger(__name__)
@@ -15,47 +13,34 @@ REVPI_DBUS_NAME = "com.revolutionpi.ios1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/ios1"
-class DbusInterfaceIo:
- interface_name = ""
- PropertiesChanged = signal()
-
- def __init__(self, dbus: SystemBus or SessionBus, io: IOBase):
- self.dbus = dbus
- self.io = io
- self.variant_type = "b" if type(self.io.value) is bool else "i"
-
- def emit_io_change(self):
- if not self.interface_name:
- return
-
- if self.interface_name:
- print(type(self.io.value))
- self.PropertiesChanged(
- self.interface_name,
- {"value": int(self.io.value)},
- [],
- )
-
- @property
- def address(self):
- return self.io.address
-
- @property
- def bmk(self) -> str:
- return self.io.bmk
-
- @property
- def defaultvalue(self):
- return self.io.defaultvalue
-
- @property
- def length(self):
- return self.io.length
-
- @property
- def name(self) -> str:
- return self.io.name
-
-
def get_io_object_path(io_name: str) -> str:
return f"{REVPI_DBUS_BASE_PATH}/io/{io_name}"
+
+
+def get_variant_type(io: IOBase) -> str:
+ value_type = type(io.value)
+ byte_length = io.length
+ signed = io._signed
+
+ if value_type is bool:
+ return "b"
+
+ if value_type is float:
+ return "d"
+
+ if value_type is int:
+ if byte_length <= 2:
+ return "n" if signed else "q"
+ if byte_length <= 4:
+ return "i" if signed else "u"
+ if byte_length <= 8:
+ return "x" if signed else "t"
+ raise ValueError(f"Unsupported byte length: {byte_length}")
+
+ if value_type is str:
+ return "s"
+
+ if value_type is bytes:
+ return "ay"
+
+ raise TypeError(f"Unsupported type: {value_type}")