docs: translate German comments and docstrings to English

Translate all inline comments, docstrings, and documentation from German to English across the codebase to improve accessibility for international developers.

Fixes #27

Signed-off-by: Nicolai Buchwitz <n.buchwitz@kunbus.com>
This commit is contained in:
Nicolai Buchwitz
2026-02-03 10:16:40 +01:00
committed by Sven Sager
parent 21d8c523ae
commit c26841cf94
17 changed files with 1050 additions and 1086 deletions

View File

@@ -1,16 +1,16 @@
# -*- coding: utf-8 -*-
"""
Stellt alle Klassen fuer den RevolutionPi zur Verfuegung.
Provides all classes for the RevolutionPi.
Webpage: https://revpimodio.org/
Stellt Klassen fuer die einfache Verwendung des Revolution Pis der
KUNBUS GmbH (https://revolution.kunbus.de/) zur Verfuegung. Alle I/Os werden
aus der piCtory Konfiguration eingelesen und mit deren Namen direkt zugreifbar
gemacht. Fuer Gateways sind eigene IOs ueber mehrere Bytes konfigurierbar
Mit den definierten Namen greift man direkt auf die gewuenschten Daten zu.
Auf alle IOs kann der Benutzer Funktionen als Events registrieren. Diese
fuehrt das Modul bei Datenaenderung aus.
Provides classes for easy use of the Revolution Pi from
KUNBUS GmbH (https://revolutionpi.com/) . All I/Os are
read from the piCtory configuration and made directly accessible by their names.
For gateways, custom IOs can be configured across multiple bytes.
With the defined names, the desired data is accessed directly.
The user can register functions as events for all IOs. The module
executes these when data changes.
"""
__all__ = [
"IOEvent",

View File

@@ -45,12 +45,12 @@ def acheck(check_type, **kwargs) -> None:
def consttostr(value) -> str:
"""
Gibt <class 'str'> fuer Konstanten zurueck.
Returns <class 'str'> for constants.
Diese Funktion ist erforderlich, da enum in Python 3.2 nicht existiert.
This function is required because enum does not exist in Python 3.2.
:param value: Konstantenwert
:return: <class 'str'> Name der Konstanten
:param value: Constant value
:return: <class 'str'> Name of the constant
"""
if value == 0:
return "OFF"

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
"""Bildet die App Sektion von piCtory ab."""
"""Maps the App section from piCtory."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"
@@ -8,13 +8,13 @@ from time import gmtime, strptime
class App:
"""Bildet die App Sektion der config.rsc ab."""
"""Maps the App section of config.rsc."""
__slots__ = "name", "version", "language", "layout", "savets"
def __init__(self, app: dict):
"""
Instantiiert die App-Klasse.
Instantiates the App class.
:param app: piCtory Appinformationen
"""
@@ -36,5 +36,5 @@ class App:
except Exception:
self.savets = gmtime(0)
# TODO: Layout untersuchen und anders abbilden
# TODO: Examine layout and map differently
self.layout = app.get("layout", {})

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
"""RevPiModIO Helperklassen und Tools."""
"""RevPiModIO helper classes and tools."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"
@@ -16,22 +16,14 @@ from .io import IOBase
class EventCallback(Thread):
"""Thread fuer das interne Aufrufen von Event-Funktionen.
"""Thread for internal calling of event functions.
Der Eventfunktion, welche dieser Thread aufruft, wird der Thread selber
als Parameter uebergeben. Darauf muss bei der definition der Funktion
geachtet werden z.B. "def event(th):". Bei umfangreichen Funktionen kann
dieser ausgewertet werden um z.B. doppeltes Starten zu verhindern.
Ueber EventCallback.ioname kann der Name des IO-Objekts abgerufen werden,
welches das Event ausgeloest hast. EventCallback.iovalue gibt den Wert des
IO-Objekts zum Ausloesezeitpunkt zurueck.
Der Thread stellt das EventCallback.exit Event als Abbruchbedingung fuer
die aufgerufene Funktion zur Verfuegung.
Durch Aufruf der Funktion EventCallback.stop() wird das exit-Event gesetzt
und kann bei Schleifen zum Abbrechen verwendet werden.
Mit dem .exit() Event auch eine Wartefunktion realisiert
werden: "th.exit.wait(0.5)" - Wartet 500ms oder bricht sofort ab, wenn
fuer den Thread .stop() aufgerufen wird.
The event function that this thread calls will receive the thread itself as a parameter. This must be considered when defining the function, e.g., "def event(th):". For extensive functions, this can be evaluated to prevent duplicate starts.
The name of the IO object can be retrieved via EventCallback.ioname,
which triggered the event. EventCallback.iovalue returns the value of the IO object at the time of triggering.
The thread provides the EventCallback.exit event as an abort condition for the called function.
By calling the EventCallback.stop() function, the exit event is set and can be used to abort loops.
A wait function can also be implemented with the .exit() event: "th.exit.wait(0.5)" - waits 500ms or aborts immediately if .stop() is called on the thread.
while not th.exit.is_set():
# IO-Arbeiten
@@ -44,9 +36,9 @@ class EventCallback(Thread):
"""
Init EventCallback class.
:param func: Funktion die beim Start aufgerufen werden soll
:param func: Function that should be called at startup
:param name: IO-Name
:param value: IO-Value zum Zeitpunkt des Events
:param value: IO value at the time of the event
"""
super().__init__()
self.daemon = True
@@ -56,35 +48,33 @@ class EventCallback(Thread):
self.iovalue = value
def run(self):
"""Ruft die registrierte Funktion auf."""
"""Calls the registered function."""
self.func(self)
def stop(self):
"""Setzt das exit-Event mit dem die Funktion beendet werden kann."""
"""Sets the exit event that can be used to terminate the function."""
self.exit.set()
class Cycletools:
"""
Werkzeugkasten fuer Cycleloop-Funktion.
Toolbox for cycle loop function.
Diese Klasse enthaelt Werkzeuge fuer Zyklusfunktionen, wie Taktmerker
und Flankenmerker.
Zu beachten ist, dass die Flankenmerker beim ersten Zyklus alle den Wert
True haben! Ueber den Merker Cycletools.first kann ermittelt werden,
ob es sich um den ersten Zyklus handelt.
This class contains tools for cycle functions, such as clock flags and edge flags.
Note that all edge flags have the value True on the first cycle! The Cycletools.first
flag can be used to determine if it is the first cycle.
Taktmerker flag1c, flag5c, flag10c, usw. haben den als Zahl angegebenen
Wert an Zyklen jeweils False und True.
Beispiel: flag5c hat 5 Zyklen den Wert False und in den naechsten 5 Zyklen
den Wert True.
Clock flags flag1c, flag5c, flag10c, etc. have the numerically specified
value for the specified number of cycles, alternating between False and True.
Flankenmerker flank5c, flank10c, usw. haben immer im, als Zahl angebenen
Zyklus fuer einen Zyklusdurchlauf den Wert True, sonst False.
Beispiel: flank5c hat immer alle 5 Zyklen den Wert True.
Example: flag5c has the value False for 5 cycles and True for the next 5 cycles.
Diese Merker koennen z.B. verwendet werden um, an Outputs angeschlossene,
Lampen synchron blinken zu lassen.
Edge flags flank5c, flank10c, etc. always have the value True for one cycle
at the numerically specified cycle, otherwise False.
Example: flank5c always has the value True every 5 cycles.
These flags can be used, for example, to make lamps connected to outputs blink synchronously.
"""
__slots__ = (
@@ -129,7 +119,7 @@ class Cycletools:
self.device = revpi_object.device
self.io = revpi_object.io
# Taktmerker
# Clock flags
self.first = True
self.flag1c = False
self.flag5c = False
@@ -138,28 +128,28 @@ class Cycletools:
self.flag20c = False
self.last = False
# Flankenmerker
# Edge flags
self.flank5c = True
self.flank10c = True
self.flank15c = True
self.flank20c = True
# Benutzerdaten
# User data
class Var:
"""Hier remanente Variablen anfuegen."""
"""Add remanent variables here."""
pass
self.var = Var()
def _docycle(self) -> None:
"""Zyklusarbeiten."""
# Einschaltverzoegerung
"""Cycle operations."""
# Turn-off delay
for tof in self.__dict_tof:
if self.__dict_tof[tof] > 0:
self.__dict_tof[tof] -= 1
# Ausschaltverzoegerung
# Turn-on delay
for ton in self.__dict_ton:
if self.__dict_ton[ton][1]:
if self.__dict_ton[ton][0] > 0:
@@ -168,7 +158,7 @@ class Cycletools:
else:
self.__dict_ton[ton][0] = -1
# Impuls
# Pulse
for tp in self.__dict_tp:
if self.__dict_tp[tp][1]:
if self.__dict_tp[tp][0] > 0:
@@ -178,7 +168,7 @@ class Cycletools:
else:
self.__dict_tp[tp][0] = -1
# Flankenmerker
# Edge flags
self.flank5c = False
self.flank10c = False
self.flank15c = False
@@ -188,7 +178,7 @@ class Cycletools:
self.first = False
self.flag1c = not self.flag1c
# Berechnete Flags
# Calculated flags
self.__cycle += 1
if self.__cycle == 5:
self.__ucycle += 1
@@ -239,64 +229,64 @@ class Cycletools:
def get_tof(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
Value of the off-delay.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
:param name: Unique name of the timer
:return: Value <class 'bool'> of the off-delay
"""
return self.__dict_tof.get(name, 0) > 0
def get_tofc(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
Value of the off-delay.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
:param name: Unique name of the timer
:return: Value <class 'bool'> of the off-delay
"""
return self.__dict_tof.get(name, 0) > 0
def set_tof(self, name: str, milliseconds: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
Starts an off-delay timer when called.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Verzoegerung in Millisekunden
:param name: Unique name for accessing the timer
:param milliseconds: Delay in milliseconds
"""
self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
def set_tofc(self, name: str, cycles: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
Starts an off-delay timer when called.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn nicht neu gestartet
:param name: Unique name for accessing the timer
:param cycles: Number of cycles for the delay if not restarted
"""
self.__dict_tof[name] = cycles
def get_ton(self, name: str) -> bool:
"""
Einschaltverzoegerung.
On-delay.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
:param name: Unique name of the timer
:return: Value <class 'bool'> of the on-delay
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def get_tonc(self, name: str) -> bool:
"""
Einschaltverzoegerung.
On-delay.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
:param name: Unique name of the timer
:return: Value <class 'bool'> of the on-delay
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def set_ton(self, name: str, milliseconds: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
Starts an on-delay timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, der Verzoegerung wenn neu gestartet
:param name: Unique name for accessing the timer
:param milliseconds: Milliseconds for the delay if restarted
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True]
@@ -305,10 +295,10 @@ class Cycletools:
def set_tonc(self, name: str, cycles: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
Starts an on-delay timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn neu gestartet
:param name: Unique name for accessing the timer
:param cycles: Number of cycles for the delay if restarted
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [cycles, True]
@@ -317,28 +307,28 @@ class Cycletools:
def get_tp(self, name: str) -> bool:
"""
Impulstimer.
Pulse timer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
:param name: Unique name of the timer
:return: Value <class 'bool'> of the pulse
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def get_tpc(self, name: str) -> bool:
"""
Impulstimer.
Pulse timer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
:param name: Unique name of the timer
:return: Value <class 'bool'> of the pulse
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def set_tp(self, name: str, milliseconds: int) -> None:
"""
Startet einen Impuls Timer.
Starts a pulse timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, die der Impuls anstehen soll
:param name: Unique name for accessing the timer
:param milliseconds: Milliseconds the pulse should be active
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True]
@@ -347,10 +337,10 @@ class Cycletools:
def set_tpc(self, name: str, cycles: int) -> None:
"""
Startet einen Impuls Timer.
Starts a pulse timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, die der Impuls anstehen soll
:param name: Unique name for accessing the timer
:param cycles: Number of cycles the pulse should be active
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [cycles, True]
@@ -371,11 +361,9 @@ class Cycletools:
class ProcimgWriter(Thread):
"""
Klasse fuer Synchroniseriungs-Thread.
Class for synchronization thread.
Diese Klasse wird als Thread gestartet, wenn das Prozessabbild zyklisch
synchronisiert werden soll. Diese Funktion wird hauptsaechlich fuer das
Event-Handling verwendet.
This class is started as a thread if the process image should be synchronized cyclically. This function is mainly used for event handling.
"""
__slots__ = (
@@ -409,7 +397,7 @@ class ProcimgWriter(Thread):
self.newdata = Event()
def __check_change(self, dev) -> None:
"""Findet Aenderungen fuer die Eventueberwachung."""
"""Finds changes for event monitoring."""
for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]:
continue
@@ -435,7 +423,7 @@ class ProcimgWriter(Thread):
else:
self._eventq.put((regfunc, io_event._name, io_event.value), False)
else:
# Verzögertes Event in dict einfügen
# Insert delayed event into dict
tup_fire = (
regfunc,
io_event._name,
@@ -454,7 +442,7 @@ class ProcimgWriter(Thread):
else:
self._eventq.put((regfunc, io_event._name, io_event.value), False)
else:
# Verzögertes Event in dict einfügen
# Insert delayed event into dict
tup_fire = (
regfunc,
io_event._name,
@@ -464,11 +452,11 @@ class ProcimgWriter(Thread):
if regfunc.overwrite or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh)
# Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf)
# Copy the bytes after processing all IOs (lock is still active)
dev._ba_datacp = dev._ba_devdata[:]
def __exec_th(self) -> None:
"""Laeuft als Thread, der Events als Thread startet."""
"""Runs as thread that starts events as threads."""
while self.__eventwork:
try:
tup_fireth = self._eventqth.get(timeout=1)
@@ -480,15 +468,15 @@ class ProcimgWriter(Thread):
def _collect_events(self, value: bool) -> bool:
"""
Aktiviert oder Deaktiviert die Eventueberwachung.
Enables or disables event monitoring.
:param value: True aktiviert / False deaktiviert
:return: True, wenn Anforderung erfolgreich war
:param value: True activates / False deactivates
:return: True, if request was successful
"""
if type(value) != bool:
raise TypeError("value must be <class 'bool'>")
# Nur starten, wenn System läuft
# Only start if system is running
if not self.is_alive():
self.__eventwork = False
return False
@@ -497,12 +485,12 @@ class ProcimgWriter(Thread):
with self.lck_refresh:
self.__eventwork = value
if not value:
# Nur leeren beim deaktivieren
# Only empty when deactivating
self._eventqth = queue.Queue()
self._eventq = queue.Queue()
self.__dict_delay = {}
# Threadmanagement
# Thread management
if value and not self.__eventth.is_alive():
self.__eventth = Thread(target=self.__exec_th)
self.__eventth.daemon = True
@@ -512,14 +500,14 @@ class ProcimgWriter(Thread):
def get_refresh(self) -> int:
"""
Gibt Zykluszeit zurueck.
Returns cycle time.
:return: <class 'int'> Zykluszeit in Millisekunden
:return: <class 'int'> cycle time in milliseconds
"""
return int(self._refresh * 1000)
def run(self):
"""Startet die automatische Prozessabbildsynchronisierung."""
"""Starts automatic process image synchronization."""
fh = self._modio._create_myfh()
mrk_delay = self._refresh
@@ -537,7 +525,7 @@ class ProcimgWriter(Thread):
RuntimeWarning,
)
mrk_delay = self._refresh
# Nur durch cycleloop erreichbar - keine verzögerten Events
# Only reachable through cycleloop - no delayed events
continue
try:
@@ -558,7 +546,7 @@ class ProcimgWriter(Thread):
bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata))
if self._modio._monitoring or dev._shared_procimg:
# Inputs und Outputs in Puffer
# Inputs and outputs in buffer
dev._ba_devdata[:] = bytesbuff[dev._slc_devoff]
if (
self.__eventwork
@@ -567,7 +555,7 @@ class ProcimgWriter(Thread):
):
self.__check_change(dev)
else:
# Inputs in Puffer, Outputs in Prozessabbild
# Inputs in buffer, outputs in process image
dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff]
if (
self.__eventwork
@@ -601,12 +589,12 @@ class ProcimgWriter(Thread):
)
mrk_warn = True
# Alle aufwecken
# Wake all
self.lck_refresh.release()
self.newdata.set()
finally:
# Verzögerte Events prüfen
# Check delayed events
if self.__eventwork:
for tup_fire in tuple(self.__dict_delay.keys()):
if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]:
@@ -614,7 +602,7 @@ class ProcimgWriter(Thread):
else:
self.__dict_delay[tup_fire] -= 1
if self.__dict_delay[tup_fire] <= 0:
# Verzögertes Event übernehmen und löschen
# Accept and delete delayed event
if tup_fire[0].as_thread:
self._eventqth.put(tup_fire, False)
else:
@@ -633,18 +621,18 @@ class ProcimgWriter(Thread):
# Sleep and not .wait (.wait uses system clock)
sleep(self._refresh - mrk_delay)
# Alle am Ende erneut aufwecken
# Wake all again at the end
self._collect_events(False)
self.newdata.set()
fh.close()
def stop(self):
"""Beendet die automatische Prozessabbildsynchronisierung."""
"""Terminates automatic process image synchronization."""
self._work.set()
def set_refresh(self, value):
"""Setzt die Zykluszeit in Millisekunden.
@param value <class 'int'> Millisekunden"""
"""Sets the cycle time in milliseconds.
@param value <class 'int'> Milliseconds"""
if type(value) == int and 5 <= value <= 2000:
self._refresh = value / 1000
else:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
"""RevPiModIO Hauptklasse fuer Netzwerkzugriff."""
"""RevPiModIO main class for network access."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"
@@ -17,16 +17,16 @@ from .errors import DeviceNotFoundError
from .modio import DevSelect, RevPiModIO as _RevPiModIO
from .pictory import DeviceType
# Synchronisierungsbefehl
# Synchronization command
_syssync = b"\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# Disconnectbefehl
_sysexit = b"\x01EX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# DirtyBytes von Server entfernen
_sysdeldirty = b"\x01EY\x00\x00\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x17"
# piCtory Konfiguration laden
# Remove DirtyBytes from server
_sysdeldirty = b"\x01EY\x00\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x17"
# Load piCtory configuration
_syspictory = b"\x01PI\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
_syspictoryh = b"\x01PH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# ReplaceIO Konfiguration laden
# Load ReplaceIO configuration
_sysreplaceio = b"\x01RP\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
_sysreplaceioh = b"\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# Hashvalues
@@ -37,24 +37,23 @@ HEADER_STOP = b"\x17"
class AclException(Exception):
"""Probleme mit Berechtigungen."""
"""Problems with permissions."""
pass
class ConfigChanged(Exception):
"""Aenderung der piCtory oder replace_ios Datei."""
"""Change to the piCtory or replace_ios file."""
pass
class NetFH(Thread):
"""
Netzwerk File Handler fuer das Prozessabbild.
Network file handler for the process image.
Dieses FileObject-like Object verwaltet das Lesen und Schriben des
Prozessabbilds ueber das Netzwerk. Ein entfernter Revolution Pi kann
so gesteuert werden.
This file-object-like object manages reading and writing of the
process image via the network. A remote Revolution Pi can be controlled this way.
"""
__slots__ = (
@@ -84,9 +83,9 @@ class NetFH(Thread):
"""
Init NetFH-class.
:param address: IP Adresse, Port des RevPi als <class 'tuple'>
:param check_replace_ios: Prueft auf Veraenderungen der Datei
:param timeout: Timeout in Millisekunden der Verbindung
:param address: IP address, port of the RevPi as <class 'tuple'>
:param check_replace_ios: Checks for changes to the file
:param timeout: Timeout in milliseconds for the connection
"""
super().__init__()
self.daemon = True
@@ -109,38 +108,38 @@ class NetFH(Thread):
self._address = address
self._serversock = None # type: socket.socket
# Parameterprüfung
# Parameter validation
if not isinstance(address, tuple):
raise TypeError("parameter address must be <class 'tuple'> ('IP', PORT)")
if not isinstance(timeout, int):
raise TypeError("parameter timeout must be <class 'int'>")
# Verbindung herstellen
# Establish connection
self.__set_systimeout(timeout)
self._connect()
if self._serversock is None:
raise FileNotFoundError("can not connect to revpi server")
# NetFH konfigurieren
# Configure NetFH
self.__position = 0
self.start()
def __del__(self):
"""NetworkFileHandler beenden."""
"""Terminate NetworkFileHandler."""
self.close()
def __check_acl(self, bytecode: bytes) -> None:
"""
Pueft ob ACL auf RevPi den Vorgang erlaubt.
Checks if ACL allows the operation on RevPi.
Ist der Vorgang nicht zulässig, wird der Socket sofort geschlossen
und eine Exception geworfen.
If the operation is not permitted, the socket is immediately closed
and an exception is thrown.
:param bytecode: Antwort, die geprueft werden solll
:param bytecode: Response to be checked
"""
if bytecode == b"\x18":
# Alles beenden, wenn nicht erlaubt
# Terminate everything if not permitted
self.__sockend.set()
self.__sockerr.set()
self._serversock.close()
@@ -152,39 +151,39 @@ class NetFH(Thread):
def __set_systimeout(self, value: int) -> None:
"""
Systemfunktion fuer Timeoutberechnung.
System function for timeout calculation.
:param value: Timeout in Millisekunden 100 - 60000
:param value: Timeout in milliseconds 100 - 60000
"""
if isinstance(value, int) and (100 <= value <= 60000):
self.__timeout = value / 1000
# Timeouts in Socket setzen
# Set timeouts in socket
if self._serversock is not None:
self._serversock.settimeout(self.__timeout)
# 45 Prozent vom Timeout für Synctimer verwenden
# Use 45 percent of timeout for sync timer
self.__waitsync = self.__timeout / 100 * 45
else:
raise ValueError("value must between 10 and 60000 milliseconds")
def _connect(self) -> None:
"""Stellt die Verbindung zu einem RevPiPlcServer her."""
# Neuen Socket aufbauen
"""Establishes the connection to a RevPiPlcServer."""
# Build new socket
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
so.connect(self._address)
so.settimeout(self.__timeout)
# Hashwerte anfordern
# Request hash values
recv_len = 16
so.sendall(_syspictoryh)
if self.__check_replace_ios:
so.sendall(_sysreplaceioh)
recv_len += 16
# Hashwerte empfangen mit eigenen Puffern, da nicht gelocked
# Receive hash values with own buffers, as not locked
buff_recv = bytearray(recv_len)
while recv_len > 0:
block = so.recv(recv_len)
@@ -193,7 +192,7 @@ class NetFH(Thread):
buff_recv += block
recv_len -= len(block)
# Änderung an piCtory prüfen
# Check for changes to piCtory
if self.__pictory_h and buff_recv[:16] != self.__pictory_h:
self.__config_changed = True
self.close()
@@ -201,7 +200,7 @@ class NetFH(Thread):
else:
self.__pictory_h = buff_recv[:16]
# Änderung an replace_ios prüfen
# Check for changes to replace_ios
if (
self.__check_replace_ios
and self.__replace_ios_h
@@ -218,7 +217,7 @@ class NetFH(Thread):
except Exception:
so.close()
else:
# Alten Socket trennen
# Disconnect old socket
with self.__socklock:
if self._serversock is not None:
self._serversock.close()
@@ -226,10 +225,10 @@ class NetFH(Thread):
self._serversock = so
self.__sockerr.clear()
# Timeout setzen
# Set timeout
self.set_timeout(int(self.__timeout * 1000))
# DirtyBytes übertragen
# Transfer dirty bytes
for pos in self.__dictdirty:
self.set_dirtybytes(pos, self.__dictdirty[pos])
@@ -285,19 +284,19 @@ class NetFH(Thread):
def clear_dirtybytes(self, position=None) -> None:
"""
Entfernt die konfigurierten Dirtybytes vom RevPi Server.
Removes the configured dirty bytes from the RevPi server.
Diese Funktion wirft keine Exception bei einem uebertragungsfehler,
veranlasst aber eine Neuverbindung.
This function does not throw an exception on transmission error,
but triggers a reconnection.
:param position: Startposition der Dirtybytes
:param position: Start position of the dirty bytes
"""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set():
raise ValueError("I/O operation on closed file")
# Daten immer übernehmen
# Always accept data
if position is None:
self.__dictdirty.clear()
elif position in self.__dictdirty:
@@ -305,10 +304,10 @@ class NetFH(Thread):
try:
if position is None:
# Alle Dirtybytes löschen
# Clear all dirty bytes
buff = self._direct_sr(_sysdeldirty, 1)
else:
# Nur bestimmte Dirtybytes löschen
# Only clear specific dirty bytes
# b CM ii xx c0000000 b = 16
buff = self._direct_sr(
pack(
@@ -322,7 +321,7 @@ class NetFH(Thread):
1,
)
if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen
# Check ACL and throw error if necessary
self.__check_acl(buff)
raise IOError("clear dirtybytes error on network")
@@ -333,14 +332,14 @@ class NetFH(Thread):
self.__sockerr.set()
def close(self) -> None:
"""Verbindung trennen."""
"""Disconnect connection."""
if self.__sockend.is_set():
return
self.__sockend.set()
self.__sockerr.set()
# Vom Socket sauber trennen
# Cleanly disconnect from socket
if self._serversock is not None:
try:
self.__socklock.acquire()
@@ -354,7 +353,7 @@ class NetFH(Thread):
self._serversock.close()
def flush(self) -> None:
"""Schreibpuffer senden."""
"""Send write buffer."""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set():
@@ -380,12 +379,12 @@ class NetFH(Thread):
except Exception:
raise
finally:
# Puffer immer leeren
# Always clear buffer
self.__int_buff = 0
self.__by_buff.clear()
if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen
# Check ACL and throw error if necessary
self.__check_acl(buff)
self.__sockerr.set()
@@ -393,23 +392,23 @@ class NetFH(Thread):
def get_closed(self) -> bool:
"""
Pruefen ob Verbindung geschlossen ist.
Check if connection is closed.
:return: True, wenn Verbindung geschlossen ist
:return: True if connection is closed
"""
return self.__sockend.is_set()
def get_config_changed(self) -> bool:
"""
Pruefen ob RevPi Konfiguration geaendert wurde.
Check if RevPi configuration was changed.
:return: True, wenn RevPi Konfiguration geaendert ist
:return: True if RevPi configuration was changed
"""
return self.__config_changed
def get_name(self) -> str:
"""
Verbindugnsnamen zurueckgeben.
Return connection name.
:return: <class 'str'> IP:PORT
"""
@@ -417,23 +416,23 @@ class NetFH(Thread):
def get_reconnecting(self) -> bool:
"""
Interner reconnect aktiv wegen Netzwerkfehlern.
Internal reconnect active due to network errors.
:return: True, wenn reconnect aktiv
:return: True if reconnect is active
"""
return self.__sockerr.is_set()
def get_timeout(self) -> int:
"""
Gibt aktuellen Timeout zurueck.
Returns current timeout.
:return: <class 'int'> in Millisekunden
:return: <class 'int'> in milliseconds
"""
return int(self.__timeout * 1000)
def ioctl(self, request: int, arg=b"") -> None:
"""
IOCTL Befehle ueber das Netzwerk senden.
Send IOCTL commands via the network.
:param request: Request as <class 'int'>
:param arg: Argument as <class 'byte'>
@@ -451,7 +450,7 @@ class NetFH(Thread):
pack("=c2s2xHI4xc", HEADER_START, b"IC", len(arg), request, HEADER_STOP) + arg, 1
)
if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen
# Check ACL and throw error if necessary
self.__check_acl(buff)
self.__sockerr.set()
@@ -459,10 +458,10 @@ class NetFH(Thread):
def read(self, length: int) -> bytes:
"""
Daten ueber das Netzwerk lesen.
Read data via the network.
:param length: Anzahl der Bytes
:return: Gelesene <class 'bytes'>
:param length: Number of bytes
:return: Read <class 'bytes'>
"""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
@@ -501,9 +500,9 @@ class NetFH(Thread):
def readpictory(self) -> bytes:
"""
Ruft die piCtory Konfiguration ab.
Retrieves the piCtory configuration.
:return: <class 'bytes'> piCtory Datei
:return: <class 'bytes'> piCtory file
"""
if self.__sockend.is_set():
raise ValueError("read of closed file")
@@ -517,7 +516,7 @@ class NetFH(Thread):
def readreplaceio(self) -> bytes:
"""
Ruft die replace_io Konfiguration ab.
Retrieves the replace_io configuration.
:return: <class 'bytes'> replace_io_file
"""
@@ -532,24 +531,24 @@ class NetFH(Thread):
return self._direct_sr(b"", recv_length)
def run(self) -> None:
"""Handler fuer Synchronisierung."""
"""Handler for synchronization."""
state_reconnect = False
while not self.__sockend.is_set():
# Bei Fehlermeldung neu verbinden
# Reconnect on error message
if self.__sockerr.is_set():
if not state_reconnect:
state_reconnect = True
warnings.warn("got a network error and try to reconnect", RuntimeWarning)
self._connect()
if self.__sockerr.is_set():
# Verhindert beim Scheitern 100% CPU last
# Prevents 100% CPU load on failure
self.__sockend.wait(self.__waitsync)
continue
else:
state_reconnect = False
warnings.warn("successfully reconnected after network error", RuntimeWarning)
# Kein Fehler aufgetreten, sync durchführen wenn socket frei
# No error occurred, perform sync if socket is free
if self.__socklock.acquire(blocking=False):
try:
self._serversock.sendall(_syssync)
@@ -573,12 +572,12 @@ class NetFH(Thread):
finally:
self.__socklock.release()
# Warten nach Sync damit Instantiierung funktioniert
# Wait after sync so instantiation works
self.__sockerr.wait(self.__waitsync)
def seek(self, position: int) -> None:
"""Springt an angegebene Position.
@param position An diese Position springen"""
"""Jump to specified position.
@param position Jump to this position"""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set():
@@ -587,20 +586,20 @@ class NetFH(Thread):
def set_dirtybytes(self, position: int, dirtybytes: bytes) -> None:
"""
Konfiguriert Dirtybytes fuer Prozessabbild bei Verbindungsfehler.
Configures dirty bytes for process image on connection error.
Diese Funktion wirft keine Exception bei einem uebertragungsfehler,
veranlasst aber eine Neuverbindung.
This function does not throw an exception on transmission error,
but triggers a reconnection.
:param position: Startposition zum Schreiben
:param dirtybytes: <class 'bytes'> die geschrieben werden sollen
:param position: Start position for writing
:param dirtybytes: <class 'bytes'> to be written
"""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set():
raise ValueError("I/O operation on closed file")
# Daten immer übernehmen
# Always accept data
self.__dictdirty[position] = dirtybytes
try:
@@ -612,7 +611,7 @@ class NetFH(Thread):
)
if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen
# Check ACL and throw error if necessary
self.__check_acl(buff)
raise IOError("set dirtybytes error on network")
@@ -625,14 +624,14 @@ class NetFH(Thread):
def set_timeout(self, value: int) -> None:
"""
Setzt Timeoutwert fuer Verbindung.
Sets timeout value for connection.
:param value: Timeout in Millisekunden
:param value: Timeout in milliseconds
"""
if self.__sockend.is_set():
raise ValueError("I/O operation on closed file")
# Timeoutwert verarbeiten (könnte Exception auslösen)
# Process timeout value (could throw exception)
self.__set_systimeout(value)
try:
@@ -645,7 +644,7 @@ class NetFH(Thread):
def tell(self) -> int:
"""
Gibt aktuelle Position zurueck.
Returns aktuelle Position.
:return: Aktuelle Position
"""
@@ -657,10 +656,10 @@ class NetFH(Thread):
def write(self, bytebuff: bytes) -> int:
"""
Daten ueber das Netzwerk schreiben.
Write data via the network.
:param bytebuff: Bytes zum schreiben
:return: <class 'int'> Anzahl geschriebener bytes
:param bytebuff: Bytes to write
:return: <class 'int'> Number of written bytes
"""
if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed")
@@ -672,14 +671,14 @@ class NetFH(Thread):
with self.__socklock:
self.__int_buff += 1
# Datenblock mit Position und Länge in Puffer ablegen
# Store data block with position and length in buffer
self.__by_buff += (
self.__position.to_bytes(length=2, byteorder="little")
+ len(bytebuff).to_bytes(length=2, byteorder="little")
+ bytebuff
)
# TODO: Bufferlänge und dann flushen?
# TODO: Bufferlänge and dann flushen?
return len(bytebuff)
@@ -692,14 +691,13 @@ class NetFH(Thread):
class RevPiNetIO(_RevPiModIO):
"""
Klasse fuer die Verwaltung der piCtory Konfiguration ueber das Netzwerk.
Class for managing the piCtory configuration via the network.
Diese Klasse uebernimmt die gesamte Konfiguration aus piCtory und bilded
die Devices und IOs ab. Sie uebernimmt die exklusive Verwaltung des
Prozessabbilds und stellt sicher, dass die Daten synchron sind.
Sollten nur einzelne Devices gesteuert werden, verwendet man
RevPiModIOSelected() und uebergibt bei Instantiierung eine Liste mit
Device Positionen oder Device Namen.
This class takes over the entire configuration from piCtory and maps
the devices and IOs. It takes over exclusive management of the
process image and ensures that the data is synchronized.
If only individual devices should be controlled, use
RevPiNetIOSelected() and pass a list with device positions or device names during instantiation.
"""
__slots__ = "_address"
@@ -716,26 +714,26 @@ class RevPiNetIO(_RevPiModIO):
shared_procimg=False,
):
"""
Instantiiert die Grundfunktionen.
Instantiates the basic functions.
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'>
:param autorefresh: Wenn True, alle Devices zu autorefresh hinzufuegen
:param monitoring: In- und Outputs werden gelesen, niemals geschrieben
:param syncoutputs: Aktuell gesetzte Outputs vom Prozessabbild einlesen
:param simulator: Laedt das Modul als Simulator und vertauscht IOs
:param debug: Gibt bei allen Fehlern komplette Meldungen aus
:param replace_io_file: Replace IO Konfiguration aus Datei laden
:param shared_procimg: Share process image with other processes, this
could be insecure for automation
:param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param autorefresh: If True, add all devices to autorefresh
:param monitoring: Inputs and outputs are read, never written
:param syncoutputs: Read currently set outputs from process image
:param simulator: Loads the module as simulator and swaps IOs
:param debug: Output complete messages for all errors
:param replace_io_file: Load replace IO configuration from file
:param shared_procimg: Share process image with other processes, this
could be insecure for automation
"""
check_ip = compile(r"^(25[0-5]|(2[0-4]|[01]?\d|)\d)(\.(25[0-5]|(2[0-4]|[01]?\d|)\d)){3}$")
# Adresse verarbeiten
# Process address
if isinstance(address, str):
self._address = (address, 55234)
elif isinstance(address, tuple):
if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int):
# Werte prüfen
# Check values
if not 0 < address[1] <= 65535:
raise ValueError("port number out of range 1 - 65535")
@@ -748,7 +746,7 @@ class RevPiNetIO(_RevPiModIO):
"like (<class 'str'>, <class 'int'>)"
)
# IP-Adresse prüfen und ggf. auflösen
# Check IP address and resolve if necessary
if check_ip.match(self._address[0]) is None:
try:
ipv4 = socket.gethostbyname(self._address[0])
@@ -772,16 +770,16 @@ class RevPiNetIO(_RevPiModIO):
)
self._set_device_based_cycle_time = False
# Netzwerkfilehandler anlegen
# Create network file handler
self._myfh = self._create_myfh()
# Nur Konfigurieren, wenn nicht vererbt
# Only configure if not inherited
if type(self) == RevPiNetIO:
self._configure(self.get_jconfigrsc())
def _create_myfh(self):
"""
Erstellt NetworkFileObject.
Creates NetworkFileObject.
:return: FileObject
"""
@@ -790,15 +788,15 @@ class RevPiNetIO(_RevPiModIO):
def _get_cpreplaceio(self) -> ConfigParser:
"""
Laed die replace_io Konfiguration ueber das Netzwerk.
Loads the replace_io configuration via the network.
:return: <class 'ConfigParser'> der replace io daten
:return: <class 'ConfigParser'> of the replace io data
"""
# Normale Verwendung über Elternklasse erledigen
# Handle normal usage via parent class
if self._replace_io_file != ":network:":
return super()._get_cpreplaceio()
# Replace IO Daten über das Netzwerk beziehen
# Obtain replace IO data via the network
byte_buff = self._myfh.readreplaceio()
cp = ConfigParser()
@@ -809,12 +807,12 @@ class RevPiNetIO(_RevPiModIO):
return cp
def disconnect(self) -> None:
"""Trennt Verbindungen und beendet autorefresh inkl. alle Threads."""
"""Disconnects connections and terminates autorefresh including all threads."""
self.cleanup()
def exit(self, full=True) -> None:
"""
Beendet mainloop() und optional autorefresh.
Terminates mainloop() and optionally autorefresh.
:ref: :func:`RevPiModIO.exit()`
"""
@@ -825,20 +823,20 @@ class RevPiNetIO(_RevPiModIO):
def get_config_changed(self) -> bool:
"""
Pruefen ob RevPi Konfiguration geaendert wurde.
Check if RevPi configuration was changed.
In diesem Fall ist die Verbindung geschlossen und RevPiNetIO muss
neu instanziert werden.
In this case, the connection is closed and RevPiNetIO must be
reinstantiated.
:return: True, wenn RevPi Konfiguration geaendert ist
:return: True if RevPi configuration was changed
"""
return self._myfh.config_changed
def get_jconfigrsc(self) -> dict:
"""
Laedt die piCotry Konfiguration und erstellt ein <class 'dict'>.
Loads the piCtory configuration and creates a <class 'dict'>.
:return: <class 'dict'> der piCtory Konfiguration
:return: <class 'dict'> of the piCtory configuration
"""
mynh = NetFH(self._address, False)
byte_buff = mynh.readpictory()
@@ -847,20 +845,20 @@ class RevPiNetIO(_RevPiModIO):
def get_reconnecting(self) -> bool:
"""
Interner reconnect aktiv wegen Netzwerkfehlern.
Internal reconnect active due to network errors.
Das Modul versucht intern die Verbindung neu herzustellen. Es ist
kein weiteres Zutun noetig.
The module tries internally to reestablish the connection. No
further action is needed.
:return: True, wenn reconnect aktiv
:return: True if reconnect is active
"""
return self._myfh.reconnecting
def net_cleardefaultvalues(self, device=None) -> None:
"""
Loescht Defaultwerte vom PLC Server.
Clears default values from the PLC server.
:param device: nur auf einzelnes Device anwenden, sonst auf Alle
:param device: Only apply to single device, otherwise to all
"""
if self.monitoring:
raise RuntimeError("can not send default values, while system is in monitoring mode")
@@ -876,12 +874,12 @@ class RevPiNetIO(_RevPiModIO):
def net_setdefaultvalues(self, device=None) -> None:
"""
Konfiguriert den PLC Server mit den piCtory Defaultwerten.
Configures the PLC server with the piCtory default values.
Diese Werte werden auf dem RevPi gesetzt, wenn die Verbindung
unerwartet (Netzwerkfehler) unterbrochen wird.
These values are set on the RevPi if the connection is
unexpectedly interrupted (network error).
:param device: nur auf einzelnes Device anwenden, sonst auf Alle
:param device: Only apply to single device, otherwise to all
"""
if self.monitoring:
raise RuntimeError("can not send default values, while system is in monitoring mode")
@@ -898,25 +896,25 @@ class RevPiNetIO(_RevPiModIO):
listlen = len(lst_io)
if listlen == 1:
# Byteorientierte Outputs direkt übernehmen
# Take byte-oriented outputs directly
dirtybytes += lst_io[0]._defaultvalue
elif listlen > 1:
# Bitorientierte Outputs in ein Byte zusammenfassen
# Combine bit-oriented outputs into one byte
int_byte = 0
lstbyte = lst_io.copy()
lstbyte.reverse()
for bitio in lstbyte:
# Von hinten die bits nach vorne schieben
# Shift the bits from back to front
int_byte <<= 1
if bitio is not None:
int_byte += 1 if bitio._defaultvalue else 0
# Errechneten Int-Wert in ein Byte umwandeln
# Convert calculated int value to a byte
dirtybytes += int_byte.to_bytes(length=1, byteorder="little")
# Dirtybytes an PLC Server senden
# Send dirtybytes to PLC server
self._myfh.set_dirtybytes(dev._offset + dev._slc_out.start, dirtybytes)
config_changed = property(get_config_changed)
@@ -925,12 +923,12 @@ class RevPiNetIO(_RevPiModIO):
class RevPiNetIOSelected(RevPiNetIO):
"""
Klasse fuer die Verwaltung einzelner Devices aus piCtory.
Class for managing individual devices from piCtory.
Diese Klasse uebernimmt nur angegebene Devices der piCtory Konfiguration
und bildet sie inkl. IOs ab. Sie uebernimmt die exklusive Verwaltung des
Adressbereichs im Prozessabbild an dem sich die angegebenen Devices
befinden und stellt sicher, dass die Daten synchron sind.
This class only takes over specified devices from the piCtory configuration
and maps them including IOs. It takes over exclusive management of the
address range in the process image where the specified devices are located
and ensures that the data is synchronized.
"""
__slots__ = ()
@@ -948,15 +946,15 @@ class RevPiNetIOSelected(RevPiNetIO):
shared_procimg=False,
):
"""
Instantiiert nur fuer angegebene Devices die Grundfunktionen.
Instantiates the basic functions only for specified devices.
Der Parameter deviceselection kann eine einzelne
Device Position / einzelner Device Name sein oder eine Liste mit
mehreren Positionen / Namen
The parameter deviceselection can be a single
device position / single device name or a list with
multiple positions / names
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'>
:param deviceselection: Positionsnummer oder Devicename
:ref: :func:`RevPiNetIO.__init__()`
:param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param deviceselection: Position number or device name
:ref: :func:`RevPiNetIO.__init__()`
"""
super().__init__(
address,
@@ -1002,12 +1000,11 @@ class RevPiNetIOSelected(RevPiNetIO):
class RevPiNetIODriver(RevPiNetIOSelected):
"""
Klasse um eigene Treiber fuer die virtuellen Devices zu erstellen.
Class to create custom drivers for virtual devices.
Mit dieser Klasse werden nur angegebene Virtuelle Devices mit RevPiModIO
verwaltet. Bei Instantiierung werden automatisch die Inputs und Outputs
verdreht, um das Schreiben der Inputs zu ermoeglichen. Die Daten koennen
dann ueber logiCAD an den Devices abgerufen werden.
With this class, only specified virtual devices are managed with RevPiModIO.
During instantiation, inputs and outputs are automatically swapped to allow
writing of inputs. The data can then be retrieved from the devices via logiCAD.
"""
__slots__ = ()
@@ -1023,16 +1020,16 @@ class RevPiNetIODriver(RevPiNetIOSelected):
shared_procimg=False,
):
"""
Instantiiert die Grundfunktionen.
Instantiates the basic functions.
Parameter 'monitoring' und 'simulator' stehen hier nicht zur
Verfuegung, da diese automatisch gesetzt werden.
Parameters 'monitoring' and 'simulator' are not available here,
as these are set automatically.
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'>
:param virtdev: Virtuelles Device oder mehrere als <class 'list'>
:ref: :func:`RevPiModIO.__init__()`
:param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param virtdev: Virtual device or multiple as <class 'list'>
:ref: :func:`RevPiModIO.__init__()`
"""
# Parent mit monitoring=False und simulator=True laden
# Load parent with monitoring=False and simulator=True
if type(virtdev) not in (list, tuple):
virtdev = (virtdev,)
dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev)
@@ -1062,7 +1059,7 @@ def run_net_plc(address, func, cycletime=50, replace_io_file=None, debug=True):
rpi.handlesignalend()
return rpi.cycleloop(func, cycletime)
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'>
:param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param func: Function to run every set milliseconds
:param cycletime: Cycle time in milliseconds
:param replace_io_file: Load replace IO configuration from file

