diff --git a/tests/dbus_middleware1/fake_devices.py b/tests/dbus_middleware1/fake_devices.py index 6d62477..edd16d1 100644 --- a/tests/dbus_middleware1/fake_devices.py +++ b/tests/dbus_middleware1/fake_devices.py @@ -2,11 +2,13 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later from ctypes import c_int -from queue import Queue +from queue import Empty, Queue from tempfile import NamedTemporaryFile +from threading import Event, Thread from unittest import TestCase IOCTL_QUEUE = Queue() +RESET_DRIVER_EVENT = Event() class FakePiControlDevice: @@ -48,14 +50,67 @@ class FakePiControlDevice: return self._fh.write(buffer) +class FakeResetDriverWatchdog(Thread): + + def __init__(self, picontrol_device: str): + super().__init__() + self.daemon = True + self._calls = [] + self._exit = False + self.not_implemented = True + self._triggered = False + self.start() + + def run(self): + while not self._exit: + if RESET_DRIVER_EVENT.wait(0.1): + RESET_DRIVER_EVENT.clear() + self._triggered = True + for func in self._calls: + func() + + def register_call(self, function): + """Register a function, if watchdog triggers.""" + if not callable(function): + raise ValueError("Function is not callable.") + if function not in self._calls: + self._calls.append(function) + + def stop(self): + """Stop watchdog for reset_driver.""" + self._exit = True + + def unregister_call(self, function=None): + """Remove a function from the watchdog trigger.""" + if function is None: + self._calls.clear() + elif function in self._calls: + self._calls.remove(function) + + @property + def triggered(self): + """Will return True one time after watchdog was triggered.""" + rc = self._triggered + self._triggered = False + return rc + + class PiControlDeviceMockup(TestCase): def setUp(self): super().setUp() + # Empty the queue + while True: + try: + IOCTL_QUEUE.get_nowait() + except Empty: + break + # Replace classes with mockup classes import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers test_helpers.PiControlIoctl = FakePiControlDevice + test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog # Create a fake picontrol0 device self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") diff --git a/tests/dbus_middleware1/process_image/test_interface_picontrol.py b/tests/dbus_middleware1/process_image/test_interface_picontrol.py index 320be1f..cd11a6e 100644 --- a/tests/dbus_middleware1/process_image/test_interface_picontrol.py +++ b/tests/dbus_middleware1/process_image/test_interface_picontrol.py @@ -1,10 +1,13 @@ # -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later -from revpi_middleware.cli_commands.dbus_helper import BusType, simple_call +from threading import Thread +from time import sleep + +from revpi_middleware.cli_commands.dbus_helper import BusType, await_signal, simple_call from revpi_middleware.dbus_middleware1 import extend_interface from tests.dbus_middleware1.bus_provider import TestBusProvider -from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE +from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE, RESET_DRIVER_EVENT class TestObjectPicontrol(TestBusProvider): @@ -20,3 +23,23 @@ class TestObjectPicontrol(TestBusProvider): ) ioctl_call = IOCTL_QUEUE.get(timeout=2.0) self.assertEqual((19212, 0), ioctl_call) + + def test_notify_reset_driver(self): + timeout = 5 + + def target_call_reset_driver(): + sleep(1.0) + RESET_DRIVER_EVENT.set() + + th_wait_for_reset = Thread(target=target_call_reset_driver, daemon=True) + th_wait_for_reset.start() + + result = await_signal( + "NotifyDriverReset", + timeout, + extend_interface("picontrol"), + bus_type=BusType.SESSION, + ) + self.assertTrue(result) + + th_wait_for_reset.join(timeout=timeout)