# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus bus provider for revpi_middleware.""" from logging import getLogger from threading import Thread import revpimodio2 from gi.repository import GLib from pydbus import SessionBus, SystemBus from revpimodio2 import Cycletools from . import REVPI_DBUS_NAME from .interface_ios import ( InterfaceIoManager, InterfaceInpBool, InterfaceOutBool, InterfaceInpInt, InterfaceOutInt, ) log = getLogger(__name__) class BusProviderIo(Thread): def __init__( self, picontrol_device="/dev/piControl0", config_rsc="/etc/revpi/config.rsc", use_system_bus=True, ): log.debug("enter BusProviderIo.__init__") super().__init__() self._bus = SystemBus() if use_system_bus else SessionBus() self._dc_io_interfaces = {} self._loop = GLib.MainLoop() self._modio = revpimodio2.RevPiModIO( procimg=picontrol_device, configrsc=config_rsc, shared_procimg=True, ) self.picontrol_device = picontrol_device self.config_rsc = config_rsc def run(self): log.debug("enter BusProviderIo.run") self._dc_io_interfaces.clear() for io in self._modio.io: interface = None value_type = type(io.value) if value_type is bool: interface = ( InterfaceInpBool(self._bus, io) if io.type == revpimodio2.INP else InterfaceOutBool(self._bus, io) ) elif value_type is int: interface = ( InterfaceInpInt(self._bus, io) if io.type == revpimodio2.INP else InterfaceOutInt(self._bus, io) ) if interface is not None: self._dc_io_interfaces[io.name] = interface lst_interfaces = [ (f"io/{io_name}", self._dc_io_interfaces[io_name]) for io_name in self._dc_io_interfaces ] try: self._bus.publish( REVPI_DBUS_NAME, InterfaceIoManager(self._modio, self._dc_io_interfaces), *lst_interfaces, ) except Exception as e: log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") try: self._loop.run() except Exception as e: log.error(f"can not run dbus mainloop: {e}") self._modio.cleanup() log.debug("leave BusProviderIo.run") def stop(self): log.debug("enter BusProviderIo.stop") self._loop.quit() log.debug("leave BusProviderIo.stop") @property def running(self): return self._loop.is_running()