diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index 7415773..4418736 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -1,6 +1,10 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus helper functions for cli commands.""" +from threading import Thread +from time import sleep + +from gi.repository import GLib from pydbus import SystemBus from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH @@ -34,3 +38,36 @@ def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_ revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) + + +def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): + detected_signal = False + timeout = int(timeout) + loop = GLib.MainLoop() + th_sleep = Thread() + + def th_timeout(): + sleep(timeout) + loop.quit() + + def signal_handler(*args, **kwargs): + nonlocal detected_signal + detected_signal = True + loop.quit() + + bus = SystemBus() + revpi = bus.get(REVPI_DBUS_NAME, object_path) + iface = revpi[interface] + + setattr(iface, f"on{signal_name}", signal_handler) + + if timeout > 0: + th_sleep = Thread(target=th_timeout, daemon=True) + th_sleep.start() + + try: + loop.run() + except KeyboardInterrupt: + pass + + return detected_signal