Merge tag 'v0.0.2' into debian/bookworm

Release version 0.0.2
This commit is contained in:
2025-04-19 15:58:57 +02:00
15 changed files with 439 additions and 86 deletions

View File

@@ -4,7 +4,7 @@
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger
from .dbus_helper import await_signal, simple_call from .dbus_helper import BusType, await_signal, simple_call
from .. import proginit as pi from .. import proginit as pi
from ..dbus_middleware1 import extend_interface from ..dbus_middleware1 import extend_interface
@@ -36,12 +36,21 @@ def add_subparsers(parent_parser: ArgumentParser):
def method_reset(): def method_reset():
log.debug("D-Bus call of method ResetDriver") log.debug("D-Bus call of method ResetDriver")
simple_call("ResetDriver", interface=extend_interface("picontrol")) simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus") log.info("ResetDriver called via D-Bus")
def method_await_reset(timout: int = 0): def method_await_reset(timeout: int = 0):
detected_signal = await_signal("NotifyDriverReset", timout, extend_interface("picontrol")) detected_signal = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal: if detected_signal:
log.info("ResetDriver signal received") log.info("ResetDriver signal received")
else: else:

View File

@@ -1,50 +1,101 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus helper functions for cli commands.""" """D-Bus helper functions for cli commands."""
from enum import Enum
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from gi.repository import GLib from gi.repository import GLib
from pydbus import SystemBus from pydbus import SessionBus, SystemBus
from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH from ..dbus_middleware1 import REVPI_DBUS_BASE_PATH
from ..dbus_middleware1 import REVPI_DBUS_NAME from ..dbus_middleware1 import REVPI_DBUS_NAME
PICONTROL_INTERFACE = "com.revolutionpi.middleware1.picontrol"
RESET_DRIVER_METHOD = "ResetDriver" class BusType(Enum):
SESSION = "session"
SYSTEM = "system"
def simple_call(method: str, *args, interface: str, object_path=REVPI_DBUS_BASE_PATH): def simple_call(
method: str,
*args,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
""" """
Executes a method on a specific D-Bus object interface within the RevPi system. This function Performs a call to a D-Bus method on a specified interface and object.
connects to the system bus, retrieves the desired interface and object path, and invokes
the specified method with provided arguments. This function uses the D-Bus messaging system to dynamically call a method
of a specified interface, using the given object path and bus type. It
provides a way to interact with D-Bus interfaces, using either a system or
session bus, and returns the result of executing the specified method.
Parameters: Parameters:
method: str method: str
The name of the method to be invoked on the targeted interface. The name of the method to invoke on the D-Bus interface.
*args: tuple *args:
Positional arguments to be passed to the method being invoked. Additional positional arguments to pass to the specified D-Bus method.
interface: str interface: str
The name of the D-Bus interface providing the required functionality. The name of the D-Bus interface containing the method.
object_path: str, optional object_path:
The D-Bus object path of the RevPi interface. Defaults to REVPI_DBUS_BASE_PATH. The path of the D-Bus object on which the interface is defined. Defaults
to REVPI_DBUS_BASE_PATH.
bus_type: BusType
Specifies whether to use the system or session bus. Defaults to BusType.SYSTEM.
Returns: Returns:
Any The value returned by the D-Bus method.
The result of the method invocation on the targeted D-Bus interface.
Raises:
Any errors raised from the D-Bus call will propagate to the caller.
""" """
bus = SystemBus() bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path) revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface] iface = revpi[interface]
return getattr(iface, method)(*args) return getattr(iface, method)(*args)
def await_signal(signal_name: str, timeout: int, interface: str, object_path=REVPI_DBUS_BASE_PATH): def await_signal(
signal_name: str,
timeout: int,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
"""
Waits for a specific signal and returns whether the signal was detected.
This function connects to a D-Bus interface and waits for a specific signal
to be emitted. If the signal is not received within the specified timeout
period, the function will return False. If the signal is detected within
the timeout, the function will return True. It can connect to either the
system bus or the session bus, depending on the provided `bus_type`.
Parameters:
signal_name: str
The name of the signal to be awaited.
timeout: int
The maximum time to wait for the signal, in seconds. A value of 0 or
less means that there is no timeout.
interface: str
The name of the D-Bus interface to listen on.
object_path
The D-Bus object path where the interface resides. Defaults to
REVPI_DBUS_BASE_PATH.
bus_type
The type of D-Bus to connect to. Can be either BusType.SYSTEM or
BusType.SESSION. Defaults to BusType.SYSTEM.
Returns:
bool
True if the signal was detected within the timeout period, False
otherwise.
"""
detected_signal = False detected_signal = False
timeout = int(timeout) timeout = int(timeout)
loop = GLib.MainLoop() loop = GLib.MainLoop()
th_sleep = Thread()
def th_timeout(): def th_timeout():
sleep(timeout) sleep(timeout)
@@ -55,7 +106,7 @@ def await_signal(signal_name: str, timeout: int, interface: str, object_path=REV
detected_signal = True detected_signal = True
loop.quit() loop.quit()
bus = SystemBus() bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path) revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface] iface = revpi[interface]

View File

@@ -42,7 +42,7 @@ class MiddlewareDaemon:
if self.bus_provider and self.bus_provider.is_alive(): if self.bus_provider and self.bus_provider.is_alive():
return return
self.bus_provider = BusProvider() self.bus_provider = BusProvider(use_system_bus=not pi.pargs.use_session_bus)
self.bus_provider.start() self.bus_provider.start()
log.debug("leave MiddlewareDaemon.dbus_start") log.debug("leave MiddlewareDaemon.dbus_start")

View File

@@ -2,27 +2,7 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus middleware version 1 of revpi_middleware.""" """D-Bus middleware version 1 of revpi_middleware."""
from ..__about__ import __author__, __copyright__, __license__, __version__ from .dbus_helper import REVPI_DBUS_BASE_PATH, REVPI_DBUS_NAME
from .dbus_helper import extend_interface
REVPI_DBUS_NAME = "com.revolutionpi.middleware1" from .bus_provider import BusProvider
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
def extend_interface(*args) -> str:
"""
Extends an interface name by appending additional segments to a pre-defined base name.
This function takes multiple arguments, concatenates them with a predefined base
interface name, and returns the resulting string, effectively constructing an
extended interface name.
Args:
*args: str
Components to be appended to the base interface name.
Returns:
str
Fully constructed interface name by joining the base interface name with
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])

View File

@@ -6,7 +6,7 @@ from logging import getLogger
from threading import Thread from threading import Thread
from gi.repository import GLib from gi.repository import GLib
from pydbus import SystemBus from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
@@ -16,25 +16,59 @@ log = getLogger(__name__)
class BusProvider(Thread): class BusProvider(Thread):
def __init__(self): def __init__(
self,
picontrol_device="/dev/piControl0",
config_rsc="/etc/revpi/config.rsc",
use_system_bus=True,
):
log.debug("enter BusProvider.__init__") log.debug("enter BusProvider.__init__")
super().__init__() super().__init__()
self._bus = SystemBus() self._bus = SystemBus() if use_system_bus else SessionBus()
self._loop = GLib.MainLoop() self._loop = GLib.MainLoop()
self.picontrol_device = picontrol_device
self.config_rsc = config_rsc
def run(self): def run(self):
log.debug("enter BusProvider.run") log.debug("enter BusProvider.run")
self._bus.publish( # The 2nd, 3rd, ... arguments can be objects or tuples of a path and an object
REVPI_DBUS_NAME, # Example(),
InterfacePiControl(), # ("Subdir1", Example()),
) # ("Subdir2", Example()),
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc),
]
try:
self._bus.publish(
REVPI_DBUS_NAME,
*lst_interfaces,
)
except Exception as e:
log.error(f"can not publish dbus {REVPI_DBUS_NAME}: {e}")
try:
self._loop.run()
except Exception as e:
log.error(f"can not run dbus mainloop: {e}")
# Clean up all interfaces
for interface in lst_interfaces:
if type(interface) is tuple:
_, interface = interface
interface.cleanup()
self._loop.run()
log.debug("leave BusProvider.run") log.debug("leave BusProvider.run")
def stop(self): def stop(self):
log.debug("enter BusProvider.stop") log.debug("enter BusProvider.stop")
self._loop.quit() self._loop.quit()
log.debug("leave BusProvider.stop") log.debug("leave BusProvider.stop")
@property
def running(self):
return self._loop.is_running()

View File

@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper for dbus."""
from logging import getLogger
log = getLogger(__name__)
REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class DbusInterface:
def cleanup(self):
"""
Represents a method responsible for performing cleanup operations. This method is executed to properly
release resources, close connections, or perform other necessary finalization tasks.
This method does not take any arguments or return a value.
"""
pass
def extend_interface(*args) -> str:
"""
Extends an interface name by appending additional segments to a pre-defined base name.
This function takes multiple arguments, concatenates them with a predefined base
interface name, and returns the resulting string, effectively constructing an
extended interface name.
Args:
*args: str
Components to be appended to the base interface name.
Returns:
str
Fully constructed interface name by joining the base interface name with
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])

View File

@@ -2,18 +2,17 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for piControl.""" """D-Bus interfaces for piControl."""
import os
from fcntl import ioctl
from logging import getLogger from logging import getLogger
from pydbus.generic import signal from pydbus.generic import signal
from ..interface_helper import ResetDriverWatchdog from .process_image_helper import PiControlIoctl, ResetDriverWatchdog
from ..dbus_helper import DbusInterface
log = getLogger(__name__) log = getLogger(__name__)
class InterfacePiControl: class InterfacePiControl(DbusInterface):
""" """
<node> <node>
<interface name='com.revolutionpi.middleware1.picontrol'> <interface name='com.revolutionpi.middleware1.picontrol'>
@@ -27,12 +26,16 @@ class InterfacePiControl:
NotifyDriverReset = signal() NotifyDriverReset = signal()
def __init__(self): def __init__(self, picontrol_device: str, config_rsc: str):
self.pi_control = "/dev/piControl0" self.picontrol_device = picontrol_device
self.config_rsc = config_rsc
self.wd_reset_driver = ResetDriverWatchdog(self.pi_control) self.wd_reset_driver = ResetDriverWatchdog(self.picontrol_device)
self.wd_reset_driver.register_call(self.notify_reset_driver) self.wd_reset_driver.register_call(self.notify_reset_driver)
def cleanup(self):
self.wd_reset_driver.stop()
def notify_reset_driver(self): def notify_reset_driver(self):
self.NotifyDriverReset() self.NotifyDriverReset()
@@ -40,21 +43,9 @@ class InterfacePiControl:
log.debug("enter InterfacePiControl.ResetDriver") log.debug("enter InterfacePiControl.ResetDriver")
try: try:
fd = os.open(self.pi_control, os.O_WRONLY) picontrol_ioctl = PiControlIoctl(self.picontrol_device)
except Exception as e: picontrol_ioctl.ioctl(PiControlIoctl.IOCTL_RESET_DRIVER)
log.warning(f"could not open ${self.pi_control} to reset driver")
raise e
execption = None
try:
# KB_RESET _IO('K', 12 ) // reset the piControl driver including the config file
ioctl(fd, 19212)
log.info("reset piControl driver") log.info("reset piControl driver")
except Exception as e: except Exception as e:
log.warning(f"could not reset piControl driver: ${e}") log.warning(f"could not reset piControl driver: ${e}")
execption = e raise e
finally:
os.close(fd)
if execption:
raise execption

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2020-2023 Sven Sager # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
""" """
Helper for the process image. Helper for the process image.
@@ -7,8 +7,8 @@ Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
""" """
import os import os
from ctypes import c_int
from fcntl import ioctl from fcntl import ioctl
from logging import getLogger from logging import getLogger
from threading import Thread from threading import Thread
@@ -19,9 +19,9 @@ log = getLogger(__name__)
class ResetDriverWatchdog(Thread): class ResetDriverWatchdog(Thread):
"""Watchdog to catch the reset_driver action.""" """Watchdog to catch the reset_driver action."""
def __init__(self, pi_control_device="/dev/piControl0"): def __init__(self, picontrol_device: str):
super(ResetDriverWatchdog, self).__init__() super(ResetDriverWatchdog, self).__init__()
self.procimg = pi_control_device self.procimg = picontrol_device
self.daemon = True self.daemon = True
self._calls = [] self._calls = []
self._exit = False self._exit = False
@@ -35,7 +35,7 @@ class ResetDriverWatchdog(Thread):
""" """
Mainloop of watchdog for reset_driver. Mainloop of watchdog for reset_driver.
If the thread can not open the process image or the IOCTL is not If the thread cannot open the process image or the IOCTL is not
implemented (wheezy), the thread function will stop. The trigger implemented (wheezy), the thread function will stop. The trigger
property will always return True. property will always return True.
""" """
@@ -51,7 +51,7 @@ class ResetDriverWatchdog(Thread):
) )
return return
# The ioctl will return 2 byte (c-type int) # The ioctl will return 2 bytes (c-type int)
byte_buff = bytearray(2) byte_buff = bytearray(2)
while not self._exit: while not self._exit:
try: try:
@@ -73,7 +73,7 @@ class ResetDriverWatchdog(Thread):
def register_call(self, function): def register_call(self, function):
"""Register a function, if watchdog triggers.""" """Register a function, if watchdog triggers."""
if not callable(function): if not callable(function):
return ValueError("Function is not callable.") raise ValueError("Function is not callable.")
if function not in self._calls: if function not in self._calls:
self._calls.append(function) self._calls.append(function)
@@ -89,7 +89,7 @@ class ResetDriverWatchdog(Thread):
log.debug("leave ResetDriverWatchdog.stop()") log.debug("leave ResetDriverWatchdog.stop()")
def unregister_call(self, function=None): def unregister_call(self, function=None):
"""Remove a function call on watchdog trigger.""" """Remove a function from the watchdog trigger."""
if function is None: if function is None:
self._calls.clear() self._calls.clear()
elif function in self._calls: elif function in self._calls:
@@ -101,3 +101,24 @@ class ResetDriverWatchdog(Thread):
rc = self._triggered rc = self._triggered
self._triggered = False self._triggered = False
return rc return rc
class PiControlIoctl:
IOCTL_RESET_DRIVER = 19212
def __init__(self, picontrol_device: str):
self.picontrol_device = picontrol_device
def ioctl(self, request, arg=0):
if type(arg) is c_int:
arg = arg.value
_fd = os.open(self.picontrol_device, os.O_WRONLY)
return_value = ioctl(_fd, 19212, arg)
os.close(_fd)
return return_value
@property
def name(self):
return self.picontrol_device

