# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus helper functions for cli commands.""" from enum import Enum from threading import Thread from time import sleep from gi.repository import GLib from pydbus import SessionBus, SystemBus from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_NAME class BusType(Enum): SESSION = "session" SYSTEM = "system" def simple_call( method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH, bus_type=BusType.SYSTEM, ): """ Performs a call to a D-Bus method on a specified interface and object. This function uses the D-Bus messaging system to dynamically call a method of a specified interface, using the given object path and bus type. It provides a way to interact with D-Bus interfaces, using either a system or session bus, and returns the result of executing the specified method. Parameters: method: str The name of the method to invoke on the D-Bus interface. *args: Additional positional arguments to pass to the specified D-Bus method. interface: str The name of the D-Bus interface containing the method. object_path: The path of the D-Bus object on which the interface is defined. Defaults to REVPI_DBUS_BASE_PATH. bus_type: BusType Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM. Returns: The value returned by the D-Bus method. Raises: Any errors raised from the D-Bus call will propagate to the caller. """ bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) def await_signal( signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH, bus_type=BusType.SYSTEM, ): """ Waits for a specific signal and returns whether the signal was detected. This function connects to a D-Bus interface and waits for a specific signal to be emitted. If the signal is not received within the specified timeout period, the function will return False. If the signal is detected within the timeout, the function will return True. It can connect to either the system bus or the session bus, depending on the provided `bus_type`. Parameters: signal_name: str The name of the signal to be awaited. timeout: int The maximum time to wait for the signal, in seconds. A value of 0 or less means that there is no timeout. interface: str The name of the D-Bus interface to listen on. object_path The D-Bus object path where the interface resides. Defaults to REVPI_DBUS_BASE_PATH. bus_type The type of D-Bus to connect to. Can be either BusType.SYSTEM or BusType.SESSION. Defaults to BusType.SYSTEM. Returns: bool True if the signal was detected within the timeout period, False otherwise. """ detected_signal = False timeout = int(timeout) loop = GLib.MainLoop() def th_timeout(): sleep(timeout) loop.quit() def signal_handler(*args, **kwargs): nonlocal detected_signal detected_signal = True loop.quit() bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] setattr(iface, f"on{signal_name}", signal_handler) if timeout > 0: th_sleep = Thread(target=th_timeout, daemon=True) th_sleep.start() try: loop.run() except KeyboardInterrupt: pass return detected_signal