10 Commits

Author SHA1 Message Date
cc560770ce feat(revpiconfig): Make unit config changes asynchronous
Refactored unit enable/disable actions to run in separate threads,
ensuring non-blocking operations. This enhances performance and avoids
potential delays during systemd operations.
2025-04-22 13:42:33 +02:00
4df903783c fix(revpiconfig): Ensure systemd reloads after unit changes
Added systemd reload calls after unit enable/disable to reflect changes.
Adjusted DBus method calls to capture and utilize change outputs
effectively. This improves reliability in applying unit modifications.
2025-04-22 12:35:43 +02:00
8db1f59cfe doc(revpiconfig): Docstrings for get_rfkill_index and simple_systemd
The new docstrings provide detailed explanations of the purpose,
parameters, and return values for both functions, improving code
readability and maintainability. This ensures better understanding for
future contributors and reduces ambiguity.
2025-04-22 11:06:32 +02:00
7051eba9b9 doc(revpiconfig): Add docstrings to enums in revpi_config.py
This update introduces detailed docstrings for the `ComputeModuleTypes`
and `ConfigActions` enumeration classes. The docstrings provide
descriptions for each class and their attributes, improving code
readability and maintainability.
2025-04-22 11:06:32 +02:00
04780bd0dd doc(revpiconfig): Add docstrings to RevPiConfig class and methods
Enhance documentation for the `RevPiConfig` class, its methods, and
properties to improve code readability and ease of use. The added
docstrings provide clear explanations of the class's purpose,
attributes, and functionality for developers and users. This update
supports better maintainability and understanding of the codebase.
2025-04-22 11:06:32 +02:00
41fb2b3c61 doc(revpiconfig): Add detailed docstrings to ConfigTxt methods
Enhance the `ConfigTxt` class with comprehensive docstrings for all
methods, providing clear explanations of their functionality,
parameters, and return values. This improves code readability and
facilitates easier maintenance and understanding for future developers.
2025-04-22 11:06:32 +02:00
41d9b13e71 fix(dbus): Update systemd interface and path handling
Revised DBus interactions to explicitly use `org.freedesktop.systemd1`
interface and path. This ensures that the correct interfaces are used
and bypasses ".systemd1" magic from the library `pydbus`.
2025-04-22 10:59:59 +02:00
463a61a001 fix(dbus): Update DBus policy file path and interface name
Change the comment to reflect the new DBus policy file location. Adjust
the interface name to use `PiControl` instead of `picontrol` for
consistency.
2025-04-22 10:37:36 +02:00
1fab228272 refactor: Rename WiFi to WLAN for consistent terminology
Updated variable names, function names, and comments to replace "WiFi"
with "WLAN" throughout the codebase. This ensures alignment with
standardized terminology and improves clarity in functionality and
configuration handling. Adjusted related configurations and interface
mappings accordingly.
2025-04-21 13:34:10 +02:00
fc82ec0eb9 feat(revpiconfig): Replace rfkill subprocess calls with sysfs writes
Updated Bluetooth and WiFi rfkill handling by replacing subprocess calls
to "rfkill" with direct writes to sysfs files. This change simplifies
the implementation and improves performance by avoiding external
command execution.
2025-04-21 13:25:56 +02:00
3 changed files with 357 additions and 46 deletions

View File

@@ -1,4 +1,4 @@
<!-- /etc/dbus-1/system.d/revpi-middleware.conf --> <!-- /usr/share/dbus-1/system.d/com.revolutionpi.middleware1.conf -->
<busconfig> <busconfig>
<!-- Allow full access to root as the bus owner --> <!-- Allow full access to root as the bus owner -->
<policy user="root"> <policy user="root">
@@ -12,7 +12,7 @@
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="org.freedesktop.DBus.Introspectable"/> send_interface="org.freedesktop.DBus.Introspectable"/>
<allow send_destination="com.revolutionpi.middleware1" <allow send_destination="com.revolutionpi.middleware1"
send_interface="com.revolutionpi.middleware1.picontrol"/> send_interface="com.revolutionpi.middleware1.PiControl"/>
</policy> </policy>
<!-- Standard-Policy --> <!-- Standard-Policy -->