View File

@@ -9,7 +9,7 @@ __version__ = "1.4.0"
import logging import logging
import sys import sys
from argparse import ArgumentParser, Namespace from argparse import ArgumentParser, Namespace, SUPPRESS
from configparser import ConfigParser from configparser import ConfigParser
from enum import Enum from enum import Enum
from os import R_OK, W_OK, access, environ, getpid, remove from os import R_OK, W_OK, access, environ, getpid, remove
@@ -262,6 +262,16 @@ parser = ArgumentParser(
prog=programname, prog=programname,
description="Program description", description="Program description",
) )
# Use session bus of D-Bus for local testing and development proposes (hidden)
parser.add_argument(
"--use-session-bus",
dest="use_session_bus",
action="store_true",
default=False,
help=SUPPRESS,
)
parser.add_argument("--version", action="version", version=f"%(prog)s {program_version}") parser.add_argument("--version", action="version", version=f"%(prog)s {program_version}")
parser.add_argument( parser.add_argument(
"-f", "-f",

3
tests/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later

View File

@@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from os import environ
# D-BUS needs a DISPLAY variable to work with the session bus
environ["DISPLAY"] = ":0"

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from time import sleep
from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup
class TestBusProvider(PiControlDeviceMockup):
def setUp(self):
super().setUp()
# Do not import things on top of the module. Some classes or functions need to be mocked up first.
from revpi_middleware.dbus_middleware1 import BusProvider
# Prepare the bus provider and start it
self.bp = BusProvider(
self.picontrol.name,
use_system_bus=False,
)
self.bp.start()
# Wait 5 seconds until the bus provider has started the main loop
counter = 50
while not self.bp.running and counter > 0:
counter -= 1
sleep(0.1)
def tearDown(self):
self.bp.stop()
self.bp.join(10.0)
if self.bp.is_alive():
raise RuntimeError("Bus provider thread is still running")
super().tearDown()

View File

@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from ctypes import c_int
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from threading import Event, Thread
from unittest import TestCase
IOCTL_QUEUE = Queue()
RESET_DRIVER_EVENT = Event()
class FakePiControlDevice:
IOCTL_RESET_DRIVER = 19212
def __init__(self, picontrol_device: str):
self._fh = NamedTemporaryFile("wb+", 0, prefix="fake_device_")
self.name = self._fh.name
def __del__(self):
self._fh.close()
def reset_process_image(self):
self._fh.write(b"\x00" * 4096)
self._fh.seek(0)
def ioctl(self, request, arg=0) -> int:
if type(arg) is c_int:
arg = arg.value
if request == self.IOCTL_RESET_DRIVER:
pass
else:
raise NotImplementedError(f"Unknown IOCTL request: {request}")
IOCTL_QUEUE.put_nowait((request, arg))
return arg
def close(self):
self._fh.close()
def read(self, size):
return self._fh.read(size)
def seek(self, offset, whence):
self._fh.seek(offset, whence)
def write(self, buffer):
return self._fh.write(buffer)
class FakeResetDriverWatchdog(Thread):
def __init__(self, picontrol_device: str):
super().__init__()
self.daemon = True
self._calls = []
self._exit = False
self.not_implemented = True
self._triggered = False
self.start()
def run(self):
while not self._exit:
if RESET_DRIVER_EVENT.wait(0.1):
RESET_DRIVER_EVENT.clear()
self._triggered = True
for func in self._calls:
func()
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
raise ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
def stop(self):
"""Stop watchdog for reset_driver."""
self._exit = True
def unregister_call(self, function=None):
"""Remove a function from the watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
self._calls.remove(function)
@property
def triggered(self):
"""Will return True one time after watchdog was triggered."""
rc = self._triggered
self._triggered = False
return rc
class PiControlDeviceMockup(TestCase):
def setUp(self):
super().setUp()
# Empty the queue
while True:
try:
IOCTL_QUEUE.get_nowait()
except Empty:
break
# Replace classes with mockup classes
import revpi_middleware.dbus_middleware1.process_image.interface_picontrol as test_helpers
test_helpers.PiControlIoctl = FakePiControlDevice
test_helpers.ResetDriverWatchdog = FakeResetDriverWatchdog
# Create a fake picontrol0 device
self.picontrol = FakePiControlDevice(picontrol_device="/dev/fake_device_0")
def tearDown(self):
self.picontrol.close()
super().tearDown()

View File

@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later

View File

@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
from threading import Thread
from time import sleep
from revpi_middleware.cli_commands.dbus_helper import BusType, await_signal, simple_call
from revpi_middleware.dbus_middleware1 import extend_interface
from tests.dbus_middleware1.bus_provider import TestBusProvider
from tests.dbus_middleware1.fake_devices import IOCTL_QUEUE, RESET_DRIVER_EVENT
class TestObjectPicontrol(TestBusProvider):
def test_is_active(self):
self.assertTrue(self.bp.running)
def test_reset_driver(self):
simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
bus_type=BusType.SESSION,
)
ioctl_call = IOCTL_QUEUE.get(timeout=2.0)
self.assertEqual((19212, 0), ioctl_call)
def test_notify_reset_driver(self):
timeout = 5
def target_call_reset_driver():
sleep(1.0)
RESET_DRIVER_EVENT.set()
th_wait_for_reset = Thread(target=target_call_reset_driver, daemon=True)
th_wait_for_reset.start()
result = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
bus_type=BusType.SESSION,
)
self.assertTrue(result)
th_wait_for_reset.join(timeout=timeout)