From e756d685565f0c667edf77b543e80845ef2bb842 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 07:56:17 +0200 Subject: [PATCH 01/14] refactor(dbus): Move D-Bus helper functions to a dedicated file Consolidated `REVPI_DBUS_*` constants and `extend_interface` function into `dbus_helper.py` for better modularity and reusability. Updated imports across modules to reflect this change. --- .../dbus_middleware1/__init__.py | 26 ++----------------- .../{interface_helper.py => dbus_helper.py} | 23 ++++++++++++++++ .../process_image/interface_picontrol.py | 2 +- 3 files changed, 26 insertions(+), 25 deletions(-) rename src/revpi_middleware/dbus_middleware1/{interface_helper.py => dbus_helper.py} (81%) diff --git a/src/revpi_middleware/dbus_middleware1/__init__.py b/src/revpi_middleware/dbus_middleware1/__init__.py index 1c14733..7f1cc52 100644 --- a/src/revpi_middleware/dbus_middleware1/__init__.py +++ b/src/revpi_middleware/dbus_middleware1/__init__.py @@ -2,27 +2,5 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus middleware version 1 of revpi_middleware.""" -from ..__about__ import __author__, __copyright__, __license__, __version__ - -REVPI_DBUS_NAME = "com.revolutionpi.middleware1" -REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" - - -def extend_interface(*args) -> str: - """ - Extends an interface name by appending additional segments to a pre-defined base name. - - This function takes multiple arguments, concatenates them with a predefined base - interface name, and returns the resulting string, effectively constructing an - extended interface name. - - Args: - *args: str - Components to be appended to the base interface name. - - Returns: - str - Fully constructed interface name by joining the base interface name with - the provided segments. - """ - return ".".join([REVPI_DBUS_NAME, *args]) +from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME +from .dbus_helper import extend_interface diff --git a/src/revpi_middleware/dbus_middleware1/interface_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py similarity index 81% rename from src/revpi_middleware/dbus_middleware1/interface_helper.py rename to src/revpi_middleware/dbus_middleware1/dbus_helper.py index 3306519..5b742c4 100644 --- a/src/revpi_middleware/dbus_middleware1/interface_helper.py +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -15,6 +15,9 @@ from threading import Thread log = getLogger(__name__) +REVPI_DBUS_NAME = "com.revolutionpi.middleware1" +REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" + class ResetDriverWatchdog(Thread): """Watchdog to catch the reset_driver action.""" @@ -101,3 +104,23 @@ class ResetDriverWatchdog(Thread): rc = self._triggered self._triggered = False return rc + + +def extend_interface(*args) -> str: + """ + Extends an interface name by appending additional segments to a pre-defined base name. + + This function takes multiple arguments, concatenates them with a predefined base + interface name, and returns the resulting string, effectively constructing an + extended interface name. + + Args: + *args: str + Components to be appended to the base interface name. + + Returns: + str + Fully constructed interface name by joining the base interface name with + the provided segments. + """ + return ".".join([REVPI_DBUS_NAME, *args]) diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index ada732a..24e212f 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -8,7 +8,7 @@ from logging import getLogger from pydbus.generic import signal -from ..interface_helper import ResetDriverWatchdog +from ..dbus_helper import ResetDriverWatchdog log = getLogger(__name__) From 4c1dc1c9b5a66864cfd70c64e19fb55b88284b41 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 08:13:02 +0200 Subject: [PATCH 02/14] refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py The ResetDriverWatchdog class was relocated from dbus_helper.py to a new helper module, process_image_helper.py, to improve code organization and maintainability. Updated imports in relevant files to reflect this change. --- .../dbus_middleware1/dbus_helper.py | 99 +---------------- .../process_image/interface_picontrol.py | 2 +- .../process_image/process_image_helper.py | 102 ++++++++++++++++++ 3 files changed, 105 insertions(+), 98 deletions(-) create mode 100644 src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py diff --git a/src/revpi_middleware/dbus_middleware1/dbus_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py index 5b742c4..bec4697 100644 --- a/src/revpi_middleware/dbus_middleware1/dbus_helper.py +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -1,17 +1,9 @@ # -*- coding: utf-8 -*- -# SPDX-FileCopyrightText: 2020-2023 Sven Sager +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later -""" -Helper for the process image. +"""Helper for dbus.""" -The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" -https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py -""" - -import os -from fcntl import ioctl from logging import getLogger -from threading import Thread log = getLogger(__name__) @@ -19,93 +11,6 @@ REVPI_DBUS_NAME = "com.revolutionpi.middleware1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" -class ResetDriverWatchdog(Thread): - """Watchdog to catch the reset_driver action.""" - - def __init__(self, pi_control_device="/dev/piControl0"): - super(ResetDriverWatchdog, self).__init__() - self.procimg = pi_control_device - self.daemon = True - self._calls = [] - self._exit = False - self._fh = None - self.not_implemented = False - """True, if KB_WAIT_FOR_EVENT is not implemented in piControl.""" - self._triggered = False - self.start() - - def run(self): - """ - Mainloop of watchdog for reset_driver. - - If the thread can not open the process image or the IOCTL is not - implemented (wheezy), the thread function will stop. The trigger - property will always return True. - """ - log.debug("enter ResetDriverWatchdog.run()") - - try: - self._fh = os.open(self.procimg, os.O_RDONLY) - except Exception: - self.not_implemented = True - log.error( - "can not open process image at '{0}' for reset_driver watchdog" - "".format(self.procimg) - ) - return - - # The ioctl will return 2 byte (c-type int) - byte_buff = bytearray(2) - while not self._exit: - try: - rc = ioctl(self._fh, 19250, byte_buff) - if rc == 0 and byte_buff[0] == 1: - self._triggered = True - log.debug("reset_driver detected") - for func in self._calls: - func() - except Exception: - self.not_implemented = True - os.close(self._fh) - self._fh = None - log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented") - return - - log.debug("leave ResetDriverWatchdog.run()") - - def register_call(self, function): - """Register a function, if watchdog triggers.""" - if not callable(function): - return ValueError("Function is not callable.") - if function not in self._calls: - self._calls.append(function) - - def stop(self): - """Stop watchdog for reset_driver.""" - log.debug("enter ResetDriverWatchdog.stop()") - - self._exit = True - if self._fh is not None: - os.close(self._fh) - self._fh = None - - log.debug("leave ResetDriverWatchdog.stop()") - - def unregister_call(self, function=None): - """Remove a function call on watchdog trigger.""" - if function is None: - self._calls.clear() - elif function in self._calls: - self._calls.remove(function) - - @property - def triggered(self): - """Will return True one time after watchdog was triggered.""" - rc = self._triggered - self._triggered = False - return rc - - def extend_interface(*args) -> str: """ Extends an interface name by appending additional segments to a pre-defined base name. diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index 24e212f..3166545 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -8,7 +8,7 @@ from logging import getLogger from pydbus.generic import signal -from ..dbus_helper import ResetDriverWatchdog +from .process_image_helper import ResetDriverWatchdog log = getLogger(__name__) diff --git a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py new file mode 100644 index 0000000..bee8ec7 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +""" +Helper for the process image. + +The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" +https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py +""" +import os +from fcntl import ioctl +from logging import getLogger +from threading import Thread + +log = getLogger(__name__) + + +class ResetDriverWatchdog(Thread): + """Watchdog to catch the reset_driver action.""" + + def __init__(self, pi_control_device="/dev/piControl0"): + super(ResetDriverWatchdog, self).__init__() + self.procimg = pi_control_device + self.daemon = True + self._calls = [] + self._exit = False + self._fh = None + self.not_implemented = False + """True, if KB_WAIT_FOR_EVENT is not implemented in piControl.""" + self._triggered = False + self.start() + + def run(self): + """ + Mainloop of watchdog for reset_driver. + + If the thread cannot open the process image or the IOCTL is not + implemented (wheezy), the thread function will stop. The trigger + property will always return True. + """ + log.debug("enter ResetDriverWatchdog.run()") + + try: + self._fh = os.open(self.procimg, os.O_RDONLY) + except Exception: + self.not_implemented = True + log.error( + "can not open process image at '{0}' for reset_driver watchdog" + "".format(self.procimg) + ) + return + + # The ioctl will return 2 bytes (c-type int) + byte_buff = bytearray(2) + while not self._exit: + try: + rc = ioctl(self._fh, 19250, byte_buff) + if rc == 0 and byte_buff[0] == 1: + self._triggered = True + log.debug("reset_driver detected") + for func in self._calls: + func() + except Exception: + self.not_implemented = True + os.close(self._fh) + self._fh = None + log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented") + return + + log.debug("leave ResetDriverWatchdog.run()") + + def register_call(self, function): + """Register a function, if watchdog triggers.""" + if not callable(function): + raise ValueError("Function is not callable.") + if function not in self._calls: + self._calls.append(function) + + def stop(self): + """Stop watchdog for reset_driver.""" + log.debug("enter ResetDriverWatchdog.stop()") + + self._exit = True + if self._fh is not None: + os.close(self._fh) + self._fh = None + + log.debug("leave ResetDriverWatchdog.stop()") + + def unregister_call(self, function=None): + """Remove a function from the watchdog trigger.""" + if function is None: + self._calls.clear() + elif function in self._calls: + self._calls.remove(function) + + @property + def triggered(self): + """Will return True one time after watchdog was triggered.""" + rc = self._triggered + self._triggered = False + return rc From a4ccb9081f6b09a443f6c079168959d1e4058a72 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 08:21:24 +0200 Subject: [PATCH 03/14] refactor(dbus): Parameterize `picontrol_device` and `config_rsc` Replaced hardcoded paths with configurable parameters `picontrol_device` and `config_rsc` across multiple classes. This improves flexibility, making the components adaptable to various environments or setups. Updated corresponding initialization and method implementations to use these parameters. --- src/revpi_middleware/dbus_middleware1/bus_provider.py | 11 +++++++++-- .../process_image/interface_picontrol.py | 11 ++++++----- .../process_image/process_image_helper.py | 4 ++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index 937ad75..c99f887 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -16,19 +16,26 @@ log = getLogger(__name__) class BusProvider(Thread): - def __init__(self): + def __init__( + self, + picontrol_device="/dev/piControl0", + config_rsc="/etc/revpi/config.rsc", + ): log.debug("enter BusProvider.__init__") super().__init__() self._bus = SystemBus() self._loop = GLib.MainLoop() + self.picontrol_device = picontrol_device + self.config_rsc = config_rsc + def run(self): log.debug("enter BusProvider.run") self._bus.publish( REVPI_DBUS_NAME, - InterfacePiControl(), + InterfacePiControl(self.picontrol_device, self.config_rsc), ) self._loop.run() diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index 3166545..044e8bf 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -27,10 +27,11 @@ class InterfacePiControl: NotifyDriverReset = signal() - def __init__(self): - self.pi_control = "/dev/piControl0" + def __init__(self, picontrol_device: str, config_rsc: str): + self.picontrol_device = picontrol_device + self.config_rsc = config_rsc - self.wd_reset_driver = ResetDriverWatchdog(self.pi_control) + self.wd_reset_driver = ResetDriverWatchdog(self.picontrol_device) self.wd_reset_driver.register_call(self.notify_reset_driver) def notify_reset_driver(self): @@ -40,9 +41,9 @@ class InterfacePiControl: log.debug("enter InterfacePiControl.ResetDriver") try: - fd = os.open(self.pi_control, os.O_WRONLY) + fd = os.open(self.picontrol_device, os.O_WRONLY) except Exception as e: - log.warning(f"could not open ${self.pi_control} to reset driver") + log.warning(f"could not open ${self.picontrol_device} to reset driver") raise e execption = None diff --git a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py index bee8ec7..7f29201 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py @@ -18,9 +18,9 @@ log = getLogger(__name__) class ResetDriverWatchdog(Thread): """Watchdog to catch the reset_driver action.""" - def __init__(self, pi_control_device="/dev/piControl0"): + def __init__(self, picontrol_device: str): super(ResetDriverWatchdog, self).__init__() - self.procimg = pi_control_device + self.procimg = picontrol_device self.daemon = True self._calls = [] self._exit = False From bde3920fc15a4366da0fe9337cf2cc33950d5b88 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 09:33:54 +0200 Subject: [PATCH 04/14] feat: Add session bus option for local testing and development Introduced a `--use-session-bus` flag to optionally use the D-Bus session bus instead of the system bus. This allows better flexibility for local testing and development scenarios without requiring system-level changes. Updated related classes and functions to respect the new flag. --- src/revpi_middleware/cli_commands/dbus_helper.py | 7 ++++--- src/revpi_middleware/daemon.py | 2 +- .../dbus_middleware1/bus_provider.py | 5 +++-- src/revpi_middleware/proginit.py | 12 +++++++++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index 4418736..af3ae48 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -5,8 +5,9 @@ from threading import Thread from time import sleep from gi.repository import GLib -from pydbus import SystemBus +from pydbus import SessionBus, SystemBus +from .. import proginit as pi from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_NAME @@ -34,7 +35,7 @@ def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_ Any The result of the method invocation on the targeted D-Bus interface. """ - bus = SystemBus() + bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) @@ -55,7 +56,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV detected_signal = True loop.quit() - bus = SystemBus() + bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] diff --git a/src/revpi_middleware/daemon.py b/src/revpi_middleware/daemon.py index 41ca9a5..1ea8523 100644 --- a/src/revpi_middleware/daemon.py +++ b/src/revpi_middleware/daemon.py @@ -42,7 +42,7 @@ class MiddlewareDaemon: if self.bus_provider and self.bus_provider.is_alive(): return - self.bus_provider = BusProvider() + self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus) self.bus_provider.start() log.debug("leave MiddlewareDaemon.dbus_start") diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index c99f887..c9508b6 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -6,7 +6,7 @@ from logging import getLogger from threading import Thread from gi.repository import GLib -from pydbus import SystemBus +from pydbus import SessionBus, SystemBus from . import REVPI_DBUS_NAME from .process_image import InterfacePiControl @@ -20,11 +20,12 @@ class BusProvider(Thread): self, picontrol_device="/dev/piControl0", config_rsc="/etc/revpi/config.rsc", + use_system_bus=True, ): log.debug("enter BusProvider.__init__") super().__init__() - self._bus = SystemBus() + self._bus = SystemBus() if use_system_bus else SessionBus() self._loop = GLib.MainLoop() self.picontrol_device = picontrol_device diff --git a/src/revpi_middleware/proginit.py b/src/revpi_middleware/proginit.py index 75d2fb4..5a0a092 100644 --- a/src/revpi_middleware/proginit.py +++ b/src/revpi_middleware/proginit.py @@ -9,7 +9,7 @@ __version__ = "1.4.0" import logging import sys -from argparse import ArgumentParser, Namespace +from argparse import ArgumentParser, Namespace, SUPPRESS from configparser import ConfigParser from enum import Enum from os import R_OK, W_OK, access, environ, getpid, remove @@ -262,6 +262,16 @@ parser = ArgumentParser( prog=programname, description="Program description", ) + +# Use session bus of D-Bus for local testing and development proposes (hidden) +parser.add_argument( + "--use-session-bus", + dest="use_session_bus", + action="store_true", + default=False, + help=SUPPRESS, +) + parser.add_argument("--version", action="version", version=f"%(prog)s {program_version}") parser.add_argument( "-f", From 93b328bf3fba0edaf071c0b8135221e078ff14fa Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 12:12:16 +0200 Subject: [PATCH 05/14] feat(dbus): Add import for BusProvider in dbus_middleware1 module This change includes the BusProvider import in the `__init__.py` file of dbus_middleware1. It ensures the module has access to BusProvider functionality, improving modularity and readiness for use. --- src/revpi_middleware/dbus_middleware1/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/revpi_middleware/dbus_middleware1/__init__.py b/src/revpi_middleware/dbus_middleware1/__init__.py index 7f1cc52..6b65020 100644 --- a/src/revpi_middleware/dbus_middleware1/__init__.py +++ b/src/revpi_middleware/dbus_middleware1/__init__.py @@ -4,3 +4,5 @@ """D-Bus middleware version 1 of revpi_middleware.""" from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME from .dbus_helper import extend_interface + +from .bus_provider import BusProvider From 487d5b3d464b27ea6c1ea07c338c79994e797148 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 12:12:54 +0200 Subject: [PATCH 06/14] feat(dbus): Add `running` property to `BusProvider` This property checks if the event loop is running, enhancing code readability and convenience. It provides an easier way to monitor the status of the loop, which may improve debugging and control flow handling. --- src/revpi_middleware/dbus_middleware1/bus_provider.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index c9508b6..d3b6701 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -46,3 +46,7 @@ class BusProvider(Thread): log.debug("enter BusProvider.stop") self._loop.quit() log.debug("leave BusProvider.stop") + + @property + def running(self): + return self._loop.is_running() From 114cbd8099725427d8f5b322981e7be7cffaeeac Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 12:24:37 +0200 Subject: [PATCH 07/14] refactor(cli): D-Bus helpers support session and system bus types Introduced `BusType` enum to differentiate between session and system bus usage in D-Bus calls. Updated `simple_call` and `await_signal` functions to include a `bus_type` parameter, improving flexibility. Adjusted `cli_picontrol` to leverage the new `BusType` parameter for D-Bus interactions. --- .../cli_commands/cli_picontrol.py | 14 ++- .../cli_commands/dbus_helper.py | 87 +++++++++++++++---- 2 files changed, 80 insertions(+), 21 deletions(-) diff --git a/src/revpi_middleware/cli_commands/cli_picontrol.py b/src/revpi_middleware/cli_commands/cli_picontrol.py index 8fdaf67..9d76e26 100644 --- a/src/revpi_middleware/cli_commands/cli_picontrol.py +++ b/src/revpi_middleware/cli_commands/cli_picontrol.py @@ -4,7 +4,7 @@ from argparse import ArgumentParser from logging import getLogger -from .dbus_helper import await_signal, simple_call +from .dbus_helper import BusType, await_signal, simple_call from .. import proginit as pi from ..dbus_middleware1 import extend_interface @@ -36,12 +36,20 @@ def add_subparsers(parent_parser: ArgumentParser): def method_reset(): log.debug("D-Bus call of method ResetDriver") - simple_call("ResetDriver", interface=extend_interface("picontrol")) + simple_call( + "ResetDriver", + interface=extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) log.info("ResetDriver called via D-Bus") def method_await_reset(timout: int = 0): - detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol")) + detected_signal = await_signal( + "NotifyDriverReset", + timout, extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) if detected_signal: log.info("ResetDriver signal received") else: diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index af3ae48..e11e90e 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -1,47 +1,98 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus helper functions for cli commands.""" +from enum import Enum from threading import Thread from time import sleep from gi.repository import GLib from pydbus import SessionBus, SystemBus -from .. import proginit as pi from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_NAME -PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol" -RESET_DRIVER_METHOD = "ResetDriver" + +class BusType(Enum): + SESSION = "session" + SYSTEM = "system" -def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def simple_call( + method: str, + *args, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): """ - Executes a method on a specific D-Bus object interface within the RevPi system. This function - connects to the system bus, retrieves the desired interface and object path, and invokes - the specified method with provided arguments. + Performs a call to a D-Bus method on a specified interface and object. + + This function uses the D-Bus messaging system to dynamically call a method + of a specified interface, using the given object path and bus type. It + provides a way to interact with D-Bus interfaces, using either a system or + session bus, and returns the result of executing the specified method. Parameters: method: str - The name of the method to be invoked on the targeted interface. - *args: tuple - Positional arguments to be passed to the method being invoked. + The name of the method to invoke on the D-Bus interface. + *args: + Additional positional arguments to pass to the specified D-Bus method. interface: str - The name of the D-Bus interface providing the required functionality. - object_path: str, optional - The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH. + The name of the D-Bus interface containing the method. + object_path: + The path of the D-Bus object on which the interface is defined. Defaults + to REVPI_DBUS_BASE_PATH. + bus_type: BusType + Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM. Returns: - Any - The result of the method invocation on the targeted D-Bus interface. + The value returned by the D-Bus method. + + Raises: + Any errors raised from the D-Bus call will propagate to the caller. """ - bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) -def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def await_signal( + signal_name: str, + timeout: int, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): + """ + Waits for a specific signal and returns whether the signal was detected. + + This function connects to a D-Bus interface and waits for a specific signal + to be emitted. If the signal is not received within the specified timeout + period, the function will return False. If the signal is detected within + the timeout, the function will return True. It can connect to either the + system bus or the session bus, depending on the provided `bus_type`. + + Parameters: + signal_name: str + The name of the signal to be awaited. + timeout: int + The maximum time to wait for the signal, in seconds. A value of 0 or + less means that there is no timeout. + interface: str + The name of the D-Bus interface to listen on. + object_path + The D-Bus object path where the interface resides. Defaults to + REVPI_DBUS_BASE_PATH. + bus_type + The type of D-Bus to connect to. Can be either BusType.SYSTEM or + BusType.SESSION. Defaults to BusType.SYSTEM. + + Returns: + bool + True if the signal was detected within the timeout period, False + otherwise. + """ detected_signal = False timeout = int(timeout) loop = GLib.MainLoop() @@ -56,7 +107,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV detected_signal = True loop.quit() - bus = SessionBus() if pi.pargs.use_session_bus else SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] From 76b53423c11ea0c5c3c6ae10ea748e115701c1b2 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 12:43:22 +0200 Subject: [PATCH 08/14] fix(dbus): Add error handling for DBus publishing and main loop Wrap DBus publishing and main loop execution in try-except blocks to capture and log failures. This ensures better visibility into errors and prevents silent failures during runtime. --- .../dbus_middleware1/bus_provider.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index d3b6701..bc45d4a 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -34,12 +34,19 @@ class BusProvider(Thread): def run(self): log.debug("enter BusProvider.run") - self._bus.publish( - REVPI_DBUS_NAME, - InterfacePiControl(self.picontrol_device, self.config_rsc), - ) + try: + self._bus.publish( + REVPI_DBUS_NAME, + InterfacePiControl(self.picontrol_device, self.config_rsc), + ) + except Exception as e: + log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") + + try: + self._loop.run() + except Exception as e: + log.error(f"can not run dbus mainloop: {e}") - self._loop.run() log.debug("leave BusProvider.run") def stop(self): From 4ccc328d0b2163a19df7c9a4f69a6ebb17cb336c Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 13:57:56 +0200 Subject: [PATCH 09/14] refactor(dbus): piControl driver reset with PiControlIoctl class Replaces inline ioctl logic with the new `PiControlIoctl` class for cleaner and reusable code. This improves readability, encapsulates device operations, and simplifies error handling for resetting the piControl driver. --- .../process_image/interface_picontrol.py | 22 ++++--------------- .../process_image/process_image_helper.py | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index 044e8bf..34623af 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -2,13 +2,11 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus interfaces for piControl.""" -import os -from fcntl import ioctl from logging import getLogger from pydbus.generic import signal -from .process_image_helper import ResetDriverWatchdog +from .process_image_helper import PiControlIoctl, ResetDriverWatchdog log = getLogger(__name__) @@ -41,21 +39,9 @@ class InterfacePiControl: log.debug("enter InterfacePiControl.ResetDriver") try: - fd = os.open(self.picontrol_device, os.O_WRONLY) - except Exception as e: - log.warning(f"could not open ${self.picontrol_device} to reset driver") - raise e - - execption = None - try: - # KB_RESET _IO('K', 12 ) // reset the piControl driver including the config file - ioctl(fd, 19212) + picontrol_ioctl = PiControlIoctl(self.picontrol_device) + picontrol_ioctl.ioctl(PiControlIoctl.IOCTL_RESET_DRIVER) log.info("reset piControl driver") except Exception as e: log.warning(f"could not reset piControl driver: ${e}") - execption = e - finally: - os.close(fd) - - if execption: - raise execption + raise e diff --git a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py index 7f29201..5fcb2a6 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py @@ -8,6 +8,7 @@ The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py """ import os +from ctypes import c_int from fcntl import ioctl from logging import getLogger from threading import Thread @@ -100,3 +101,24 @@ class ResetDriverWatchdog(Thread): rc = self._triggered self._triggered = False return rc + + +class PiControlIoctl: + IOCTL_RESET_DRIVER = 19212 + + def __init__(self, picontrol_device: str): + self.picontrol_device = picontrol_device + + def ioctl(self, request, arg=0): + if type(arg) is c_int: + arg = arg.value + + _fd = os.open(self.picontrol_device, os.O_WRONLY) + return_value = ioctl(_fd, 19212, arg) + os.close(_fd) + + return return_value + + @property + def name(self): + return self.picontrol_device From 3f808c55f8720c5fffb645ff3f67553e0f763666 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 14:38:55 +0200 Subject: [PATCH 10/14] test(dbus): Add unit test framework for dbus_middleware1 module Introduces test infrastructure and mockup classes for `dbus_middleware1`. Includes `BusProvider` test setup, fake device implementations, and initialization logic to support session bus testing. Enhances testability and isolation of the D-Bus middleware components. --- tests/__init__.py | 3 ++ tests/dbus_middleware1/__init__.py | 7 +++ tests/dbus_middleware1/bus_provider.py | 36 ++++++++++++++ tests/dbus_middleware1/fake_devices.py | 65 ++++++++++++++++++++++++++ 4 files changed, 111 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/dbus_middleware1/__init__.py create mode 100644 tests/dbus_middleware1/bus_provider.py create mode 100644 tests/dbus_middleware1/fake_devices.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d5998d4 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later diff --git a/tests/dbus_middleware1/__init__.py b/tests/dbus_middleware1/__init__.py new file mode 100644 index 0000000..286ffd6 --- /dev/null +++ b/tests/dbus_middleware1/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from os import environ + +# D-BUS needs a DISPLAY variable to work with the session bus +environ["DISPLAY"] = ":0" diff --git a/tests/dbus_middleware1/bus_provider.py b/tests/dbus_middleware1/bus_provider.py new file mode 100644 index 0000000..0f753d8 --- /dev/null +++ b/tests/dbus_middleware1/bus_provider.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from time import sleep + +from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup + + +class TestBusProvider(PiControlDeviceMockup): + + def setUp(self): + super().setUp() + + # Do not import things on top of the module. Some classes or functions need to be mocked up first. + from revpi_middleware.dbus_middleware1 import BusProvider + + # Prepare the bus provider and start it + self.bp = BusProvider( + self.picontrol.name, + use_system_bus=False, + ) + self.bp.start() + + # Wait 5 seconds until the bus provider has started the main loop + counter = 50 + while not self.bp.running and counter > 0: + counter -= 1 + sleep(0.1) + + def tearDown(self): + self.bp.stop() + self.bp.join(10.0) + if self.bp.is_alive(): + raise RuntimeError("Bus provider thread is still running") + + super().tearDown() diff --git a/tests/dbus_middleware1/fake_devices.py b/tests/dbus_middleware1/fake_devices.py new file mode 100644 index 0000000..6d62477 --- /dev/null +++ b/tests/dbus_middleware1/fake_devices.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from ctypes import c_int +from queue import Queue +from tempfile import NamedTemporaryFile +from unittest import TestCase + +IOCTL_QUEUE = Queue() + + +class FakePiControlDevice: + IOCTL_RESET_DRIVER = 19212 + + def __init__(self, picontrol_device: str): + self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_") + self.name = self._fh.name + + def __del__(self): + self._fh.close() + + def reset_process_image(self): + self._fh.write(b"\x00" * 4096) + self._fh.seek(0) + + def ioctl(self, request, arg=0) -> int: + if type(arg) is c_int: + arg = arg.value + + if request == self.IOCTL_RESET_DRIVER: + pass + else: + raise NotImplementedError(f"Unknown IOCTL request: {request}") + + IOCTL_QUEUE.put_nowait((request, arg)) + return arg + + def close(self): + self._fh.close() + + def read(self, size): + return self._fh.read(size) + + def seek(self, offset, whence): + self._fh.seek(offset, whence) + + def write(self, buffer): + return self._fh.write(buffer) + + +class PiControlDeviceMockup(TestCase): + + def setUp(self): + super().setUp() + + # Replace classes with mockup classes + import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers + test_helpers.PiControlIoctl = FakePiControlDevice + + # Create a fake picontrol0 device + self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") + + def tearDown(self): + self.picontrol.close() + super().tearDown() From 7207845b13db0fb238cbc588b9ebc3c2879f8f9c Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 14:39:33 +0200 Subject: [PATCH 11/14] test(dbus): Add unit tests for PiControl D-Bus interface Introduces initial tests for the PiControl interface in `dbus_middleware1`. These tests cover basic functionality like checking activity status and resetting the driver while verifying IOCTL calls. --- .../process_image/__init__.py | 3 +++ .../process_image/test_interface_picontrol.py | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 tests/dbus_middleware1/process_image/__init__.py create mode 100644 tests/dbus_middleware1/process_image/test_interface_picontrol.py diff --git a/tests/dbus_middleware1/process_image/__init__.py b/tests/dbus_middleware1/process_image/__init__.py new file mode 100644 index 0000000..d5998d4 --- /dev/null +++ b/tests/dbus_middleware1/process_image/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later diff --git a/tests/dbus_middleware1/process_image/test_interface_picontrol.py b/tests/dbus_middleware1/process_image/test_interface_picontrol.py new file mode 100644 index 0000000..320be1f --- /dev/null +++ b/tests/dbus_middleware1/process_image/test_interface_picontrol.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from revpi_middleware.cli_commands.dbus_helper import BusType, simple_call +from revpi_middleware.dbus_middleware1 import extend_interface +from tests.dbus_middleware1.bus_provider import TestBusProvider +from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE + + +class TestObjectPicontrol(TestBusProvider): + + def test_is_active(self): + self.assertTrue(self.bp.running) + + def test_reset_driver(self): + simple_call( + "ResetDriver", + interface=extend_interface("picontrol"), + bus_type=BusType.SESSION, + ) + ioctl_call = IOCTL_QUEUE.get(timeout=2.0) + self.assertEqual((19212, 0), ioctl_call) From f8bc1532e38dcf36ba96037f214bcc6a69f52a6d Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 15:05:47 +0200 Subject: [PATCH 12/14] refactor(dbus): Fix typo and remove unused thread instance Corrected a typo in the `timeout` parameter name in `method_await_reset` and removed an unused thread instance `th_sleep` from `dbus_helper.py`. These changes improve code clarity and eliminate redundant components. --- src/revpi_middleware/cli_commands/cli_picontrol.py | 5 +++-- src/revpi_middleware/cli_commands/dbus_helper.py | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/revpi_middleware/cli_commands/cli_picontrol.py b/src/revpi_middleware/cli_commands/cli_picontrol.py index 9d76e26..6eb355f 100644 --- a/src/revpi_middleware/cli_commands/cli_picontrol.py +++ b/src/revpi_middleware/cli_commands/cli_picontrol.py @@ -44,10 +44,11 @@ def method_reset(): log.info("ResetDriver called via D-Bus") -def method_await_reset(timout: int = 0): +def method_await_reset(timeout: int = 0): detected_signal = await_signal( "NotifyDriverReset", - timout, extend_interface("picontrol"), + timeout, + extend_interface("picontrol"), bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, ) if detected_signal: diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index e11e90e..e14f072 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -96,7 +96,6 @@ def await_signal( detected_signal = False timeout = int(timeout) loop = GLib.MainLoop() - th_sleep = Thread() def th_timeout(): sleep(timeout) From 26c3ac0afbf00cc66f6e7958a18e8800b056e2d9 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 15:55:49 +0200 Subject: [PATCH 13/14] refactor(dbus): D-Bus interface management with cleanup support. Introduced a `DbusInterface` base class with a `cleanup` method to standardize resource cleanup for D-Bus interfaces. Modified `InterfacePiControl` to inherit from it and implemented `cleanup` logic for proper watchdog resource handling. Adjusted `BusProvider` to manage multiple interfaces and ensure cleanup on shutdown. --- .../dbus_middleware1/bus_provider.py | 17 ++++++++++++++++- .../dbus_middleware1/dbus_helper.py | 12 ++++++++++++ .../process_image/interface_picontrol.py | 6 +++++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index bc45d4a..6f91bce 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -34,10 +34,19 @@ class BusProvider(Thread): def run(self): log.debug("enter BusProvider.run") + # The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object + # Example(), + # ("Subdir1", Example()), + # ("Subdir2", Example()), + # ("Subdir2/Whatever", Example()) + lst_interfaces = [ + InterfacePiControl(self.picontrol_device, self.config_rsc), + ] + try: self._bus.publish( REVPI_DBUS_NAME, - InterfacePiControl(self.picontrol_device, self.config_rsc), + *lst_interfaces, ) except Exception as e: log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") @@ -47,6 +56,12 @@ class BusProvider(Thread): except Exception as e: log.error(f"can not run dbus mainloop: {e}") + # Clean up all interfaces + for interface in lst_interfaces: + if type(interface) is tuple: + _, interface = interface + interface.cleanup() + log.debug("leave BusProvider.run") def stop(self): diff --git a/src/revpi_middleware/dbus_middleware1/dbus_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py index bec4697..23bcd2e 100644 --- a/src/revpi_middleware/dbus_middleware1/dbus_helper.py +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -11,6 +11,18 @@ REVPI_DBUS_NAME = "com.revolutionpi.middleware1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" +class DbusInterface: + + def cleanup(self): + """ + Represents a method responsible for performing cleanup operations. This method is executed to properly + release resources, close connections, or perform other necessary finalization tasks. + + This method does not take any arguments or return a value. + """ + pass + + def extend_interface(*args) -> str: """ Extends an interface name by appending additional segments to a pre-defined base name. diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index 34623af..f62a972 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -7,11 +7,12 @@ from logging import getLogger from pydbus.generic import signal from .process_image_helper import PiControlIoctl, ResetDriverWatchdog +from ..dbus_helper import DbusInterface log = getLogger(__name__) -class InterfacePiControl: +class InterfacePiControl(DbusInterface): """ @@ -32,6 +33,9 @@ class InterfacePiControl: self.wd_reset_driver = ResetDriverWatchdog(self.picontrol_device) self.wd_reset_driver.register_call(self.notify_reset_driver) + def cleanup(self): + self.wd_reset_driver.stop() + def notify_reset_driver(self): self.NotifyDriverReset() From 157b7bd118246993ed2a36b0090580160c52e3c6 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 15:56:59 +0200 Subject: [PATCH 14/14] test(dbus): Add support for testing driver reset notification Introduce `FakeResetDriverWatchdog` to simulate driver reset triggers. Update existing tests and add a new test, `test_notify_reset_driver`, to verify reset notifications using the event signaling mechanism. --- tests/dbus_middleware1/fake_devices.py | 57 ++++++++++++++++++- .../process_image/test_interface_picontrol.py | 27 ++++++++- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/tests/dbus_middleware1/fake_devices.py b/tests/dbus_middleware1/fake_devices.py index 6d62477..edd16d1 100644 --- a/tests/dbus_middleware1/fake_devices.py +++ b/tests/dbus_middleware1/fake_devices.py @@ -2,11 +2,13 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later from ctypes import c_int -from queue import Queue +from queue import Empty, Queue from tempfile import NamedTemporaryFile +from threading import Event, Thread from unittest import TestCase IOCTL_QUEUE = Queue() +RESET_DRIVER_EVENT = Event() class FakePiControlDevice: @@ -48,14 +50,67 @@ class FakePiControlDevice: return self._fh.write(buffer) +class FakeResetDriverWatchdog(Thread): + + def __init__(self, picontrol_device: str): + super().__init__() + self.daemon = True + self._calls = [] + self._exit = False + self.not_implemented = True + self._triggered = False + self.start() + + def run(self): + while not self._exit: + if RESET_DRIVER_EVENT.wait(0.1): + RESET_DRIVER_EVENT.clear() + self._triggered = True + for func in self._calls: + func() + + def register_call(self, function): + """Register a function, if watchdog triggers.""" + if not callable(function): + raise ValueError("Function is not callable.") + if function not in self._calls: + self._calls.append(function) + + def stop(self): + """Stop watchdog for reset_driver.""" + self._exit = True + + def unregister_call(self, function=None): + """Remove a function from the watchdog trigger.""" + if function is None: + self._calls.clear() + elif function in self._calls: + self._calls.remove(function) + + @property + def triggered(self): + """Will return True one time after watchdog was triggered.""" + rc = self._triggered + self._triggered = False + return rc + + class PiControlDeviceMockup(TestCase): def setUp(self): super().setUp() + # Empty the queue + while True: + try: + IOCTL_QUEUE.get_nowait() + except Empty: + break + # Replace classes with mockup classes import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers test_helpers.PiControlIoctl = FakePiControlDevice + test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog # Create a fake picontrol0 device self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") diff --git a/tests/dbus_middleware1/process_image/test_interface_picontrol.py b/tests/dbus_middleware1/process_image/test_interface_picontrol.py index 320be1f..cd11a6e 100644 --- a/tests/dbus_middleware1/process_image/test_interface_picontrol.py +++ b/tests/dbus_middleware1/process_image/test_interface_picontrol.py @@ -1,10 +1,13 @@ # -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later -from revpi_middleware.cli_commands.dbus_helper import BusType, simple_call +from threading import Thread +from time import sleep + +from revpi_middleware.cli_commands.dbus_helper import BusType, await_signal, simple_call from revpi_middleware.dbus_middleware1 import extend_interface from tests.dbus_middleware1.bus_provider import TestBusProvider -from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE +from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE, RESET_DRIVER_EVENT class TestObjectPicontrol(TestBusProvider): @@ -20,3 +23,23 @@ class TestObjectPicontrol(TestBusProvider): ) ioctl_call = IOCTL_QUEUE.get(timeout=2.0) self.assertEqual((19212, 0), ioctl_call) + + def test_notify_reset_driver(self): + timeout = 5 + + def target_call_reset_driver(): + sleep(1.0) + RESET_DRIVER_EVENT.set() + + th_wait_for_reset = Thread(target=target_call_reset_driver, daemon=True) + th_wait_for_reset.start() + + result = await_signal( + "NotifyDriverReset", + timeout, + extend_interface("picontrol"), + bus_type=BusType.SESSION, + ) + self.assertTrue(result) + + th_wait_for_reset.join(timeout=timeout)