5 Commits

Author SHA1 Message Date
8d8d3bbae4 feat(dbus): Add dphys-swapfile configuration functionality
Implemented `configure_dphys_swapfile` to manage the dphys-swapfile
service, including automatic swapfile removal when the service is
disabled. Updated feature registry to support dphys-swapfile
configuration.
2025-04-20 16:32:57 +02:00
1ae661a5e9 feat(dbus): Add avahi-daemon configuration to system services
Introduce a `configure_avahi_daemon` function to manage the avahi-daemon
service and socket with proper post actions. Update the interface
configuration to enable avahi management using the new function.
2025-04-20 16:16:51 +02:00
ffdb27049e feat(dbus): Remove 'var-log.mount' feature
The 'var-log.mount' feature was dropped and has been removed from
the AVAILABLE_FEATURES dictionary.
2025-04-20 16:02:39 +02:00
38f495e864 refactor(dbus): Move system configuration methods to revpi_config.py
Centralized `ConfigActions`, `configure_gui`, and `simple_systemd` into
`revpi_config.py` to eliminate duplication and improve maintainability.
Updated `interface_config.py` to import these methods, ensuring
consistent functionality across modules.
2025-04-20 15:43:09 +02:00
85dc6a7e56 feat(dbus): Add RevPiConfig class for device information handling
This commit introduces a new `RevPiConfig` class to manage RevPi device
configuration details, such as model, serial, compute module type, WiFi,
and ConBridge detection. It parses CPU info from `/proc/cpuinfo` and
leverages helper methods for hardware-specific checks, enhancing the
middleware's ability to identify and manage hardware features.
2025-04-20 15:39:02 +02:00
6 changed files with 13 additions and 377 deletions

View File

@@ -29,11 +29,6 @@ def setup_command_line_arguments():
help="RevPi PiControl object", help="RevPi PiControl object",
) )
cli_picontrol.add_subparsers(obj_picontrol) cli_picontrol.add_subparsers(obj_picontrol)
obj_config = rpictl_obj.add_parser(
"config",
help="RevPi configuration object (revpi-config)",
)
cli_config.add_subparsers(obj_config)
def main() -> int: def main() -> int:
@@ -44,9 +39,6 @@ def main() -> int:
if obj == "picontrol": if obj == "picontrol":
rc = cli_picontrol.main() rc = cli_picontrol.main()
elif obj == "config":
rc = cli_config.main()
else: else:
log.error(f"Unknown object: {obj}") log.error(f"Unknown object: {obj}")
rc = 1 rc = 1

View File

