feat(cli): Add await_signal function to handle D-Bus signals

Introduce the await_signal function to monitor D-Bus signals with a
timeout. This uses GLib's MainLoop and threading to wait for signals
efficiently while respecting a specified timeout duration.
This commit is contained in:
2025-04-18 17:30:19 +02:00
parent 6dca9880c8
commit 527a921bfd

View File

@@ -1,6 +1,10 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands."""
from threading import Thread
from time import sleep
from gi.repository import GLib
from pydbus import SystemBus
from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH
@@ -34,3 +38,36 @@ def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, method)(*args)
def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH):
detected_signal = False
timeout = int(timeout)
loop = GLib.MainLoop()
th_sleep = Thread()
def th_timeout():
sleep(timeout)
loop.quit()
def signal_handler(*args, **kwargs):
nonlocal detected_signal
detected_signal = True
loop.quit()
bus = SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
setattr(iface, f"on{signal_name}", signal_handler)
if timeout > 0:
th_sleep = Thread(target=th_timeout, daemon=True)
th_sleep.start()
try:
loop.run()
except KeyboardInterrupt:
pass
return detected_signal