The new docstrings provide detailed explanations of the purpose, parameters, and return values for both functions, improving code readability and maintainability. This ensures better understanding for future contributors and reduces ambiguity.
713 lines
24 KiB
Python
713 lines
24 KiB
Python
# -*- coding: utf-8 -*-
|
|
# SPDX-FileCopyrightText: 2025 KUNBUS GmbH
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from collections import namedtuple
|
|
from enum import Enum, IntEnum
|
|
from glob import glob
|
|
from logging import getLogger
|
|
from os import X_OK, access
|
|
from os.path import exists, join
|
|
from typing import List, Optional
|
|
|
|
from pydbus import SystemBus
|
|
|
|
from ..dbus_helper import grep
|
|
|
|
log = getLogger(__name__)
|
|
|
|
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
|
|
|
|
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
|
|
LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
|
|
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
|
|
|
|
|
|
class ComputeModuleTypes(IntEnum):
|
|
"""
|
|
Enumeration class to represent compute module types.
|
|
|
|
This class is an enumeration that defines various types of compute
|
|
modules and assigns them associated integer values for identifying
|
|
different module types.
|
|
|
|
Attributes:
|
|
UNKNOWN (int): Represents an unknown or undefined compute module type.
|
|
CM1 (int): Represents a Compute Module 1.
|
|
CM3 (int): Represents a Compute Module 3.
|
|
CM4 (int): Represents a Compute Module 4.
|
|
CM4S (int): Represents a Compute Module 4S.
|
|
CM5 (int): Represents a Compute Module 5.
|
|
"""
|
|
UNKNOWN = 0
|
|
CM1 = 6
|
|
CM3 = 10
|
|
CM4 = 20
|
|
CM4S = 21
|
|
CM5 = 24
|
|
|
|
|
|
class ConfigActions(Enum):
|
|
"""
|
|
Enumeration class for defining configuration actions.
|
|
|
|
This enumeration provides predefined constants for common configuration
|
|
actions. It can be used to ensure consistency when working with or defining
|
|
such actions in a system.
|
|
"""
|
|
ENABLE = "enable"
|
|
DISABLE = "disable"
|
|
STATUS = "status"
|
|
AVAILABLE = "available"
|
|
|
|
|
|
class RevPiConfig:
|
|
"""
|
|
Represents the configuration and hardware details of a Revolution Pi system.
|
|
|
|
This class provides methods and properties to initialize and fetch
|
|
information related to the Revolution Pi device, such as model, serial
|
|
number, compute module type, WLAN capability, and the presence of a
|
|
connection bridge. The class works by parsing system-level files (e.g.,
|
|
`/proc/cpuinfo`) and using this data to identify hardware characteristics
|
|
and features.
|
|
|
|
Attributes:
|
|
serial (str): The serial number of the Revolution Pi device.
|
|
model (str): The model name of the Revolution Pi device.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._cm_type = ComputeModuleTypes.UNKNOWN
|
|
|
|
self._revpi_with_con_bridge = False
|
|
self._wlan_class_path = ""
|
|
|
|
self.serial = ""
|
|
self.model = ""
|
|
|
|
self._init_device_info()
|
|
|
|
def _init_device_info(self):
|
|
"""
|
|
Initialize and retrieve detailed hardware information, including CPU details,
|
|
device type, WLAN interface, and connectivity features.
|
|
|
|
This method gathers information from system files and other sources to
|
|
initialize device-specific attributes such as model, serial number,
|
|
compute module type, and optional features like integrated WLAN
|
|
or ConBridge support. It performs checks specific to the detected
|
|
module type to accurately populate necessary device details.
|
|
|
|
Attributes
|
|
----------
|
|
model : str
|
|
The model of the CPU based on information from /proc/cpuinfo.
|
|
serial : str
|
|
The serial number extracted from /proc/cpuinfo.
|
|
_cm_type : ComputeModuleTypes, optional
|
|
The type of the compute module derived from the hardware revision value.
|
|
_wlan_class_path : str, optional
|
|
Filesystem path to the detected WLAN interface, if any.
|
|
_revpi_with_con_bridge : bool
|
|
Indicates whether the device supports the ConBridge feature.
|
|
"""
|
|
dc_cpuinfo = {}
|
|
|
|
# Extract CPU information
|
|
with open("/proc/cpuinfo", "r") as f:
|
|
line = "\n"
|
|
while line:
|
|
line = f.readline()
|
|
if line.startswith(("Revision", "Serial", "Model")):
|
|
key, value = line.split(":", 1)
|
|
key = key.strip().lower()
|
|
value = value.strip()
|
|
dc_cpuinfo[key] = value
|
|
|
|
self.model = dc_cpuinfo.get("model", "")
|
|
self.serial = dc_cpuinfo.get("serial", "")
|
|
|
|
# Detect Compute Module type
|
|
revision = dc_cpuinfo.get("revision", "")
|
|
if revision:
|
|
revision = int(revision, 16)
|
|
mask = 4080 # 0xFF0 in dezimal
|
|
try:
|
|
self._cm_type = ComputeModuleTypes((revision & mask) >> 4)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Detect WLAN on CM module
|
|
could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
|
|
if could_have_wlan:
|
|
wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0")
|
|
if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")):
|
|
self._wlan_class_path = wlan_interface
|
|
|
|
# If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
|
|
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
|
|
lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
|
|
for wlan_interface in lst_wlan_interfaces:
|
|
if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
|
|
self._wlan_class_path = wlan_interface
|
|
|
|
# Detect ConBridge
|
|
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
|
|
if could_have_con_bridge:
|
|
lst_grep = grep("kunbus,revpi-connect", "/proc/device-tree/compatible")
|
|
self._revpi_with_con_bridge = len(lst_grep) > 0
|
|
|
|
@property
|
|
def class_path_wlan(self) -> str:
|
|
"""
|
|
Provides access to the WLAN class path.
|
|
|
|
This property retrieves the stored WLAN class path, allowing the user to access it when
|
|
needed.
|
|
|
|
Returns:
|
|
str: The WLAN class path.
|
|
"""
|
|
return self._wlan_class_path
|
|
|
|
@property
|
|
def cm_type(self) -> ComputeModuleTypes:
|
|
"""
|
|
Gets the type of the compute module.
|
|
|
|
The property provides access to the type of the compute
|
|
module used. The type is represented as an instance of
|
|
the `ComputeModuleTypes` class.
|
|
|
|
Returns
|
|
-------
|
|
ComputeModuleTypes
|
|
The type of the compute module.
|
|
"""
|
|
return self._cm_type
|
|
|
|
@property
|
|
def with_con_bridge(self) -> bool:
|
|
"""
|
|
Indicates if the device is configured with a connection bridge.
|
|
|
|
This property checks the internal status and determines whether the device setup
|
|
includes a connection bridge functionality. It is read-only.
|
|
|
|
Returns:
|
|
bool: True if the connection bridge is configured, False otherwise.
|
|
"""
|
|
return self._revpi_with_con_bridge
|
|
|
|
@property
|
|
def with_wlan(self) -> bool:
|
|
"""
|
|
Checks if WLAN is available.
|
|
|
|
This property evaluates whether WLAN is enabled or available by checking
|
|
the presence or value of the internal attribute `_wlan_class_path`.
|
|
|
|
Returns:
|
|
bool: True if WLAN is available, False otherwise.
|
|
"""
|
|
return bool(self._wlan_class_path)
|
|
|
|
|
|
class ConfigTxt:
|
|
"""
|
|
Configuration file handler for managing 'config.txt'.
|
|
|
|
This class provides an interface to read, modify, save, and reload
|
|
Raspbian's configuration file `config.txt`. It includes functionalities
|
|
to manipulate specific parameters within the configuration and supports
|
|
managing dtoverlay and dtparam entries. The primary aim of this class
|
|
is to abstract file operations and make modifications user-friendly.
|
|
|
|
Attributes:
|
|
_config_txt_path (str): The path to the configuration file `config.txt`.
|
|
_config_txt_lines (list[str]): Contains all lines of the configuration
|
|
file as a list of strings, where each string represents a line.
|
|
"""
|
|
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
|
|
|
|
def __init__(self):
|
|
self._config_txt_path = ""
|
|
for path in CONFIG_TXT_LOCATIONS:
|
|
if exists(path):
|
|
self._config_txt_path = path
|
|
break
|
|
|
|
if not self._config_txt_path:
|
|
raise FileNotFoundError("no config.txt found")
|
|
|
|
self._config_txt_lines = []
|
|
|
|
def _clear_name_values(self, name: str, values: str or list) -> int:
|
|
"""
|
|
Removes all occurrences of specified name-value pairs from the configuration.
|
|
|
|
This method searches for all name-value pairs in the configuration and
|
|
removes those that match the given name and value(s). It returns the
|
|
number of occurrences removed.
|
|
|
|
Arguments:
|
|
name: str
|
|
The name of the configuration variable to search for.
|
|
values: str or list
|
|
The value or list of values to match the configuration variable
|
|
against.
|
|
|
|
Returns:
|
|
int: The number of name-value pairs removed from the configuration.
|
|
|
|
"""
|
|
counter = 0
|
|
if type(values) is str:
|
|
values = [values]
|
|
|
|
for config_var in self._get_all_name_values():
|
|
if config_var.name == name and config_var.value in values:
|
|
self._config_txt_lines.pop(config_var.line_index)
|
|
counter += 1
|
|
|
|
return counter
|
|
|
|
def _get_all_name_values(self) -> List[ConfigVariable]:
|
|
"""
|
|
Retrieves all name-value pairs from the configuration text lines.
|
|
|
|
This method parses the configuration text lines to extract all name-value
|
|
pairs. If the configuration text lines are not loaded, it reloads the
|
|
configuration before processing. Each extracted name-value pair is added to a
|
|
list as a ConfigVariable object, which also holds the index of the match in
|
|
the text lines. The method returns the compiled list of these ConfigVariable
|
|
objects.
|
|
|
|
Returns:
|
|
List[ConfigVariable]: A list of ConfigVariable objects representing the
|
|
name-value pairs found in the configuration text lines, along with their
|
|
corresponding indexes.
|
|
"""
|
|
if not self._config_txt_lines:
|
|
self.reload_config()
|
|
|
|
lst_return = []
|
|
|
|
for i in range(len(self._config_txt_lines)):
|
|
match = self.re_name_value.match(self._config_txt_lines[i])
|
|
if match:
|
|
lst_return.append(ConfigVariable(match.group("name"), match.group("value"), i))
|
|
|
|
return lst_return
|
|
|
|
def reload_config(self):
|
|
"""
|
|
Reloads the configuration file and updates the list of configuration lines.
|
|
|
|
This method reads the content of the configuration file specified by the
|
|
attribute `_config_txt_path` and updates `_config_txt_lines` with the file
|
|
contents as a list of strings, where each string represents a line.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
with open(self._config_txt_path, "r") as f:
|
|
self._config_txt_lines = f.readlines()
|
|
|
|
def save_config(self):
|
|
"""
|
|
Saves the current configuration to a file. The method ensures atomicity by first writing
|
|
to a temporary file and then moving it to the desired path. After the configuration is
|
|
saved, the internal list of configuration lines is cleared.
|
|
|
|
Raises:
|
|
OSError: If there is an issue writing to or moving the file.
|
|
"""
|
|
if not self._config_txt_lines:
|
|
return
|
|
|
|
tmp_path = f"{self._config_txt_path}.tmp"
|
|
with open(tmp_path, "w") as f:
|
|
f.writelines(self._config_txt_lines)
|
|
shutil.move(tmp_path, self._config_txt_path)
|
|
|
|
self._config_txt_lines.clear()
|
|
|
|
def add_name_value(self, name: str, value: str):
|
|
"""
|
|
Adds a name-value pair to the configuration if it does not already exist.
|
|
|
|
This method checks if the given name-value pair is already present in
|
|
the configuration. If it is not present, the pair is appended to the
|
|
configuration text lines.
|
|
|
|
Parameters:
|
|
name (str): The name to be added to the configuration.
|
|
value (str): The value corresponding to the name to be added.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
# Check weather name and value already exists
|
|
for config_var in self._get_all_name_values():
|
|
if config_var.name == name and config_var.value == value:
|
|
return
|
|
|
|
self._config_txt_lines.append(f"{name}={value}\n")
|
|
|
|
def clear_dtoverlays(self, dtoverlays: str or list) -> int:
|
|
"""
|
|
Clears the specified device tree overlays. This method removes one or more
|
|
device tree overlays by clearing their corresponding name-value pairs.
|
|
|
|
Args:
|
|
dtoverlays (str or list): A device tree overlay name as a string, or a
|
|
list of such overlay names to be cleared.
|
|
|
|
Returns:
|
|
int: The number of device tree overlay name-value pairs successfully
|
|
cleared.
|
|
"""
|
|
return self._clear_name_values("dtoverlay", dtoverlays)
|
|
|
|
def clear_dtparams(self, dtparams: str or list) -> int:
|
|
"""
|
|
Clears the specified device tree parameters.
|
|
|
|
This method removes the given device tree parameters by utilizing
|
|
the underlying `_clear_name_values` function with a predefined
|
|
parameter type.
|
|
|
|
Parameters:
|
|
dtparams: str or list
|
|
A string or list of strings specifying the device tree
|
|
parameters to remove.
|
|
|
|
Returns:
|
|
int
|
|
The number of parameters cleared.
|
|
"""
|
|
return self._clear_name_values("dtparam", dtparams)
|
|
|
|
def get_values(self, var_name: str) -> list:
|
|
"""
|
|
Get all values associated with a given variable name.
|
|
|
|
This method retrieves a list of values corresponding to the specified
|
|
variable name by iterating through a collection of configuration
|
|
variables. Each configuration variable is checked for a matching name,
|
|
and its value is appended to the resulting list if a match is found.
|
|
|
|
Parameters:
|
|
var_name (str): The name of the variable for which values are to
|
|
be retrieved.
|
|
|
|
Returns:
|
|
list: A list of values associated with the specified variable name.
|
|
"""
|
|
var_values = []
|
|
|
|
for config_var in self._get_all_name_values():
|
|
if config_var.name == var_name:
|
|
var_values.append(config_var.value)
|
|
|
|
return var_values
|
|
|
|
@property
|
|
def config_txt_path(self) -> str:
|
|
"""
|
|
Get the file path for the configuration text file.
|
|
|
|
This property provides access to the private attribute `_config_txt_path` which
|
|
stores the file path to the configuration text file.
|
|
|
|
Returns:
|
|
str
|
|
The file path to the configuration text file.
|
|
"""
|
|
return self._config_txt_path
|
|
|
|
|
|
def configure_avahi_daemon(action: ConfigActions):
|
|
return_value = simple_systemd(action, "avahi-daemon.service")
|
|
|
|
# Post actions for avahi-daemon
|
|
if action in (ConfigActions.ENABLE, ConfigActions.DISABLE):
|
|
# Apply the enable/disable action to the avahi socket AFTER the service
|
|
# unit, because a connected socket could interrupt stop
|
|
simple_systemd(action, "avahi-daemon.socket")
|
|
|
|
return return_value
|
|
|
|
|
|
def configure_bluetooth(action: ConfigActions):
|
|
hci_device = join(LINUX_BT_CLASS_PATH, "hci0")
|
|
bt_rfkill_index = get_rfkill_index(hci_device)
|
|
|
|
# If the bluetooth device is not present, the device should have been
|
|
# brought up by revpi-bluetooth's udev rules or vendor magic (devices
|
|
# based on CM4 and newer). Nothing we can do here, so treat the interface
|
|
# as disabled.
|
|
|
|
if action is ConfigActions.ENABLE:
|
|
if bt_rfkill_index is not None:
|
|
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
|
|
f.write("0")
|
|
|
|
elif action is ConfigActions.DISABLE:
|
|
if bt_rfkill_index is not None:
|
|
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
|
|
f.write("1")
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
if bt_rfkill_index is None:
|
|
return False
|
|
|
|
with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "r") as f:
|
|
buffer = f.read().strip()
|
|
return buffer == "0"
|
|
|
|
elif action is ConfigActions.AVAILABLE:
|
|
return bt_rfkill_index is not None
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
return None
|
|
|
|
|
|
def configure_con_can(action: ConfigActions):
|
|
revpi = RevPiConfig()
|
|
if action is ConfigActions.AVAILABLE:
|
|
return revpi.with_con_bridge
|
|
|
|
dt_overlay = "revpi-con-can"
|
|
config_txt = ConfigTxt()
|
|
|
|
if action is ConfigActions.ENABLE and revpi.with_con_bridge:
|
|
config_txt.clear_dtoverlays([dt_overlay])
|
|
config_txt.add_name_value("dtoverlay", dt_overlay)
|
|
config_txt.save_config()
|
|
subprocess.call(["/usr/bin/dtoverlay", dt_overlay])
|
|
|
|
elif action is ConfigActions.DISABLE and revpi.with_con_bridge:
|
|
config_txt.clear_dtoverlays([dt_overlay])
|
|
config_txt.save_config()
|
|
subprocess.call(["/usr/bin/dtoverlay", "-r", dt_overlay])
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
return revpi.with_con_bridge and dt_overlay in config_txt.get_values("dtparam")
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
return None
|
|
|
|
|
|
def configure_dphys_swapfile(action: ConfigActions):
|
|
return_value = simple_systemd(action, "dphys-swapfile.service")
|
|
|
|
# Post actions for dphys-swapfile
|
|
if action is ConfigActions.DISABLE:
|
|
# Remove swapfile afer disabling the service unit
|
|
subprocess.call(
|
|
["/sbin/dphys-swapfile", "uninstall"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
return return_value
|
|
|
|
|
|
def configure_external_antenna(action: ConfigActions):
|
|
revpi = RevPiConfig()
|
|
if action is ConfigActions.AVAILABLE:
|
|
return revpi.with_wlan
|
|
|
|
config_txt = ConfigTxt()
|
|
|
|
if action is ConfigActions.ENABLE and revpi.with_wlan:
|
|
config_txt.clear_dtparams(["ant1", "ant2"])
|
|
config_txt.add_name_value("dtparam", "ant2")
|
|
config_txt.save_config()
|
|
|
|
elif action is ConfigActions.DISABLE and revpi.with_wlan:
|
|
config_txt.clear_dtparams(["ant1", "ant2"])
|
|
config_txt.save_config()
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
return None
|
|
|
|
|
|
def configure_gui(action: ConfigActions):
|
|
gui_available = access("/usr/bin/startx", X_OK)
|
|
|
|
if action is ConfigActions.AVAILABLE:
|
|
return gui_available
|
|
|
|
bus = SystemBus()
|
|
systemd = bus.get(
|
|
"org.freedesktop.systemd1",
|
|
"/org/freedesktop/systemd1",
|
|
)
|
|
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
|
|
|
|
if action is ConfigActions.ENABLE:
|
|
systemd_manager.SetDefaultTarget("graphical.target", True)
|
|
|
|
elif action is ConfigActions.DISABLE:
|
|
systemd_manager.SetDefaultTarget("multi-user.target", True)
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
return systemd_manager.GetDefaultTarget() == "graphical.target"
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
|
|
def configure_wlan(action: ConfigActions):
|
|
revpi = RevPiConfig()
|
|
|
|
if action is ConfigActions.ENABLE:
|
|
if revpi.with_wlan:
|
|
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
|
|
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
|
|
f.write("0")
|
|
|
|
elif action is ConfigActions.DISABLE:
|
|
if revpi.with_wlan:
|
|
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
|
|
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
|
|
f.write("1")
|
|
|
|
elif action is ConfigActions.AVAILABLE:
|
|
return revpi.with_wlan
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
if not revpi.with_wlan:
|
|
return False
|
|
|
|
wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
|
|
with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
|
|
buffer = f.read().strip()
|
|
return buffer == "0"
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
return None
|
|
|
|
|
|
def get_rfkill_index(device_class_path: str) -> Optional[int]:
|
|
"""
|
|
Get the rfkill index for a device under a specific device class path.
|
|
|
|
This function searches for and extracts the rfkill index associated with
|
|
devices located under the given device class path. It uses a regular
|
|
expression to identify and parse the rfkill index from the paths
|
|
of matching rfkill device files.
|
|
|
|
Parameters:
|
|
device_class_path: str
|
|
The path to the device class directory where rfkill entries
|
|
are located.
|
|
|
|
Returns:
|
|
Optional[int]:
|
|
The index of the rfkill device if found, otherwise None.
|
|
"""
|
|
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
|
|
for rfkill_path in glob(join(device_class_path, "rfkill*")):
|
|
match_index = re_rfkill_index.match(rfkill_path)
|
|
if match_index:
|
|
return int(match_index.group("index"))
|
|
|
|
return None
|
|
|
|
|
|
def simple_systemd(action: ConfigActions, unit: str):
|
|
"""
|
|
Performs specified actions on systemd units.
|
|
|
|
This function allows interaction with systemd units for various operations
|
|
such as enabling, disabling, checking the status, and verifying availability.
|
|
It communicates with the systemd manager via the SystemBus and handles units
|
|
based on the action specified.
|
|
|
|
Parameters:
|
|
action (ConfigActions): Specifies the action to be performed on the
|
|
systemd unit. Supported actions include ENABLE,
|
|
DISABLE, STATUS, and AVAILABLE.
|
|
unit (str): The name of the systemd unit on which the action is to be
|
|
performed.
|
|
|
|
Returns:
|
|
bool: For STATUS and AVAILABLE actions, returns True if the corresponding
|
|
criteria are met (e.g., enabled and active for STATUS, or not found
|
|
for AVAILABLE). Otherwise, returns False.
|
|
|
|
Raises:
|
|
ValueError: If the specified action is not supported.
|
|
"""
|
|
bus = SystemBus()
|
|
systemd = bus.get(
|
|
"org.freedesktop.systemd1",
|
|
"/org/freedesktop/systemd1",
|
|
)
|
|
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
|
|
|
|
if action is ConfigActions.ENABLE:
|
|
systemd_manager.UnmaskUnitFiles([unit], False)
|
|
systemd_manager.EnableUnitFiles([unit], False, False)
|
|
systemd_manager.StartUnit(unit, "replace")
|
|
|
|
elif action is ConfigActions.DISABLE:
|
|
systemd_manager.StopUnit(unit, "replace")
|
|
systemd_manager.DisableUnitFiles([unit], False)
|
|
|
|
elif action is ConfigActions.STATUS:
|
|
try:
|
|
unit_path = systemd_manager.LoadUnit(unit)
|
|
properties = bus.get("org.freedesktop.systemd1", unit_path)
|
|
except Exception:
|
|
log.warning(f"could not get systemd unit {unit}")
|
|
return False
|
|
|
|
return properties.UnitFileState == "enabled" and properties.ActiveState == "active"
|
|
|
|
elif action is ConfigActions.AVAILABLE:
|
|
try:
|
|
unit_path = systemd_manager.LoadUnit(unit)
|
|
properties = bus.get("org.freedesktop.systemd1", unit_path)
|
|
except Exception:
|
|
log.warning(f"could not get systemd unit {unit}")
|
|
return False
|
|
|
|
return properties.LoadState != "not-found"
|
|
|
|
else:
|
|
raise ValueError(f"action {action} not supported")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
rc = RevPiConfig()
|
|
print("Model:", rc.model)
|
|
print("Serial: ", rc.serial)
|
|
print("CM Type: ", rc.cm_type.name)
|
|
print("With WLAN: ", rc.with_wlan)
|
|
if rc.with_wlan:
|
|
print(" class path: ", rc.class_path_wlan)
|
|
print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
|
|
print("With con-bridge:", rc.with_con_bridge)
|
|
|
|
config_txt = ConfigTxt()
|
|
print("Config file: ", config_txt.config_txt_path)
|