3 Commits

Author SHA1 Message Date
Sven Sager
1e4bfeccdf feat(dbus): Add InterfaceWlan for WLAN configuration management
Introduced `InterfaceWlan` to enable/disable WLAN hardware, check
availability, and track status via D-Bus. Integrated the new interface
with the bus provider initialization to ensure proper registration.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 13:01:57 +02:00
Sven Sager
1f80063eb2 feat(revpiconfig): Add CmdLineTxt class for managing cmdline.txt
Introduced the `CmdLineTxt` class to handle parsing, modifying, and
writing operations on the `cmdline.txt` file with thread safety.
Implemented methods for setting, removing keys, and ensuring safe file
operations.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 08:47:52 +02:00
Sven Sager
fedb0f8924 feat(revpiconfig): Add thread safety for config.txt file operations
Introduced a threading lock to safeguard read and write operations on
the `config.txt` file. This ensures thread-safe access and prevents
potential race conditions during concurrent execution.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 06:41:59 +02:00
5 changed files with 185 additions and 14 deletions

View File

@@ -10,7 +10,7 @@ from pydbus import SessionBus, SystemBus
from . import REVPI_DBUS_NAME from . import REVPI_DBUS_NAME
from .process_image import InterfacePiControl from .process_image import InterfacePiControl
from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices from .system_config import InterfaceRevpiConfig, InterfaceSoftwareServices, InterfaceWlan
log = getLogger(__name__) log = getLogger(__name__)
@@ -44,6 +44,7 @@ class BusProvider(Thread):
InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc), InterfacePiControl(self._bus, self.picontrol_device, self.config_rsc),
InterfaceRevpiConfig(self._bus), InterfaceRevpiConfig(self._bus),
InterfaceSoftwareServices(self._bus), InterfaceSoftwareServices(self._bus),
InterfaceWlan(self._bus),
] ]
try: try:

View File

@@ -4,3 +4,4 @@
"""D-Bus interfaces for system configuration.""" """D-Bus interfaces for system configuration."""
from .interface_config import InterfaceRevpiConfig from .interface_config import InterfaceRevpiConfig
from .interface_services import InterfaceSoftwareServices from .interface_services import InterfaceSoftwareServices
from .interface_wlan import InterfaceWlan

View File

@@ -14,7 +14,6 @@ from .revpi_config import (
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna, configure_external_antenna,
configure_gui, configure_gui,
configure_wlan,
) )
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -98,6 +97,5 @@ AVAILABLE_FEATURES = {
"revpi-con-can": FeatureFunction(configure_con_can, []), "revpi-con-can": FeatureFunction(configure_con_can, []),
"swapfile": FeatureFunction(configure_dphys_swapfile, []), "swapfile": FeatureFunction(configure_dphys_swapfile, []),
"bluetooth": FeatureFunction(configure_bluetooth, []), "bluetooth": FeatureFunction(configure_bluetooth, []),
"wlan": FeatureFunction(configure_wlan, []),
"external-antenna": FeatureFunction(configure_external_antenna, []), "external-antenna": FeatureFunction(configure_external_antenna, []),
} }

View File

@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later
"""D-Bus interfaces for wlan configuration."""
from logging import getLogger
from threading import Event, Thread
from pydbus.generic import signal
from .revpi_config import configure_wlan, ConfigActions
from ..dbus_helper import DbusInterface
log = getLogger(__name__)
class InterfaceWlan(DbusInterface):
"""
<node>
<interface name="com.revolutionpi.middleware1.WlanConfiguration">
<method name="Enable">
<annotation name="org.freedesktop.DBus.Method.NoReply" value="true"/>
</method>
<method name="Disable">
<annotation name="org.freedesktop.DBus.Method.NoReply" value="true"/>
</method>
<property name="available" type="b" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const"/>
</property>
<property name="status" type="s" access="read">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="true"/>
</property>
</interface>
</node>
"""
PropertiesChanged = signal()
def __init__(self, bus):
super().__init__(bus)
self._status = ""
# NetworkManager-Objekt abrufen
self.bus_nm = self.bus.get(
"org.freedesktop.NetworkManager",
"/org/freedesktop/NetworkManager",
)
# Prepare status value and thread
self.evt_stop_threading = Event()
self._update_status(suppress_signal=True)
self.th_run_status_update = Thread(
target=self._run_status_update,
daemon=True,
).start()
def _run_status_update(self, *args, **kwargs):
while not self.evt_stop_threading.wait(1.0):
self._update_status()
def _update_status(self, suppress_signal=False) -> None:
str_status = "enabled" if configure_wlan(ConfigActions.STATUS) else "disabled"
if self._status != str_status:
self._status = str_status
if not suppress_signal:
self.PropertiesChanged(
"com.revolutionpi.middleware1.WlanConfiguration",
{"status": str_status},
[],
)
def cleanup(self):
self.evt_stop_threading.set()
def Disable(self) -> None:
"""Disable integrated WLAN hardware."""
configure_wlan(ConfigActions.DISABLE)
def Enable(self) -> None:
"""Enable integrated WLAN hardware."""
configure_wlan(ConfigActions.ENABLE)
@property
def available(self) -> bool:
"""Check if WLAN hardware is available."""
return configure_wlan(ConfigActions.AVAILABLE)
@property
def status(self) -> str:
return self._status

