diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d5998d4 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later diff --git a/tests/dbus_middleware1/__init__.py b/tests/dbus_middleware1/__init__.py new file mode 100644 index 0000000..286ffd6 --- /dev/null +++ b/tests/dbus_middleware1/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from os import environ + +# D-BUS needs a DISPLAY variable to work with the session bus +environ["DISPLAY"] = ":0" diff --git a/tests/dbus_middleware1/bus_provider.py b/tests/dbus_middleware1/bus_provider.py new file mode 100644 index 0000000..0f753d8 --- /dev/null +++ b/tests/dbus_middleware1/bus_provider.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from time import sleep + +from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup + + +class TestBusProvider(PiControlDeviceMockup): + + def setUp(self): + super().setUp() + + # Do not import things on top of the module. Some classes or functions need to be mocked up first. + from revpi_middleware.dbus_middleware1 import BusProvider + + # Prepare the bus provider and start it + self.bp = BusProvider( + self.picontrol.name, + use_system_bus=False, + ) + self.bp.start() + + # Wait 5 seconds until the bus provider has started the main loop + counter = 50 + while not self.bp.running and counter > 0: + counter -= 1 + sleep(0.1) + + def tearDown(self): + self.bp.stop() + self.bp.join(10.0) + if self.bp.is_alive(): + raise RuntimeError("Bus provider thread is still running") + + super().tearDown() diff --git a/tests/dbus_middleware1/fake_devices.py b/tests/dbus_middleware1/fake_devices.py new file mode 100644 index 0000000..6d62477 --- /dev/null +++ b/tests/dbus_middleware1/fake_devices.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from ctypes import c_int +from queue import Queue +from tempfile import NamedTemporaryFile +from unittest import TestCase + +IOCTL_QUEUE = Queue() + + +class FakePiControlDevice: + IOCTL_RESET_DRIVER = 19212 + + def __init__(self, picontrol_device: str): + self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_") + self.name = self._fh.name + + def __del__(self): + self._fh.close() + + def reset_process_image(self): + self._fh.write(b"\x00" * 4096) + self._fh.seek(0) + + def ioctl(self, request, arg=0) -> int: + if type(arg) is c_int: + arg = arg.value + + if request == self.IOCTL_RESET_DRIVER: + pass + else: + raise NotImplementedError(f"Unknown IOCTL request: {request}") + + IOCTL_QUEUE.put_nowait((request, arg)) + return arg + + def close(self): + self._fh.close() + + def read(self, size): + return self._fh.read(size) + + def seek(self, offset, whence): + self._fh.seek(offset, whence) + + def write(self, buffer): + return self._fh.write(buffer) + + +class PiControlDeviceMockup(TestCase): + + def setUp(self): + super().setUp() + + # Replace classes with mockup classes + import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers + test_helpers.PiControlIoctl = FakePiControlDevice + + # Create a fake picontrol0 device + self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") + + def tearDown(self): + self.picontrol.close() + super().tearDown()