9 Commits

Author SHA1 Message Date
18e6fb3d14 feat(revpiconfig): Enhance Wi-Fi detection and add rfkill index support
Improved logic to detect built-in and third-party Wi-Fi interfaces,
including integration of `rfkill` index retrieval for Wi-Fi devices.
This enhances support for various hardware setups and allows better
control over Wi-Fi functionality.
2025-04-21 09:05:47 +02:00
fe614d026a feat(dbus): Add support for configuring 'revpi-con-can' feature
Introduce the `configure_con_can` function to manage enabling,
disabling, status checking, and availability of the 'revpi-con-can'
feature. Update the `AVAILABLE_FEATURES` dictionary to integrate
'revpi-con-can' as a configurable feature.
2025-04-21 09:05:47 +02:00
870a55042e feat(dbus): Add support for configuring the external antenna
Introduce the `configure_external_antenna` function to manage external
antenna settings, including enable, disable, and status checks. Update
the feature configuration in `interface_config` to include this
functionality. This enhances WiFi-related configuration options.
2025-04-21 09:05:47 +02:00
2848fdcf54 feat(revpiconfig): Add ConfigTxt class for managing config.txt file
Introduced the ConfigTxt class to handle parsing, editing, and saving of
config.txt files. This includes methods for adding, clearing, and
retrieving configuration values, as well as handling dtoverlays and
dtparams. Enhanced system configuration capabilities by providing
structured support for config file operations.
2025-04-21 09:05:47 +02:00
d0f641f2b5 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-21 09:05:47 +02:00
e8eba43647 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-21 09:05:47 +02:00
0d601f623c feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-21 09:05:47 +02:00
66735a9eba refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-21 09:05:47 +02:00
7525c9f653 feat(revpiconfig): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-21 09:05:46 +02:00
2 changed files with 378 additions and 75 deletions

View File

@@ -3,12 +3,17 @@
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for hardware configuration."""
from collections import namedtuple
from enum import Enum
from logging import getLogger
from os import X_OK, access
from pydbus import SystemBus
from .revpi_config import (
ConfigActions,
configure_avahi_daemon,
configure_con_can,
configure_dphys_swapfile,
configure_external_antenna,
configure_gui,
simple_systemd,
)
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
@@ -16,13 +21,6 @@ log = getLogger(__name__)
FeatureFunction = namedtuple("FeatureFunction", ["function", "args"])
class ConfigActions(Enum):
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class InterfaceRevpiConfig(DbusInterface):
"""
<node>
@@ -71,28 +69,6 @@ class InterfaceRevpiConfig(DbusInterface):
return list(AVAILABLE_FEATURES.keys())
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def get_feature(feature: str) -> FeatureFunction:
if feature not in AVAILABLE_FEATURES:
raise ValueError(f"feature {feature} does not exist")
@@ -102,48 +78,10 @@ def get_feature(feature: str) -> FeatureFunction:
return feature_function
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []),
"revpi-con-can": False,
"var-log.mount": False,
"dphys-swapfile": False,
"revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
"systemd-timesyncd": FeatureFunction(simple_systemd, ["systemd-timesyncd.service"]),
@@ -155,6 +93,6 @@ AVAILABLE_FEATURES = {
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": False,
"ieee80211": False,
"avahi": False,
"external-antenna": False,
"avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []),
}

View File

@@ -0,0 +1,365 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess
from collections import namedtuple
from enum import Enum, IntEnum
from glob import glob
from logging import getLogger
from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus
from ..dbus_helper import grep
log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_WIFI_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum):
UNKNOWN = 0
CM1 = 6
CM3 = 10
CM4 = 20
CM4S = 21
CM5 = 24
class ConfigActions(Enum):
ENABLE = "enable"
DISABLE = "disable"
STATUS = "status"
AVAILABLE = "available"
class RevPiConfig:
def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN
self._cm_with_wifi = False
self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self._wlan_rfkill_index = None
self.serial = ""
self.model = ""
self._init_device_info()
def _init_device_info(self):
dc_cpuinfo = {}
# Extract CPU information
with open("/proc/cpuinfo", "r") as f:
line = "\n"
while line:
line = f.readline()
if line.startswith(("Revision", "Serial", "Model")):
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
dc_cpuinfo[key] = value
self.model = dc_cpuinfo.get("model", "")
self.serial = dc_cpuinfo.get("serial", "")
# Detect Compute Module type
revision = dc_cpuinfo.get("revision", "")
if revision:
revision = int(revision, 16)
mask = 4080 # 0xFF0 in dezimal
try:
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
except ValueError:
pass
# Detect WiFi on CM module
could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wifi:
wifi_interface = join(LINUX_WIFI_CLASS_PATH, "phy0")
lst_grep = grep("DRIVER=brcmfmac", join(wifi_interface, "device", "uevent"))
self._cm_with_wifi = len(lst_grep) > 0
self._wlan_class_path = wifi_interface
# If no build in Wi-Fi on the CM, detect third party Wi-Fi on RevPi Flat
if not self._cm_with_wifi and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wifi_interfaces = glob("/sys/class/ieee80211/*")
for wifi_interface in lst_wifi_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wifi_interface, "device", "uevent")):
self._cm_with_wifi = True
self._wlan_class_path = wifi_interface
# Detect rfkill index of the integrated Wi-Fi device
if self._wlan_class_path:
for rfkill_path in glob(join(self._wlan_class_path, "rfkill*")):
match_index = re.match(r"^/.+/rfkill(?P<index>\d+)$", rfkill_path)
if match_index:
self._wlan_rfkill_index = int(match_index.group("index"))
break
# Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
if could_have_con_bridge:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def cm_type(self) -> ComputeModuleTypes:
return self._cm_type
@property
def rfkill_index(self) -> Optional[int]:
return self._wlan_rfkill_index
@property
def with_con_bridge(self) -> bool:
return self._revpi_with_con_bridge
@property
def with_wifi(self) -> bool:
return self._cm_with_wifi
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service")
# Post actions for avahi-daemon
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
# Apply the enable/disable action to the avahi socket AFTER the service
# unit, because a connected socket could interrupt stop
simple_systemd(action, "avahi-daemon.socket")
return return_value
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service")
# Post actions for dphys-swapfile
if action is ConfigActions.DISABLE:
# Remove swapfile afer disabling the service unit
subprocess.call(
["/sbin/dphys-swapfile", "uninstall"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wifi
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wifi and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK)
if action is ConfigActions.AVAILABLE:
return gui_available
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True)
elif action is ConfigActions.DISABLE:
systemd_manager.SetDefaultTarget("multi-user.target", True)
elif action is ConfigActions.STATUS:
return systemd_manager.GetDefaultTarget() == "graphical.target"
else:
raise ValueError(f"action {action} not supported")
def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus()
systemd_manager = bus.get(".systemd1")
if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False)
systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE:
systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
elif action is ConfigActions.AVAILABLE:
try:
unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path)
except Exception:
log.warning(f"could not get systemd unit {unit}")
return False
return properties.LoadState != "not-found"
else:
raise ValueError(f"action {action} not supported")
if __name__ == "__main__":
rc = RevPiConfig()
print("Model:", rc.model)
print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name)
print("With wifi: ", rc.with_wifi)
if rc.with_wifi:
print(" rfkill index: ", rc.rfkill_index)
print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)