3 Commits

Author SHA1 Message Date
52802ae6c5 feat(dbus): Add support for configuring 'revpi-con-can' feature
Introduce the `configure_con_can` function to manage enabling,
disabling, status checking, and availability of the 'revpi-con-can'
feature. Update the `AVAILABLE_FEATURES` dictionary to integrate
'revpi-con-can' as a configurable feature.
2025-04-20 19:19:13 +02:00
f22fbdf814 feat(dbus): Add support for configuring the external antenna
Introduce the `configure_external_antenna` function to manage external
antenna settings, including enable, disable, and status checks. Update
the feature configuration in `interface_config` to include this
functionality. This enhances WiFi-related configuration options.
2025-04-20 19:18:41 +02:00
bbf1bcc4b5 feat(dbus): Add ConfigTxt class for managing config.txt file
Introduced the ConfigTxt class to handle parsing, editing, and saving of
config.txt files. This includes methods for adding, clearing, and
retrieving configuration values, as well as handling dtoverlays and
dtparams. Enhanced system configuration capabilities by providing
structured support for config file operations.
2025-04-20 19:17:41 +02:00
2 changed files with 150 additions and 2 deletions

View File

@@ -8,7 +8,9 @@ from logging import getLogger
from .revpi_config import ( from .revpi_config import (
ConfigActions, ConfigActions,
configure_avahi_daemon, configure_avahi_daemon,
configure_con_can,
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna,
configure_gui, configure_gui,
simple_systemd, simple_systemd,
) )
@@ -78,7 +80,7 @@ def get_feature(feature: str) -> FeatureFunction:
AVAILABLE_FEATURES = { AVAILABLE_FEATURES = {
"gui": FeatureFunction(configure_gui, []), "gui": FeatureFunction(configure_gui, []),
"revpi-con-can": False, "revpi-con-can": FeatureFunction(configure_con_can, []),
"dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []), "dphys-swapfile": FeatureFunction(configure_dphys_swapfile, []),
"pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]), "pimodbus-master": FeatureFunction(simple_systemd, ["pimodbus-master.service"]),
"pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]), "pimodbus-slave": FeatureFunction(simple_systemd, ["pimodbus-slave.service"]),
@@ -92,5 +94,5 @@ AVAILABLE_FEATURES = {
"bluetooth": False, "bluetooth": False,
"ieee80211": False, "ieee80211": False,
"avahi": FeatureFunction(configure_avahi_daemon, []), "avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": False, "external-antenna": FeatureFunction(configure_external_antenna, []),
} }

View File

@@ -1,10 +1,15 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH # SPDX-FileCopyrightText: 2025 KUNBUS GmbH
# SPDX-License-Identifier: GPL-2.0-or-later # SPDX-License-Identifier: GPL-2.0-or-later
import re
import shutil
import subprocess import subprocess
from collections import namedtuple
from enum import Enum, IntEnum from enum import Enum, IntEnum
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists
from typing import List
from pydbus import SystemBus from pydbus import SystemBus
@@ -12,6 +17,8 @@ from ..dbus_helper import grep
log = getLogger(__name__) log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
UNKNOWN = 0 UNKNOWN = 0
@@ -94,6 +101,89 @@ class RevPiConfig:
return self._cm_with_wifi return self._cm_with_wifi
class ConfigTxt:
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self):
self._config_txt_path = ""
for path in ("/boot/firmware/config.txt", "/boot/config.txt"):
if exists(path):
self._config_txt_path = path
break
if not self._config_txt_path:
raise FileNotFoundError("no config.txt found")
self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int:
counter = 0
if type(values) is str:
values = [values]
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value in values:
self._config_txt_lines.pop(config_var.line_index)
counter += 1
return counter
def _get_all_name_values(self) -> List[ConfigVariable]:
if not self._config_txt_lines:
self.reload_config()
lst_return = []
for i in range(len(self._config_txt_lines)):
match = self.re_name_value.match(self._config_txt_lines[i])
if match:
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
return lst_return
def reload_config(self):
with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines()
def save_config(self):
if not self._config_txt_lines:
return
tmp_path = f"{self._config_txt_path}.tmp"
with open(tmp_path, "w") as f:
f.writelines(self._config_txt_lines)
shutil.move(tmp_path, self._config_txt_path)
self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str):
# Check weather name and value already exists
for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value:
return
self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int:
return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list:
var_values = []
for config_var in self._get_all_name_values():
if config_var.name == var_name:
var_values.append(config_var.value)
return var_values
@property
def config_txt_path(self) -> str:
return self._config_txt_path
def configure_avahi_daemon(action: ConfigActions): def configure_avahi_daemon(action: ConfigActions):
return_value = simple_systemd(action, "avahi-daemon.service") return_value = simple_systemd(action, "avahi-daemon.service")
@@ -121,6 +211,31 @@ def configure_dphys_swapfile(action: ConfigActions):
return return_value return return_value
def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_wifi
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wifi:
config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config()
elif action is ConfigActions.STATUS:
return revpi.with_wifi and "ant2" in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def configure_gui(action: ConfigActions): def configure_gui(action: ConfigActions):
gui_available = access("/usr/bin/startx", X_OK) gui_available = access("/usr/bin/startx", X_OK)
@@ -143,6 +258,34 @@ def configure_gui(action: ConfigActions):
raise ValueError(f"action {action} not supported") raise ValueError(f"action {action} not supported")
def configure_con_can(action: ConfigActions):
revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE:
return revpi.with_con_bridge
dt_overlay = "revpi-con-can"
config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.add_name_value("dtoverlay", dt_overlay)
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
config_txt.clear_dtoverlays([dt_overlay])
config_txt.save_config()
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
elif action is ConfigActions.STATUS:
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
else:
raise ValueError(f"action {action} not supported")
return None
def simple_systemd(action: ConfigActions, unit: str): def simple_systemd(action: ConfigActions, unit: str):
bus = SystemBus() bus = SystemBus()
systemd_manager = bus.get(".systemd1") systemd_manager = bus.get(".systemd1")
@@ -187,3 +330,6 @@ if __name__ == "__main__":
print("CM Type: ", rc.cm_type.name) print("CM Type: ", rc.cm_type.name)
print("With wifi: ", rc.with_wifi) print("With wifi: ", rc.with_wifi)
print("With con-bridge:", rc.with_con_bridge) print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt()
print("Config file: ", config_txt.config_txt_path)