2 Commits

Author SHA1 Message Date
Sven Sager
1172954ad7 feat(revpiconfig): Add CmdLineTxt class for managing cmdline.txt
Introduced the `CmdLineTxt` class to handle parsing, modifying, and
writing operations on the `cmdline.txt` file with thread safety.
Implemented methods for setting, removing keys, and ensuring safe file
operations.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 08:41:49 +02:00
Sven Sager
fedb0f8924 feat(revpiconfig): Add thread safety for config.txt file operations
Introduced a threading lock to safeguard read and write operations on
the `config.txt` file. This ensures thread-safe access and prevents
potential race conditions during concurrent execution.

Signed-off-by: Sven Sager <s.sager@kunbus.com>
2025-06-26 06:41:59 +02:00

View File

@@ -10,6 +10,7 @@ from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join from os.path import exists, join
from threading import Lock
from typing import List, Optional from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
@@ -24,6 +25,8 @@ ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211" LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
CMDLINE_TXT_LOCK = Lock()
CONFIG_TXT_LOCK = Lock()
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
@@ -219,6 +222,61 @@ class RevPiConfig:
return bool(self._wlan_class_path) return bool(self._wlan_class_path)
class CmdLineTxt:
# Value is optional, "?:=" non-capturing the "="
re_name_value = re.compile(r"(?P<key>[^\s=]+)(?:=(?P<value>\S+))?")
def __init__(self):
self._cmdline_txt_path = ""
for path in CONFIG_TXT_LOCATIONS:
if exists(path):
self._cmdline_txt_path = path
break
if not self._cmdline_txt_path:
raise FileNotFoundError("no config.txt found")
def _get_cmdline_dict(self) -> dict:
with CMDLINE_TXT_LOCK:
with open(self._cmdline_txt_path, "r") as file:
cmdline = file.read()
return {
match.group("key"): match.group("value")
for match in self.re_name_value.finditer(cmdline)
}
def _write_cmdline_dict(self, cmdline_dict: dict) -> None:
with CMDLINE_TXT_LOCK:
tmp_path = f"{self._cmdline_txt_path}.tmp"
with open(tmp_path, "w") as file:
str_cmdline = ""
for key, value in cmdline_dict.items():
if value is None:
str_cmdline += f"{key} "
else:
str_cmdline += f"{key}={value} "
str_cmdline = str_cmdline.strip()
file.write(str_cmdline + "\n")
shutil.move(tmp_path, self._cmdline_txt_path)
def remove_key(self, key: str) -> None:
dc_cmdline = self._get_cmdline_dict()
if key in dc_cmdline:
del dc_cmdline[key]
self._write_cmdline_dict(dc_cmdline)
def set_key_value(self, key: str, value: Optional[str] = None) -> None:
dc_cmdline = self._get_cmdline_dict()
if key not in dc_cmdline or dc_cmdline.get(key, value) != value:
dc_cmdline[key] = value
self._write_cmdline_dict(dc_cmdline)
class ConfigTxt: class ConfigTxt:
""" """
Configuration file handler for managing 'config.txt'. Configuration file handler for managing 'config.txt'.
@@ -228,11 +286,6 @@ class ConfigTxt:
to manipulate specific parameters within the configuration and supports to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly. is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
""" """
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
@@ -318,6 +371,7 @@ class ConfigTxt:
Returns: Returns:
None None
""" """
with CONFIG_TXT_LOCK:
with open(self._config_txt_path, "r") as f: with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines() self._config_txt_lines = f.readlines()
@@ -333,6 +387,7 @@ class ConfigTxt:
if not self._config_txt_lines: if not self._config_txt_lines:
return return
with CONFIG_TXT_LOCK:
tmp_path = f"{self._config_txt_path}.tmp" tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f: with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines) f.writelines(self._config_txt_lines)