# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus interfaces for IOs.""" from typing import Union from dbus import SystemBus, SessionBus from gi.overrides.GLib import Variant from pydbus.generic import signal from revpimodio2 import RevPiModIO, Cycletools from revpimodio2.io import IOBase from .ios1_helper import get_io_object_path, get_variant_type class InterfaceInput: """ """ interface_name = "com.revolutionpi.ios1.Input" PropertiesChanged = signal() def __init__(self, dbus: Union[SystemBus, SessionBus], io: IOBase): self._raw = False self.dbus = dbus self.io = io try: self.variant_type = get_variant_type(self.io) except ValueError: # Fallback to bytes if the integer is too large self._raw = True self.variant_type = "ay" def emit_io_change(self): if self.interface_name: self.PropertiesChanged( self.interface_name, { "value": Variant( self.variant_type, self.io.get_value() if self._raw else self.io.value, ), }, [], ) @property def address(self) -> int: return self.io.address @property def bmk(self) -> str: return self.io.bmk @property def bitaddress(self) -> int: return self.io._bitaddress @property def byteorder(self) -> str: return self.io.byteorder @byteorder.setter def byteorder(self, value: str) -> None: if hasattr(self.io, "_set_byteorder"): self.io._set_byteorder(value) self.variant_type = get_variant_type(self.io) @property def defaultvalue(self) -> Variant: return Variant( self.variant_type, self.io.get_value() if self._raw else self.io.defaultvalue, ) @property def length(self) -> int: # 0 length for boolean return 0 if self.variant_type == "b" else self.io.length @property def name(self) -> str: return self.io.name @property def signed(self) -> bool: if hasattr(self.io, "signed"): return self.io.signed return False @signed.setter def signed(self, value: bool) -> None: if hasattr(self.io, "_set_signed"): self.io._set_signed(value) self.variant_type = get_variant_type(self.io) @property def value(self) -> Variant: return Variant( self.variant_type, self.io.get_value() if self._raw else self.io.value, ) class InterfaceOutput(InterfaceInput): """ """ interface_name = "com.revolutionpi.ios1.Output" @property def value(self) -> Variant: return super().value @value.setter def value(self, value: Variant) -> None: if self._raw: self.io.set_value(value) else: self.io.value = value self.io._parentdevice._modio.writeprocimg() class InterfaceIoManager: """ """ interface_name = "com.revolutionpi.ios1.IoManager" IoChanged = signal() def __init__(self, modio: RevPiModIO, io_interfaces: dict[str, InterfaceInput]): self._dc_io_interfaces = io_interfaces self.modio = modio self.lst_inp = [] for dev in self.modio.device: for io in dev.get_inputs(): self.lst_inp.append(get_io_object_path(io.name)) self.lst_out = [] for dev in self.modio.device: for io in dev.get_outputs(): self.lst_out.append(get_io_object_path(io.name)) def _modio_cycle(self, ct: Cycletools) -> None: for io_name in self._dc_io_interfaces: interface = self._dc_io_interfaces[io_name] if ct.changed(interface.io): interface.emit_io_change() self.IoChanged( interface.io.name, Variant(interface.variant_type, interface.io.value), ) def GetAllInputs(self) -> list[str]: return self.lst_inp def GetAllOutputs(self) -> list[str]: return self.lst_out def Get(self, io_name) -> str: if io_name in self.modio.io: return get_io_object_path(io_name) raise KeyError(f"No IO with name '{io_name}' found.") def ActivateIoEvents(self) -> None: if not self.modio._looprunning: self.modio.autorefresh_all() self.modio.cycleloop(self._modio_cycle, cycletime=50, blocking=False) def DeactivateIoEvents(self) -> None: self.modio.exit(False)