diff --git a/src/revpi_middleware/cli_commands/cli_picontrol.py b/src/revpi_middleware/cli_commands/cli_picontrol.py index 8fdaf67..9d76e26 100644 --- a/src/revpi_middleware/cli_commands/cli_picontrol.py +++ b/src/revpi_middleware/cli_commands/cli_picontrol.py @@ -4,7 +4,7 @@ from argparse import ArgumentParser from logging import getLogger -from .dbus_helper import await_signal, simple_call +from .dbus_helper import BusType, await_signal, simple_call from .. import proginit as pi from ..dbus_middleware1 import extend_interface @@ -36,12 +36,20 @@ def add_subparsers(parent_parser: ArgumentParser): def method_reset(): log.debug("D-Bus call of method ResetDriver") - simple_call("ResetDriver", interface=extend_interface("picontrol")) + simple_call( + "ResetDriver", + interface=extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) log.info("ResetDriver called via D-Bus") def method_await_reset(timout: int = 0): - detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol")) + detected_signal = await_signal( + "NotifyDriverReset", + timout, extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) if detected_signal: log.info("ResetDriver signal received") else: diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index af3ae48..e11e90e 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -1,47 +1,98 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus helper functions for cli commands.""" +from enum import Enum from threading import Thread from time import sleep from gi.repository import GLib from pydbus import SessionBus, SystemBus -from .. import proginit as pi from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_NAME -PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol" -RESET_DRIVER_METHOD = "ResetDriver" + +class BusType(Enum): + SESSION = "session" + SYSTEM = "system" -def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def simple_call( + method: str, + *args, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): """ - Executes a method on a specific D-Bus object interface within the RevPi system. This function - connects to the system bus, retrieves the desired interface and object path, and invokes - the specified method with provided arguments. + Performs a call to a D-Bus method on a specified interface and object. + + This function uses the D-Bus messaging system to dynamically call a method + of a specified interface, using the given object path and bus type. It + provides a way to interact with D-Bus interfaces, using either a system or + session bus, and returns the result of executing the specified method. Parameters: method: str - The name of the method to be invoked on the targeted interface. - *args: tuple - Positional arguments to be passed to the method being invoked. + The name of the method to invoke on the D-Bus interface. + *args: + Additional positional arguments to pass to the specified D-Bus method. interface: str - The name of the D-Bus interface providing the required functionality. - object_path: str, optional - The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH. + The name of the D-Bus interface containing the method. + object_path: + The path of the D-Bus object on which the interface is defined. Defaults + to REVPI_DBUS_BASE_PATH. + bus_type: BusType + Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM. Returns: - Any - The result of the method invocation on the targeted D-Bus interface. + The value returned by the D-Bus method. + + Raises: + Any errors raised from the D-Bus call will propagate to the caller. """ - bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) -def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def await_signal( + signal_name: str, + timeout: int, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): + """ + Waits for a specific signal and returns whether the signal was detected. + + This function connects to a D-Bus interface and waits for a specific signal + to be emitted. If the signal is not received within the specified timeout + period, the function will return False. If the signal is detected within + the timeout, the function will return True. It can connect to either the + system bus or the session bus, depending on the provided `bus_type`. + + Parameters: + signal_name: str + The name of the signal to be awaited. + timeout: int + The maximum time to wait for the signal, in seconds. A value of 0 or + less means that there is no timeout. + interface: str + The name of the D-Bus interface to listen on. + object_path + The D-Bus object path where the interface resides. Defaults to + REVPI_DBUS_BASE_PATH. + bus_type + The type of D-Bus to connect to. Can be either BusType.SYSTEM or + BusType.SESSION. Defaults to BusType.SYSTEM. + + Returns: + bool + True if the signal was detected within the timeout period, False + otherwise. + """ detected_signal = False timeout = int(timeout) loop = GLib.MainLoop() @@ -56,7 +107,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV detected_signal = True loop.quit() - bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface]