refactor(cli): D-Bus helpers support session and system bus types

Introduced `BusType` enum to differentiate between session and system
bus usage in D-Bus calls. Updated `simple_call` and `await_signal`
functions to include a `bus_type` parameter, improving flexibility.
Adjusted `cli_picontrol` to leverage the new `BusType` parameter for
D-Bus interactions.
This commit is contained in:
2025-04-19 12:24:37 +02:00
parent 487d5b3d46
commit 114cbd8099
2 changed files with 80 additions and 21 deletions

View File

@@ -4,7 +4,7 @@
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger
from .dbus_helper import await_signal, simple_call from .dbus_helper import BusType, await_signal, simple_call
from .. import proginit as pi from .. import proginit as pi
from ..dbus_middleware1 import extend_interface from ..dbus_middleware1 import extend_interface
@@ -36,12 +36,20 @@ def add_subparsers(parent_parser: ArgumentParser):
def method_reset(): def method_reset():
log.debug("D-Bus call of method ResetDriver") log.debug("D-Bus call of method ResetDriver")
simple_call("ResetDriver", interface=extend_interface("picontrol")) simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus") log.info("ResetDriver called via D-Bus")
def method_await_reset(timout: int = 0): def method_await_reset(timout: int = 0):
detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol")) detected_signal = await_signal(
"NotifyDriverReset",
timout, extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal: if detected_signal:
log.info("ResetDriver signal received") log.info("ResetDriver signal received")
else: else:

View File

@@ -1,47 +1,98 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands.""" """D-Bus helper functions for cli commands."""
from enum import Enum
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from gi.repository import GLib from gi.repository import GLib
from pydbus import SessionBus, SystemBus from pydbus import SessionBus, SystemBus
from .. import proginit as pi
from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH
from ..dbus_middleware1 import REVPI_DBUS_NAME from ..dbus_middleware1 import REVPI_DBUS_NAME
PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol"
RESET_DRIVER_METHOD = "ResetDriver" class BusType(Enum):
SESSION = "session"
SYSTEM = "system"
def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH): def simple_call(
method: str,
*args,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
""" """
Executes a method on a specific D-Bus object interface within the RevPi system. This function Performs a call to a D-Bus method on a specified interface and object.
connects to the system bus, retrieves the desired interface and object path, and invokes
the specified method with provided arguments. This function uses the D-Bus messaging system to dynamically call a method
of a specified interface, using the given object path and bus type. It
provides a way to interact with D-Bus interfaces, using either a system or
session bus, and returns the result of executing the specified method.
Parameters: Parameters:
method: str method: str
The name of the method to be invoked on the targeted interface. The name of the method to invoke on the D-Bus interface.
*args: tuple *args:
Positional arguments to be passed to the method being invoked. Additional positional arguments to pass to the specified D-Bus method.
interface: str interface: str
The name of the D-Bus interface providing the required functionality. The name of the D-Bus interface containing the method.
object_path: str, optional object_path:
The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH. The path of the D-Bus object on which the interface is defined. Defaults
to REVPI_DBUS_BASE_PATH.
bus_type: BusType
Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM.
Returns: Returns:
Any The value returned by the D-Bus method.
The result of the method invocation on the targeted D-Bus interface.
Raises:
Any errors raised from the D-Bus call will propagate to the caller.
""" """
bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path) revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface] iface = revpi[interface]
return getattr(iface, method)(*args) return getattr(iface, method)(*args)
def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): def await_signal(
signal_name: str,
timeout: int,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
"""
Waits for a specific signal and returns whether the signal was detected.
This function connects to a D-Bus interface and waits for a specific signal
to be emitted. If the signal is not received within the specified timeout
period, the function will return False. If the signal is detected within
the timeout, the function will return True. It can connect to either the
system bus or the session bus, depending on the provided `bus_type`.
Parameters:
signal_name: str
The name of the signal to be awaited.
timeout: int
The maximum time to wait for the signal, in seconds. A value of 0 or
less means that there is no timeout.
interface: str
The name of the D-Bus interface to listen on.
object_path
The D-Bus object path where the interface resides. Defaults to
REVPI_DBUS_BASE_PATH.
bus_type
The type of D-Bus to connect to. Can be either BusType.SYSTEM or
BusType.SESSION. Defaults to BusType.SYSTEM.
Returns:
bool
True if the signal was detected within the timeout period, False
otherwise.
"""
detected_signal = False detected_signal = False
timeout = int(timeout) timeout = int(timeout)
loop = GLib.MainLoop() loop = GLib.MainLoop()
@@ -56,7 +107,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV
detected_signal = True detected_signal = True
loop.quit() loop.quit()
bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path) revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface] iface = revpi[interface]