6 Commits

16 changed files with 147 additions and 510 deletions

48
debian/changelog vendored Normal file
View File

@@ -0,0 +1,48 @@
revpi-middleware (0.0.2-1+deb12+1) bookworm; urgency=medium
* refactor(dbus): Move D-Bus helper functions to a dedicated file
* refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py
* refactor(dbus): Parameterize `picontrol_device` and `config_rsc`
* feat: Add session bus option for local testing and development
* feat(dbus): Add import for BusProvider in dbus_middleware1 module
* feat(dbus): Add `running` property to `BusProvider`
* refactor(cli): D-Bus helpers support session and system bus types
* fix(dbus): Add error handling for DBus publishing and main loop
* refactor(dbus): piControl driver reset with PiControlIoctl class
* test(dbus): Add unit test framework for dbus_middleware1 module
* test(dbus): Add unit tests for PiControl D-Bus interface
* refactor(dbus): Fix typo and remove unused thread instance
* refactor(dbus): D-Bus interface management with cleanup support.
* test(dbus): Add support for testing driver reset notification
* feat(deb): Add dbus for testing to build dependencies
* fix(deb): Skip tests because of missing SystemBus in build container
-- Sven Sager <s.sager@kunbus.com> Sat, 19 Apr 2025 16:34:20 +0200
revpi-middleware (0.0.1-1+deb12+1) bookworm; urgency=medium
* docs: Start git project with python git-ignore and Readme
* docs: Use 'reuse' for SPDX Headers and Licenses
* feat: Add python base project files
* feat: Add proginit application basic module
* feat: Add the data directory for additional data files for the project
* test: Add tests directory with a dummy test
* build: Add all necessary files for the build system
* feat: Add dummy main application script
* feat: Add systemd file and data to integrate the app as a daemon
* chore: Update proginit to 1.4.0
* feat(dbus): Add ResetDriverWatchdog helper as global dbus helper
* feat(process_image): Add D-Bus interface for piControl driver
* feat(dbus): Add initial D-Bus middleware implementation
* feat(dbus): Add `extend_interface` function for dynamic interface naming
* feat(dbus): Add DBus policy configuration for revpi-middleware
* feat: Add MiddlewareDaemon implementation to revpi-middleware
* feat: Add daemon mode and signal handling to main application
* feat(cli): Add D-Bus helper functions for CLI commands.
* feat(cli): Add await_signal function to handle D-Bus signals
* feat(cli): Add `await-reset` to wait for piControl reset signal
* chore(build): Update requirements for this project
* feat(cli): Add new CLI tool entry point for `revpictl`
* feat(deb): Start packaging branch
-- Sven Sager <s.sager@kunbus.com> Fri, 18 Apr 2025 19:02:20 +0200

38
debian/control vendored Normal file
View File

@@ -0,0 +1,38 @@
Source: revpi-middleware
Section: python
Priority: optional
Maintainer: KUNBUS GmbH <support@kunbus.com>
Rules-Requires-Root: no
Homepage: https://revolutionpi.com/
Vcs-Browser: https://gitlab.com/revolutionpi/revpi-middleware
Vcs-Git: https://gitlab.com/revolutionpi/revpi-middleware.git -b debian/bookworm
Build-Depends:
dbus,
dbus-x11,
debhelper-compat (= 13),
dh-python,
python3-all,
python3-gi (>= 3.42.2),
python3-pydbus (>= 0.6.0),
python3-setuptools,
Standards-Version: 4.6.2
Package: revpi-middleware
Architecture: all
Pre-Depends: ${misc:Pre-Depends}
Depends:
${python3:Depends},
${misc:Depends}
Description: Revolution Pi middleware with D-Bus interface
The Revolution Pi middleware provides a robust communication interface for
Revolution Pi industrial computers. It enables seamless integration between
hardware components and applications through a D-Bus interface. The middleware
serves as a bridge for data exchange, device configuration, and system
monitoring.
.
Key features:
* Hardware abstraction layer for Revolution Pi I/O modules
* Real-time data processing and event handling
* Simplified API for accessing Revolution Pi hardware features
* Extensive configuration options for industrial automation tasks
* Built-in monitoring and diagnostic capabilities

27
debian/copyright vendored Normal file
View File

