feat: Add bus_filter to dbus_stop for selective stopping

Enhanced the `dbus_stop` method in `MiddlewareDaemon` to support an
optional `bus_filter` parameter, allowing selective stopping of bus
providers. Updated imports to include `List` from `typing`.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
This commit is contained in:
Sven Sager
2026-02-06 17:49:00 +01:00
parent cf90b2dd55
commit 3dbad93975

View File

@@ -6,7 +6,7 @@
from logging import getLogger
from threading import Event
from time import perf_counter
from typing import Union
from typing import Union, List
from dbus import SystemBus, SessionBus
@@ -65,17 +65,23 @@ class MiddlewareDaemon:
log.debug("leave MiddlewareDaemon.dbus_start")
def dbus_stop(self):
def dbus_stop(self, bus_filter: List[object] = None):
log.debug("enter MiddlewareDaemon.dbus_stop")
if self.bp_middleware1:
if bus_filter is None:
bus_filter = [
self.bp_middleware1,
self.bp_ios1,
]
if self.bp_middleware1 and self.bp_middleware1 in bus_filter:
self.bp_middleware1.stop()
self.bp_middleware1.join(timeout=10.0)
if self.bp_middleware1.is_alive():
log.warning("dbus middleware1 provider thread is still alive")
self.bp_middleware1.bus.con.close()
if self.bp_ios1:
if self.bp_ios1 and self.bp_ios1 in bus_filter:
self.bp_ios1.stop()
self.bp_ios1.join(timeout=10.0)
if self.bp_ios1.is_alive():