doc(io): Update io_controller.py example file
Signed-off-by: Sven Sager <s.sager@kunbus.com>
This commit is contained in:
@@ -1,40 +1,59 @@
|
|||||||
from time import time
|
# -*- coding: utf-8 -*-
|
||||||
|
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
|
||||||
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
||||||
|
"""Switches 14 outputs on a DIO to the same value as the first input."""
|
||||||
|
|
||||||
|
from time import perf_counter
|
||||||
|
|
||||||
from gi.repository import GLib
|
from gi.repository import GLib
|
||||||
from pydbus import SystemBus
|
from pydbus import SystemBus
|
||||||
|
|
||||||
detected_signal = False
|
detected_signal = False
|
||||||
timestamp = time()
|
timestamp = perf_counter()
|
||||||
|
|
||||||
|
# Get system bus
|
||||||
bus = SystemBus()
|
bus = SystemBus()
|
||||||
bus_revpi = bus.get("com.revolutionpi.ios1", "/com/revolutionpi/ios1")
|
|
||||||
interface = bus_revpi["com.revolutionpi.ios1.IoManager"]
|
|
||||||
|
|
||||||
io_o_1_path = interface.Get("O_1")
|
# Get IoManager interface on ios1 bus
|
||||||
io_revpiled_path = interface.Get("RevPiLED")
|
iface_io_manager = bus.get(
|
||||||
|
"com.revolutionpi.ios1",
|
||||||
|
"/com/revolutionpi/ios1",
|
||||||
|
)["com.revolutionpi.ios1.IoManager"]
|
||||||
|
|
||||||
io_RevPiLED = bus.get("com.revolutionpi.ios1", io_revpiled_path)
|
# Query object path of RevPiLED output
|
||||||
io_O_1 = bus.get("com.revolutionpi.ios1", io_o_1_path)
|
path_revpi_led = iface_io_manager.Get("RevPiLED")
|
||||||
|
|
||||||
|
# Get Output interface in the queried object path.
|
||||||
|
out_RevPiLED = bus.get("com.revolutionpi.ios1", path_revpi_led)["com.revolutionpi.ios1.Output"]
|
||||||
|
|
||||||
|
# Get all 14 outputs
|
||||||
|
lst_path_outputs = [iface_io_manager.Get(f"O_{i}") for i in range(1, 15)]
|
||||||
|
lst_out_outputs = [
|
||||||
|
bus.get("com.revolutionpi.ios1", path)["com.revolutionpi.ios1.Output"]
|
||||||
|
for path in lst_path_outputs
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def signal_handler(io_name, io_value):
|
def signal_handler(io_name, io_value):
|
||||||
global timestamp
|
global timestamp
|
||||||
|
|
||||||
print(f"Signal received: {io_name} = {io_value}")
|
print(f"Signal received: {io_name} = {io_value}")
|
||||||
if io_name == "O_1":
|
|
||||||
timestamp = time()
|
if io_name == "O_14":
|
||||||
io_RevPiLED.Set("com.revolutionpi.ios1.OutInt", "value", GLib.Variant("i", int(io_value)))
|
print(f"Time since input detection: {perf_counter() - timestamp}")
|
||||||
|
|
||||||
elif io_name == "I_1":
|
elif io_name == "I_1":
|
||||||
io_RevPiLED.Set("com.revolutionpi.ios1.OutInt", "value", GLib.Variant("i", int(io_value)))
|
timestamp = perf_counter()
|
||||||
io_O_1.Set("com.revolutionpi.ios1.OutBool", "value", GLib.Variant("b", not io_value))
|
|
||||||
print(time() - timestamp)
|
out_RevPiLED.SetValue(GLib.Variant("i", io_value))
|
||||||
|
for output in lst_out_outputs:
|
||||||
|
output.SetValue(GLib.Variant("b", io_value))
|
||||||
|
|
||||||
|
|
||||||
# Start event detection
|
# Start change detection to fire signals on dbus
|
||||||
interface.ActivateIoEvents()
|
iface_io_manager.ActivateIoSignals()
|
||||||
|
|
||||||
interface.onIoChanged = signal_handler
|
iface_io_manager.onIoChanged = signal_handler
|
||||||
|
|
||||||
try:
|
try:
|
||||||
loop = GLib.MainLoop()
|
loop = GLib.MainLoop()
|
||||||
@@ -42,5 +61,5 @@ try:
|
|||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Stop event detection
|
# Stop change detection
|
||||||
interface.DeactivateIoEvents()
|
iface_io_manager.DeactivateIoSignals()
|
||||||
|
|||||||
Reference in New Issue
Block a user