# -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later from time import sleep from gi.repository.Gio import dbus_address_get_for_bus_sync, BusType from tests.dbus_middleware1.fake_devices import PiControlDeviceMockup class TestBusProvider(PiControlDeviceMockup): def setUp(self): super().setUp() # Do not import things on top of the module. Some classes or functions need to be mocked up first. from revpi_middleware.dbus_middleware1 import BusProviderMiddleware1 # Prepare the bus provider and start it bus_address = dbus_address_get_for_bus_sync(BusType.SESSION) self.bp = BusProviderMiddleware1( bus_address, picontrol_device=self.picontrol.name, ) self.bp.start() # Wait 5 seconds until the bus provider has started the main loop counter = 50 while not self.bp.running and counter > 0: counter -= 1 sleep(0.1) def tearDown(self): self.bp.stop() self.bp.join(10.0) if self.bp.is_alive(): raise RuntimeError("Bus provider thread is still running") super().tearDown()