diff --git a/src/revpi_middleware/cli_commands/cli_picontrol.py b/src/revpi_middleware/cli_commands/cli_picontrol.py index 8fdaf67..6eb355f 100644 --- a/src/revpi_middleware/cli_commands/cli_picontrol.py +++ b/src/revpi_middleware/cli_commands/cli_picontrol.py @@ -4,7 +4,7 @@ from argparse import ArgumentParser from logging import getLogger -from .dbus_helper import await_signal, simple_call +from .dbus_helper import BusType, await_signal, simple_call from .. import proginit as pi from ..dbus_middleware1 import extend_interface @@ -36,12 +36,21 @@ def add_subparsers(parent_parser: ArgumentParser): def method_reset(): log.debug("D-Bus call of method ResetDriver") - simple_call("ResetDriver", interface=extend_interface("picontrol")) + simple_call( + "ResetDriver", + interface=extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) log.info("ResetDriver called via D-Bus") -def method_await_reset(timout: int = 0): - detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol")) +def method_await_reset(timeout: int = 0): + detected_signal = await_signal( + "NotifyDriverReset", + timeout, + extend_interface("picontrol"), + bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, + ) if detected_signal: log.info("ResetDriver signal received") else: diff --git a/src/revpi_middleware/cli_commands/dbus_helper.py b/src/revpi_middleware/cli_commands/dbus_helper.py index 4418736..e14f072 100644 --- a/src/revpi_middleware/cli_commands/dbus_helper.py +++ b/src/revpi_middleware/cli_commands/dbus_helper.py @@ -1,50 +1,101 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus helper functions for cli commands.""" +from enum import Enum from threading import Thread from time import sleep from gi.repository import GLib -from pydbus import SystemBus +from pydbus import SessionBus, SystemBus from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_NAME -PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol" -RESET_DRIVER_METHOD = "ResetDriver" + +class BusType(Enum): + SESSION = "session" + SYSTEM = "system" -def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def simple_call( + method: str, + *args, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): """ - Executes a method on a specific D-Bus object interface within the RevPi system. This function - connects to the system bus, retrieves the desired interface and object path, and invokes - the specified method with provided arguments. + Performs a call to a D-Bus method on a specified interface and object. + + This function uses the D-Bus messaging system to dynamically call a method + of a specified interface, using the given object path and bus type. It + provides a way to interact with D-Bus interfaces, using either a system or + session bus, and returns the result of executing the specified method. Parameters: method: str - The name of the method to be invoked on the targeted interface. - *args: tuple - Positional arguments to be passed to the method being invoked. + The name of the method to invoke on the D-Bus interface. + *args: + Additional positional arguments to pass to the specified D-Bus method. interface: str - The name of the D-Bus interface providing the required functionality. - object_path: str, optional - The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH. + The name of the D-Bus interface containing the method. + object_path: + The path of the D-Bus object on which the interface is defined. Defaults + to REVPI_DBUS_BASE_PATH. + bus_type: BusType + Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM. Returns: - Any - The result of the method invocation on the targeted D-Bus interface. + The value returned by the D-Bus method. + + Raises: + Any errors raised from the D-Bus call will propagate to the caller. """ - bus = SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] return getattr(iface, method)(*args) -def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): +def await_signal( + signal_name: str, + timeout: int, + interface: str, + object_path=REVPI_DBUS_BASE_PATH, + bus_type=BusType.SYSTEM, +): + """ + Waits for a specific signal and returns whether the signal was detected. + + This function connects to a D-Bus interface and waits for a specific signal + to be emitted. If the signal is not received within the specified timeout + period, the function will return False. If the signal is detected within + the timeout, the function will return True. It can connect to either the + system bus or the session bus, depending on the provided `bus_type`. + + Parameters: + signal_name: str + The name of the signal to be awaited. + timeout: int + The maximum time to wait for the signal, in seconds. A value of 0 or + less means that there is no timeout. + interface: str + The name of the D-Bus interface to listen on. + object_path + The D-Bus object path where the interface resides. Defaults to + REVPI_DBUS_BASE_PATH. + bus_type + The type of D-Bus to connect to. Can be either BusType.SYSTEM or + BusType.SESSION. Defaults to BusType.SYSTEM. + + Returns: + bool + True if the signal was detected within the timeout period, False + otherwise. + """ detected_signal = False timeout = int(timeout) loop = GLib.MainLoop() - th_sleep = Thread() def th_timeout(): sleep(timeout) @@ -55,7 +106,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV detected_signal = True loop.quit() - bus = SystemBus() + bus = SessionBus() if bus_type is BusType.SESSION else SystemBus() revpi = bus.get(REVPI_DBUS_NAME, object_path) iface = revpi[interface] diff --git a/src/revpi_middleware/daemon.py b/src/revpi_middleware/daemon.py index 41ca9a5..1ea8523 100644 --- a/src/revpi_middleware/daemon.py +++ b/src/revpi_middleware/daemon.py @@ -42,7 +42,7 @@ class MiddlewareDaemon: if self.bus_provider and self.bus_provider.is_alive(): return - self.bus_provider = BusProvider() + self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus) self.bus_provider.start() log.debug("leave MiddlewareDaemon.dbus_start") diff --git a/src/revpi_middleware/dbus_middleware1/__init__.py b/src/revpi_middleware/dbus_middleware1/__init__.py index 1c14733..6b65020 100644 --- a/src/revpi_middleware/dbus_middleware1/__init__.py +++ b/src/revpi_middleware/dbus_middleware1/__init__.py @@ -2,27 +2,7 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus middleware version 1 of revpi_middleware.""" -from ..__about__ import __author__, __copyright__, __license__, __version__ +from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME +from .dbus_helper import extend_interface -REVPI_DBUS_NAME = "com.revolutionpi.middleware1" -REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" - - -def extend_interface(*args) -> str: - """ - Extends an interface name by appending additional segments to a pre-defined base name. - - This function takes multiple arguments, concatenates them with a predefined base - interface name, and returns the resulting string, effectively constructing an - extended interface name. - - Args: - *args: str - Components to be appended to the base interface name. - - Returns: - str - Fully constructed interface name by joining the base interface name with - the provided segments. - """ - return ".".join([REVPI_DBUS_NAME, *args]) +from .bus_provider import BusProvider diff --git a/src/revpi_middleware/dbus_middleware1/bus_provider.py b/src/revpi_middleware/dbus_middleware1/bus_provider.py index 937ad75..6f91bce 100644 --- a/src/revpi_middleware/dbus_middleware1/bus_provider.py +++ b/src/revpi_middleware/dbus_middleware1/bus_provider.py @@ -6,7 +6,7 @@ from logging import getLogger from threading import Thread from gi.repository import GLib -from pydbus import SystemBus +from pydbus import SessionBus, SystemBus from . import REVPI_DBUS_NAME from .process_image import InterfacePiControl @@ -16,25 +16,59 @@ log = getLogger(__name__) class BusProvider(Thread): - def __init__(self): + def __init__( + self, + picontrol_device="/dev/piControl0", + config_rsc="/etc/revpi/config.rsc", + use_system_bus=True, + ): log.debug("enter BusProvider.__init__") super().__init__() - self._bus = SystemBus() + self._bus = SystemBus() if use_system_bus else SessionBus() self._loop = GLib.MainLoop() + self.picontrol_device = picontrol_device + self.config_rsc = config_rsc + def run(self): log.debug("enter BusProvider.run") - self._bus.publish( - REVPI_DBUS_NAME, - InterfacePiControl(), - ) + # The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object + # Example(), + # ("Subdir1", Example()), + # ("Subdir2", Example()), + # ("Subdir2/Whatever", Example()) + lst_interfaces = [ + InterfacePiControl(self.picontrol_device, self.config_rsc), + ] + + try: + self._bus.publish( + REVPI_DBUS_NAME, + *lst_interfaces, + ) + except Exception as e: + log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}") + + try: + self._loop.run() + except Exception as e: + log.error(f"can not run dbus mainloop: {e}") + + # Clean up all interfaces + for interface in lst_interfaces: + if type(interface) is tuple: + _, interface = interface + interface.cleanup() - self._loop.run() log.debug("leave BusProvider.run") def stop(self): log.debug("enter BusProvider.stop") self._loop.quit() log.debug("leave BusProvider.stop") + + @property + def running(self): + return self._loop.is_running() diff --git a/src/revpi_middleware/dbus_middleware1/dbus_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py new file mode 100644 index 0000000..23bcd2e --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +"""Helper for dbus.""" + +from logging import getLogger + +log = getLogger(__name__) + +REVPI_DBUS_NAME = "com.revolutionpi.middleware1" +REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" + + +class DbusInterface: + + def cleanup(self): + """ + Represents a method responsible for performing cleanup operations. This method is executed to properly + release resources, close connections, or perform other necessary finalization tasks. + + This method does not take any arguments or return a value. + """ + pass + + +def extend_interface(*args) -> str: + """ + Extends an interface name by appending additional segments to a pre-defined base name. + + This function takes multiple arguments, concatenates them with a predefined base + interface name, and returns the resulting string, effectively constructing an + extended interface name. + + Args: + *args: str + Components to be appended to the base interface name. + + Returns: + str + Fully constructed interface name by joining the base interface name with + the provided segments. + """ + return ".".join([REVPI_DBUS_NAME, *args]) diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index ada732a..f62a972 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -2,18 +2,17 @@ # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """D-Bus interfaces for piControl.""" -import os -from fcntl import ioctl from logging import getLogger from pydbus.generic import signal -from ..interface_helper import ResetDriverWatchdog +from .process_image_helper import PiControlIoctl, ResetDriverWatchdog +from ..dbus_helper import DbusInterface log = getLogger(__name__) -class InterfacePiControl: +class InterfacePiControl(DbusInterface): """ @@ -27,12 +26,16 @@ class InterfacePiControl: NotifyDriverReset = signal() - def __init__(self): - self.pi_control = "/dev/piControl0" + def __init__(self, picontrol_device: str, config_rsc: str): + self.picontrol_device = picontrol_device + self.config_rsc = config_rsc - self.wd_reset_driver = ResetDriverWatchdog(self.pi_control) + self.wd_reset_driver = ResetDriverWatchdog(self.picontrol_device) self.wd_reset_driver.register_call(self.notify_reset_driver) + def cleanup(self): + self.wd_reset_driver.stop() + def notify_reset_driver(self): self.NotifyDriverReset() @@ -40,21 +43,9 @@ class InterfacePiControl: log.debug("enter InterfacePiControl.ResetDriver") try: - fd = os.open(self.pi_control, os.O_WRONLY) - except Exception as e: - log.warning(f"could not open ${self.pi_control} to reset driver") - raise e - - execption = None - try: - # KB_RESET _IO('K', 12 ) // reset the piControl driver including the config file - ioctl(fd, 19212) + picontrol_ioctl = PiControlIoctl(self.picontrol_device) + picontrol_ioctl.ioctl(PiControlIoctl.IOCTL_RESET_DRIVER) log.info("reset piControl driver") except Exception as e: log.warning(f"could not reset piControl driver: ${e}") - execption = e - finally: - os.close(fd) - - if execption: - raise execption + raise e diff --git a/src/revpi_middleware/dbus_middleware1/interface_helper.py b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py similarity index 77% rename from src/revpi_middleware/dbus_middleware1/interface_helper.py rename to src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py index 3306519..5fcb2a6 100644 --- a/src/revpi_middleware/dbus_middleware1/interface_helper.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# SPDX-FileCopyrightText: 2020-2023 Sven Sager +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later """ Helper for the process image. @@ -7,8 +7,8 @@ Helper for the process image. The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py """ - import os +from ctypes import c_int from fcntl import ioctl from logging import getLogger from threading import Thread @@ -19,9 +19,9 @@ log = getLogger(__name__) class ResetDriverWatchdog(Thread): """Watchdog to catch the reset_driver action.""" - def __init__(self, pi_control_device="/dev/piControl0"): + def __init__(self, picontrol_device: str): super(ResetDriverWatchdog, self).__init__() - self.procimg = pi_control_device + self.procimg = picontrol_device self.daemon = True self._calls = [] self._exit = False @@ -35,7 +35,7 @@ class ResetDriverWatchdog(Thread): """ Mainloop of watchdog for reset_driver. - If the thread can not open the process image or the IOCTL is not + If the thread cannot open the process image or the IOCTL is not implemented (wheezy), the thread function will stop. The trigger property will always return True. """ @@ -51,7 +51,7 @@ class ResetDriverWatchdog(Thread): ) return - # The ioctl will return 2 byte (c-type int) + # The ioctl will return 2 bytes (c-type int) byte_buff = bytearray(2) while not self._exit: try: @@ -73,7 +73,7 @@ class ResetDriverWatchdog(Thread): def register_call(self, function): """Register a function, if watchdog triggers.""" if not callable(function): - return ValueError("Function is not callable.") + raise ValueError("Function is not callable.") if function not in self._calls: self._calls.append(function) @@ -89,7 +89,7 @@ class ResetDriverWatchdog(Thread): log.debug("leave ResetDriverWatchdog.stop()") def unregister_call(self, function=None): - """Remove a function call on watchdog trigger.""" + """Remove a function from the watchdog trigger.""" if function is None: self._calls.clear() elif function in self._calls: @@ -101,3 +101,24 @@ class ResetDriverWatchdog(Thread): rc = self._triggered self._triggered = False return rc + + +class PiControlIoctl: + IOCTL_RESET_DRIVER = 19212 + + def __init__(self, picontrol_device: str): + self.picontrol_device = picontrol_device + + def ioctl(self, request, arg=0): + if type(arg) is c_int: + arg = arg.value + + _fd = os.open(self.picontrol_device, os.O_WRONLY) + return_value = ioctl(_fd, 19212, arg) + os.close(_fd) + + return return_value + + @property + def name(self): + return self.picontrol_device diff --git a/src/revpi_middleware/proginit.py b/src/revpi_middleware/proginit.py index 75d2fb4..5a0a092 100644 --- a/src/revpi_middleware/proginit.py +++ b/src/revpi_middleware/proginit.py @@ -9,7 +9,7 @@ __version__ = "1.4.0" import logging import sys -from argparse import ArgumentParser, Namespace +from argparse import ArgumentParser, Namespace, SUPPRESS from configparser import ConfigParser from enum import Enum from os import R_OK, W_OK, access, environ, getpid, remove @@ -262,6 +262,16 @@ parser = ArgumentParser( prog=programname, description="Program description", ) + +# Use session bus of D-Bus for local testing and development proposes (hidden) +parser.add_argument( + "--use-session-bus", + dest="use_session_bus", + action="store_true", + default=False, + help=SUPPRESS, +) + parser.add_argument("--version", action="version", version=f"%(prog)s {program_version}") parser.add_argument( "-f", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d5998d4 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later diff --git a/tests/dbus_middleware1/__init__.py b/tests/dbus_middleware1/__init__.py new file mode 100644 index 0000000..286ffd6 --- /dev/null +++ b/tests/dbus_middleware1/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from os import environ + +# D-BUS needs a DISPLAY variable to work with the session bus +environ["DISPLAY"] = ":0" diff --git a/tests/dbus_middleware1/bus_provider.py b/tests/dbus_middleware1/bus_provider.py new file mode 100644 index 0000000..0f753d8 --- /dev/null +++ b/tests/dbus_middleware1/bus_provider.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from time import sleep + +from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup + + +class TestBusProvider(PiControlDeviceMockup): + + def setUp(self): + super().setUp() + + # Do not import things on top of the module. Some classes or functions need to be mocked up first. + from revpi_middleware.dbus_middleware1 import BusProvider + + # Prepare the bus provider and start it + self.bp = BusProvider( + self.picontrol.name, + use_system_bus=False, + ) + self.bp.start() + + # Wait 5 seconds until the bus provider has started the main loop + counter = 50 + while not self.bp.running and counter > 0: + counter -= 1 + sleep(0.1) + + def tearDown(self): + self.bp.stop() + self.bp.join(10.0) + if self.bp.is_alive(): + raise RuntimeError("Bus provider thread is still running") + + super().tearDown() diff --git a/tests/dbus_middleware1/fake_devices.py b/tests/dbus_middleware1/fake_devices.py new file mode 100644 index 0000000..edd16d1 --- /dev/null +++ b/tests/dbus_middleware1/fake_devices.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from ctypes import c_int +from queue import Empty, Queue +from tempfile import NamedTemporaryFile +from threading import Event, Thread +from unittest import TestCase + +IOCTL_QUEUE = Queue() +RESET_DRIVER_EVENT = Event() + + +class FakePiControlDevice: + IOCTL_RESET_DRIVER = 19212 + + def __init__(self, picontrol_device: str): + self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_") + self.name = self._fh.name + + def __del__(self): + self._fh.close() + + def reset_process_image(self): + self._fh.write(b"\x00" * 4096) + self._fh.seek(0) + + def ioctl(self, request, arg=0) -> int: + if type(arg) is c_int: + arg = arg.value + + if request == self.IOCTL_RESET_DRIVER: + pass + else: + raise NotImplementedError(f"Unknown IOCTL request: {request}") + + IOCTL_QUEUE.put_nowait((request, arg)) + return arg + + def close(self): + self._fh.close() + + def read(self, size): + return self._fh.read(size) + + def seek(self, offset, whence): + self._fh.seek(offset, whence) + + def write(self, buffer): + return self._fh.write(buffer) + + +class FakeResetDriverWatchdog(Thread): + + def __init__(self, picontrol_device: str): + super().__init__() + self.daemon = True + self._calls = [] + self._exit = False + self.not_implemented = True + self._triggered = False + self.start() + + def run(self): + while not self._exit: + if RESET_DRIVER_EVENT.wait(0.1): + RESET_DRIVER_EVENT.clear() + self._triggered = True + for func in self._calls: + func() + + def register_call(self, function): + """Register a function, if watchdog triggers.""" + if not callable(function): + raise ValueError("Function is not callable.") + if function not in self._calls: + self._calls.append(function) + + def stop(self): + """Stop watchdog for reset_driver.""" + self._exit = True + + def unregister_call(self, function=None): + """Remove a function from the watchdog trigger.""" + if function is None: + self._calls.clear() + elif function in self._calls: + self._calls.remove(function) + + @property + def triggered(self): + """Will return True one time after watchdog was triggered.""" + rc = self._triggered + self._triggered = False + return rc + + +class PiControlDeviceMockup(TestCase): + + def setUp(self): + super().setUp() + + # Empty the queue + while True: + try: + IOCTL_QUEUE.get_nowait() + except Empty: + break + + # Replace classes with mockup classes + import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers + test_helpers.PiControlIoctl = FakePiControlDevice + test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog + + # Create a fake picontrol0 device + self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0") + + def tearDown(self): + self.picontrol.close() + super().tearDown() diff --git a/tests/dbus_middleware1/process_image/__init__.py b/tests/dbus_middleware1/process_image/__init__.py new file mode 100644 index 0000000..d5998d4 --- /dev/null +++ b/tests/dbus_middleware1/process_image/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later diff --git a/tests/dbus_middleware1/process_image/test_interface_picontrol.py b/tests/dbus_middleware1/process_image/test_interface_picontrol.py new file mode 100644 index 0000000..cd11a6e --- /dev/null +++ b/tests/dbus_middleware1/process_image/test_interface_picontrol.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +from threading import Thread +from time import sleep + +from revpi_middleware.cli_commands.dbus_helper import BusType, await_signal, simple_call +from revpi_middleware.dbus_middleware1 import extend_interface +from tests.dbus_middleware1.bus_provider import TestBusProvider +from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE, RESET_DRIVER_EVENT + + +class TestObjectPicontrol(TestBusProvider): + + def test_is_active(self): + self.assertTrue(self.bp.running) + + def test_reset_driver(self): + simple_call( + "ResetDriver", + interface=extend_interface("picontrol"), + bus_type=BusType.SESSION, + ) + ioctl_call = IOCTL_QUEUE.get(timeout=2.0) + self.assertEqual((19212, 0), ioctl_call) + + def test_notify_reset_driver(self): + timeout = 5 + + def target_call_reset_driver(): + sleep(1.0) + RESET_DRIVER_EVENT.set() + + th_wait_for_reset = Thread(target=target_call_reset_driver, daemon=True) + th_wait_for_reset.start() + + result = await_signal( + "NotifyDriverReset", + timeout, + extend_interface("picontrol"), + bus_type=BusType.SESSION, + ) + self.assertTrue(result) + + th_wait_for_reset.join(timeout=timeout)