# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus bus provider for revpi_middleware.""" from logging import getLogger from threading import Thread from gi.repository import GLib from pydbus import SessionBus, SystemBus from . import REVPI_DBUS_NAME from .process_image import InterfacePiControl log = getLogger(__name__) class BusProvider(Thread): def __init__( self, picontrol_device="/dev/piControl0", config_rsc="/etc/revpi/config.rsc", use_system_bus=True, ): log.debug("enter BusProvider.__init__") super().__init__() self._bus = SystemBus() if use_system_bus else SessionBus() self._loop = GLib.MainLoop() self.picontrol_device = picontrol_device self.config_rsc = config_rsc def run(self): log.debug("enter BusProvider.run") try: self._bus.publish( REVPI_DBUS_NAME, InterfacePiControl(self.picontrol_device, self.config_rsc), ) except Exception as e: log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") try: self._loop.run() except Exception as e: log.error(f"can not run dbus mainloop: {e}") log.debug("leave BusProvider.run") def stop(self): log.debug("enter BusProvider.stop") self._loop.quit() log.debug("leave BusProvider.stop") @property def running(self): return self._loop.is_running()