refactor(dbus): Move ResetDriverWatchdog to process_image_helper.py

The ResetDriverWatchdog class was relocated from dbus_helper.py to a new
helper module, process_image_helper.py, to improve code organization
and maintainability. Updated imports in relevant files to reflect this
change.
This commit is contained in:
2025-04-19 08:13:02 +02:00
parent e756d68556
commit 4c1dc1c9b5
3 changed files with 105 additions and 98 deletions

View File

@@ -1,17 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2020-2023 Sven Sager # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
""" """Helper for dbus."""
Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
"""
import os
from fcntl import ioctl
from logging import getLogger from logging import getLogger
from threading import Thread
log = getLogger(__name__) log = getLogger(__name__)
@@ -19,93 +11,6 @@ REVPI_DBUS_NAME = "com.revolutionpi.middleware1"
REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1" REVPI_DBUS_BASE_PATH = "/com/revolutionpi/middleware1"
class ResetDriverWatchdog(Thread):
"""Watchdog to catch the reset_driver action."""
def __init__(self, pi_control_device="/dev/piControl0"):
super(ResetDriverWatchdog, self).__init__()
self.procimg = pi_control_device
self.daemon = True
self._calls = []
self._exit = False
self._fh = None
self.not_implemented = False
"""True, if KB_WAIT_FOR_EVENT is not implemented in piControl."""
self._triggered = False
self.start()
def run(self):
"""
Mainloop of watchdog for reset_driver.
If the thread can not open the process image or the IOCTL is not
implemented (wheezy), the thread function will stop. The trigger
property will always return True.
"""
log.debug("enter ResetDriverWatchdog.run()")
try:
self._fh = os.open(self.procimg, os.O_RDONLY)
except Exception:
self.not_implemented = True
log.error(
"can not open process image at '{0}' for reset_driver watchdog"
"".format(self.procimg)
)
return
# The ioctl will return 2 byte (c-type int)
byte_buff = bytearray(2)
while not self._exit:
try:
rc = ioctl(self._fh, 19250, byte_buff)
if rc == 0 and byte_buff[0] == 1:
self._triggered = True
log.debug("reset_driver detected")
for func in self._calls:
func()
except Exception:
self.not_implemented = True
os.close(self._fh)
self._fh = None
log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented")
return
log.debug("leave ResetDriverWatchdog.run()")
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
return ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
def stop(self):
"""Stop watchdog for reset_driver."""
log.debug("enter ResetDriverWatchdog.stop()")
self._exit = True
if self._fh is not None:
os.close(self._fh)
self._fh = None
log.debug("leave ResetDriverWatchdog.stop()")
def unregister_call(self, function=None):
"""Remove a function call on watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
self._calls.remove(function)
@property
def triggered(self):
"""Will return True one time after watchdog was triggered."""
rc = self._triggered
self._triggered = False
return rc
def extend_interface(*args) -> str: def extend_interface(*args) -> str:
""" """
Extends an interface name by appending additional segments to a pre-defined base name. Extends an interface name by appending additional segments to a pre-defined base name.

View File

@@ -8,7 +8,7 @@ from logging import getLogger
from pydbus.generic import signal from pydbus.generic import signal
from ..dbus_helper import ResetDriverWatchdog from .process_image_helper import ResetDriverWatchdog
log = getLogger(__name__) log = getLogger(__name__)

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""
Helper for the process image.
The ResetDriverWatchdog class is a copy of revpipyload project module "watchdogs"
https://github.com/naruxde/revpipyload/blob/b51c2b617a57cc7d96fd67e1da9f090a0624eacb/src/revpipyload/watchdogs.py
"""
import os
from fcntl import ioctl
from logging import getLogger
from threading import Thread
log = getLogger(__name__)
class ResetDriverWatchdog(Thread):
"""Watchdog to catch the reset_driver action."""
def __init__(self, pi_control_device="/dev/piControl0"):
super(ResetDriverWatchdog, self).__init__()
self.procimg = pi_control_device
self.daemon = True
self._calls = []
self._exit = False
self._fh = None
self.not_implemented = False
"""True, if KB_WAIT_FOR_EVENT is not implemented in piControl."""
self._triggered = False
self.start()
def run(self):
"""
Mainloop of watchdog for reset_driver.
If the thread cannot open the process image or the IOCTL is not
implemented (wheezy), the thread function will stop. The trigger
property will always return True.
"""
log.debug("enter ResetDriverWatchdog.run()")
try:
self._fh = os.open(self.procimg, os.O_RDONLY)
except Exception:
self.not_implemented = True
log.error(
"can not open process image at '{0}' for reset_driver watchdog"
"".format(self.procimg)
)
return
# The ioctl will return 2 bytes (c-type int)
byte_buff = bytearray(2)
while not self._exit:
try:
rc = ioctl(self._fh, 19250, byte_buff)
if rc == 0 and byte_buff[0] == 1:
self._triggered = True
log.debug("reset_driver detected")
for func in self._calls:
func()
except Exception:
self.not_implemented = True
os.close(self._fh)
self._fh = None
log.warning("IOCTL KB_WAIT_FOR_EVENT is not implemented")
return
log.debug("leave ResetDriverWatchdog.run()")
def register_call(self, function):
"""Register a function, if watchdog triggers."""
if not callable(function):
raise ValueError("Function is not callable.")
if function not in self._calls:
self._calls.append(function)
def stop(self):
"""Stop watchdog for reset_driver."""
log.debug("enter ResetDriverWatchdog.stop()")
self._exit = True
if self._fh is not None:
os.close(self._fh)
self._fh = None
log.debug("leave ResetDriverWatchdog.stop()")
def unregister_call(self, function=None):
"""Remove a function from the watchdog trigger."""
if function is None:
self._calls.clear()
elif function in self._calls:
self._calls.remove(function)
@property
def triggered(self):
"""Will return True one time after watchdog was triggered."""
rc = self._triggered
self._triggered = False
return rc