View File

@@ -10,6 +10,7 @@ from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join from os.path import exists, join
from threading import Lock
from typing import List, Optional from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
@@ -24,6 +25,8 @@ ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211" LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
CMDLINE_TXT_LOCK = Lock()
CONFIG_TXT_LOCK = Lock()
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
@@ -219,6 +222,87 @@ class RevPiConfig:
return bool(self._wlan_class_path) return bool(self._wlan_class_path)
class CmdLineTxt:
"""
Represents operations on a `cmdline.txt` configuration file.
This class provides functionality to read, modify, and save the
`cmdline.txt` file commonly used for system configurations. It allows
setting key-value pairs, removing keys, and manages file locking to ensure
thread-safe modifications.
"""
# Value is optional, "?:=" non-capturing the "="
re_name_value = re.compile(r"(?P<key>[^\s=]+)(?:=(?P<value>\S+))?")
def __init__(self):
self._cmdline_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._cmdline_txt_path = path
break
if not self._cmdline_txt_path:
raise FileNotFoundError("no config.txt found")
def _get_cmdline_dict(self) -> dict:
with CMDLINE_TXT_LOCK:
with open(self._cmdline_txt_path, "r") as file:
cmdline = file.read()
return {
match.group("key"): match.group("value")
for match in self.re_name_value.finditer(cmdline)
}
def _write_cmdline_dict(self, cmdline_dict: dict) -> None:
with CMDLINE_TXT_LOCK:
tmp_path = f"{self._cmdline_txt_path}.tmp"
with open(tmp_path, "w") as file:
str_cmdline = ""
for key, value in cmdline_dict.items():
if value is None:
str_cmdline += f"{key} "
else:
str_cmdline += f"{key}={value} "
str_cmdline = str_cmdline.strip()
file.write(str_cmdline + "\n")
shutil.move(tmp_path, self._cmdline_txt_path)
def remove_key(self, key: str) -> None:
"""
Removes a specified key from the config.txt file.
Parameters:
key: str
The key to be removed from the config.txt file.
"""
dc_cmdline = self._get_cmdline_dict()
if key in dc_cmdline:
del dc_cmdline[key]
self._write_cmdline_dict(dc_cmdline)
def set_key_value(self, key: str, value: Optional[str] = None) -> None:
"""
Sets a given key-value pair in the config.txt file. If the key does not
exist or the value differs from the current one, the pair is updated.
If the value is None, just the key is set without a value.
Parameters:
key: str
The key to set in the config.txt file.
value: Optional[str], default = None
The value to associate with the key, defaulting to None.
"""
dc_cmdline = self._get_cmdline_dict()
if key not in dc_cmdline or dc_cmdline.get(key, value) != value:
dc_cmdline[key] = value
self._write_cmdline_dict(dc_cmdline)
class ConfigTxt: class ConfigTxt:
""" """
Configuration file handler for managing 'config.txt'. Configuration file handler for managing 'config.txt'.
@@ -228,11 +312,6 @@ class ConfigTxt:
to manipulate specific parameters within the configuration and supports to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly. is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
""" """
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
@@ -318,8 +397,9 @@ class ConfigTxt:
Returns: Returns:
None None
""" """
with open(self._config_txt_path, "r") as f: with CONFIG_TXT_LOCK:
self._config_txt_lines = f.readlines() with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self): def save_config(self):
""" """
@@ -333,10 +413,11 @@ class ConfigTxt:
if not self._config_txt_lines: if not self._config_txt_lines:
return return
tmp_path = f"{self._config_txt_path}.tmp" with CONFIG_TXT_LOCK:
with open(tmp_path, "w") as f: tmp_path = f"{self._config_txt_path}.tmp"
f.writelines(self._config_txt_lines) with open(tmp_path, "w") as f:
shutil.move(tmp_path, self._config_txt_path) f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear() self._config_txt_lines.clear()