Docstrings optimieren 1

This commit is contained in:
2019-10-20 22:36:09 +02:00
parent 56da8e4cf6
commit 10af9a01d5
16 changed files with 828 additions and 587 deletions

3
.idea/dictionaries/akira.xml generated Normal file
View File

@@ -0,0 +1,3 @@
<component name="ProjectDictionaryState">
<dictionary name="akira" />
</component>

View File

@@ -1,5 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="PROJECT_PROFILE" value="Default" />
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>

4
.idea/revpimodio2.iml generated
View File

@@ -1,7 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/revpimodio2" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.6" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>

7
docs/revpimodio2.app.rst Normal file
View File

@@ -0,0 +1,7 @@
revpimodio2\.app module
=======================
.. automodule:: revpimodio2.app
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
revpimodio2\.device module
==========================
.. automodule:: revpimodio2.device
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
revpimodio2\.helper module
==========================
.. automodule:: revpimodio2.helper
:members:
:undoc-members:
:show-inheritance:

7
docs/revpimodio2.io.rst Normal file
View File

@@ -0,0 +1,7 @@
revpimodio2\.io module
======================
.. automodule:: revpimodio2.io
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
revpimodio2\.modio module
=========================
.. automodule:: revpimodio2.modio
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
revpimodio2\.netio module
=========================
.. automodule:: revpimodio2.netio
:members:
:undoc-members:
:show-inheritance:

View File

@@ -4,62 +4,15 @@ revpimodio2 package
Submodules
----------
revpimodio2\.app module
-----------------------
.. automodule:: revpimodio2.app
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.device module
--------------------------
.. automodule:: revpimodio2.device
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.helper module
--------------------------
.. automodule:: revpimodio2.helper
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.io module
----------------------
.. automodule:: revpimodio2.io
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.modio module
-------------------------
.. automodule:: revpimodio2.modio
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.netio module
-------------------------
.. automodule:: revpimodio2.netio
:members:
:undoc-members:
:show-inheritance:
revpimodio2\.summary module
---------------------------
.. automodule:: revpimodio2.summary
:members:
:undoc-members:
:show-inheritance:
.. toctree::
revpimodio2.app
revpimodio2.device
revpimodio2.helper
revpimodio2.io
revpimodio2.modio
revpimodio2.netio
revpimodio2.summary
Module contents
---------------

View File

@@ -0,0 +1,7 @@
revpimodio2\.summary module
===========================
.. automodule:: revpimodio2.summary
:members:
:undoc-members:
:show-inheritance:

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Stellt alle Klassen fuer den RevolutionPi zur Verfuegung.
"""
Stellt alle Klassen fuer den RevolutionPi zur Verfuegung.
Webpage: https://revpimodio.org/
@@ -10,11 +11,10 @@ gemacht. Fuer Gateways sind eigene IOs ueber mehrere Bytes konfigurierbar
Mit den definierten Namen greift man direkt auf die gewuenschten Daten zu.
Auf alle IOs kann der Benutzer Funktionen als Events registrieren. Diese
fuehrt das Modul bei Datenaenderung aus.
"""
__all__ = [
"RevPiModIO", "RevPiModIOSelected", "RevPiModIODriver",
"RevPiNetIO", "RevPiNetIOSelected", "RevPiNetIODriver",
"RevPiModIO", "RevPiModIODriver", "RevPiModIOSelected",
"RevPiNetIO", "RevPiNetIODriver", "RevPiNetIOSelected",
"Cycletools",
]
__author__ = "Sven Sager <akira@revpimodio.org>"
@@ -36,27 +36,25 @@ MEM = 302
class DeviceNotFoundError(Exception):
"""Fehler wenn ein Device nicht gefunden wird."""
pass
def acheck(check_type, **kwargs):
"""Check type of given arguments.
def acheck(check_type, **kwargs) -> None:
"""
Check type of given arguments.
Use the argument name as keyword and the argument itself as value.
@param check_type Type to check
@param kwargs Arguments to check
:param check_type: Type to check
:param kwargs: Arguments to check
"""
for var_name in kwargs:
none_okay = var_name.endswith("_noneok")
if not (isinstance(kwargs[var_name], check_type) or
none_okay and kwargs[var_name] is None):
msg = "Argument '{0}' must be {1}{2}".format(
var_name.rstrip("_noneok"), str(check_type),
" or <class 'NoneType'>" if none_okay else ""
@@ -64,14 +62,14 @@ def acheck(check_type, **kwargs):
raise TypeError(msg)
def consttostr(value):
"""Gibt <class 'str'> fuer Konstanten zurueck.
def consttostr(value) -> str:
"""
Gibt <class 'str'> fuer Konstanten zurueck.
Diese Funktion ist erforderlich, da enum in Python 3.2 nicht existiert.
@param value Konstantenwert
@return <class 'str'> Name der Konstanten
:param value: Konstantenwert
:return: <class 'str'> Name der Konstanten
"""
if value == 0:
return "OFF"
@@ -97,5 +95,5 @@ def consttostr(value):
# Benötigte Klassen importieren
from .helper import Cycletools
from .modio import RevPiModIO, RevPiModIOSelected, RevPiModIODriver
from .netio import RevPiNetIO, RevPiNetIOSelected, RevPiNetIODriver
from .modio import RevPiModIO, RevPiModIODriver, RevPiModIOSelected
from .netio import RevPiNetIO, RevPiNetIODriver, RevPiNetIOSelected

View File

@@ -1,27 +1,36 @@
# -*- coding: utf-8 -*-
"""Bildet die App Sektion von piCtory ab."""
from time import strptime
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "LGPLv3"
from time import strptime
class App(object):
"""Bildet die App Sektion der config.rsc ab."""
__slots__ = "name", "version", "language", "layout", "savets"
def __init__(self, app):
"""Instantiiert die App-Klasse.
@param app piCtory Appinformationen"""
"""
Instantiiert die App-Klasse.
:param app: piCtory Appinformationen
"""
self.name = app["name"]
"""Name of creating app"""
self.version = app["version"]
"""Version of creating app"""
self.language = app["language"]
"""Language of creating app"""
# Speicherungszeitpunkt laden, wenn vorhanden
self.savets = app.get("saveTS", None)
"""Timestamp of configuraiton"""
if self.savets is not None:
try:
self.savets = strptime(self.savets, "%Y%m%d%H%M%S")

File diff suppressed because it is too large Load Diff

View File

@@ -1,19 +1,19 @@
# -*- coding: utf-8 -*-
"""RevPiModIO Helperklassen und Tools."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "LGPLv3"
import queue
import warnings
from math import ceil
from threading import Event, Lock, Thread
from timeit import default_timer
from revpimodio2 import RISING, FALLING, BOTH
from revpimodio2 import BOTH, FALLING, RISING
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "LGPLv3"
class EventCallback(Thread):
"""Thread fuer das interne Aufrufen von Event-Funktionen.
Der Eventfunktion, welche dieser Thread aufruft, wird der Thread selber
@@ -34,18 +34,17 @@ class EventCallback(Thread):
while not th.exit.is_set():
# IO-Arbeiten
th.exit.wait(0.5)
"""
__slots__ = "daemon", "exit", "func", "ioname", "iovalue"
def __init__(self, func, name, value):
"""Init EventCallback class.
@param func Funktion die beim Start aufgerufen werden soll
@param name IO-Name
@param value IO-Value zum Zeitpunkt des Events
def __init__(self, func, name: str, value):
"""
Init EventCallback class.
:param func: Funktion die beim Start aufgerufen werden soll
:param name: IO-Name
:param value: IO-Value zum Zeitpunkt des Events
"""
super().__init__()
self.daemon = True
@@ -63,9 +62,9 @@ class EventCallback(Thread):
self.exit.set()
class Cycletools():
"""Werkzeugkasten fuer Cycleloop-Funktion.
class Cycletools:
"""
Werkzeugkasten fuer Cycleloop-Funktion.
Diese Klasse enthaelt Werkzeuge fuer Zyklusfunktionen, wie Taktmerker
und Flankenmerker.
@@ -84,13 +83,12 @@ class Cycletools():
Diese Merker koennen z.B. verwendet werden um, an Outputs angeschlossene,
Lampen synchron blinken zu lassen.
"""
__slots__ = "__cycle", "__cycletime", "__ucycle", \
"__dict_ton", "__dict_tof", "__dict_tp", "first", \
"flag1c", "flag5c", "flag10c", "flag15c", "flag20c", \
"flank5c", "flank10c", "flank15c", "flank20c", "var"
"__dict_ton", "__dict_tof", "__dict_tp", "first", \
"flag1c", "flag5c", "flag10c", "flag15c", "flag20c", \
"flank5c", "flank10c", "flank15c", "flank20c", "var"
def __init__(self, cycletime):
"""Init Cycletools class."""
@@ -117,10 +115,13 @@ class Cycletools():
# Benutzerdaten
class Var:
"""Hier remanente Variablen anfuegen."""
pass
self.var = Var()
def _docycle(self):
def _docycle(self) -> None:
"""Zyklusarbeiten."""
# Einschaltverzoegerung
for tof in self.__dict_tof:
@@ -174,54 +175,66 @@ class Cycletools():
self.flag5c = not self.flag5c
self.__cycle = 0
def get_tof(self, name):
"""Wert der Ausschaltverzoegerung.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> der Ausschaltverzoegerung"""
def get_tof(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
"""
return self.__dict_tof.get(name, 0) > 0
def get_tofc(self, name):
"""Wert der Ausschaltverzoegerung.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> der Ausschaltverzoegerung"""
def get_tofc(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
"""
return self.__dict_tof.get(name, 0) > 0
def set_tof(self, name, milliseconds):
"""Startet bei Aufruf einen ausschaltverzoegerten Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param milliseconds Verzoegerung in Millisekunden
def set_tof(self, name: str, milliseconds: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Verzoegerung in Millisekunden
"""
self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
def set_tofc(self, name, cycles):
"""Startet bei Aufruf einen ausschaltverzoegerten Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param cycles Zyklusanzahl, der Verzoegerung wenn nicht neu gestartet
def set_tofc(self, name: str, cycles: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn nicht neu gestartet
"""
self.__dict_tof[name] = cycles
def get_ton(self, name):
"""Einschaltverzoegerung.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> der Einschaltverzoegerung"""
def get_ton(self, name: str) -> bool:
"""
Einschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def get_tonc(self, name):
"""Einschaltverzoegerung.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> der Einschaltverzoegerung"""
def get_tonc(self, name: str) -> bool:
"""
Einschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def set_ton(self, name, milliseconds):
"""Startet einen einschaltverzoegerten Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param milliseconds Millisekunden, der Verzoegerung wenn neu gestartet
def set_ton(self, name: str, milliseconds: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, der Verzoegerung wenn neu gestartet
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = \
@@ -229,36 +242,42 @@ class Cycletools():
else:
self.__dict_ton[name][1] = True
def set_tonc(self, name, cycles):
"""Startet einen einschaltverzoegerten Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param cycles Zyklusanzahl, der Verzoegerung wenn neu gestartet
def set_tonc(self, name: str, cycles: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn neu gestartet
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [cycles, True]
else:
self.__dict_ton[name][1] = True
def get_tp(self, name):
"""Impulstimer.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> des Impulses"""
def get_tp(self, name: str) -> bool:
"""
Impulstimer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def get_tpc(self, name):
"""Impulstimer.
@param name Eindeutiger Name des Timers
@return Wert <class 'bool'> des Impulses"""
def get_tpc(self, name: str) -> bool:
"""
Impulstimer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def set_tp(self, name, milliseconds):
"""Startet einen Impuls Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param milliseconds Millisekunden, die der Impuls anstehen soll
def set_tp(self, name: str, milliseconds: int) -> None:
"""
Startet einen Impuls Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, die der Impuls anstehen soll
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = \
@@ -266,12 +285,12 @@ class Cycletools():
else:
self.__dict_tp[name][1] = True
def set_tpc(self, name, cycles):
"""Startet einen Impuls Timer.
@param name Eindeutiger Name fuer Zugriff auf Timer
@param cycles Zyklusanzahl, die der Impuls anstehen soll
def set_tpc(self, name: str, cycles: int) -> None:
"""
Startet einen Impuls Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, die der Impuls anstehen soll
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [cycles, True]
@@ -280,22 +299,20 @@ class Cycletools():
class ProcimgWriter(Thread):
"""Klasse fuer Synchroniseriungs-Thread.
"""
Klasse fuer Synchroniseriungs-Thread.
Diese Klasse wird als Thread gestartet, wenn das Prozessabbild zyklisch
synchronisiert werden soll. Diese Funktion wird hauptsaechlich fuer das
Event-Handling verwendet.
"""
__slots__ = "__dict_delay", "__eventth", "_eventqth", "__eventwork", \
"_adjwait", "_eventq", "_modio", \
"_refresh", "_work", "daemon", "lck_refresh", "newdata"
"_adjwait", "_eventq", "_modio", \
"_refresh", "_work", "daemon", "lck_refresh", "newdata"
def __init__(self, parentmodio):
"""Init ProcimgWriter class.
@param parentmodio Parent Object"""
"""Init ProcimgWriter class."""
super().__init__()
self.__dict_delay = {}
self.__eventth = Thread(target=self.__exec_th)
@@ -311,7 +328,7 @@ class ProcimgWriter(Thread):
self.lck_refresh = Lock()
self.newdata = Event()
def __check_change(self, dev):
def __check_change(self, dev) -> None:
"""Findet Aenderungen fuer die Eventueberwachung."""
for io_event in dev._dict_events:
@@ -386,7 +403,7 @@ class ProcimgWriter(Thread):
# Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf)
dev._ba_datacp = dev._ba_devdata[:]
def __exec_th(self):
def __exec_th(self) -> None:
"""Laeuft als Thread, der Events als Thread startet."""
while self.__eventwork:
try:
@@ -398,10 +415,13 @@ class ProcimgWriter(Thread):
except queue.Empty:
pass
def _collect_events(self, value):
"""Aktiviert oder Deaktiviert die Eventueberwachung.
@param value True aktiviert / False deaktiviert
@return True, wenn Anforderung erfolgreich war"""
def _collect_events(self, value: bool) -> bool:
"""
Aktiviert oder Deaktiviert die Eventueberwachung.
:param value: True aktiviert / False deaktiviert
:return: True, wenn Anforderung erfolgreich war
"""
if type(value) != bool:
raise TypeError("value must be <class 'bool'>")
@@ -427,9 +447,12 @@ class ProcimgWriter(Thread):
return True
def get_refresh(self):
"""Gibt Zykluszeit zurueck.
@return <class 'int'> Zykluszeit in Millisekunden"""
def get_refresh(self) -> int:
"""
Gibt Zykluszeit zurueck.
:return: <class 'int'> Zykluszeit in Millisekunden
"""
return int(self._refresh * 1000)
def run(self):
@@ -472,7 +495,7 @@ class ProcimgWriter(Thread):
with dev._filelock:
dev._ba_devdata[dev._slc_inp] = \
bytesbuff[dev._slc_inpoff]
if self.__eventwork\
if self.__eventwork \
and len(dev._dict_events) > 0 \
and dev._ba_datacp != dev._ba_devdata:
self.__check_change(dev)

View File

@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-
"""RevPiModIO Modul fuer die Verwaltung der IOs."""
import struct
from re import match as rematch
from threading import Event
from revpimodio2 import BOTH, FALLING, INP, MEM, RISING, consttostr
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "LGPLv3"
import struct
from re import match as rematch
from threading import Event
from revpimodio2 import RISING, FALLING, BOTH, INP, MEM, consttostr
try:
# Funktioniert nur auf Unix
from fcntl import ioctl
@@ -16,7 +18,6 @@ except Exception:
class IOEvent(object):
"""Basisklasse fuer IO-Events."""
__slots__ = "as_thread", "delay", "edge", "func", "overwrite", "prefire"
@@ -32,7 +33,6 @@ class IOEvent(object):
class IOList(object):
"""Basisklasse fuer direkten Zugriff auf IO Objekte."""
def __init__(self):
@@ -41,17 +41,23 @@ class IOList(object):
self.__dict_iorefname = {}
def __contains__(self, key):
"""Prueft ob IO existiert.
@param key IO-Name <class 'str'> oder Bytenummer <class 'int'>
@return True, wenn IO vorhanden / Byte belegt"""
"""
Prueft ob IO existiert.
:param key: IO-Name <class 'str'> oder Bytenummer <class 'int'>
:return: True, wenn IO vorhanden / Byte belegt
"""
if type(key) == int:
return len(self.__dict_iobyte.get(key, [])) > 0
else:
return hasattr(self, key) and type(getattr(self, key)) != DeadIO
def __delattr__(self, key):
"""Entfernt angegebenen IO.
@param key IO zum entfernen"""
"""
Entfernt angegebenen IO.
:param key: IO zum entfernen
"""
io_del = object.__getattribute__(self, key)
# Alte Events vom Device löschen
@@ -70,16 +76,20 @@ class IOList(object):
io_del._parentdevice._update_my_io_list()
def __getattr__(self, key):
"""Verwaltet geloeschte IOs (Attribute, die nicht existieren).
@param key Name oder Byte eines alten IOs
@return Alten IO, wenn in Ref-Listen"""
"""
Verwaltet geloeschte IOs (Attribute, die nicht existieren).
:param key: Name oder Byte eines alten IOs
:return: Alten IO, wenn in Ref-Listen
"""
if key in self.__dict_iorefname:
return self.__dict_iorefname[key]
else:
raise AttributeError("can not find io '{0}'".format(key))
def __getitem__(self, key):
"""Ruft angegebenen IO ab.
"""
Ruft angegebenen IO ab.
Wenn der Key <class 'str'> ist, wird ein einzelner IO geliefert. Wird
der Key als <class 'int'> uebergeben, wird eine <class 'list'>
@@ -87,9 +97,8 @@ class IOList(object):
Wird als Key <class 'slice'> gegeben, werden die Listen in einer Liste
zurueckgegeben.
@param key IO Name als <class 'str> oder Byte als <class 'int'>.
@return IO Objekt oder Liste der IOs
:param key: IO Name als <class 'str> oder Byte als <class 'int'>.
:return: IO Objekt oder Liste der IOs
"""
if type(key) == int:
if key not in self.__dict_iobyte:
@@ -106,16 +115,22 @@ class IOList(object):
return getattr(self, key)
def __iter__(self):
"""Gibt Iterator aller IOs zurueck.
@return Iterator aller IOs"""
"""
Gibt Iterator aller IOs zurueck.
:return: Iterator aller IOs
"""
for int_io in sorted(self.__dict_iobyte):
for io in self.__dict_iobyte[int_io]:
if io is not None:
yield io
def __len__(self):
"""Gibt die Anzahl aller IOs zurueck.
@return Anzahl aller IOs"""
"""
Gibt die Anzahl aller IOs zurueck.
:return: Anzahl aller IOs
"""
int_ios = 0
for int_io in self.__dict_iobyte:
for io in self.__dict_iobyte[int_io]:
@@ -128,17 +143,19 @@ class IOList(object):
if key in (
"_IOList__dict_iobyte",
"_IOList__dict_iorefname"
):
):
object.__setattr__(self, key, value)
else:
raise AttributeError(
"direct assignment is not supported - use .value Attribute"
)
def __private_replace_oldio_with_newio(self, io):
"""Ersetzt bestehende IOs durch den neu Registrierten.
@param io Neuer IO der eingefuegt werden soll"""
def __private_replace_oldio_with_newio(self, io) -> None:
"""
Ersetzt bestehende IOs durch den neu Registrierten.
:param io: Neuer IO der eingefuegt werden soll
"""
# Scanbereich festlegen
if io._bitaddress < 0:
scan_start = io.address
@@ -192,12 +209,15 @@ class IOList(object):
io._defaultvalue = calc_defaultvalue
else:
io._defaultvalue = bool(io._parentio_defaultvalue[
io._parentio_address - io.address
] & (1 << io._bitaddress))
io._parentio_address - io.address
] & (1 << io._bitaddress))
def _private_register_new_io_object(self, new_io):
"""Registriert neues IO Objekt unabhaenging von __setattr__.
@param new_io Neues IO Objekt"""
def _private_register_new_io_object(self, new_io) -> None:
"""
Registriert neues IO Objekt unabhaenging von __setattr__.
:param new_io: Neues IO Objekt
"""
if isinstance(new_io, IOBase):
if hasattr(self, new_io._name):
raise AttributeError(
@@ -229,27 +249,32 @@ class IOList(object):
class DeadIO(object):
"""Klasse, mit der ersetzte IOs verwaltet werden."""
__slots__ = "__deadio"
def __init__(self, deadio):
"""Instantiierung der DeadIO-Klasse.
@param deadio IO, der ersetzt wurde"""
"""
Instantiierung der DeadIO-Klasse.
:param deadio: IO, der ersetzt wurde
"""
self.__deadio = deadio
def replace_io(self, name, frm, **kwargs):
"""Stellt Funktion fuer weiter Bit-Ersetzungen bereit.
@see #IntIOReplaceable.replace_io replace_io(...)"""
def replace_io(self, name: str, frm: str, **kwargs) -> None:
"""
Stellt Funktion fuer weiter Bit-Ersetzungen bereit.
:ref: :func:IntIOReplaceable.replace_io()
"""
self.__deadio.replace_io(name, frm, **kwargs)
_parentdevice = property(lambda self: None)
class IOBase(object):
"""Basisklasse fuer alle IO-Objekte.
"""
Basisklasse fuer alle IO-Objekte.
Die Basisfunktionalitaet ermoeglicht das Lesen und Schreiben der Werte
als <class bytes'> oder <class 'bool'>. Dies entscheidet sich bei der
@@ -259,24 +284,23 @@ class IOBase(object):
Diese Klasse dient als Basis fuer andere IO-Klassen mit denen die Werte
auch als <class 'int'> verwendet werden koennen.
"""
__slots__ = "__bit_ioctl_off", "__bit_ioctl_on", \
"_bitaddress", "_bitlength", "_byteorder", "_defaultvalue", \
"_iotype", "_length", "_name", "_parentdevice", \
"_signed", "_slc_address", "bmk", "export"
"_bitaddress", "_bitlength", "_byteorder", "_defaultvalue", \
"_iotype", "_length", "_name", "_parentdevice", \
"_signed", "_slc_address", "bmk", "export"
def __init__(self, parentdevice, valuelist, iotype, byteorder, signed):
"""Instantiierung der IOBase-Klasse.
def __init__(self, parentdevice, valuelist: list, iotype: int, byteorder: str, signed: bool):
"""
Instantiierung der IOBase-Klasse.
@param parentdevice Parentdevice auf dem der IO liegt
@param valuelist Datenliste fuer Instantiierung
:param parentdevice: Parentdevice auf dem der IO liegt
:param valuelist: Datenliste fuer Instantiierung
["name","defval","bitlen","startaddrdev",exp,"idx","bmk","bitaddr"]
@param iotype <class 'int'> Wert
@param byteorder Byteorder 'little'/'big' fuer <class 'int'> Berechnung
@param sigend Intberechnung mit Vorzeichen durchfuehren
:param iotype: <class 'int'> Wert
:param byteorder: Byteorder 'little'/'big' fuer <class 'int'> Berechnung
:param signed: Intberechnung mit Vorzeichen durchfuehren
"""
# ["name","defval","bitlen","startaddrdev",exp,"idx","bmk","bitaddr"]
# [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ]
@@ -358,8 +382,11 @@ class IOBase(object):
self.__bit_ioctl_on = self.__bit_ioctl_off + b'\x01'
def __bool__(self):
"""<class 'bool'>-Wert der Klasse.
@return <class 'bool'> Nur False wenn False oder 0 sonst True"""
"""
<class 'bool'>-Wert der Klasse.
:return: <class 'bool'> Nur False wenn False oder 0 sonst True
"""
if self._bitaddress >= 0:
int_byte = int.from_bytes(
self._parentdevice._ba_devdata[self._slc_address],
@@ -368,28 +395,34 @@ class IOBase(object):
return bool(int_byte & 1 << self._bitaddress)
else:
return self._parentdevice._ba_devdata[self._slc_address] != \
bytearray(self._length)
bytearray(self._length)
def __len__(self):
"""Gibt die Bytelaenge des IO zurueck.
@return Bytelaenge des IO - 0 bei BITs"""
"""
Gibt die Bytelaenge des IO zurueck.
:return: Bytelaenge des IO - 0 bei BITs
"""
return 0 if self._bitaddress > 0 else self._length
def __str__(self):
"""<class 'str'>-Wert der Klasse.
@return Namen des IOs"""
"""
<class 'str'>-Wert der Klasse.
:return: Namen des IOs
"""
return self._name
def __reg_xevent(self, func, delay, edge, as_thread, overwrite, prefire):
"""Verwaltet reg_event und reg_timerevent.
@param func Funktion die bei Aenderung aufgerufen werden soll
@param delay Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung
@param edge Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
@param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren
@param overwrite Wenn True, wird Event bei ueberschrieben
@param prefire Ausloesen mit aktuellem Wert, wenn mainloop startet
def __reg_xevent(self, func, delay: int, edge: int, as_thread: bool, overwrite: bool, prefire: bool) -> None:
"""
Verwaltet reg_event und reg_timerevent.
:param func: Funktion die bei Aenderung aufgerufen werden soll
:param delay: Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung
:param edge: Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
:param as_thread: Bei True, Funktion als EventCallback-Thread ausfuehren
:param overwrite: Wenn True, wird Event bei ueberschrieben
:param prefire: Ausloesen mit aktuellem Wert, wenn mainloop startet
"""
# Prüfen ob Funktion callable ist
if not callable(func):
@@ -449,29 +482,44 @@ class IOBase(object):
IOEvent(func, edge, as_thread, delay, overwrite, prefire)
)
def _get_address(self):
"""Gibt die absolute Byteadresse im Prozessabbild zurueck.
@return Absolute Byteadresse"""
def _get_address(self) -> int:
"""
Gibt die absolute Byteadresse im Prozessabbild zurueck.
:return: Absolute Byteadresse
"""
return self._parentdevice._offset + self._slc_address.start
def _get_byteorder(self):
"""Gibt konfigurierte Byteorder zurueck.
@return <class 'str'> Byteorder"""
def _get_byteorder(self) -> str:
"""
Gibt konfigurierte Byteorder zurueck.
:return: <class 'str'> Byteorder
"""
return self._byteorder
def _get_iotype(self):
"""Gibt io type zurueck.
@return <class 'int'> io type"""
def _get_iotype(self) -> int:
"""
Gibt io type zurueck.
:return: <class 'int'> io type
"""
return self._iotype
def get_defaultvalue(self):
"""Gibt die Defaultvalue von piCtory zurueck.
@return Defaultvalue als <class 'byte'> oder <class 'bool'>"""
"""
Gibt die Defaultvalue von piCtory zurueck.
:return: Defaultvalue als <class 'byte'> oder <class 'bool'>
"""
return self._defaultvalue
def get_value(self):
"""Gibt den Wert des IOs zurueck.
@return IO-Wert als <class 'bytes'> oder <class 'bool'>"""
"""
Gibt den Wert des IOs zurueck.
:return: IO-Wert als <class 'bytes'> oder <class 'bool'>
"""
if self._bitaddress >= 0:
int_byte = int.from_bytes(
self._parentdevice._ba_devdata[self._slc_address],
@@ -483,7 +531,8 @@ class IOBase(object):
def reg_event(
self, func, delay=0, edge=BOTH, as_thread=False, prefire=False):
"""Registriert fuer IO ein Event bei der Eventueberwachung.
"""
Registriert fuer IO ein Event bei der Eventueberwachung.
Die uebergebene Funktion wird ausgefuehrt, wenn sich der IO Wert
aendert. Mit Angabe von optionalen Parametern kann das
@@ -492,17 +541,17 @@ class IOBase(object):
HINWEIS: Die delay-Zeit muss in die .cycletime passen, ist dies nicht
der Fall, wird IMMER aufgerundet!
@param func Funktion die bei Aenderung aufgerufen werden soll
@param delay Verzoegerung in ms zum Ausloesen wenn Wert gleich bleibt
@param edge Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
@param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren
@param prefire Ausloesen mit aktuellem Wert, wenn mainloop startet
:param func: Funktion die bei Aenderung aufgerufen werden soll
:param delay; Verzoegerung in ms zum Ausloesen wenn Wert gleich bleibt
:param edge: Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
:param as_thread: Bei True, Funktion als EventCallback-Thread ausfuehren
:param prefire: Ausloesen mit aktuellem Wert, wenn mainloop startet
"""
self.__reg_xevent(func, delay, edge, as_thread, True, prefire)
def reg_timerevent(self, func, delay, edge=BOTH, as_thread=False):
"""Registriert fuer IO einen Timer, welcher nach delay func ausfuehrt.
"""
Registriert fuer IO einen Timer, welcher nach delay func ausfuehrt.
Der Timer wird gestartet, wenn sich der IO Wert aendert und fuehrt die
uebergebene Funktion aus - auch wenn sich der IO Wert in der
@@ -514,17 +563,19 @@ class IOBase(object):
HINWEIS: Die delay-Zeit muss in die .cycletime passen, ist dies nicht
der Fall, wird IMMER aufgerundet!
@param func Funktion die bei Aenderung aufgerufen werden soll
@param delay Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung
@param edge Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
@param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren
:param func: Funktion die bei Aenderung aufgerufen werden soll
:param delay: Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung
:param edge: Ausfuehren bei RISING, FALLING or BOTH Wertaenderung
:param as_thread: Bei True, Funktion als EventCallback-Thread ausfuehren
"""
self.__reg_xevent(func, delay, edge, as_thread, False, False)
def set_value(self, value):
"""Setzt den Wert des IOs.
@param value IO-Wert als <class bytes'> oder <class 'bool'>"""
def set_value(self, value) -> None:
"""
Setzt den Wert des IOs.
:param value: IO-Wert als <class bytes'> oder <class 'bool'>
"""
if self._iotype == INP:
if self._parentdevice._modio._simulator:
raise RuntimeError(
@@ -643,12 +694,12 @@ class IOBase(object):
else:
self._parentdevice._ba_devdata[self._slc_address] = value
def unreg_event(self, func=None, edge=None):
"""Entfernt ein Event aus der Eventueberwachung.
@param func Nur Events mit angegebener Funktion
@param edge Nur Events mit angegebener Funktion und angegebener Edge
def unreg_event(self, func=None, edge=None) -> None:
"""
Entfernt ein Event aus der Eventueberwachung.
:param func: Nur Events mit angegebener Funktion
:param edge: Nur Events mit angegebener Funktion und angegebener Edge
"""
if self in self._parentdevice._dict_events:
if func is None:
@@ -659,7 +710,6 @@ class IOBase(object):
for regfunc in self._parentdevice._dict_events[self]:
if regfunc.func != func or edge is not None \
and regfunc.edge != edge:
newlist.append(regfunc)
# Wenn Funktionen übrig bleiben, diese übernehmen
@@ -669,8 +719,9 @@ class IOBase(object):
else:
del self._parentdevice._dict_events[self]
def wait(self, edge=BOTH, exitevent=None, okvalue=None, timeout=0):
"""Wartet auf Wertaenderung eines IOs.
def wait(self, edge=BOTH, exitevent=None, okvalue=None, timeout=0) -> int:
"""
Wartet auf Wertaenderung eines IOs.
Die Wertaenderung wird immer uerberprueft, wenn fuer Devices
mit aktiviertem autorefresh neue Daten gelesen wurden.
@@ -696,19 +747,18 @@ class IOBase(object):
der autorefresh Funktion berechnet, entspricht also nicht exakt den
angegeben Millisekunden! Es wird immer nach oben gerundet!)
@param edge Flanke RISING, FALLING, BOTH die eintreten muss
@param exitevent <class 'thrading.Event'> fuer vorzeitiges Beenden
@param okvalue IO-Wert, bei dem das Warten sofort beendet wird
@param timeout Zeit in ms nach der abgebrochen wird
@return <class 'int'> erfolgreich Werte <= 0
- Erfolgreich gewartet
Wert 0: IO hat den Wert gewechselt
Wert -1: okvalue stimmte mit IO ueberein
- Fehlerhaft gewartet
Wert 1: exitevent wurde gesetzt
Wert 2: timeout abgelaufen
Wert 100: Devicelist.exit() wurde aufgerufen
:param edge: Flanke RISING, FALLING, BOTH die eintreten muss
:param exitevent: <class 'thrading.Event'> fuer vorzeitiges Beenden
:param okvalue: IO-Wert, bei dem das Warten sofort beendet wird
:param timeout: Zeit in ms nach der abgebrochen wird
:return: <class 'int'> erfolgreich Werte <= 0
* Erfolgreich gewartet
** Wert 0: IO hat den Wert gewechselt
** Wert -1: okvalue stimmte mit IO ueberein
* Fehlerhaft gewartet
** Wert 1: exitevent wurde gesetzt
** Wert 2: timeout abgelaufen
** Wert 100: Devicelist.exit() wurde aufgerufen
"""
# Prüfen ob Device in autorefresh ist
if not self._parentdevice._selfupdate:
@@ -791,68 +841,89 @@ class IOBase(object):
class IntIO(IOBase):
"""Klasse fuer den Zugriff auf die Daten mit Konvertierung in int.
"""
Klasse fuer den Zugriff auf die Daten mit Konvertierung in int.
Diese Klasse erweitert die Funktion von <class 'IOBase'> um Funktionen,
ueber die mit <class 'int'> Werten gearbeitet werden kann. Fuer die
Umwandlung koennen 'Byteorder' (Default 'little') und 'signed' (Default
False) als Parameter gesetzt werden.
@see #IOBase IOBase
:ref:`IOBase`
"""
__slots__ = ()
def __int__(self):
"""Gibt IO-Wert zurueck mit Beachtung byteorder/signed.
@return IO-Wert als <class 'int'>"""
"""
Gibt IO-Wert zurueck mit Beachtung byteorder/signed.
:return: IO-Wert als <class 'int'>
"""
return int.from_bytes(
self._parentdevice._ba_devdata[self._slc_address],
byteorder=self._byteorder,
signed=self._signed
)
def _get_signed(self):
"""Ruft ab, ob der Wert Vorzeichenbehaftet behandelt werden soll.
@return True, wenn Vorzeichenbehaftet"""
def _get_signed(self) -> bool:
"""
Ruft ab, ob der Wert Vorzeichenbehaftet behandelt werden soll.
:return: True, wenn Vorzeichenbehaftet
"""
return self._signed
def _set_byteorder(self, value):
"""Setzt Byteorder fuer <class 'int'> Umwandlung.
@param value <class 'str'> 'little' or 'big'"""
def _set_byteorder(self, value: str) -> None:
"""
Setzt Byteorder fuer <class 'int'> Umwandlung.
:param value: <class 'str'> 'little' or 'big'
"""
if not (value == "little" or value == "big"):
raise ValueError("byteorder must be 'little' or 'big'")
if self._byteorder != value:
self._byteorder = value
self._defaultvalue = self._defaultvalue[::-1]
def _set_signed(self, value):
"""Left fest, ob der Wert Vorzeichenbehaftet behandelt werden soll.
@param value True, wenn mit Vorzeichen behandel"""
def _set_signed(self, value: bool) -> None:
"""
Left fest, ob der Wert Vorzeichenbehaftet behandelt werden soll.
:param value: True, wenn mit Vorzeichen behandel
"""
if type(value) != bool:
raise TypeError("signed must be <class 'bool'> True or False")
self._signed = value
def get_intdefaultvalue(self):
"""Gibt die Defaultvalue als <class 'int'> zurueck.
@return <class 'int'> Defaultvalue"""
def get_intdefaultvalue(self) -> int:
"""
Gibt die Defaultvalue als <class 'int'> zurueck.
:return: <class 'int'> Defaultvalue
"""
return int.from_bytes(
self._defaultvalue, byteorder=self._byteorder, signed=self._signed
)
def get_intvalue(self):
"""Gibt IO-Wert zurueck mit Beachtung byteorder/signed.
@return IO-Wert als <class 'int'>"""
def get_intvalue(self) -> int:
"""
Gibt IO-Wert zurueck mit Beachtung byteorder/signed.
:return: IO-Wert als <class 'int'>
"""
return int.from_bytes(
self._parentdevice._ba_devdata[self._slc_address],
byteorder=self._byteorder,
signed=self._signed
)
def set_intvalue(self, value):
"""Setzt IO mit Beachtung byteorder/signed.
@param value <class 'int'> Wert"""
def set_intvalue(self, value: int) -> None:
"""
Setzt IO mit Beachtung byteorder/signed.
:param value: <class 'int'> Wert
"""
if type(value) == int:
self.set_value(value.to_bytes(
self._length,
@@ -872,19 +943,18 @@ class IntIO(IOBase):
class IntIOCounter(IntIO):
"""Erweitert die IntIO-Klasse um die .reset() Funktion fuer Counter."""
__slots__ = ("__ioctl_arg")
__slots__ = ("__ioctl_arg", )
def __init__(
self, counter_id,
parentdevice, valuelist, iotype, byteorder, signed):
"""Instantiierung der IntIOCounter-Klasse.
@param counter_id ID fuer den Counter, zu dem der IO gehoert (0-15)
@see #IOBase.__init__ IOBase.__init__(...)
"""
Instantiierung der IntIOCounter-Klasse.
:param counter_id: ID fuer den Counter, zu dem der IO gehoert (0-15)
:ref: :func:`IOBase.__init__(...)`
"""
if not isinstance(counter_id, int):
raise TypeError("counter_id must be <class 'int'>")
@@ -912,7 +982,7 @@ class IntIOCounter(IntIO):
# Basisklasse laden
super().__init__(parentdevice, valuelist, iotype, byteorder, signed)
def reset(self):
def reset(self) -> None:
"""Setzt den Counter des Inputs zurueck."""
if self._parentdevice._modio._monitoring:
raise RuntimeError(
@@ -957,13 +1027,13 @@ class IntIOCounter(IntIO):
class IntIOReplaceable(IntIO):
"""Erweitert die IntIO-Klasse um die .replace_io Funktion."""
__slots__ = ()
def replace_io(self, name, frm, **kwargs):
"""Ersetzt bestehenden IO mit Neuem.
def replace_io(self, name: str, frm: str, **kwargs) -> None:
"""
Ersetzt bestehenden IO mit Neuem.
Wenn die kwargs fuer byteorder und defaultvalue nicht angegeben werden,
uebernimmt das System die Daten aus dem ersetzten IO.
@@ -983,9 +1053,9 @@ class IntIOReplaceable(IntIO):
der urspruenglige IO hat, werden die nachfolgenden IOs ebenfalls
verwendet und entfernt.
@param name Name des neuen Inputs
@param frm struct formatierung (1 Zeichen) oder 'ANZAHLs' z.B. '8s'
@param kwargs Weitere Parameter:
:param name: Name des neuen Inputs
:param frm: struct formatierung (1 Zeichen) oder 'ANZAHLs' z.B. '8s'
:param kwargs: Weitere Parameter:
- bmk: interne Bezeichnung fuer IO
- bit: Registriert IO als <class 'bool'> am angegebenen Bit im Byte
- byteorder: Byteorder fuer den IO, Standardwert=little
@@ -995,10 +1065,7 @@ class IntIOReplaceable(IntIO):
- edge: Event ausfuehren bei RISING, FALLING or BOTH Wertaenderung
- as_thread: Fuehrt die event-Funktion als RevPiCallback-Thread aus
- prefire: Ausloesen mit aktuellem Wert, wenn mainloop startet
@see <a target="_blank"
href="https://docs.python.org/3/library/struct.html#format-characters"
>Python3 struct</a>
`https://docs.python.org/3/library/struct.html#format-characters`
"""
# StructIO erzeugen
io_new = StructIO(
@@ -1023,30 +1090,29 @@ class IntIOReplaceable(IntIO):
class StructIO(IOBase):
"""Klasse fuer den Zugriff auf Daten ueber ein definierten struct.
"""
Klasse fuer den Zugriff auf Daten ueber ein definierten struct.
Sie stellt ueber struct die Werte in der gewuenschten Formatierung
bereit. Der struct-Formatwert wird bei der Instantiierung festgelegt.
@see #IOBase IOBase
:ref:`IOBase`
"""
__slots__ = "__frm", "_parentio_address", "_parentio_defaultvalue", \
"_parentio_length", "_parentio_name"
"_parentio_length", "_parentio_name"
def __init__(self, parentio, name, frm, **kwargs):
"""Erstellt einen IO mit struct-Formatierung.
def __init__(self, parentio, name: str, frm: str, **kwargs):
"""
Erstellt einen IO mit struct-Formatierung.
@param parentio ParentIO Objekt, welches ersetzt wird
@param name Name des neuen IO
@param frm struct formatierung (1 Zeichen) oder 'ANZAHLs' z.B. '8s'
@param kwargs Weitere Parameter:
:param parentio: ParentIO Objekt, welches ersetzt wird
:param name: Name des neuen IO
:param frm: struct formatierung (1 Zeichen) oder 'ANZAHLs' z.B. '8s'
:param kwargs: Weitere Parameter:
- bmk: Bezeichnung fuer IO
- bit: Registriert IO als <class 'bool'> am angegebenen Bit im Byte
- byteorder: Byteorder fuer IO, Standardwert vom ersetzten IO
- defaultvalue: Standardwert fuer IO, Standard vom ersetzten IO
"""
# Structformatierung prüfen
regex = rematch("^([0-9]*s|[cbB?hHiIlLqQefd])$", frm)
@@ -1118,40 +1184,54 @@ class StructIO(IOBase):
parentio._parentdevice._dict_slc[parentio._iotype].start and
self._slc_address.stop <=
parentio._parentdevice._dict_slc[parentio._iotype].stop):
raise BufferError(
"registered value does not fit process image scope"
)
def _get_frm(self):
"""Ruft die struct Formatierung ab.
@return struct Formatierung"""
def _get_frm(self) -> str:
"""
Ruft die struct Formatierung ab.
:return: struct Formatierung
"""
return self.__frm[1:]
def _get_signed(self):
"""Ruft ab, ob der Wert Vorzeichenbehaftet behandelt werden soll.
@return True, wenn Vorzeichenbehaftet"""
def _get_signed(self) -> bool:
"""
Ruft ab, ob der Wert Vorzeichenbehaftet behandelt werden soll.
:return: True, wenn Vorzeichenbehaftet
"""
return self._signed
def get_structdefaultvalue(self):
"""Gibt die Defaultvalue mit struct Formatierung zurueck.
@return Defaultvalue vom Typ der struct-Formatierung"""
"""
Gibt die Defaultvalue mit struct Formatierung zurueck.
:return: Defaultvalue vom Typ der struct-Formatierung
"""
if self._bitaddress >= 0:
return self._defaultvalue
else:
return struct.unpack(self.__frm, self._defaultvalue)[0]
def get_structvalue(self):
"""Gibt den Wert mit struct Formatierung zurueck.
@return Wert vom Typ der struct-Formatierung"""
"""
Gibt den Wert mit struct Formatierung zurueck.
:return: Wert vom Typ der struct-Formatierung
"""
if self._bitaddress >= 0:
return self.get_value()
else:
return struct.unpack(self.__frm, self.get_value())[0]
def set_structvalue(self, value):
"""Setzt den Wert mit struct Formatierung.
@param value Wert vom Typ der struct-Formatierung"""
"""
Setzt den Wert mit struct Formatierung.
:param value: Wert vom Typ der struct-Formatierung
"""
if self._bitaddress >= 0:
self.set_value(value)
else: