# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus interfaces for IOs.""" from typing import Union, List from pydbus import Variant from pydbus.bus import Bus from pydbus.generic import signal from revpimodio2 import RevPiModIO, Cycletools, INP, OUT from revpimodio2.io import IOBase from .ios1_helper import get_io_object_path, get_variant_type class InterfaceInput: """ """ interface_name = "com.revolutionpi.ios1.Input" PropertiesChanged = signal() def __init__(self, dbus: Bus, io: IOBase): self._raw = False self.dbus = dbus self.io = io self.object_path = get_io_object_path(io) try: self.variant_type = get_variant_type(self.io) except ValueError: # Fallback to bytes if the integer is too large self._raw = True self.variant_type = "ay" def emit_io_change(self): self.PropertiesChanged( self.interface_name, { "value": Variant( self.variant_type, self.io.get_value() if self._raw else self.io.value, ), }, [], ) def SetByteorder(self, order: str) -> None: self.byteorder = order def SetSigned(self, signed: bool) -> None: self.signed = signed @property def address(self) -> int: return self.io.address @property def bmk(self) -> str: return self.io.bmk @property def bitaddress(self) -> int: return self.io._bitaddress @property def byteorder(self) -> str: return self.io.byteorder @byteorder.setter def byteorder(self, value: str) -> None: if hasattr(self.io, "_set_byteorder"): self.io._set_byteorder(value) self.variant_type = get_variant_type(self.io) # Changing the byteorder can change the value, but we do NOT send a signal for that # because the real value of the process image was not changed. But we inform the client # about the changed byteorder property. self.PropertiesChanged( self.interface_name, {}, ["byteorder"], ) @property def defaultvalue(self) -> Variant: return Variant( self.variant_type, self.io.get_value() if self._raw else self.io.defaultvalue, ) @property def length(self) -> int: # 0 length for boolean return 0 if self.variant_type == "b" else self.io.length @property def name(self) -> str: return self.io.name @property def signed(self) -> bool: if hasattr(self.io, "signed"): return self.io.signed return False @signed.setter def signed(self, value: bool) -> None: if hasattr(self.io, "_set_signed"): self.io._set_signed(value) self.variant_type = get_variant_type(self.io) # Changing the signedness can change the value, but we do NOT send a signal for that # because the real value of the process image was not changed. But we inform the client # about the changed signedness property. self.PropertiesChanged( self.interface_name, {}, ["signed"], ) @property def value(self) -> Variant: if not self.io._parentdevice._selfupdate: self.io._parentdevice.readprocimg() return Variant( self.variant_type, self.io.get_value() if self._raw else self.io.value, ) class InterfaceOutput(InterfaceInput): """ """ interface_name = "com.revolutionpi.ios1.Output" def SetValue(self, value: Variant) -> None: self.value = value @property def value(self) -> Variant: return super().value @value.setter def value(self, value: Variant) -> None: if self._raw: self.io.set_value(value) else: self.io.value = value if not self.io._parentdevice._selfupdate: self.io._parentdevice.writeprocimg() class InterfaceIoManager: """ """ interface_name = "com.revolutionpi.ios1.IoManager" IoChanged = signal() def __init__( self, io_interfaces: List[Union[InterfaceInput, InterfaceOutput]], modio: RevPiModIO, ): self._dc_io_interfaces = {interface.name: interface for interface in io_interfaces} self.modio = modio self.lst_inp_object_path = [] self.lst_out_object_path = [] for interface in io_interfaces: if interface.io.type == INP: self.lst_inp_object_path.append(interface.object_path) elif interface.io.type == OUT: self.lst_out_object_path.append(interface.object_path) def _modio_cycle(self, ct: Cycletools) -> None: for io_name in self._dc_io_interfaces: interface = self._dc_io_interfaces[io_name] if ct.changed(interface.io): interface.emit_io_change() self.IoChanged( interface.io.name, Variant(interface.variant_type, interface.io.value), ) def GetAllInputs(self) -> list[str]: return self.lst_inp_object_path def GetAllOutputs(self) -> list[str]: return self.lst_out_object_path def GetByName(self, io_name) -> str: if io_name in self._dc_io_interfaces: return self._dc_io_interfaces[io_name].object_path raise KeyError(f"No IO with name '{io_name}' found.") def ActivateIoSignals(self) -> None: if not self.modio._looprunning: self.modio.autorefresh_all() self.modio.cycleloop(self._modio_cycle, cycletime=50, blocking=False) def DeactivateIoSignals(self) -> None: self.modio.exit(False)