From 4c1dc1c9b5a66864cfd70c64e19fb55b88284b41 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sat, 19 Apr 2025 08:13:02 +0200 Subject: [PATCH] refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py The ResetDriverWatchdog class was relocated from dbus_helper.py to a new helper module, process_image_helper.py, to improve code organization and maintainability. Updated imports in relevant files to reflect this change. --- .../dbus_middleware1/dbus_helper.py | 99 +---------------- .../process_image/interface_picontrol.py | 2 +- .../process_image/process_image_helper.py | 102 ++++++++++++++++++ 3 files changed, 105 insertions(+), 98 deletions(-) create mode 100644 src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py diff --git a/src/revpi_middleware/dbus_middleware1/dbus_helper.py b/src/revpi_middleware/dbus_middleware1/dbus_helper.py index 5b742c4..bec4697 100644 --- a/src/revpi_middleware/dbus_middleware1/dbus_helper.py +++ b/src/revpi_middleware/dbus_middleware1/dbus_helper.py @@ -1,17 +1,9 @@ # -*- coding: utf-8 -*- -# SPDX-FileCopyrightText: 2020-2023 Sven Sager +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later -""" -Helper for the process image. +"""Helper for dbus.""" -The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" -https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py -""" - -import os -from fcntl import ioctl from logging import getLogger -from threading import Thread log = getLogger(__name__) @@ -19,93 +11,6 @@ REVPI_DBUS_NAME = "com.revolutionpi.middleware1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" -class ResetDriverWatchdog(Thread): - """Watchdog to catch the reset_driver action.""" - - def __init__(self, pi_control_device="/dev/piControl0"): - super(ResetDriverWatchdog, self).__init__() - self.procimg = pi_control_device - self.daemon = True - self._calls = [] - self._exit = False - self._fh = None - self.not_implemented = False - """True, if KB_WAIT_FOR_EVENT is not implemented in piControl.""" - self._triggered = False - self.start() - - def run(self): - """ - Mainloop of watchdog for reset_driver. - - If the thread can not open the process image or the IOCTL is not - implemented (wheezy), the thread function will stop. The trigger - property will always return True. - """ - log.debug("enter ResetDriverWatchdog.run()") - - try: - self._fh = os.open(self.procimg, os.O_RDONLY) - except Exception: - self.not_implemented = True - log.error( - "can not open process image at '{0}' for reset_driver watchdog" - "".format(self.procimg) - ) - return - - # The ioctl will return 2 byte (c-type int) - byte_buff = bytearray(2) - while not self._exit: - try: - rc = ioctl(self._fh, 19250, byte_buff) - if rc == 0 and byte_buff[0] == 1: - self._triggered = True - log.debug("reset_driver detected") - for func in self._calls: - func() - except Exception: - self.not_implemented = True - os.close(self._fh) - self._fh = None - log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented") - return - - log.debug("leave ResetDriverWatchdog.run()") - - def register_call(self, function): - """Register a function, if watchdog triggers.""" - if not callable(function): - return ValueError("Function is not callable.") - if function not in self._calls: - self._calls.append(function) - - def stop(self): - """Stop watchdog for reset_driver.""" - log.debug("enter ResetDriverWatchdog.stop()") - - self._exit = True - if self._fh is not None: - os.close(self._fh) - self._fh = None - - log.debug("leave ResetDriverWatchdog.stop()") - - def unregister_call(self, function=None): - """Remove a function call on watchdog trigger.""" - if function is None: - self._calls.clear() - elif function in self._calls: - self._calls.remove(function) - - @property - def triggered(self): - """Will return True one time after watchdog was triggered.""" - rc = self._triggered - self._triggered = False - return rc - - def extend_interface(*args) -> str: """ Extends an interface name by appending additional segments to a pre-defined base name. diff --git a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py index 24e212f..3166545 100644 --- a/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py +++ b/src/revpi_middleware/dbus_middleware1/process_image/interface_picontrol.py @@ -8,7 +8,7 @@ from logging import getLogger from pydbus.generic import signal -from ..dbus_helper import ResetDriverWatchdog +from .process_image_helper import ResetDriverWatchdog log = getLogger(__name__) diff --git a/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py new file mode 100644 index 0000000..bee8ec7 --- /dev/null +++ b/src/revpi_middleware/dbus_middleware1/process_image/process_image_helper.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2025 KUNBUS GmbH +# SPDX-License-Identifier: GPL-2.0-or-later +""" +Helper for the process image. + +The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" +https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py +""" +import os +from fcntl import ioctl +from logging import getLogger +from threading import Thread + +log = getLogger(__name__) + + +class ResetDriverWatchdog(Thread): + """Watchdog to catch the reset_driver action.""" + + def __init__(self, pi_control_device="/dev/piControl0"): + super(ResetDriverWatchdog, self).__init__() + self.procimg = pi_control_device + self.daemon = True + self._calls = [] + self._exit = False + self._fh = None + self.not_implemented = False + """True, if KB_WAIT_FOR_EVENT is not implemented in piControl.""" + self._triggered = False + self.start() + + def run(self): + """ + Mainloop of watchdog for reset_driver. + + If the thread cannot open the process image or the IOCTL is not + implemented (wheezy), the thread function will stop. The trigger + property will always return True. + """ + log.debug("enter ResetDriverWatchdog.run()") + + try: + self._fh = os.open(self.procimg, os.O_RDONLY) + except Exception: + self.not_implemented = True + log.error( + "can not open process image at '{0}' for reset_driver watchdog" + "".format(self.procimg) + ) + return + + # The ioctl will return 2 bytes (c-type int) + byte_buff = bytearray(2) + while not self._exit: + try: + rc = ioctl(self._fh, 19250, byte_buff) + if rc == 0 and byte_buff[0] == 1: + self._triggered = True + log.debug("reset_driver detected") + for func in self._calls: + func() + except Exception: + self.not_implemented = True + os.close(self._fh) + self._fh = None + log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented") + return + + log.debug("leave ResetDriverWatchdog.run()") + + def register_call(self, function): + """Register a function, if watchdog triggers.""" + if not callable(function): + raise ValueError("Function is not callable.") + if function not in self._calls: + self._calls.append(function) + + def stop(self): + """Stop watchdog for reset_driver.""" + log.debug("enter ResetDriverWatchdog.stop()") + + self._exit = True + if self._fh is not None: + os.close(self._fh) + self._fh = None + + log.debug("leave ResetDriverWatchdog.stop()") + + def unregister_call(self, function=None): + """Remove a function from the watchdog trigger.""" + if function is None: + self._calls.clear() + elif function in self._calls: + self._calls.remove(function) + + @property + def triggered(self): + """Will return True one time after watchdog was triggered.""" + rc = self._triggered + self._triggered = False + return rc