View File

@@ -1,18 +1,18 @@
# -*- coding: utf-8 -*-
"""Bildet die Summary-Sektion von piCtory ab."""
"""Maps the Summary section from piCtory."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2"
class Summary:
"""Bildet die Summary-Sektion der config.rsc ab."""
"""Maps the Summary section of config.rsc."""
__slots__ = "inptotal", "outtotal"
def __init__(self, summary: dict):
"""
Instantiiert die RevPiSummary-Klasse.
Instantiates the RevPiSummary class.
:param summary: piCtory Summaryinformationen
"""

View File

@@ -96,7 +96,7 @@ class TestDevicesModule(TestRevPiModIO):
self.assertEqual(33 in rpi.device.virt01, False)
self.assertEqual(552 in rpi.device.virt01, True)
# Löschen
# Delete
del rpi.device.virt01
with self.assertRaises(AttributeError):
rpi.device.virt01

View File

@@ -93,10 +93,10 @@ class TestInitModio(TestRevPiModIO):
self.assertEqual(2, len(rpi.device))
del rpi
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Liste mit einem ungültigen Device als <class 'list'>
# List with an invalid device as <class 'list'>
rpi = revpimodio2.RevPiModIOSelected([32, 10], **defaultkwargs)
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Ungültiges Device als <class 'int'>
# Invalid device as <class 'int'>
rpi = revpimodio2.RevPiModIOSelected(100, **defaultkwargs)
with self.assertRaises(ValueError):
# Ungültiger Devicetype
@@ -116,10 +116,10 @@ class TestInitModio(TestRevPiModIO):
# RevPiModIODriver
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Liste mit einem ungültigen Device als <class 'list'>
# List with an invalid device as <class 'list'>
rpi = revpimodio2.RevPiModIODriver([64, 100], **defaultkwargs)
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Ungültiges Device als <class 'int'>
# Invalid device as <class 'int'>
rpi = revpimodio2.RevPiModIODriver([100], **defaultkwargs)
with self.assertRaises(ValueError):
# Ungültiger Devicetype
@@ -132,7 +132,7 @@ class TestInitModio(TestRevPiModIO):
self.assertEqual(1, len(rpi.device))
del rpi
# Core ios als bits
# Core ios as bits
rpi = self.modio(configrsc="config_core_bits.json")
del rpi

View File

@@ -27,7 +27,7 @@ class TestModioClassBasics(TestRevPiModIO):
self.assertEqual(rpi.app.savets.tm_hour, 12)
del rpi
# Alte config ohne saveTS
# Old config without saveTS
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
rpi = self.modio(configrsc="config_old.rsc")
self.assertIsNone(rpi.app.savets)
@@ -79,7 +79,7 @@ class TestModioClassBasics(TestRevPiModIO):
"""Test interaction with process image."""
rpi = self.modio()
# Procimg IO alle
# Procimg IO all
self.assertIsNone(rpi.setdefaultvalues())
self.assertEqual(rpi.writeprocimg(), True)
self.assertEqual(rpi.syncoutputs(), True)

View File

@@ -19,7 +19,7 @@ class TestCompact(TestRevPiModIO):
self.assertIsInstance(rpi.core, revpimodio2.device.Compact)
# COMPACT LEDs prüfen
# Check COMPACT LEDs
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00")
rpi.core.A1 = revpimodio2.OFF
self.assertEqual(rpi.core.A1, 0)
@@ -44,12 +44,12 @@ class TestCompact(TestRevPiModIO):
with self.assertRaises(ValueError):
rpi.core.A2 = 5
# Spezielle Werte aufrufen
# Call special values
self.assertIsInstance(rpi.core.temperature, int)
self.assertIsInstance(rpi.core.frequency, int)
rpi.core.wd_toggle()
# Directzuweisung nicht erlaubt
# Direct assignment not allowed
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
rpi.core.a1green = True

View File

@@ -20,7 +20,7 @@ class TestFlat(TestRevPiModIO):
self.assertIsInstance(rpi.core, revpimodio2.device.Flat)
# FLAT LEDs prüfen
# Check FLAT LEDs
rpi.core.A1 = revpimodio2.OFF
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00\x00")
self.assertEqual(rpi.core.A1, 0)
@@ -77,12 +77,12 @@ class TestFlat(TestRevPiModIO):
with self.assertRaises(ValueError):
rpi.core.A5 = 5
# Spezielle Werte aufrufen
# Call special values
self.assertIsInstance(rpi.core.temperature, int)
self.assertIsInstance(rpi.core.frequency, int)
rpi.core.wd_toggle()
# Directzuweisung nicht erlaubt
# Direct assignment not allowed
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
rpi.core.a1green = True

View File

@@ -84,7 +84,7 @@ class TestIos(TestRevPiModIO):
"""Testet mehrbittige IOs."""
rpi = self.modio(configrsc="config_supervirt.rsc")
# Adressen und Längen prüfen
# Check addresses and lengths
self.assertEqual(rpi.device[65]._offset, 75)
self.assertEqual(rpi.io.InBit_1.length, 1)

View File

@@ -48,7 +48,7 @@ class TestRevPiConnect(TestRevPiModIO):
with self.assertRaises(ValueError):
rpi.core.A3 = BLUE
# Direkte Zuweisung darf nicht funktionieren
# Direct assignment must not work
with self.assertRaises(AttributeError):
rpi.core.a3green = True
with self.assertRaises(AttributeError):

View File

@@ -97,7 +97,7 @@ class TestRevPiCore(TestRevPiModIO):
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
rpi = self.modio(configrsc="config_old.rsc")
# Errorlimits testen, die es nicht gibt (damals None, jetzt -1)
# Test error limits that don't exist (formerly None, now -1)
self.assertEqual(rpi.core.errorlimit1, -1)
self.assertEqual(rpi.core.errorlimit2, -1)