@@ -0,0 +1,27 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Source: https://gitlab.com/revolutionpi/opcua-revpi-server
Files: *
Copyright: 2025 KUNBUS GmbH
License: GPL-2+
Files: debian/*
Copyright: 2025 KUNBUS GmbH
License: GPL-2+
License: GPL-2+
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
.
On Debian systems, the complete text of the GNU General
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".

7
debian/gbp.conf vendored Normal file
View File

@@ -0,0 +1,7 @@
[DEFAULT]
upstream-branch = main
upstream-tag = v%(version)s
debian-branch=debian/bookworm
debian-tag = debian/%(version)s
debian-tag-msg = %(pkg)s Debian release %(version)s
pristine-tar = True

4
debian/revpi-middleware.install vendored Normal file
View File

@@ -0,0 +1,4 @@
data/dbus-policy/com.revolutionpi.middleware1.conf /usr/share/dbus-1/system.d
data/etc/default/revpi-middleware /etc/default/
data/etc/revpi-middleware/revpi-middleware.conf /etc/revpi-middleware/
data/systemd/before_253/revpi-middleware.service /lib/systemd/system/

2
debian/revpi-middleware.links vendored Normal file
View File

@@ -0,0 +1,2 @@
/usr/share/revpi-middleware/revpi-middleware /usr/sbin/revpi-middleware
/usr/share/revpi-middleware/revpicli /usr/bin/revpicli

12
debian/rules vendored Executable file
View File

@@ -0,0 +1,12 @@
#!/usr/bin/make -f
export PYBUILD_NAME=revpi-middleware
export PYBUILD_INSTALL_ARGS=--install-lib=/usr/share/$(PYBUILD_NAME)/ --install-scripts=/usr/share/$(PYBUILD_NAME)/
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_auto_test:
# Currently, the tests have to be skipped, because no SystemBus is
# available in the Docker container.
@echo "Skipped tests"

1
debian/source/format vendored Normal file
View File

@@ -0,0 +1 @@
3.0 (quilt)

View File

@@ -5,11 +5,12 @@
This module provides the foundation for the RevPi middleware CLI commands This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup. and argument parsing setup.
""" """
from argparse import ArgumentParser
from logging import getLogger from logging import getLogger
from . import cli_config, cli_picontrol from revpi_middleware.cli_commands import cli_picontrol
from revpi_middleware.proginit import StdLogOutput
from .. import proginit as pi from .. import proginit as pi
from ..proginit import StdLogOutput
log = getLogger(__name__) log = getLogger(__name__)

View File

@@ -38,7 +38,7 @@ def method_reset():
log.debug("D-Bus call of method ResetDriver") log.debug("D-Bus call of method ResetDriver")
simple_call( simple_call(
"ResetDriver", "ResetDriver",
interface=extend_interface("PiControl"), interface=extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
) )
log.info("ResetDriver called via D-Bus") log.info("ResetDriver called via D-Bus")
@@ -48,7 +48,7 @@ def method_await_reset(timeout: int = 0):
detected_signal = await_signal( detected_signal = await_signal(
"NotifyDriverReset", "NotifyDriverReset",
timeout, timeout,
extend_interface("PiControl"), extend_interface("picontrol"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM, bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
) )
if detected_signal: if detected_signal:

View File

@@ -41,38 +41,3 @@ def extend_interface(*args) -> str:
the provided segments. the provided segments.
""" """
return ".".join([REVPI_DBUS_NAME, *args]) return ".".join([REVPI_DBUS_NAME, *args])
def grep(pattern, filename):
"""
Searches for lines in a file that contain a given pattern and returns them as a list.
The function reads lines from the specified file and checks whether each line
contains the provided pattern. It returns a list of lines that match the
pattern. If the file is not found, an empty list is returned. Any other
exceptions during the file reading process are caught and logged.
Args:
pattern (str): The substring to search for within the file's lines.
filename (str): The path to the file that will be searched.
Returns:
list[str]: A list containing lines that include the provided pattern, with
leading and trailing spaces removed.
Raises:
FileNotFoundError: This error is caught if the file specified is not
found.
Exception: Any unforeseen exception during file operations is caught and
logged.
"""
try:
with open(filename, "r") as file:
# Gibt alle Zeilen zurück, die das Muster enthalten
matching_lines = [line.strip() for line in file if pattern in line]
return matching_lines
except FileNotFoundError:
return []
except Exception as e:
log.error(f"Error reading file: {e}")
return []

View File

@@ -15,7 +15,7 @@ log = getLogger(__name__)
class InterfacePiControl(DbusInterface): class InterfacePiControl(DbusInterface):
""" """
<node> <node>
<interface name='com.revolutionpi.middleware1.PiControl'> <interface name='com.revolutionpi.middleware1.picontrol'>
<signal name="NotifyDriverReset"> <signal name="NotifyDriverReset">
</signal> </signal>
<method name='ResetDriver'> <method name='ResetDriver'>

View File

@@ -1,5 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig

View File

@@ -1,98 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration."""
from collections import namedtuple
from logging import getLogger
from .revpi_config import (
ConfigActions,
configure_avahi_daemon,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
simple_systemd,
)
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
class InterfaceRevpiConfig(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.RevpiConfig">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
</interface>
</node>
"""
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.STATUS, *feature_function.args)
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args)
@property
def available_features(self) -> list[str]:
return list(AVAILABLE_FEATURES.keys())
def get_feature(feature: str) -> FeatureFunction:
if feature not in AVAILABLE_FEATURES:
raise ValueError(f"feature {feature} does not exist")
feature_function = AVAILABLE_FEATURES[feature]
if not feature_function:
raise NotImplementedError(f"feature {feature} is not implemented")
return feature_function
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": False,
"ieee80211": False,
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -1,365 +0,0 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_WIFI_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
UNKNOWN = 0
CM1 = 6
CM3 = 10
CM4 = 20
CM4S = 21
CM5 = 24
class ConfigActions(Enum):
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class RevPiConfig:
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._cm_with_wifi = False
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self._wlan_rfkill_index = None
self.serial = ""
self.model = ""
self._init_device_info()
def _init_device_info(self):
dc_cpuinfo = {}
# Extract CPU information
with open("/proc/cpuinfo", "r") as f:
line = "\n"
while line:
line = f.readline()
if line.startswith(("Revision", "Serial", "Model")):
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
dc_cpuinfo[key] = value
self.model = dc_cpuinfo.get("model", "")
self.serial = dc_cpuinfo.get("serial", "")
# Detect Compute Module type
revision = dc_cpuinfo.get("revision", "")
if revision:
revision = int(revision, 16)
mask = 4080 # 0xFF0 in dezimal
try:
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
except ValueError:
pass
# Detect WiFi on CM module
could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wifi:
wifi_interface = join(LINUX_WIFI_CLASS_PATH, "phy0")
lst_grep = grep("DRIVER=brcmfmac", join(wifi_interface, "device", "uevent"))
self._cm_with_wifi = len(lst_grep) > 0
self._wlan_class_path = wifi_interface
# If no build in Wi-Fi on the CM, detect third party Wi-Fi on RevPi Flat
if not self._cm_with_wifi and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wifi_interfaces = glob("/sys/class/ieee80211/*")
for wifi_interface in lst_wifi_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wifi_interface, "device", "uevent")):
self._cm_with_wifi = True
self._wlan_class_path = wifi_interface
# Detect rfkill index of the integrated Wi-Fi device
if self._wlan_class_path:
for rfkill_path in glob(join(self._wlan_class_path, "rfkill*")):
match_index = re.match(r"^/.+/rfkill(?P<index>\d+)$", rfkill_path)
if match_index:
self._wlan_rfkill_index = int(match_index.group("index"))
break
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
if could_have_con_bridge:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def cm_type(self) -> ComputeModuleTypes:
return self._cm_type
@property
def rfkill_index(self) -> Optional[int]:
return self._wlan_rfkill_index
@property
def with_con_bridge(self) -> bool:
return self._revpi_with_con_bridge
@property
def with_wifi(self) -> bool:
return self._cm_with_wifi
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
# Remove swapfile afer disabling the service unit
subprocess.call(
["/sbin/dphys-swapfile", "uninstall"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wifi
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wifi and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With wifi: ", rc.with_wifi)
if rc.with_wifi:
print(" rfkill index: ", rc.rfkill_index)
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)

View File

@@ -18,7 +18,7 @@ class TestObjectPicontrol(TestBusProvider):
def test_reset_driver(self): def test_reset_driver(self):
simple_call( simple_call(
"ResetDriver", "ResetDriver",
interface=extend_interface("PiControl"), interface=extend_interface("picontrol"),
bus_type=BusType.SESSION, bus_type=BusType.SESSION,
) )
ioctl_call = IOCTL_QUEUE.get(timeout=2.0) ioctl_call = IOCTL_QUEUE.get(timeout=2.0)
@@ -37,7 +37,7 @@ class TestObjectPicontrol(TestBusProvider):
result = await_signal( result = await_signal(
"NotifyDriverReset", "NotifyDriverReset",
timeout, timeout,
extend_interface("PiControl"), extend_interface("picontrol"),
bus_type=BusType.SESSION, bus_type=BusType.SESSION,
) )
self.assertTrue(result) self.assertTrue(result)