View File

@@ -13,7 +13,7 @@ from .revpi_config import (
configure_dphys_swapfile, configure_dphys_swapfile,
configure_external_antenna, configure_external_antenna,
configure_gui, configure_gui,
configure_wifi, configure_wlan,
simple_systemd, simple_systemd,
) )
from ..dbus_helper import DbusInterface from ..dbus_helper import DbusInterface
@@ -94,7 +94,7 @@ AVAILABLE_FEATURES = {
), ),
"revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]), "revpipyload": FeatureFunction(simple_systemd, ["revpipyload.service"]),
"bluetooth": FeatureFunction(configure_bluetooth, []), "bluetooth": FeatureFunction(configure_bluetooth, []),
"ieee80211": FeatureFunction(configure_wifi, []), "wlan": FeatureFunction(configure_wlan, []),
"avahi": FeatureFunction(configure_avahi_daemon, []), "avahi": FeatureFunction(configure_avahi_daemon, []),
"external-antenna": FeatureFunction(configure_external_antenna, []), "external-antenna": FeatureFunction(configure_external_antenna, []),
} }

View File

@@ -10,6 +10,7 @@ from glob import glob
from logging import getLogger from logging import getLogger
from os import X_OK, access from os import X_OK, access
from os.path import exists, join from os.path import exists, join
from threading import Thread
from typing import List, Optional from typing import List, Optional
from pydbus import SystemBus from pydbus import SystemBus
@@ -21,11 +22,26 @@ log = getLogger(__name__)
ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"]) ConfigVariable = namedtuple("ConfigVariable", ["name", "value", "line_index"])
LINUX_BT_CLASS_PATH = "/sys/class/bluetooth" LINUX_BT_CLASS_PATH = "/sys/class/bluetooth"
LINUX_WIFI_CLASS_PATH = "/sys/class/ieee80211" LINUX_WLAN_CLASS_PATH = "/sys/class/ieee80211"
CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt") CONFIG_TXT_LOCATIONS = ("/boot/firmware/config.txt", "/boot/config.txt")
class ComputeModuleTypes(IntEnum): class ComputeModuleTypes(IntEnum):
"""
Enumeration class to represent compute module types.
This class is an enumeration that defines various types of compute
modules and assigns them associated integer values for identifying
different module types.
Attributes:
UNKNOWN (int): Represents an unknown or undefined compute module type.
CM1 (int): Represents a Compute Module 1.
CM3 (int): Represents a Compute Module 3.
CM4 (int): Represents a Compute Module 4.
CM4S (int): Represents a Compute Module 4S.
CM5 (int): Represents a Compute Module 5.
"""
UNKNOWN = 0 UNKNOWN = 0
CM1 = 6 CM1 = 6
CM3 = 10 CM3 = 10
@@ -35,6 +51,13 @@ class ComputeModuleTypes(IntEnum):
class ConfigActions(Enum): class ConfigActions(Enum):
"""
Enumeration class for defining configuration actions.
This enumeration provides predefined constants for common configuration
actions. It can be used to ensure consistency when working with or defining
such actions in a system.
"""
ENABLE = "enable" ENABLE = "enable"
DISABLE = "disable" DISABLE = "disable"
STATUS = "status" STATUS = "status"
@@ -42,6 +65,20 @@ class ConfigActions(Enum):
class RevPiConfig: class RevPiConfig:
"""
Represents the configuration and hardware details of a Revolution Pi system.
This class provides methods and properties to initialize and fetch
information related to the Revolution Pi device, such as model, serial
number, compute module type, WLAN capability, and the presence of a
connection bridge. The class works by parsing system-level files (e.g.,
`/proc/cpuinfo`) and using this data to identify hardware characteristics
and features.
Attributes:
serial (str): The serial number of the Revolution Pi device.
model (str): The model name of the Revolution Pi device.
"""
def __init__(self): def __init__(self):
self._cm_type = ComputeModuleTypes.UNKNOWN self._cm_type = ComputeModuleTypes.UNKNOWN
@@ -55,6 +92,29 @@ class RevPiConfig:
self._init_device_info() self._init_device_info()
def _init_device_info(self): def _init_device_info(self):
"""
Initialize and retrieve detailed hardware information, including CPU details,
device type, WLAN interface, and connectivity features.
This method gathers information from system files and other sources to
initialize device-specific attributes such as model, serial number,
compute module type, and optional features like integrated WLAN
or ConBridge support. It performs checks specific to the detected
module type to accurately populate necessary device details.
Attributes
----------
model : str
The model of the CPU based on information from /proc/cpuinfo.
serial : str
The serial number extracted from /proc/cpuinfo.
_cm_type : ComputeModuleTypes, optional
The type of the compute module derived from the hardware revision value.
_wlan_class_path : str, optional
Filesystem path to the detected WLAN interface, if any.
_revpi_with_con_bridge : bool
Indicates whether the device supports the ConBridge feature.
"""
dc_cpuinfo = {} dc_cpuinfo = {}
# Extract CPU information # Extract CPU information
@@ -81,19 +141,19 @@ class RevPiConfig:
except ValueError: except ValueError:
pass pass
# Detect WiFi on CM module # Detect WLAN on CM module
could_have_wifi = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5) could_have_wlan = self._cm_type in (ComputeModuleTypes.CM4, ComputeModuleTypes.CM5)
if could_have_wifi: if could_have_wlan:
wifi_interface = join(LINUX_WIFI_CLASS_PATH, "phy0") wlan_interface = join(LINUX_WLAN_CLASS_PATH, "phy0")
if grep("DRIVER=brcmfmac", join(wifi_interface, "device", "uevent")): if grep("DRIVER=brcmfmac", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wifi_interface self._wlan_class_path = wlan_interface
# If no build in Wi-Fi on the CM, detect third party Wi-Fi on RevPi Flat # If no build in WLAN on the CM, detect third party WLAN on RevPi Flat
if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"): if not self._wlan_class_path and grep("revpi-flat", "/proc/device-tree/compatible"):
lst_wifi_interfaces = glob("/sys/class/ieee80211/*") lst_wlan_interfaces = glob("/sys/class/ieee80211/*")
for wifi_interface in lst_wifi_interfaces: for wlan_interface in lst_wlan_interfaces:
if grep("DRIVER=mwifiex_sdio", join(wifi_interface, "device", "uevent")): if grep("DRIVER=mwifiex_sdio", join(wlan_interface, "device", "uevent")):
self._wlan_class_path = wifi_interface self._wlan_class_path = wlan_interface
# Detect ConBridge # Detect ConBridge
could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S) could_have_con_bridge = self._cm_type in (ComputeModuleTypes.CM3, ComputeModuleTypes.CM4S)
@@ -102,23 +162,76 @@ class RevPiConfig:
self._revpi_with_con_bridge = len(lst_grep) > 0 self._revpi_with_con_bridge = len(lst_grep) > 0
@property @property
def class_path_wifi(self) -> str: def class_path_wlan(self) -> str:
"""
Provides access to the WLAN class path.
This property retrieves the stored WLAN class path, allowing the user to access it when
needed.
Returns:
str: The WLAN class path.
"""
return self._wlan_class_path return self._wlan_class_path
@property @property
def cm_type(self) -> ComputeModuleTypes: def cm_type(self) -> ComputeModuleTypes:
"""
Gets the type of the compute module.
The property provides access to the type of the compute
module used. The type is represented as an instance of
the `ComputeModuleTypes` class.
Returns
-------
ComputeModuleTypes
The type of the compute module.
"""
return self._cm_type return self._cm_type
@property @property
def with_con_bridge(self) -> bool: def with_con_bridge(self) -> bool:
"""
Indicates if the device is configured with a connection bridge.
This property checks the internal status and determines whether the device setup
includes a connection bridge functionality. It is read-only.
Returns:
bool: True if the connection bridge is configured, False otherwise.
"""
return self._revpi_with_con_bridge return self._revpi_with_con_bridge
@property @property
def with_wifi(self) -> bool: def with_wlan(self) -> bool:
"""
Checks if WLAN is available.
This property evaluates whether WLAN is enabled or available by checking
the presence or value of the internal attribute `_wlan_class_path`.
Returns:
bool: True if WLAN is available, False otherwise.
"""
return bool(self._wlan_class_path) return bool(self._wlan_class_path)
class ConfigTxt: class ConfigTxt:
"""
Configuration file handler for managing 'config.txt'.
This class provides an interface to read, modify, save, and reload
Raspbian's configuration file `config.txt`. It includes functionalities
to manipulate specific parameters within the configuration and supports
managing dtoverlay and dtparam entries. The primary aim of this class
is to abstract file operations and make modifications user-friendly.
Attributes:
_config_txt_path (str): The path to the configuration file `config.txt`.
_config_txt_lines (list[str]): Contains all lines of the configuration
file as a list of strings, where each string represents a line.
"""
re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$") re_name_value = re.compile(r"^\s*(?!#)(?P<name>[^=\s].+?)\s*=\s*(?P<value>\S+)\s*$")
def __init__(self): def __init__(self):
@@ -134,6 +247,24 @@ class ConfigTxt:
self._config_txt_lines = [] self._config_txt_lines = []
def _clear_name_values(self, name: str, values: str or list) -> int: def _clear_name_values(self, name: str, values: str or list) -> int:
"""
Removes all occurrences of specified name-value pairs from the configuration.
This method searches for all name-value pairs in the configuration and
removes those that match the given name and value(s). It returns the
number of occurrences removed.
Arguments:
name: str
The name of the configuration variable to search for.
values: str or list
The value or list of values to match the configuration variable
against.
Returns:
int: The number of name-value pairs removed from the configuration.
"""
counter = 0 counter = 0
if type(values) is str: if type(values) is str:
values = [values] values = [values]
@@ -146,6 +277,21 @@ class ConfigTxt:
return counter return counter
def _get_all_name_values(self) -> List[ConfigVariable]: def _get_all_name_values(self) -> List[ConfigVariable]:
"""
Retrieves all name-value pairs from the configuration text lines.
This method parses the configuration text lines to extract all name-value
pairs. If the configuration text lines are not loaded, it reloads the
configuration before processing. Each extracted name-value pair is added to a
list as a ConfigVariable object, which also holds the index of the match in
the text lines. The method returns the compiled list of these ConfigVariable
objects.
Returns:
List[ConfigVariable]: A list of ConfigVariable objects representing the
name-value pairs found in the configuration text lines, along with their
corresponding indexes.
"""
if not self._config_txt_lines: if not self._config_txt_lines:
self.reload_config() self.reload_config()
@@ -159,10 +305,28 @@ class ConfigTxt:
return lst_return return lst_return
def reload_config(self): def reload_config(self):
"""
Reloads the configuration file and updates the list of configuration lines.
This method reads the content of the configuration file specified by the
attribute `_config_txt_path` and updates `_config_txt_lines` with the file
contents as a list of strings, where each string represents a line.
Returns:
None
"""
with open(self._config_txt_path, "r") as f: with open(self._config_txt_path, "r") as f:
self._config_txt_lines = f.readlines() self._config_txt_lines = f.readlines()
def save_config(self): def save_config(self):
"""
Saves the current configuration to a file. The method ensures atomicity by first writing
to a temporary file and then moving it to the desired path. After the configuration is
saved, the internal list of configuration lines is cleared.
Raises:
OSError: If there is an issue writing to or moving the file.
"""
if not self._config_txt_lines: if not self._config_txt_lines:
return return
@@ -174,6 +338,20 @@ class ConfigTxt:
self._config_txt_lines.clear() self._config_txt_lines.clear()
def add_name_value(self, name: str, value: str): def add_name_value(self, name: str, value: str):
"""
Adds a name-value pair to the configuration if it does not already exist.
This method checks if the given name-value pair is already present in
the configuration. If it is not present, the pair is appended to the
configuration text lines.
Parameters:
name (str): The name to be added to the configuration.
value (str): The value corresponding to the name to be added.
Returns:
None
"""
# Check weather name and value already exists # Check weather name and value already exists
for config_var in self._get_all_name_values(): for config_var in self._get_all_name_values():
if config_var.name == name and config_var.value == value: if config_var.name == name and config_var.value == value:
@@ -182,12 +360,55 @@ class ConfigTxt:
self._config_txt_lines.append(f"{name}={value}\n") self._config_txt_lines.append(f"{name}={value}\n")
def clear_dtoverlays(self, dtoverlays: str or list) -> int: def clear_dtoverlays(self, dtoverlays: str or list) -> int:
"""
Clears the specified device tree overlays. This method removes one or more
device tree overlays by clearing their corresponding name-value pairs.
Args:
dtoverlays (str or list): A device tree overlay name as a string, or a
list of such overlay names to be cleared.
Returns:
int: The number of device tree overlay name-value pairs successfully
cleared.
"""
return self._clear_name_values("dtoverlay", dtoverlays) return self._clear_name_values("dtoverlay", dtoverlays)
def clear_dtparams(self, dtparams: str or list) -> int: def clear_dtparams(self, dtparams: str or list) -> int:
"""
Clears the specified device tree parameters.
This method removes the given device tree parameters by utilizing
the underlying `_clear_name_values` function with a predefined
parameter type.
Parameters:
dtparams: str or list
A string or list of strings specifying the device tree
parameters to remove.
Returns:
int
The number of parameters cleared.
"""
return self._clear_name_values("dtparam", dtparams) return self._clear_name_values("dtparam", dtparams)
def get_values(self, var_name: str) -> list: def get_values(self, var_name: str) -> list:
"""
Get all values associated with a given variable name.
This method retrieves a list of values corresponding to the specified
variable name by iterating through a collection of configuration
variables. Each configuration variable is checked for a matching name,
and its value is appended to the resulting list if a match is found.
Parameters:
var_name (str): The name of the variable for which values are to
be retrieved.
Returns:
list: A list of values associated with the specified variable name.
"""
var_values = [] var_values = []
for config_var in self._get_all_name_values(): for config_var in self._get_all_name_values():
@@ -198,6 +419,16 @@ class ConfigTxt:
@property @property
def config_txt_path(self) -> str: def config_txt_path(self) -> str:
"""
Get the file path for the configuration text file.
This property provides access to the private attribute `_config_txt_path` which
stores the file path to the configuration text file.
Returns:
str
The file path to the configuration text file.
"""
return self._config_txt_path return self._config_txt_path
@@ -224,11 +455,13 @@ def configure_bluetooth(action: ConfigActions):
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
if bt_rfkill_index is not None: if bt_rfkill_index is not None:
subprocess.call(["rfkill", "unblock", str(bt_rfkill_index)]) with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE: elif action is ConfigActions.DISABLE:
if bt_rfkill_index is not None: if bt_rfkill_index is not None:
subprocess.call(["rfkill", "block", str(bt_rfkill_index)]) with open(f"/sys/class/rfkill/rfkill{bt_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.STATUS: elif action is ConfigActions.STATUS:
if bt_rfkill_index is None: if bt_rfkill_index is None:
@@ -293,21 +526,21 @@ def configure_dphys_swapfile(action: ConfigActions):
def configure_external_antenna(action: ConfigActions): def configure_external_antenna(action: ConfigActions):
revpi = RevPiConfig() revpi = RevPiConfig()
if action is ConfigActions.AVAILABLE: if action is ConfigActions.AVAILABLE:
return revpi.with_wifi return revpi.with_wlan
config_txt = ConfigTxt() config_txt = ConfigTxt()
if action is ConfigActions.ENABLE and revpi.with_wifi: if action is ConfigActions.ENABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"]) config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.add_name_value("dtparam", "ant2") config_txt.add_name_value("dtparam", "ant2")
config_txt.save_config() config_txt.save_config()
elif action is ConfigActions.DISABLE and revpi.with_wifi: elif action is ConfigActions.DISABLE and revpi.with_wlan:
config_txt.clear_dtparams(["ant1", "ant2"]) config_txt.clear_dtparams(["ant1", "ant2"])
config_txt.save_config() config_txt.save_config()
elif action is ConfigActions.STATUS: elif action is ConfigActions.STATUS:
return revpi.with_wifi and "ant2" in config_txt.get_values("dtparam") return revpi.with_wlan and "ant2" in config_txt.get_values("dtparam")
else: else:
raise ValueError(f"action {action} not supported") raise ValueError(f"action {action} not supported")
@@ -322,7 +555,11 @@ def configure_gui(action: ConfigActions):
return gui_available return gui_available
bus = SystemBus() bus = SystemBus()
systemd_manager = bus.get(".systemd1") systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
systemd_manager.SetDefaultTarget("graphical.target", True) systemd_manager.SetDefaultTarget("graphical.target", True)
@@ -337,28 +574,30 @@ def configure_gui(action: ConfigActions):
raise ValueError(f"action {action} not supported") raise ValueError(f"action {action} not supported")
def configure_wifi(action: ConfigActions): def configure_wlan(action: ConfigActions):
revpi = RevPiConfig() revpi = RevPiConfig()
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
if revpi.with_wifi: if revpi.with_wlan:
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi) wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
subprocess.call(["rfkill", "unblock", str(wifi_rfkill_index)]) with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("0")
elif action is ConfigActions.DISABLE: elif action is ConfigActions.DISABLE:
if revpi.with_wifi: if revpi.with_wlan:
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi) wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
subprocess.call(["rfkill", "block", str(wifi_rfkill_index)]) with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "w") as f:
f.write("1")
elif action is ConfigActions.AVAILABLE: elif action is ConfigActions.AVAILABLE:
return revpi.with_wifi return revpi.with_wlan
elif action is ConfigActions.STATUS: elif action is ConfigActions.STATUS:
if not revpi.with_wifi: if not revpi.with_wlan:
return False return False
wifi_rfkill_index = get_rfkill_index(revpi.class_path_wifi) wlan_rfkill_index = get_rfkill_index(revpi.class_path_wlan)
with open(f"/sys/class/rfkill/rfkill{wifi_rfkill_index}/soft", "r") as f: with open(f"/sys/class/rfkill/rfkill{wlan_rfkill_index}/soft", "r") as f:
buffer = f.read().strip() buffer = f.read().strip()
return buffer == "0" return buffer == "0"
@@ -369,6 +608,23 @@ def configure_wifi(action: ConfigActions):
def get_rfkill_index(device_class_path: str) -> Optional[int]: def get_rfkill_index(device_class_path: str) -> Optional[int]:
"""
Get the rfkill index for a device under a specific device class path.
This function searches for and extracts the rfkill index associated with
devices located under the given device class path. It uses a regular
expression to identify and parse the rfkill index from the paths
of matching rfkill device files.
Parameters:
device_class_path: str
The path to the device class directory where rfkill entries
are located.
Returns:
Optional[int]:
The index of the rfkill device if found, otherwise None.
"""
re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$") re_rfkill_index = re.compile(r"^/.+/rfkill(?P<index>\d+)$")
for rfkill_path in glob(join(device_class_path, "rfkill*")): for rfkill_path in glob(join(device_class_path, "rfkill*")):
match_index = re_rfkill_index.match(rfkill_path) match_index = re_rfkill_index.match(rfkill_path)
@@ -379,22 +635,75 @@ def get_rfkill_index(device_class_path: str) -> Optional[int]:
def simple_systemd(action: ConfigActions, unit: str): def simple_systemd(action: ConfigActions, unit: str):
"""
Performs specified actions on systemd units.
This function allows interaction with systemd units for various operations
such as enabling, disabling, checking the status, and verifying availability.
It communicates with the systemd manager via the SystemBus and handles units
based on the action specified.
Parameters:
action (ConfigActions): Specifies the action to be performed on the
systemd unit. Supported actions include ENABLE,
DISABLE, STATUS, and AVAILABLE.
unit (str): The name of the systemd unit on which the action is to be
performed.
Returns:
bool: For STATUS and AVAILABLE actions, returns True if the corresponding
criteria are met (e.g., enabled and active for STATUS, or not found
for AVAILABLE). Otherwise, returns False.
Raises:
ValueError: If the specified action is not supported.
"""
bus = SystemBus() bus = SystemBus()
systemd_manager = bus.get(".systemd1") systemd = bus.get(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
)
systemd_manager = systemd["org.freedesktop.systemd1.Manager"]
if action is ConfigActions.ENABLE: if action is ConfigActions.ENABLE:
systemd_manager.UnmaskUnitFiles([unit], False)
systemd_manager.EnableUnitFiles([unit], False, False) def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: UnmaskUnitFiles(in as files, in b runtime, out a(sss) changes
lst_change_unmask = systemd_manager.UnmaskUnitFiles([unit], False)
# Dbus call: EnableUnitFiles(in as files, in b runtime, in b force,
# out b carries_install_info, out a(sss) changes
lst_change_enable = systemd_manager.EnableUnitFiles([unit], False, False)
if lst_change_unmask or lst_change_enable:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StartUnit(in s name, in s mode, out o job
systemd_manager.StartUnit(unit, "replace") systemd_manager.StartUnit(unit, "replace")
elif action is ConfigActions.DISABLE: elif action is ConfigActions.DISABLE:
def thread_unit_config():
"""Change configuration asynchronously."""
# Dbus call: DisableUnitFiles (in as files, in b runtime, out a(sss) changes)
change = systemd_manager.DisableUnitFiles([unit], False)
if change:
# Reload systemd after modified unit property
systemd_manager.Reload()
Thread(target=thread_unit_config, daemon=True).start()
# Dbus call: StopUnit(in s name,in s mode, out o job
systemd_manager.StopUnit(unit, "replace") systemd_manager.StopUnit(unit, "replace")
systemd_manager.DisableUnitFiles([unit], False)
elif action is ConfigActions.STATUS: elif action is ConfigActions.STATUS:
try: try:
unit_path = systemd_manager.LoadUnit(unit) unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path) properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception: except Exception:
log.warning(f"could not get systemd unit {unit}") log.warning(f"could not get systemd unit {unit}")
return False return False
@@ -404,7 +713,7 @@ def simple_systemd(action: ConfigActions, unit: str):
elif action is ConfigActions.AVAILABLE: elif action is ConfigActions.AVAILABLE:
try: try:
unit_path = systemd_manager.LoadUnit(unit) unit_path = systemd_manager.LoadUnit(unit)
properties = bus.get(".systemd1", unit_path) properties = bus.get("org.freedesktop.systemd1", unit_path)
except Exception: except Exception:
log.warning(f"could not get systemd unit {unit}") log.warning(f"could not get systemd unit {unit}")
return False return False
@@ -414,16 +723,18 @@ def simple_systemd(action: ConfigActions, unit: str):
else: else:
raise ValueError(f"action {action} not supported") raise ValueError(f"action {action} not supported")
return None
if __name__ == "__main__": if __name__ == "__main__":
rc = RevPiConfig() rc = RevPiConfig()
print("Model:", rc.model) print("Model:", rc.model)
print("Serial: ", rc.serial) print("Serial: ", rc.serial)
print("CM Type: ", rc.cm_type.name) print("CM Type: ", rc.cm_type.name)
print("With wifi: ", rc.with_wifi) print("With WLAN: ", rc.with_wlan)
if rc.with_wifi: if rc.with_wlan:
print(" class path: ", rc.class_path_wifi) print(" class path: ", rc.class_path_wlan)
print(" rfkill index: ", get_rfkill_index(rc.class_path_wifi)) print(" rfkill index: ", get_rfkill_index(rc.class_path_wlan))
print("With con-bridge:", rc.with_con_bridge) print("With con-bridge:", rc.with_con_bridge)
config_txt = ConfigTxt() config_txt = ConfigTxt()