Reorder package to src-layout.

Move package to src directory and update project base files for build
process.
This commit is contained in:
2023-01-19 13:20:26 +01:00
parent 36c30ae6d6
commit 1057e6daa4
16 changed files with 237 additions and 168 deletions

View File

@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
Stellt alle Klassen fuer den RevolutionPi zur Verfuegung.
Webpage: https://revpimodio.org/
Stellt Klassen fuer die einfache Verwendung des Revolution Pis der
Kunbus GmbH (https://revolution.kunbus.de/) zur Verfuegung. Alle I/Os werden
aus der piCtory Konfiguration eingelesen und mit deren Namen direkt zugreifbar
gemacht. Fuer Gateways sind eigene IOs ueber mehrere Bytes konfigurierbar
Mit den definierten Namen greift man direkt auf die gewuenschten Daten zu.
Auf alle IOs kann der Benutzer Funktionen als Events registrieren. Diese
fuehrt das Modul bei Datenaenderung aus.
"""
__all__ = [
"IOEvent",
"RevPiModIO", "RevPiModIODriver", "RevPiModIOSelected", "run_plc",
"RevPiNetIO", "RevPiNetIODriver", "RevPiNetIOSelected",
"Cycletools", "EventCallback",
"ProductType", "AIO", "COMPACT", "DI", "DO", "DIO", "FLAT", "MIO",
]
__author__ = "Sven Sager <akira@revpimodio.org>"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
__name__ = "revpimodio2"
__version__ = "2.6.0rc2"
from ._internal import *
from .helper import Cycletools, EventCallback
from .io import IOEvent
from .modio import RevPiModIO, RevPiModIODriver, RevPiModIOSelected, run_plc
from .netio import RevPiNetIO, RevPiNetIODriver, RevPiNetIOSelected
from .pictory import ProductType, AIO, COMPACT, DI, DO, DIO, FLAT, MIO

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
"""Internal functions and values for this package."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "GPLv3"
# Global package values
OFF = 0
GREEN = 1
RED = 2
RISING = 31
FALLING = 32
BOTH = 33
INP = 300
OUT = 301
MEM = 302
PROCESS_IMAGE_SIZE = 4096
def acheck(check_type, **kwargs) -> None:
"""
Check type of given arguments.
Use the argument name as keyword and the argument itself as value.
:param check_type: Type to check
:param kwargs: Arguments to check
"""
for var_name in kwargs:
none_okay = var_name.endswith("_noneok")
if not (isinstance(kwargs[var_name], check_type) or
none_okay and kwargs[var_name] is None):
msg = "Argument '{0}' must be {1}{2}".format(
var_name.rstrip("_noneok"), str(check_type),
" or <class 'NoneType'>" if none_okay else ""
)
raise TypeError(msg)
def consttostr(value) -> str:
"""
Gibt <class 'str'> fuer Konstanten zurueck.
Diese Funktion ist erforderlich, da enum in Python 3.2 nicht existiert.
:param value: Konstantenwert
:return: <class 'str'> Name der Konstanten
"""
if value == 0:
return "OFF"
elif value == 1:
return "GREEN"
elif value == 2:
return "RED"
elif value == 31:
return "RISING"
elif value == 32:
return "FALLING"
elif value == 33:
return "BOTH"
elif value == 300:
return "INP"
elif value == 301:
return "OUT"
elif value == 302:
return "MEM"
else:
return ""

41
src/revpimodio2/app.py Normal file
View File

@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""Bildet die App Sektion von piCtory ab."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
from time import strptime
class App(object):
"""Bildet die App Sektion der config.rsc ab."""
__slots__ = "name", "version", "language", "layout", "savets"
def __init__(self, app: dict):
"""
Instantiiert die App-Klasse.
:param app: piCtory Appinformationen
"""
self.name = app["name"]
"""Name of creating app"""
self.version = app["version"]
"""Version of creating app"""
self.language = app["language"]
"""Language of creating app"""
# Speicherungszeitpunkt laden, wenn vorhanden
self.savets = app.get("saveTS", None)
"""Timestamp of configuraiton"""
if self.savets is not None:
try:
self.savets = strptime(self.savets, "%Y%m%d%H%M%S")
except ValueError:
self.savets = None
# TODO: Layout untersuchen und anders abbilden
self.layout = app["layout"]

1542
src/revpimodio2/device.py Normal file

File diff suppressed because it is too large Load Diff

13
src/revpimodio2/errors.py Normal file
View File

@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
"""Error classes of RevPiModIO."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
class RevPiModIOError(Exception):
pass
class DeviceNotFoundError(RevPiModIOError):
pass

643
src/revpimodio2/helper.py Normal file
View File

@@ -0,0 +1,643 @@
# -*- coding: utf-8 -*-
"""RevPiModIO Helperklassen und Tools."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
import queue
import warnings
from math import ceil
from threading import Event, Lock, Thread
from time import sleep
from timeit import default_timer
from ._internal import RISING, FALLING, BOTH
from .io import IOBase
class EventCallback(Thread):
"""Thread fuer das interne Aufrufen von Event-Funktionen.
Der Eventfunktion, welche dieser Thread aufruft, wird der Thread selber
als Parameter uebergeben. Darauf muss bei der definition der Funktion
geachtet werden z.B. "def event(th):". Bei umfangreichen Funktionen kann
dieser ausgewertet werden um z.B. doppeltes Starten zu verhindern.
Ueber EventCallback.ioname kann der Name des IO-Objekts abgerufen werden,
welches das Event ausgeloest hast. EventCallback.iovalue gibt den Wert des
IO-Objekts zum Ausloesezeitpunkt zurueck.
Der Thread stellt das EventCallback.exit Event als Abbruchbedingung fuer
die aufgerufene Funktion zur Verfuegung.
Durch Aufruf der Funktion EventCallback.stop() wird das exit-Event gesetzt
und kann bei Schleifen zum Abbrechen verwendet werden.
Mit dem .exit() Event auch eine Wartefunktion realisiert
werden: "th.exit.wait(0.5)" - Wartet 500ms oder bricht sofort ab, wenn
fuer den Thread .stop() aufgerufen wird.
while not th.exit.is_set():
# IO-Arbeiten
th.exit.wait(0.5)
"""
__slots__ = "daemon", "exit", "func", "ioname", "iovalue"
def __init__(self, func, name: str, value):
"""
Init EventCallback class.
:param func: Funktion die beim Start aufgerufen werden soll
:param name: IO-Name
:param value: IO-Value zum Zeitpunkt des Events
"""
super().__init__()
self.daemon = True
self.exit = Event()
self.func = func
self.ioname = name
self.iovalue = value
def run(self):
"""Ruft die registrierte Funktion auf."""
self.func(self)
def stop(self):
"""Setzt das exit-Event mit dem die Funktion beendet werden kann."""
self.exit.set()
class Cycletools:
"""
Werkzeugkasten fuer Cycleloop-Funktion.
Diese Klasse enthaelt Werkzeuge fuer Zyklusfunktionen, wie Taktmerker
und Flankenmerker.
Zu beachten ist, dass die Flankenmerker beim ersten Zyklus alle den Wert
True haben! Ueber den Merker Cycletools.first kann ermittelt werden,
ob es sich um den ersten Zyklus handelt.
Taktmerker flag1c, flag5c, flag10c, usw. haben den als Zahl angegebenen
Wert an Zyklen jeweils False und True.
Beispiel: flag5c hat 5 Zyklen den Wert False und in den naechsten 5 Zyklen
den Wert True.
Flankenmerker flank5c, flank10c, usw. haben immer im, als Zahl angebenen
Zyklus fuer einen Zyklusdurchlauf den Wert True, sonst False.
Beispiel: flank5c hat immer alle 5 Zyklen den Wert True.
Diese Merker koennen z.B. verwendet werden um, an Outputs angeschlossene,
Lampen synchron blinken zu lassen.
"""
__slots__ = "__cycle", "__cycletime", "__ucycle", "__dict_ton", \
"__dict_tof", "__dict_tp", "__dict_change", \
"_start_timer", "core", "device", \
"first", "io", "last", "var", \
"flag1c", "flag5c", "flag10c", "flag15c", "flag20c", \
"flank5c", "flank10c", "flank15c", "flank20c"
def __init__(self, cycletime, revpi_object):
"""Init Cycletools class."""
self.__cycle = 0
self.__cycletime = cycletime
self.__ucycle = 0
self.__dict_change = {}
self.__dict_ton = {}
self.__dict_tof = {}
self.__dict_tp = {}
self._start_timer = 0.0
# Access to core and io
self.core = revpi_object.core
self.device = revpi_object.device
self.io = revpi_object.io
# Taktmerker
self.first = True
self.flag1c = False
self.flag5c = False
self.flag10c = False
self.flag15c = False
self.flag20c = False
self.last = False
# Flankenmerker
self.flank5c = True
self.flank10c = True
self.flank15c = True
self.flank20c = True
# Benutzerdaten
class Var:
"""Hier remanente Variablen anfuegen."""
pass
self.var = Var()
def _docycle(self) -> None:
"""Zyklusarbeiten."""
# Einschaltverzoegerung
for tof in self.__dict_tof:
if self.__dict_tof[tof] > 0:
self.__dict_tof[tof] -= 1
# Ausschaltverzoegerung
for ton in self.__dict_ton:
if self.__dict_ton[ton][1]:
if self.__dict_ton[ton][0] > 0:
self.__dict_ton[ton][0] -= 1
self.__dict_ton[ton][1] = False
else:
self.__dict_ton[ton][0] = -1
# Impuls
for tp in self.__dict_tp:
if self.__dict_tp[tp][1]:
if self.__dict_tp[tp][0] > 0:
self.__dict_tp[tp][0] -= 1
else:
self.__dict_tp[tp][1] = False
else:
self.__dict_tp[tp][0] = -1
# Flankenmerker
self.flank5c = False
self.flank10c = False
self.flank15c = False
self.flank20c = False
# Logische Flags
self.first = False
self.flag1c = not self.flag1c
# Berechnete Flags
self.__cycle += 1
if self.__cycle == 5:
self.__ucycle += 1
if self.__ucycle == 3:
self.flank15c = True
self.flag15c = not self.flag15c
self.__ucycle = 0
if self.flag5c:
if self.flag10c:
self.flank20c = True
self.flag20c = not self.flag20c
self.flank10c = True
self.flag10c = not self.flag10c
self.flank5c = True
self.flag5c = not self.flag5c
self.__cycle = 0
# Process changed values
for io in self.__dict_change:
self.__dict_change[io] = io.get_value()
def changed(self, io: IOBase, edge=BOTH):
"""
Check change of IO value from last to this cycle.
It will always be False on the first use of this function with an
IO object.
:param io: IO to check for changes to last cycle
:param edge: Check for rising or falling on bit io objects
:return: True, if IO value changed
"""
if io in self.__dict_change:
if edge == BOTH:
return self.__dict_change[io] != io.get_value()
else:
value = io.get_value()
return self.__dict_change[io] != value and (
value and edge == RISING or
not value and edge == FALLING
)
else:
if not isinstance(io, IOBase):
raise TypeError("parameter 'io' must be an io object")
if not (edge == BOTH or type(io.value) == bool):
raise ValueError(
"parameter 'edge' can be used with bit io objects only"
)
self.__dict_change[io] = None
return False
def get_tof(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
"""
return self.__dict_tof.get(name, 0) > 0
def get_tofc(self, name: str) -> bool:
"""
Wert der Ausschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Ausschaltverzoegerung
"""
return self.__dict_tof.get(name, 0) > 0
def set_tof(self, name: str, milliseconds: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Verzoegerung in Millisekunden
"""
self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
def set_tofc(self, name: str, cycles: int) -> None:
"""
Startet bei Aufruf einen ausschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn nicht neu gestartet
"""
self.__dict_tof[name] = cycles
def get_ton(self, name: str) -> bool:
"""
Einschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def get_tonc(self, name: str) -> bool:
"""
Einschaltverzoegerung.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> der Einschaltverzoegerung
"""
return self.__dict_ton.get(name, [-1])[0] == 0
def set_ton(self, name: str, milliseconds: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, der Verzoegerung wenn neu gestartet
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = \
[ceil(milliseconds / self.__cycletime), True]
else:
self.__dict_ton[name][1] = True
def set_tonc(self, name: str, cycles: int) -> None:
"""
Startet einen einschaltverzoegerten Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn neu gestartet
"""
if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [cycles, True]
else:
self.__dict_ton[name][1] = True
def get_tp(self, name: str) -> bool:
"""
Impulstimer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def get_tpc(self, name: str) -> bool:
"""
Impulstimer.
:param name: Eindeutiger Name des Timers
:return: Wert <class 'bool'> des Impulses
"""
return self.__dict_tp.get(name, [-1])[0] > 0
def set_tp(self, name: str, milliseconds: int) -> None:
"""
Startet einen Impuls Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param milliseconds: Millisekunden, die der Impuls anstehen soll
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = \
[ceil(milliseconds / self.__cycletime), True]
else:
self.__dict_tp[name][1] = True
def set_tpc(self, name: str, cycles: int) -> None:
"""
Startet einen Impuls Timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer
:param cycles: Zyklusanzahl, die der Impuls anstehen soll
"""
if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [cycles, True]
else:
self.__dict_tp[name][1] = True
@property
def runtime(self) -> float:
"""
Runtime im milliseconds of cycle function till now.
This property will return the actual runtime of the function. So on the
beginning of your function it will be about 0 and will rise during
the runtime to the max in the last line of your function.
"""
return (default_timer() - self._start_timer) * 1000
class ProcimgWriter(Thread):
"""
Klasse fuer Synchroniseriungs-Thread.
Diese Klasse wird als Thread gestartet, wenn das Prozessabbild zyklisch
synchronisiert werden soll. Diese Funktion wird hauptsaechlich fuer das
Event-Handling verwendet.
"""
__slots__ = "__dict_delay", "__eventth", "_eventqth", "__eventwork", \
"_eventq", "_modio", \
"_refresh", "_work", "daemon", "lck_refresh", "newdata"
def __init__(self, parentmodio):
"""Init ProcimgWriter class."""
super().__init__()
self.__dict_delay = {}
self.__eventth = Thread(target=self.__exec_th)
self._eventqth = queue.Queue()
self.__eventwork = False
self._eventq = queue.Queue()
self._modio = parentmodio
self._refresh = 0.05
self._work = Event()
self.daemon = True
self.lck_refresh = Lock()
self.newdata = Event()
def __check_change(self, dev) -> None:
"""Findet Aenderungen fuer die Eventueberwachung."""
for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == \
dev._ba_devdata[io_event._slc_address]:
continue
if io_event._bitshift:
boolcp = dev._ba_datacp[io_event._slc_address.start] \
& io_event._bitshift
boolor = dev._ba_devdata[io_event._slc_address.start] \
& io_event._bitshift
if boolor == boolcp:
continue
for regfunc in dev._dict_events[io_event]:
if regfunc.edge == BOTH \
or regfunc.edge == RISING and boolor \
or regfunc.edge == FALLING and not boolor:
if regfunc.delay == 0:
if regfunc.as_thread:
self._eventqth.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
self._eventq.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
# Verzögertes Event in dict einfügen
tup_fire = (
regfunc, io_event._name, io_event.value,
io_event,
)
if regfunc.overwrite \
or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(
regfunc.delay / 1000 / self._refresh
)
else:
for regfunc in dev._dict_events[io_event]:
if regfunc.delay == 0:
if regfunc.as_thread:
self._eventqth.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
self._eventq.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
# Verzögertes Event in dict einfügen
tup_fire = (
regfunc, io_event._name, io_event.value,
io_event,
)
if regfunc.overwrite \
or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(
regfunc.delay / 1000 / self._refresh
)
# Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf)
dev._ba_datacp = dev._ba_devdata[:]
def __exec_th(self) -> None:
"""Laeuft als Thread, der Events als Thread startet."""
while self.__eventwork:
try:
tup_fireth = self._eventqth.get(timeout=1)
th = EventCallback(
tup_fireth[0].func, tup_fireth[1], tup_fireth[2]
)
th.start()
self._eventqth.task_done()
except queue.Empty:
pass
def _collect_events(self, value: bool) -> bool:
"""
Aktiviert oder Deaktiviert die Eventueberwachung.
:param value: True aktiviert / False deaktiviert
:return: True, wenn Anforderung erfolgreich war
"""
if type(value) != bool:
raise TypeError("value must be <class 'bool'>")
# Nur starten, wenn System läuft
if not self.is_alive():
self.__eventwork = False
return False
if self.__eventwork != value:
with self.lck_refresh:
self.__eventwork = value
if not value:
# Nur leeren beim deaktivieren
self._eventqth = queue.Queue()
self._eventq = queue.Queue()
self.__dict_delay = {}
# Threadmanagement
if value and not self.__eventth.is_alive():
self.__eventth = Thread(target=self.__exec_th)
self.__eventth.daemon = True
self.__eventth.start()
return True
def get_refresh(self) -> int:
"""
Gibt Zykluszeit zurueck.
:return: <class 'int'> Zykluszeit in Millisekunden
"""
return int(self._refresh * 1000)
def run(self):
"""Startet die automatische Prozessabbildsynchronisierung."""
fh = self._modio._create_myfh()
mrk_delay = self._refresh
mrk_warn = True
bytesbuff = bytearray(self._modio._length)
while not self._work.is_set():
ot = default_timer()
# At this point, we slept and have the rest of delay from last cycle
if not self.lck_refresh.acquire(timeout=mrk_delay):
warnings.warn(
"cycle time of {0} ms exceeded in your cycle function"
"".format(int(self._refresh * 1000)),
RuntimeWarning
)
mrk_delay = self._refresh
# Nur durch cycleloop erreichbar - keine verzögerten Events
continue
try:
for dev in self._modio._lst_shared:
# Set shared outputs before reading process image
for io in dev._shared_write:
if not io._write_to_procimg():
raise IOError("error on _write_to_procimg")
dev._shared_write.clear()
fh.seek(0)
fh.readinto(bytesbuff)
for dev in self._modio._lst_refresh:
with dev._filelock:
if self._modio._monitoring or dev._shared_procimg:
# Inputs und Outputs in Puffer
dev._ba_devdata[:] = bytesbuff[dev._slc_devoff]
if self.__eventwork \
and len(dev._dict_events) > 0 \
and dev._ba_datacp != dev._ba_devdata:
self.__check_change(dev)
else:
# Inputs in Puffer, Outputs in Prozessabbild
dev._ba_devdata[dev._slc_inp] = \
bytesbuff[dev._slc_inpoff]
if self.__eventwork \
and len(dev._dict_events) > 0 \
and dev._ba_datacp != dev._ba_devdata:
self.__check_change(dev)
fh.seek(dev._slc_outoff.start)
fh.write(dev._ba_devdata[dev._slc_out])
if self._modio._buffedwrite:
fh.flush()
except IOError as e:
self._modio._gotioerror("autorefresh", e, mrk_warn)
mrk_warn = self._modio._debug == -1
self.lck_refresh.release()
continue
else:
if not mrk_warn:
if self._modio._debug == 0:
warnings.warn(
"recover from io errors on process image",
RuntimeWarning
)
else:
warnings.warn(
"recover from io errors on process image - total "
"count of {0} errors now"
"".format(self._modio._ioerror),
RuntimeWarning
)
mrk_warn = True
# Alle aufwecken
self.lck_refresh.release()
self.newdata.set()
finally:
# Verzögerte Events prüfen
if self.__eventwork:
for tup_fire in tuple(self.__dict_delay.keys()):
if tup_fire[0].overwrite and \
tup_fire[3].value != tup_fire[2]:
del self.__dict_delay[tup_fire]
else:
self.__dict_delay[tup_fire] -= 1
if self.__dict_delay[tup_fire] <= 0:
# Verzögertes Event übernehmen und löschen
if tup_fire[0].as_thread:
self._eventqth.put(tup_fire, False)
else:
self._eventq.put(tup_fire, False)
del self.__dict_delay[tup_fire]
mrk_delay = default_timer() % self._refresh
# Second default_timer call include calculation time from above
if default_timer() - ot > self._refresh:
warnings.warn(
"cycle time of {0} ms exceeded - can not hold cycle time!"
"".format(int(self._refresh * 1000)),
RuntimeWarning
)
mrk_delay = 0.0
else:
# Sleep and not .wait (.wait uses system clock)
sleep(self._refresh - mrk_delay)
# Alle am Ende erneut aufwecken
self._collect_events(False)
self.newdata.set()
fh.close()
def stop(self):
"""Beendet die automatische Prozessabbildsynchronisierung."""
self._work.set()
def set_refresh(self, value):
"""Setzt die Zykluszeit in Millisekunden.
@param value <class 'int'> Millisekunden"""
if type(value) == int and 5 <= value <= 2000:
self._refresh = value / 1000
else:
raise ValueError(
"refresh time must be 5 to 2000 milliseconds"
)
refresh = property(get_refresh, set_refresh)

1434
src/revpimodio2/io.py Normal file

File diff suppressed because it is too large Load Diff

1458
src/revpimodio2/modio.py Normal file

File diff suppressed because it is too large Load Diff

1031
src/revpimodio2/netio.py Normal file

File diff suppressed because it is too large Load Diff

182
src/revpimodio2/pictory.py Normal file
View File

@@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-
"""Pictory aliases for IO values."""
__author__ = "Théo Rozier"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
# RAP files are located under "/var/www/pictory/resources/data/rap/".
# Checked *.rap files already check and do not define any alias :
# - RevPiCore_20160818_1_0.rap
# - RevPiCore_20170210_1_1.rap
# - RevPiCore_20170404_1_2.rap
# - RevPiConCan_20180425_1_0.rap
# - RevPiGateCANopen_20161102_1_0.rap
class ProductType:
GATEWAY_CAN_OPEN = 71
GATEWAY_CCLINK = 72
GATEWAY_DEV_NET = 73
GATEWAY_ETHERCAT = 74
GATEWAY_ETHERNET_IP = 75
GATEWAY_POWERLINK = 76
GATEWAY_PROFIBUS = 77
GATEWAY_PROFINET_RT = 78
GATEWAY_PROFINET_IRT = 79
GATEWAY_CAN_OPEN_MASTER = 80
GATEWAY_SERCOS3 = 81
GATEWAY_SERIAL = 82
GATEWAY_PROFINET_SITARA = 83
GATEWAY_PROFINET_IRT_MASTER = 84
GATEWAY_ETHERCAT_MASTER = 85
GATEWAY_MODBUS_RTU = 92
GATEWAY_MODBUS_TCP = 83
GATEWAY_DMX = 199
DIO = 96
DI = 97
DO = 98
AIO = 103
MIO = 118
REVPI_CORE = 95
REVPI_COMPACT = 104
REVPI_CONNECT = 105
REVPI_FLAT = 135
class AIO:
"""Memory value mappings for RevPi AIO 1.0 (RevPiAIO_20170301_1_0.rap)."""
OUT_RANGE_OFF = 0 # Off
OUT_RANGE_0_5V = 1 # 0 - 5V
OUT_RANGE_0_10V = 2 # 0 - 10V
OUT_RANGE_N5_5V = 3 # -5 - 5V
OUT_RANGE_N10_10V = 4 # -10 - 10V
OUT_RANGE_0_5P5V = 5 # 0 - 5.5V
OUT_RANGE_0_11V = 6 # 0 - 11V
OUT_RANGE_N5P5_5P5V = 7 # -5.5 - 5.5V
OUT_RANGE_N11_11V = 8 # -11 - 11V
OUT_RANGE_4_20MA = 9 # 4 - 20mA
OUT_RANGE_0_20MA = 10 # 0 - 20mA
OUT_RANGE_0_24MA = 11 # 0 - 24mA
# Slew rate deceleration
OUT_SLEW_OFF = 0
OUT_SLEW_ON = 1
# Slew rate step size
OUT_SLEW_STEP_SIZE_1LSB = 0
OUT_SLEW_STEP_SIZE_2LSB = 1
OUT_SLEW_STEP_SIZE_4LSB = 2
OUT_SLEW_STEP_SIZE_8LSB = 3
OUT_SLEW_STEP_SIZE_16LSB = 4
OUT_SLEW_STEP_SIZE_32LSB = 5
OUT_SLEW_STEP_SIZE_64LSB = 6
OUT_SLEW_STEP_SIZE_128LSB = 7
# Clock rate of slew rate deceleration in kHz
OUT_SLEW_CLOCK_258_KZH = 0
OUT_SLEW_CLOCK_200_KZH = 1
OUT_SLEW_CLOCK_154_KZH = 2
OUT_SLEW_CLOCK_131_KZH = 3
OUT_SLEW_CLOCK_116_KZH = 4
OUT_SLEW_CLOCK_70_KZH = 5
OUT_SLEW_CLOCK_38_KZH = 6
OUT_SLEW_CLOCK_26_KZH = 7
OUT_SLEW_CLOCK_20_KZH = 8
OUT_SLEW_CLOCK_16_KZH = 9
OUT_SLEW_CLOCK_10_KZH = 10
OUT_SLEW_CLOCK_8P3_KZH = 11
OUT_SLEW_CLOCK_6P9_KZH = 12
OUT_SLEW_CLOCK_5P5_KZH = 13
OUT_SLEW_CLOCK_4P2_KZH = 14
OUT_SLEW_CLOCK_3P3_KZH = 15
IN_RANGE_N10V_10V = 1 # -10 - 10V
IN_RANGE_0_10V = 2 # 0 - 10V
IN_RANGE_0_5V = 3 # 0 - 5V
IN_RANGE_N5_5V = 4 # -5 - 5V
IN_RANGE_0_20MA = 5 # 0 - 20mA
IN_RANGE_0_24MA = 6 # 0 - 24mA
IN_RANGE_4_20MA = 7 # 4 - 20mA
IN_RANGE_N25_25MA = 8 # -25 - 25mA
ADC_DATARATE_5HZ = 0 # 5 Hz
ADC_DATARATE_10HZ = 1 # 10 Hz
ADC_DATARATE_20HZ = 2 # 20 Hz
ADC_DATARATE_40HZ = 3 # 40 Hz
ADC_DATARATE_80HZ = 4 # 80 Hz
ADC_DATARATE_160HZ = 5 # 160 Hz
ADC_DATARATE_320HZ = 6 # 320 Hz
ADC_DATARATE_640HZ = 7 # 640 Hz
RTD_TYPE_PT100 = 0 # PT100
RTD_TYPE_PT1000 = 1 # PT1000
RTD_2_WIRE = 2 # 2-wire
RTD_3_WIRE = 0 # 3-wire
RTD_4_WIRE = 1 # 4-wire
class DI:
"""Memory value mappings for RevPi DI 1.0 (RevPiDI_20160818_1_0.rap)."""
IN_MODE_DIRECT = 0 # Direct
IN_MODE_COUNT_RISING = 1 # Counter, rising edge
IN_MODE_COUNT_FALLING = 2 # Counter, falling edge
IN_MODE_ENCODER = 3 # Encoder
IN_DEBOUNCE_OFF = 0 # Off
IN_DEBOUNCE_25US = 1 # 25us
IN_DEBOUNCE_750US = 2 # 750us
IN_DEBOUNCE_3MS = 3 # 3ms
class DO:
"""Memory value mappings for RevPi DO 1.0 (RevPiDO_20160818_1_0.rap)."""
OUT_PWM_FREQ_40HZ = 1 # 40Hz 1%
OUT_PWM_FREQ_80HZ = 2 # 80Hz 2%
OUT_PWM_FREQ_160HZ = 4 # 160Hz 4%
OUT_PWM_FREQ_200HZ = 5 # 200Hz 5%
OUT_PWM_FREQ_400HZ = 10 # 400Hz 10%
class DIO(DI, DO):
"""Memory value mappings for RevPi DIO 1.0 (RevPiDIO_20160818_1_0.rap)."""
pass
class MIO:
"""Memory value mappings for RevPi MIO 1.0 (RevPiMIO_20200901_1_0.rap)."""
ENCODER_MODE_DISABLED = 0
ENCODER_MODE_ENABLED = 1
IO_MODE_DIGITAL_IN = 0
IO_MODE_PULSE_IN = 1
IO_MODE_PWM_IN = 2
IO_MODE_DIGITAL_OUT = 3
IO_MODE_PULSE_OUT = 4
IO_MODE_PWM_OUT = 5
AI_MODE_ANALOG_INPUT = 0
AI_MODE_LOGIC_LEVEL_INPUT = 1
AO_MODE_ANALOG_OUTPUT = 0
AO_MODE_LOGIC_LEVEL_OUTPUT = 1
class COMPACT:
"""Memory value mappings for RevPi Compact 1.0 (RevPiCompact_20171023_1_0.rap)."""
DIN_DEBOUNCE_OFF = 0 # Off
DIN_DEBOUNCE_25US = 1 # 25us
DIN_DEBOUNCE_750US = 2 # 750us
DIN_DEBOUNCE_3MS = 3 # 3ms
AIN_MODE_OFF = 0 # Off
AIN_MODE_0_10V = 1 # 0 - 10V
AIN_MODE_PT100 = 3 # PT100
AIN_MODE_PT1000 = 7 # PT1000
class FLAT:
"""Memory value mappings for RevPi Flat 1.0 (RevPiFlat_20200921_1_0.rap)."""
IN_RANGE_0_10V = 0
IN_RANGE_4_20MA = 1

View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
"""Bildet die Summary-Sektion von piCtory ab."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv3"
class Summary(object):
"""Bildet die Summary-Sektion der config.rsc ab."""
__slots__ = "inptotal", "outtotal"
def __init__(self, summary: dict):
"""
Instantiiert die RevPiSummary-Klasse.
:param summary: piCtory Summaryinformationen
"""
self.inptotal = summary["inpTotal"]
self.outtotal = summary["outTotal"]