diff --git a/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py index f3b7a91..3c6397c 100644 --- a/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py +++ b/src/revpi_middleware/dbus_middleware1/system_config/revpi_config.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-License-Identifier: GPL-2.0-or-later +import re +import shutil import subprocess +from collections import namedtuple from enum import Enum, IntEnum from logging import getLogger from os import X_OK, access +from os.path import exists +from typing import List from pydbus import SystemBus @@ -12,6 +17,8 @@ from ..dbus_helper import grep log = getLogger(__name__) +ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"]) + class ComputeModuleTypes(IntEnum): UNKNOWN = 0 @@ -94,6 +101,89 @@ class RevPiConfig: return self._cm_with_wifi +class ConfigTxt: + re_name_value = re.compile(r"^\s*(?!#)(?P[^=\s].+?)\s*=\s*(?P\S+)\s*$") + + def __init__(self): + self._config_txt_path = "" + for path in ("/boot/firmware/config.txt", "/boot/config.txt"): + if exists(path): + self._config_txt_path = path + break + + if not self._config_txt_path: + raise FileNotFoundError("no config.txt found") + + self._config_txt_lines = [] + + def _clear_name_values(self, name: str, values: str or list) -> int: + counter = 0 + if type(values) is str: + values = [values] + + for config_var in self._get_all_name_values(): + if config_var.name == name and config_var.value in values: + self._config_txt_lines.pop(config_var.line_index) + counter += 1 + + return counter + + def _get_all_name_values(self) -> List[ConfigVariable]: + if not self._config_txt_lines: + self.reload_config() + + lst_return = [] + + for i in range(len(self._config_txt_lines)): + match = self.re_name_value.match(self._config_txt_lines[i]) + if match: + lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i)) + + return lst_return + + def reload_config(self): + with open(self._config_txt_path, "r") as f: + self._config_txt_lines = f.readlines() + + def save_config(self): + if not self._config_txt_lines: + return + + tmp_path = f"{self._config_txt_path}.tmp" + with open(tmp_path, "w") as f: + f.writelines(self._config_txt_lines) + shutil.move(tmp_path, self._config_txt_path) + + self._config_txt_lines.clear() + + def add_name_value(self, name: str, value: str): + # Check weather name and value already exists + for config_var in self._get_all_name_values(): + if config_var.name == name and config_var.value == value: + return + + self._config_txt_lines.append(f"{name}={value}\n") + + def clear_dtoverlays(self, dtoverlays: str or list) -> int: + return self._clear_name_values("dtoverlay", dtoverlays) + + def clear_dtparams(self, dtparams: str or list) -> int: + return self._clear_name_values("dtparam", dtparams) + + def get_values(self, var_name: str) -> list: + var_values = [] + + for config_var in self._get_all_name_values(): + if config_var.name == var_name: + var_values.append(config_var.value) + + return var_values + + @property + def config_txt_path(self) -> str: + return self._config_txt_path + + def configure_avahi_daemon(action: ConfigActions): return_value = simple_systemd(action, "avahi-daemon.service") @@ -187,3 +277,6 @@ if __name__ == "__main__": print("CM Type: ", rc.cm_type.name) print("With wifi: ", rc.with_wifi) print("With con-bridge:", rc.with_con_bridge) + + config_txt = ConfigTxt() + print("Config file: ", config_txt.config_txt_path)