This function facilitates retrieving specific properties from a DBus interface, improving code modularity and reusability. It supports both system and session bus types, streamlining access to DBus resources.
137 lines
4.1 KiB
Python
137 lines
4.1 KiB
Python
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
"""D-Bus helper functions for cli commands."""
|
|
from enum import Enum
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
from gi.repository import GLib
|
|
from pydbus import SessionBus, SystemBus
|
|
|
|
from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH
|
|
from ..dbus_middleware1 import REVPI_DBUS_NAME
|
|
|
|
|
|
class BusType(Enum):
|
|
SESSION = "session"
|
|
SYSTEM = "system"
|
|
|
|
|
|
def get_properties(
|
|
property_name: str,
|
|
interface: str,
|
|
object_path=REVPI_DBUS_BASE_PATH,
|
|
bus_type=BusType.SYSTEM,
|
|
):
|
|
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
|
|
revpi = bus.get(REVPI_DBUS_NAME, object_path)
|
|
iface = revpi[interface]
|
|
return getattr(iface, property_name)
|
|
|
|
|
|
def simple_call(
|
|
method: str,
|
|
*args,
|
|
interface: str,
|
|
object_path=REVPI_DBUS_BASE_PATH,
|
|
bus_type=BusType.SYSTEM,
|
|
):
|
|
"""
|
|
Performs a call to a D-Bus method on a specified interface and object.
|
|
|
|
This function uses the D-Bus messaging system to dynamically call a method
|
|
of a specified interface, using the given object path and bus type. It
|
|
provides a way to interact with D-Bus interfaces, using either a system or
|
|
session bus, and returns the result of executing the specified method.
|
|
|
|
Parameters:
|
|
method: str
|
|
The name of the method to invoke on the D-Bus interface.
|
|
*args:
|
|
Additional positional arguments to pass to the specified D-Bus method.
|
|
interface: str
|
|
The name of the D-Bus interface containing the method.
|
|
object_path:
|
|
The path of the D-Bus object on which the interface is defined. Defaults
|
|
to REVPI_DBUS_BASE_PATH.
|
|
bus_type: BusType
|
|
Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM.
|
|
|
|
Returns:
|
|
The value returned by the D-Bus method.
|
|
|
|
Raises:
|
|
Any errors raised from the D-Bus call will propagate to the caller.
|
|
"""
|
|
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
|
|
revpi = bus.get(REVPI_DBUS_NAME, object_path)
|
|
iface = revpi[interface]
|
|
return getattr(iface, method)(*args)
|
|
|
|
|
|
def await_signal(
|
|
signal_name: str,
|
|
timeout: int,
|
|
interface: str,
|
|
object_path=REVPI_DBUS_BASE_PATH,
|
|
bus_type=BusType.SYSTEM,
|
|
):
|
|
"""
|
|
Waits for a specific signal and returns whether the signal was detected.
|
|
|
|
This function connects to a D-Bus interface and waits for a specific signal
|
|
to be emitted. If the signal is not received within the specified timeout
|
|
period, the function will return False. If the signal is detected within
|
|
the timeout, the function will return True. It can connect to either the
|
|
system bus or the session bus, depending on the provided `bus_type`.
|
|
|
|
Parameters:
|
|
signal_name: str
|
|
The name of the signal to be awaited.
|
|
timeout: int
|
|
The maximum time to wait for the signal, in seconds. A value of 0 or
|
|
less means that there is no timeout.
|
|
interface: str
|
|
The name of the D-Bus interface to listen on.
|
|
object_path
|
|
The D-Bus object path where the interface resides. Defaults to
|
|
REVPI_DBUS_BASE_PATH.
|
|
bus_type
|
|
The type of D-Bus to connect to. Can be either BusType.SYSTEM or
|
|
BusType.SESSION. Defaults to BusType.SYSTEM.
|
|
|
|
Returns:
|
|
bool
|
|
True if the signal was detected within the timeout period, False
|
|
otherwise.
|
|
"""
|
|
detected_signal = False
|
|
timeout = int(timeout)
|
|
loop = GLib.MainLoop()
|
|
|
|
def th_timeout():
|
|
sleep(timeout)
|
|
loop.quit()
|
|
|
|
def signal_handler(*args, **kwargs):
|
|
nonlocal detected_signal
|
|
detected_signal = True
|
|
loop.quit()
|
|
|
|
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
|
|
revpi = bus.get(REVPI_DBUS_NAME, object_path)
|
|
iface = revpi[interface]
|
|
|
|
setattr(iface, f"on{signal_name}", signal_handler)
|
|
|
|
if timeout > 0:
|
|
th_sleep = Thread(target=th_timeout, daemon=True)
|
|
th_sleep.start()
|
|
|
|
try:
|
|
loop.run()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
return detected_signal
|