feat(dbus): Add ResetDriverWatchdog helper as global dbus helper

Introduced a new helper class `ResetDriverWatchdog`, a thread-based
watchdog to detect the `reset_driver` action. The implementation is
adapted from the revpipyload project and includes methods to
register/unregister callbacks and manage the watchdog lifecycle. This
addition improves monitoring capabilities for the process image.
This commit is contained in:
2025-04-18 14:11:19 +02:00
parent 049ddfdc0f
commit 91a1fae411

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2020-2023 Sven Sager
# SPDX-License-Identifier: GPL-2.0-or-later
"""
Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
"""
import os
from fcntl import ioctl
from logging import getLogger
from threading import Thread
log = getLogger(__name__)
class ResetDriverWatchdog(Thread):
"""Watchdog to catch the reset_driver action."""
def __init__(self, pi_control_device="/dev/piControl0"):
super(ResetDriverWatchdog, self).__init__()
self.procimg = pi_control_device
self.daemon = True
self._calls = []
self._exit = False
self._fh = None
self.not_implemented = False
"""True, if KB_WAIT_FOR_EVENT is not implemented in piControl."""
self._triggered = False
self.start()
def run(self):
"""
Mainloop of watchdog for reset_driver.
If the thread can not open the process image or the IOCTL is not
implemented (wheezy), the thread function will stop. The trigger
property will always return True.
"""
log.debug("enter ResetDriverWatchdog.run()")
try:
self._fh = os.open(self.procimg, os.O_RDONLY)
except Exception:
self.not_implemented = True
log.error(
"can not open process image at '{0}' for reset_driver watchdog"
"".format(self.procimg)
)
return
# The ioctl will return 2 byte (c-type int)
byte_buff = bytearray(2)
while not self._exit:
try:
rc = ioctl(self._fh, 19250, byte_buff)
if rc == 0 and byte_buff[0] == 1:
self._triggered = True
log.debug("reset_driver detected")
for func in self._calls:
func()
except Exception:
self.not_implemented = True
os.close(self._fh)
self._fh = None
log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented")
return
log.debug("leave ResetDriverWatchdog.run()")
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
return ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
def stop(self):
"""Stop watchdog for reset_driver."""
log.debug("enter ResetDriverWatchdog.stop()")
self._exit = True
if self._fh is not None:
os.close(self._fh)
self._fh = None
log.debug("leave ResetDriverWatchdog.stop()")
def unregister_call(self, function=None):
"""Remove a function call on watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
self._calls.remove(function)
@property
def triggered(self):
"""Will return True one time after watchdog was triggered."""
rc = self._triggered
self._triggered = False
return rc