Files
revpi-middleware/src/revpi_middleware/cli_commands/cli_picontrol.py
Sven Sager 865d2ca7a9 refactor: Update interface name from 'picontrol' to 'PiControl'
Renamed all occurrences of 'picontrol' to 'PiControl' in the D-Bus
interface definitions, method calls, and test cases for consistency and
adherence to naming conventions. This ensures uniformity across the
codebase and resolves potential naming-related issues.
2025-04-20 12:19:41 +02:00

80 lines
2.0 KiB
Python

# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, await_signal, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
methods = parent_parser.add_subparsers(
dest="methods",
title="methods",
help="Available RevPi PiControl methods",
)
methods.add_parser(
"reset",
help="Reset the piControl driver",
)
methods_await_reset = methods.add_parser(
"await-reset",
help="Wait for the piControl driver to be reset",
)
methods_await_reset.add_argument(
"-t",
"--timeout",
type=int,
default=0,
help="Timeout in seconds. Default: 0 (no timeout).",
)
def method_reset():
log.debug("D-Bus call of method ResetDriver")
simple_call(
"ResetDriver",
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus")
def method_await_reset(timeout: int = 0):
detected_signal = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal:
log.info("ResetDriver signal received")
else:
raise Exception("Interrupted or timeout reached before signal was received")
def main() -> int:
method = pi.pargs.methods
try:
if method == "reset":
method_reset()
elif method == "await-reset":
method_await_reset(
pi.pargs.timeout,
)
else:
raise Exception(f"Unknown method: {method}")
except Exception as e:
log.error(f"Error: {e}")
return 1
return 0