21 Commits

Author SHA1 Message Date
de1774f60e feat(cli): Add CLI support for RevPi configuration object (revpi-config)
This implements a new command "config" in the CLI to handle RevPi
configuration. It includes parsing and subparser setup for
configuration-related operations. The change improves usability by
extending CLI functionality to manage RevPi configuration objects.
2025-04-21 10:55:32 +02:00
8c145ef2ff feat(cli): Add CLI command for configuring Revpi features
Introduced a new CLI command to enable, disable, check the status, and
list available features for Revpi using D-Bus calls. This implementation
provides a structured interface for managing Revpi configurations via
command-line actions.
2025-04-21 10:55:11 +02:00
0ecd86bd64 feat(cli): Add get_properties helper function for DBus interactions
This function facilitates retrieving specific properties from a DBus
interface, improving code modularity and reusability. It supports both
system and session bus types, streamlining access to DBus resources.
2025-04-21 10:54:47 +02:00
555c781aed feat(dbus): Add InterfaceRevpiConfig to DBus interfaces list
Added the InterfaceRevpiConfig class to the list of DBus interfaces in
`bus_provider.py`. This ensures the new system configuration interface
is properly registered and accessible.
2025-04-21 10:21:36 +02:00
604cb61870 feat(dbus): Add Bluetooth configuration functionality
Introduce functionality to enable, disable, and check the status and
availability of Bluetooth devices using the `configure_bluetooth`
method. Integrate Bluetooth configuration into the feature management
system by mapping it in `interface_config.py`.
2025-04-21 10:17:56 +02:00
69e370f964 refactor(revpiconfig: Change Wi-Fi detection and rfkill index logic
Replaced inline rfkill index detection with a standalone
`get_rfkill_index` function for improved modularity. Removed
`_cm_with_wifi` and `_wlan_rfkill_index` attributes, utilizing
`_wlan_class_path` for Wi-Fi presence checks. Adjusted property and
output logic to incorporate the new function.
2025-04-21 10:05:20 +02:00
6eb7eeea40 feat(dbus): Add Wi-Fi configuration support to the system config
Introduces the `configure_wifi` function to handle Wi-Fi actions such as
enabling, disabling, and checking status. Updates the `ieee80211`
feature to use the new function, integrating Wi-Fi management into the
existing configuration framework.
2025-04-21 09:28:01 +02:00
18e6fb3d14 feat(revpiconfig): Enhance Wi-Fi detection and add rfkill index support
Improved logic to detect built-in and third-party Wi-Fi interfaces,
including integration of `rfkill` index retrieval for Wi-Fi devices.
This enhances support for various hardware setups and allows better
control over Wi-Fi functionality.
2025-04-21 09:05:47 +02:00
fe614d026a feat(dbus): Add support for configuring 'revpi-con-can' feature
Introduce the `configure_con_can` function to manage enabling,
disabling, status checking, and availability of the 'revpi-con-can'
feature. Update the `AVAILABLE_FEATURES` dictionary to integrate
'revpi-con-can' as a configurable feature.
2025-04-21 09:05:47 +02:00
870a55042e feat(dbus): Add support for configuring the external antenna
Introduce the `configure_external_antenna` function to manage external
antenna settings, including enable, disable, and status checks. Update
the feature configuration in `interface_config` to include this
functionality. This enhances WiFi-related configuration options.
2025-04-21 09:05:47 +02:00
2848fdcf54 feat(revpiconfig): Add ConfigTxt class for managing config.txt file
Introduced the ConfigTxt class to handle parsing, editing, and saving of
config.txt files. This includes methods for adding, clearing, and
retrieving configuration values, as well as handling dtoverlays and
dtparams. Enhanced system configuration capabilities by providing
structured support for config file operations.
2025-04-21 09:05:47 +02:00
d0f641f2b5 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-21 09:05:47 +02:00
e8eba43647 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-21 09:05:47 +02:00
0d601f623c feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-21 09:05:47 +02:00
66735a9eba refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-21 09:05:47 +02:00
7525c9f653 feat(revpiconfig): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-21 09:05:46 +02:00
3909fab379 feat(dbus): Add GUI configuration handling to interface_config.py
Introduced the `configure_gui` function to manage GUI enabling,
disabling, availability, and status retrieval. Updated the
`AVAILABLE_FEATURES` dictionary to include GUI management functionality,
leveraging systemd and os module operations.
2025-04-20 15:34:16 +02:00
9ce36c78e7 feat(dbus): Add D-Bus interface for system configuration in middleware
Introduced `InterfaceRevpiConfig` to manage feature actions like
enable/disable, status, and availability using D-Bus. Includes support
for predefined features with systemd integration and exception handling
for unsupported features.
2025-04-20 15:18:27 +02:00
c24c78761f fix(cli): Change absolute imports to relative imports 2025-04-20 15:12:09 +02:00
3cc64f514f feat(dbus): Add grep function to search for patterns in a file
The new `grep` function reads a specified file and returns lines
containing a given pattern. It handles file not found errors and logs
unexpected exceptions, improving resilience in file operations.
2025-04-20 15:11:43 +02:00
865d2ca7a9 refactor: Update interface name from 'picontrol' to 'PiControl'
Renamed all occurrences of 'picontrol' to 'PiControl' in the D-Bus
interface definitions, method calls, and test cases for consistency and
adherence to naming conventions. This ensures uniformity across the
codebase and resolves potential naming-related issues.
2025-04-20 12:19:41 +02:00
11 changed files with 692 additions and 8 deletions

View File

@@ -5,12 +5,11 @@
This module provides the foundation for the RevPi middleware CLI commands
and argument parsing setup.
"""
from argparse import ArgumentParser
from logging import getLogger
from revpi_middleware.cli_commands import cli_picontrol
from revpi_middleware.proginit import StdLogOutput
from . import cli_config, cli_picontrol
from .. import proginit as pi
from ..proginit import StdLogOutput
log = getLogger(__name__)
@@ -30,6 +29,11 @@ def setup_command_line_arguments():
help="RevPi PiControl object",
)
cli_picontrol.add_subparsers(obj_picontrol)
obj_config = rpictl_obj.add_parser(
"config",
help="RevPi configuration object (revpi-config)",
)
cli_config.add_subparsers(obj_config)
def main() -> int:
@@ -40,6 +44,9 @@ def main() -> int:
if obj == "picontrol":
rc = cli_picontrol.main()
elif obj == "config":
rc = cli_config.main()
else:
log.error(f"Unknown object: {obj}")
rc = 1

View File

@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, get_properties, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
parent_parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.",
)
parent_parser.add_argument(
"feature",
nargs="?",
default="",
help="Name of the feature to configer. To list all features use 'list-features' as action.",
)
def main() -> int:
action = pi.pargs.action
dbus_value = False
try:
if action == "list-features":
dbus_value = get_properties(
"available_features",
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
for feature in dbus_value:
print(feature)
return 0
# For the following actions, a feature name is required
if pi.pargs.feature == "":
raise Exception("Feature name is required")
if action == "enable":
simple_call(
"Enable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "disable":
simple_call(
"Disable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "status":
dbus_value = simple_call(
"GetStatus",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "available":
dbus_value = simple_call(
"GetAvailability",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
else:
raise Exception(f"Unknown action: {action}")
except Exception as e:
log.error(f"Error: {e}")
return 1
log.debug(
f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}"
)
print(int(dbus_value))
return 0

View File

@@ -38,7 +38,7 @@ def method_reset():
log.debug("D-Bus call of method ResetDriver")
simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
log.info("ResetDriver called via D-Bus")
@@ -48,7 +48,7 @@ def method_await_reset(timeout: int = 0):
detected_signal = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
extend_interface("PiControl"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
if detected_signal:

View File

@@ -17,6 +17,18 @@ class BusType(Enum):
SYSTEM = "system"
def get_properties(
property_name: str,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, property_name)
def simple_call(
method: str,
*args,

View File

@@ -10,6 +10,7 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig
log = getLogger(__name__)
@@ -41,6 +42,7 @@ class BusProvider(Thread):
# ("Subdir2/Whatever", Example())
lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(),
]
try:

View File

@@ -41,3 +41,38 @@ def extend_interface(*args) -> str:
the provided segments.
"""
return ".".join([REVPI_DBUS_NAME, *args])
def grep(pattern, filename):
"""
Searches for lines in a file that contain a given pattern and returns them as a list.
The function reads lines from the specified file and checks whether each line
contains the provided pattern. It returns a list of lines that match the
pattern. If the file is not found, an empty list is returned. Any other
exceptions during the file reading process are caught and logged.
Args:
pattern (str): The substring to search for within the file's lines.
filename (str): The path to the file that will be searched.
Returns:
list[str]: A list containing lines that include the provided pattern, with
leading and trailing spaces removed.
Raises:
FileNotFoundError: This error is caught if the file specified is not
found.
Exception: Any unforeseen exception during file operations is caught and
logged.
"""
try:
with open(filename, "r") as file:
# Gibt alle Zeilen zurück, die das Muster enthalten
matching_lines = [line.strip() for line in file if pattern in line]
return matching_lines
except FileNotFoundError:
return []
except Exception as e:
log.error(f"Error reading file: {e}")
return []

View File

@@ -15,7 +15,7 @@ log = getLogger(__name__)
class InterfacePiControl(DbusInterface):
"""
<node>
<interface name='com.revolutionpi.middleware1.picontrol'>
<interface name='com.revolutionpi.middleware1.PiControl'>
<signal name="NotifyDriverReset">
</signal>
<method name='ResetDriver'>

View File

@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig

View File

@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration."""
from collections import namedtuple
from logging import getLogger
from .revpi_config import (
ConfigActions,
configure_avahi_daemon,
configure_bluetooth,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
configure_wifi,
simple_systemd,
)
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
class InterfaceRevpiConfig(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.RevpiConfig">
<method name="Disable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="Enable">
<arg name="feature" type="s" direction="in"/>
</method>
<method name="GetStatus">
<arg name="feature" type="s" direction="in"/>
<arg name="status" type="b" direction="out"/>
</method>
<method name="GetAvailability">
<arg name="feature" type="s" direction="in"/>
<arg name="available" type="b" direction="out"/>
</method>
<property name="available_features" type="as" access="read"/>
</interface>
</node>
"""
def Disable(self, feature: str) -> None:
"""Disable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.DISABLE, *feature_function.args)
def Enable(self, feature: str) -> None:
"""Enable the feature."""
feature_function = get_feature(feature)
feature_function.function(ConfigActions.ENABLE, *feature_function.args)
def GetStatus(self, feature: str) -> bool:
"""Get feature status."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.STATUS, *feature_function.args)
def GetAvailability(self, feature: str) -> bool:
"""Get feature availability on the RevPi."""
feature_function = get_feature(feature)
return feature_function.function(ConfigActions.AVAILABLE, *feature_function.args)
@property
def available_features(self) -> list[str]:
return list(AVAILABLE_FEATURES.keys())
def get_feature(feature: str) -> FeatureFunction:
if feature not in AVAILABLE_FEATURES:
raise ValueError(f"feature {feature} does not exist")
feature_function = AVAILABLE_FEATURES[feature]
if not feature_function:
raise NotImplementedError(f"feature {feature} is not implemented")
return feature_function
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
"ssh": FeatureFunction(simple_systemd, ["ssh.service"]),
"nodered": FeatureFunction(simple_systemd, ["nodered.service"]),
"noderedrevpinodes-server": FeatureFunction(
simple_systemd, ["noderedrevpinodes-server.service"]
),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []),
"ieee80211": FeatureFunction(configure_wifi, []),
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -0,0 +1,430 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WIFI_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
UNKNOWN = 0
CM1 = 6
CM3 = 10
CM4 = 20
CM4S = 21
CM5 = 24
class ConfigActions(Enum):
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class RevPiConfig:
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self.serial = ""
self.model = ""
self._init_device_info()
def _init_device_info(self):
dc_cpuinfo = {}
# Extract CPU information
with open("/proc/cpuinfo", "r") as f:
line = "\n"
while line:
line = f.readline()
if line.startswith(("Revision", "Serial", "Model")):
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
dc_cpuinfo[key] = value
self.model = dc_cpuinfo.get("model", "")
self.serial = dc_cpuinfo.get("serial", "")
# Detect Compute Module type
revision = dc_cpuinfo.get("revision", "")
if revision:
revision = int(revision, 16)
mask = 4080 # 0xFF0 in dezimal
try:
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
except ValueError:
pass
# Detect WiFi on CM module
could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wifi:
wifi_interface = join(LINUX_WIFI_CLASS_PATH, "phy0")
if grep("DRIVER=brcmfmac", join(wifi_interface, "device", "uevent")):
self._wlan_class_path = wifi_interface
# If no build in Wi-Fi on the CM, detect third party Wi-Fi on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wifi_interfaces = glob("/sys/class/ieee80211/*")
for wifi_interface in lst_wifi_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wifi_interface, "device", "uevent")):
self._wlan_class_path = wifi_interface
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
if could_have_con_bridge:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def class_path_wifi(self) -> str:
return self._wlan_class_path
@property
def cm_type(self) -> ComputeModuleTypes:
return self._cm_type
@property
def with_con_bridge(self) -> bool:
return self._revpi_with_con_bridge
@property
def with_wifi(self) -> bool:
return bool(self._wlan_class_path)
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
# If the bluetooth device is not present, the device should have been
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
# based on CM4 and newer). Nothing we can do here, so treat the interface
# as disabled.
if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None:
subprocess.call(["rfkill", "unblock", str(bt_rfkill_index)])
elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None:
subprocess.call(["rfkill", "block", str(bt_rfkill_index)])
elif action is ConfigActions.STATUS:
if bt_rfkill_index is None:
return False
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
elif action is ConfigActions.AVAILABLE:
return bt_rfkill_index is not None
else:
raise ValueError(f"action {action} not supported")
return None
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
# Remove swapfile afer disabling the service unit
subprocess.call(
["/sbin/dphys-swapfile", "uninstall"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wifi
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wifi and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def configure_wifi(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.ENABLE:
if revpi.with_wifi:
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi)
subprocess.call(["rfkill", "unblock", str(wifi_rfkill_index)])
elif action is ConfigActions.DISABLE:
if revpi.with_wifi:
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi)
subprocess.call(["rfkill", "block", str(wifi_rfkill_index)])
elif action is ConfigActions.AVAILABLE:
return revpi.with_wifi
elif action is ConfigActions.STATUS:
if not revpi.with_wifi:
return False
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi)
with open(f"/sys/class/rfkill/rfkill{wifi_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
else:
raise ValueError(f"action {action} not supported")
return None
def get_rfkill_index(device_class_path: str) -> Optional[int]:
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path)
if match_index:
return int(match_index.group("index"))
return None
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With wifi: ", rc.with_wifi)
if rc.with_wifi:
print(" class path: ", rc.class_path_wifi)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wifi))
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)

View File

@@ -18,7 +18,7 @@ class TestObjectPicontrol(TestBusProvider):
def test_reset_driver(self):
simple_call(
"ResetDriver",
interface=extend_interface("picontrol"),
interface=extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
ioctl_call = IOCTL_QUEUE.get(timeout=2.0)
@@ -37,7 +37,7 @@ class TestObjectPicontrol(TestBusProvider):
result = await_signal(
"NotifyDriverReset",
timeout,
extend_interface("picontrol"),
extend_interface("PiControl"),
bus_type=BusType.SESSION,
)
self.assertTrue(result)