# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2020-2023 Sven Sager # SPDX-License-Identifier: GPL-2.0-or-later """ Helper for the process image. The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs" https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py """ import os from fcntl import ioctl from logging import getLogger from threading import Thread log = getLogger(__name__) REVPI_DBUS_NAME = "com.revolutionpi.middleware1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" class ResetDriverWatchdog(Thread): """Watchdog to catch the reset_driver action.""" def __init__(self, pi_control_device="/dev/piControl0"): super(ResetDriverWatchdog, self).__init__() self.procimg = pi_control_device self.daemon = True self._calls = [] self._exit = False self._fh = None self.not_implemented = False """True, if KB_WAIT_FOR_EVENT is not implemented in piControl.""" self._triggered = False self.start() def run(self): """ Mainloop of watchdog for reset_driver. If the thread can not open the process image or the IOCTL is not implemented (wheezy), the thread function will stop. The trigger property will always return True. """ log.debug("enter ResetDriverWatchdog.run()") try: self._fh = os.open(self.procimg, os.O_RDONLY) except Exception: self.not_implemented = True log.error( "can not open process image at '{0}' for reset_driver watchdog" "".format(self.procimg) ) return # The ioctl will return 2 byte (c-type int) byte_buff = bytearray(2) while not self._exit: try: rc = ioctl(self._fh, 19250, byte_buff) if rc == 0 and byte_buff[0] == 1: self._triggered = True log.debug("reset_driver detected") for func in self._calls: func() except Exception: self.not_implemented = True os.close(self._fh) self._fh = None log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented") return log.debug("leave ResetDriverWatchdog.run()") def register_call(self, function): """Register a function, if watchdog triggers.""" if not callable(function): return ValueError("Function is not callable.") if function not in self._calls: self._calls.append(function) def stop(self): """Stop watchdog for reset_driver.""" log.debug("enter ResetDriverWatchdog.stop()") self._exit = True if self._fh is not None: os.close(self._fh) self._fh = None log.debug("leave ResetDriverWatchdog.stop()") def unregister_call(self, function=None): """Remove a function call on watchdog trigger.""" if function is None: self._calls.clear() elif function in self._calls: self._calls.remove(function) @property def triggered(self): """Will return True one time after watchdog was triggered.""" rc = self._triggered self._triggered = False return rc def extend_interface(*args) -> str: """ Extends an interface name by appending additional segments to a pre-defined base name. This function takes multiple arguments, concatenates them with a predefined base interface name, and returns the resulting string, effectively constructing an extended interface name. Args: *args: str Components to be appended to the base interface name. Returns: str Fully constructed interface name by joining the base interface name with the provided segments. """ return ".".join([REVPI_DBUS_NAME, *args])