@@ -1,93 +0,0 @@
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""Command-Line for the picontrol object of CLI."""
from argparse import ArgumentParser
from logging import getLogger
from .dbus_helper import BusType, get_properties, simple_call
from .. import proginit as pi
from ..dbus_middleware1 import extend_interface
log = getLogger(__name__)
def add_subparsers(parent_parser: ArgumentParser):
parent_parser.add_argument(
"action",
choices=["enable", "disable", "status", "available", "list-features"],
help="Action to be executed: enable, disable, status or available. "
"To get all available features, use 'list-features'.",
)
parent_parser.add_argument(
"feature",
nargs="?",
default="",
help="Name of the feature to configer. To list all features use 'list-features' as action.",
)
def main() -> int:
action = pi.pargs.action
dbus_value = False
try:
if action == "list-features":
dbus_value = get_properties(
"available_features",
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
for feature in dbus_value:
print(feature)
return 0
# For the following actions, a feature name is required
if pi.pargs.feature == "":
raise Exception("Feature name is required")
if action == "enable":
simple_call(
"Enable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "disable":
simple_call(
"Disable",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "status":
dbus_value = simple_call(
"GetStatus",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
elif action == "available":
dbus_value = simple_call(
"GetAvailability",
pi.pargs.feature,
interface=extend_interface("RevpiConfig"),
bus_type=BusType.SESSION if pi.pargs.use_session_bus else BusType.SYSTEM,
)
else:
raise Exception(f"Unknown action: {action}")
except Exception as e:
log.error(f"Error: {e}")
return 1
log.debug(
f"D-Bus call of method {action} for feature {pi.pargs.feature} returned: {dbus_value}"
)
print(int(dbus_value))
return 0

View File

@@ -17,18 +17,6 @@ class BusType(Enum):
SYSTEM = "system" SYSTEM = "system"
def get_properties(
property_name: str,
interface: str,
object_path=REVPI_DBUS_BASE_PATH,
bus_type=BusType.SYSTEM,
):
bus = SessionBus() if bus_type is BusType.SESSION else SystemBus()
revpi = bus.get(REVPI_DBUS_NAME, object_path)
iface = revpi[interface]
return getattr(iface, property_name)
def simple_call( def simple_call(
method: str, method: str,
*args, *args,

View File

@@ -10,7 +10,6 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig
log = getLogger(__name__) log = getLogger(__name__)
@@ -42,7 +41,6 @@ class BusProvider(Thread):
# ("Subdir2/Whatever", Example()) # ("Subdir2/Whatever", Example())
lst_interfaces = [ lst_interfaces = [
InterfacePiControl(self.picontrol_device, self.config_rsc), InterfacePiControl(self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(),
] ]
try: try:

View File

@@ -8,12 +8,8 @@ from logging import getLogger
from .revpi_config import ( from .revpi_config import (
ConfigActions, ConfigActions,
configure_avahi_daemon, configure_avahi_daemon,
configure_bluetooth,
configure_con_can,
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna,
configure_gui, configure_gui,
configure_wlan,
simple_systemd, simple_systemd,
) )
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -82,7 +78,7 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = { AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []), "gui": FeatureFunction(configure_gui, []),
"revpi-con-can": FeatureFunction(configure_con_can, []), "revpi-con-can": False,
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []), "dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]), "pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]), "pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
@@ -93,8 +89,8 @@ AVAILABLE_FEATURES = {
simple_systemd, ["noderedrevpinodes-server.service"] simple_systemd, ["noderedrevpinodes-server.service"]
), ),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]), "revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []), "bluetooth": False,
"wlan": FeatureFunction(configure_wlan, []), "ieee80211": False,
"avahi": FeatureFunction(configure_avahi_daemon, []), "avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []), "external-antenna": False,
} }

View File

@@ -1,16 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess import subprocess
from collections import namedtuple
from enum import Enum, IntEnum from enum import Enum, IntEnum
from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join
from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
@@ -18,12 +12,6 @@ from ..dbus_helper import grep
log = getLogger(__name__) log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
UNKNOWN = 0 UNKNOWN = 0
@@ -45,9 +33,9 @@ class RevPiConfig:
def __init__(self): def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN self._cm_type = ComputeModuleTypes.UNKNOWN
self._cm_with_wifi = False
self._revpi_with_con_bridge = False self._revpi_with_con_bridge = False
self._wlan_class_path = ""
self.serial = "" self.serial = ""
self.model = "" self.model = ""
@@ -81,19 +69,11 @@ class RevPiConfig:
except ValueError: except ValueError:
pass pass
# Detect WLAN on CM module # Detect WiFi
could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5) could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wlan: if could_have_wifi:
wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0") lst_grep = grep("DRIVER=brcmfmac", "/sys/class/ieee80211/phy0/device/uevent")
if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")): self._cm_with_wifi = len(lst_grep) > 0 and self._cm_type in (ComputeModuleTypes)
self._wlan_class_path = wlan_interface
# If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
for wlan_interface in lst_wlan_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wlan_interface
# Detect ConBridge # Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S) could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
@@ -101,10 +81,6 @@ class RevPiConfig:
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible") lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
self._revpi_with_con_bridge = len(lst_grep) > 0 self._revpi_with_con_bridge = len(lst_grep) > 0
@property
def class_path_wlan(self) -> str:
return self._wlan_class_path
@property @property
def cm_type(self) -> ComputeModuleTypes: def cm_type(self) -> ComputeModuleTypes:
return self._cm_type return self._cm_type
@@ -114,91 +90,8 @@ class RevPiConfig:
return self._revpi_with_con_bridge return self._revpi_with_con_bridge
@property @property
def with_wlan(self) -> bool: def with_wifi(self) -> bool:
return bool(self._wlan_class_path) return self._cm_with_wifi
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions): def configure_avahi_daemon(action: ConfigActions):
@@ -213,70 +106,6 @@ def configure_avahi_daemon(action: ConfigActions):
return return_value return return_value
def configure_bluetooth(action: ConfigActions):
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
bt_rfkill_index = get_rfkill_index(hci_device)
# If the bluetooth device is not present, the device should have been
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
# based on CM4 and newer). Nothing we can do here, so treat the interface
# as disabled.
if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None:
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.STATUS:
if bt_rfkill_index is None:
return False
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
elif action is ConfigActions.AVAILABLE:
return bt_rfkill_index is not None
else:
raise ValueError(f"action {action} not supported")
return None
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_dphys_swapfile(action: ConfigActions): def configure_dphys_swapfile(action: ConfigActions):
return_value = simple_systemd(action, "dphys-swapfile.service") return_value = simple_systemd(action, "dphys-swapfile.service")
@@ -292,31 +121,6 @@ def configure_dphys_swapfile(action: ConfigActions):
return return_value return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wlan
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions): def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK) gui_available = access("/usr/bin/startx", X_OK)
@@ -339,49 +143,6 @@ def configure_gui(action: ConfigActions):
raise ValueError(f"action {action} not supported") raise ValueError(f"action {action} not supported")
def configure_wlan(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.ENABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE:
if revpi.with_wlan:
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.AVAILABLE:
return revpi.with_wlan
elif action is ConfigActions.STATUS:
if not revpi.with_wlan:
return False
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
buffer = f.read().strip()
return buffer == "0"
else:
raise ValueError(f"action {action} not supported")
return None
def get_rfkill_index(device_class_path: str) -> Optional[int]:
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path)
if match_index:
return int(match_index.group("index"))
return None
def simple_systemd(action: ConfigActions, unit: str): def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus() bus = SystemBus()
systemd_manager = bus.get(".systemd1") systemd_manager = bus.get(".systemd1")
@@ -424,11 +185,5 @@ if __name__ == "__main__":
print("Model:", rc.model) print("Model:", rc.model)
print("Serial: ", rc.serial) print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name) print("CM Type: ", rc.cm_type.name)
print("With WLAN: ", rc.with_wlan) print("With wifi: ", rc.with_wifi)
if rc.with_wlan:
print(" class path: ", rc.class_path_wlan)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
print("With con-bridge:", rc.with_con_bridge) print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)