# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later from ctypes import c_int from queue import Empty, Queue from tempfile import NamedTemporaryFile from threading import Event, Thread from unittest import TestCase IOCTL_QUEUE = Queue() RESET_DRIVER_EVENT = Event() class FakePiControlDevice: IOCTL_RESET_DRIVER = 19212 def __init__(self, picontrol_device: str): self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_") self.name = self._fh.name def __del__(self): self._fh.close() def reset_process_image(self): self._fh.write(b"\x00" * 4096) self._fh.seek(0) def ioctl(self, request, arg=0) -> int: if type(arg) is c_int: arg = arg.value if request == self.IOCTL_RESET_DRIVER: pass else: raise NotImplementedError(f"Unknown IOCTL request: {request}") IOCTL_QUEUE.put_nowait((request, arg)) return arg def close(self): self._fh.close() def read(self, size): return self._fh.read(size) def seek(self, offset, whence): self._fh.seek(offset, whence) def write(self, buffer): return self._fh.write(buffer) class FakeResetDriverWatchdog(Thread): def __init__(self, picontrol_device: str): super().__init__() self.daemon = True self._calls = [] self._exit = False self.not_implemented = True self._triggered = False self.start() def run(self): while not self._exit: if RESET_DRIVER_EVENT.wait(0.1): RESET_DRIVER_EVENT.clear() self._triggered = True for func in self._calls: func() def register_call(self, function): """Register a function, if watchdog triggers.""" if not callable(function): raise ValueError("Function is not callable.") if function not in self._calls: self._calls.append(function) def stop(self): """Stop watchdog for reset_driver.""" self._exit = True def unregister_call(self, function=None): """Remove a function from the watchdog trigger.""" if function is None: self._calls.clear() elif function in self._calls: self._calls.remove(function) @property def triggered(self): """Will return True one time after watchdog was triggered.""" rc = self._triggered self._triggered = False return rc class PiControlDeviceMockup(TestCase): def setUp(self): super().setUp() # Empty the queue while True: try: IOCTL_QUEUE.get_nowait() except Empty: break # Replace classes with mockup classes import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers test_helpers.PiControlIoctl = FakePiControlDevice test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog # Create a fake picontrol0 device self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") def tearDown(self): self.picontrol.close() super().tearDown()