Eventüberwachung über Queues realisiert

This commit is contained in:
2017-11-15 09:18:33 +01:00
parent 1f7ecd0e22
commit 01515e28c5
5 changed files with 158 additions and 141 deletions

View File

@@ -440,6 +440,12 @@ Methods</h3>
<td><a style="color:#0000FF" href="#ProcimgWriter.__init__">ProcimgWriter</a></td>
<td>Init ProcimgWriter class.</td>
</tr><tr>
<td><a style="color:#0000FF" href="#ProcimgWriter.__check_change">__check_change</a></td>
<td>Findet Aenderungen fuer die Eventueberwachung.</td>
</tr><tr>
<td><a style="color:#0000FF" href="#ProcimgWriter._collect_events">_collect_events</a></td>
<td>Aktiviert oder Deaktiviert die Eventueberwachung.</td>
</tr><tr>
<td><a style="color:#0000FF" href="#ProcimgWriter._get_ioerrors">_get_ioerrors</a></td>
<td>Ruft aktuelle Anzahl der Fehler ab.</td>
</tr><tr>
@@ -481,6 +487,23 @@ Init ProcimgWriter class.
<dd>
Parent Object
</dd>
</dl><a NAME="ProcimgWriter.__check_change" ID="ProcimgWriter.__check_change"></a>
<h3 style="background-color:#FFFFFF;color:#FF0000">
ProcimgWriter.__check_change</h3>
<b>__check_change</b>(<i>dev</i>)
<p>
Findet Aenderungen fuer die Eventueberwachung.
</p><a NAME="ProcimgWriter._collect_events" ID="ProcimgWriter._collect_events"></a>
<h3 style="background-color:#FFFFFF;color:#FF0000">
ProcimgWriter._collect_events</h3>
<b>_collect_events</b>(<i>value</i>)
<p>
Aktiviert oder Deaktiviert die Eventueberwachung.
</p><dl>
<dt><i>value</i></dt>
<dd>
True aktiviert / False deaktiviert
</dd>
</dl><a NAME="ProcimgWriter._get_ioerrors" ID="ProcimgWriter._get_ioerrors"></a>
<h3 style="background-color:#FFFFFF;color:#FF0000">
ProcimgWriter._get_ioerrors</h3>

View File

@@ -440,7 +440,7 @@ Funktion wird nach dem letzten Lesen der Inputs
</dl><a NAME="RevPiModIO.mainloop" ID="RevPiModIO.mainloop"></a>
<h3 style="background-color:#FFFFFF;color:#FF0000">
RevPiModIO.mainloop</h3>
<b>mainloop</b>(<i>freeze=False, blocking=True</i>)
<b>mainloop</b>(<i>blocking=True</i>)
<p>
Startet den Mainloop mit Eventueberwachung.
</p><p>
@@ -449,12 +449,6 @@ Startet den Mainloop mit Eventueberwachung.
durchlaeuft die Eventueberwachung und prueft Aenderungen der, mit
einem Event registrierten, IOs. Wird eine Veraenderung erkannt,
fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus.
</p><p>
Wenn der Parameter "freeze" mit True angegeben ist, wird die
Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen
ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren
aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen
in das Prozessabbild geschrieben.
</p><p>
Wenn der Parameter "blocking" mit False angegeben wird, aktiviert
dies die Eventueberwachung und blockiert das Programm NICHT an der
@@ -462,10 +456,7 @@ Startet den Mainloop mit Eventueberwachung.
Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt
werden soll.
</p><dl>
<dt><i>freeze</i></dt>
<dd>
Wenn True, Prozessabbildsynchronisierung anhalten
</dd><dt><i>blocking</i></dt>
<dt><i>blocking</i></dt>
<dd>
Wenn False, blockiert das Programm NICHT
</dd>

View File

@@ -70,6 +70,7 @@ revpimodio2.helper.Cycletools?1(cycletime)
revpimodio2.helper.EventCallback.run?4()
revpimodio2.helper.EventCallback.stop?4()
revpimodio2.helper.EventCallback?1(func, name, value)
revpimodio2.helper.ProcimgWriter._collect_events?5(value)
revpimodio2.helper.ProcimgWriter._get_ioerrors?5()
revpimodio2.helper.ProcimgWriter._gotioerror?5()
revpimodio2.helper.ProcimgWriter.get_maxioerrors?4()
@@ -149,7 +150,7 @@ revpimodio2.modio.RevPiModIO.get_jconfigrsc?4()
revpimodio2.modio.RevPiModIO.handlesignalend?4(cleanupfunc=None)
revpimodio2.modio.RevPiModIO.ioerrors?7
revpimodio2.modio.RevPiModIO.length?7
revpimodio2.modio.RevPiModIO.mainloop?4(freeze=False, blocking=True)
revpimodio2.modio.RevPiModIO.mainloop?4(blocking=True)
revpimodio2.modio.RevPiModIO.maxioerrors?7
revpimodio2.modio.RevPiModIO.monitoring?7
revpimodio2.modio.RevPiModIO.procimg?7

View File

@@ -6,10 +6,12 @@
# (c) Sven Sager, License: LGPLv3
#
"""RevPiModIO Helperklassen und Tools."""
import queue
import warnings
from math import ceil
from threading import Event, Lock, Thread
from timeit import default_timer
from revpimodio2 import RISING, FALLING, BOTH
class EventCallback(Thread):
@@ -281,7 +283,10 @@ class ProcimgWriter(Thread):
"""Init ProcimgWriter class.
@param parentmodio Parent Object"""
super().__init__()
self.__dict_delay = {}
self.__eventwork = False
self._adjwait = 0
self._eventq = queue.Queue()
self._ioerror = 0
self._maxioerrors = 0
self._modio = parentmodio
@@ -292,6 +297,77 @@ class ProcimgWriter(Thread):
self.lck_refresh = Lock()
self.newdata = Event()
def __check_change(self, dev):
"""Findet Aenderungen fuer die Eventueberwachung."""
for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == \
dev._ba_devdata[io_event._slc_address]:
continue
if io_event._bitaddress >= 0:
boolcp = bool(int.from_bytes(
dev._ba_datacp[io_event._slc_address],
byteorder=io_event._byteorder
) & 1 << io_event._bitaddress)
boolor = bool(int.from_bytes(
dev._ba_devdata[io_event._slc_address],
byteorder=io_event._byteorder
) & 1 << io_event._bitaddress)
if boolor == boolcp:
continue
for regfunc in dev._dict_events[io_event]:
if regfunc[1] == BOTH \
or regfunc[1] == RISING and boolor \
or regfunc[1] == FALLING and not boolor:
if regfunc[3] == 0:
self._eventq.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
# Verzögertes Event in dict einfügen
tupfire = (
regfunc, io_event._name, io_event.value
)
if regfunc[4] or tupfire not in self.__dict_delay:
self.__dict_delay[tupfire] = ceil(
regfunc[3] / 1000 / self._refresh
)
else:
for regfunc in dev._dict_events[io_event]:
if regfunc[3] == 0:
self._eventq.put(
(regfunc, io_event._name, io_event.value),
False
)
else:
# Verzögertes Event in dict einfügen
tupfire = (
regfunc, io_event._name, io_event.value
)
if regfunc[4] or tupfire not in self.__dict_delay:
self.__dict_delay[tupfire] = ceil(
regfunc[3] / 1000 / self._refresh
)
# Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf)
dev._ba_datacp = dev._ba_devdata[:]
def _collect_events(self, value):
"""Aktiviert oder Deaktiviert die Eventueberwachung.
@param value True aktiviert / False deaktiviert"""
if type(value) != bool:
raise ValueError("value must be <class 'bool'>")
if self.__eventwork != value:
with self.lck_refresh:
self.__eventwork = value
self._eventq = queue.Queue()
self.__dict_delay = {}
def _get_ioerrors(self):
"""Ruft aktuelle Anzahl der Fehler ab.
@return Aktuelle Fehleranzahl"""
@@ -328,6 +404,21 @@ class ProcimgWriter(Thread):
while not self._work.is_set():
ot = default_timer()
# Verzögerte Events prüfen
# NOTE: Darf dies VOR der Aktualisierung der Daten gemacht werden?
if self.__eventwork:
for tup_fire in list(self.__dict_delay.keys()):
if tup_fire[0][4] \
and getattr(self._modio.io, tup_fire[1]).value != \
tup_fire[2]:
del self.__dict_delay[tup_fire]
else:
self.__dict_delay[tup_fire] -= 1
if self.__dict_delay[tup_fire] <= 1:
# Verzögertes Event übernehmen und löschen
self._eventq.put(tup_fire, False)
del self.__dict_delay[tup_fire]
# Lockobjekt holen und Fehler werfen, wenn nicht schnell genug
if not self.lck_refresh.acquire(timeout=self._adjwait):
warnings.warn(
@@ -352,6 +443,10 @@ class ProcimgWriter(Thread):
for dev in self._modio._lst_refresh:
dev._filelock.acquire()
dev._ba_devdata[:] = bytesbuff[dev._slc_devoff]
if self.__eventwork \
and len(dev._dict_events) > 0 \
and dev._ba_datacp != dev._ba_devdata:
self.__check_change(dev)
dev._filelock.release()
else:
# Inputs in Puffer, Outputs in Prozessabbild
@@ -359,6 +454,11 @@ class ProcimgWriter(Thread):
for dev in self._modio._lst_refresh:
dev._filelock.acquire()
dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff]
if self.__eventwork\
and len(dev._dict_events) > 0 \
and dev._ba_datacp != dev._ba_devdata:
self.__check_change(dev)
try:
fh.seek(dev._slc_outoff.start)
fh.write(dev._ba_devdata[dev._slc_out])

View File

@@ -8,8 +8,8 @@
"""RevPiModIO Hauptklasse fuer piControl0 Zugriff."""
import warnings
from json import load as jload
from math import ceil
from os import access, F_OK, R_OK
from queue import Empty
from signal import signal, SIG_DFL, SIGINT, SIGTERM
from threading import Thread, Event
@@ -18,7 +18,6 @@ from . import device as devicemodule
from . import helper as helpermodule
from . import summary as summarymodule
from .io import IOList
from revpimodio2 import RISING, FALLING, BOTH
class RevPiModIO(object):
@@ -419,12 +418,15 @@ class RevPiModIO(object):
if self._imgwriter is not None and self._imgwriter.is_alive():
self._imgwriter.stop()
self._imgwriter.join(self._imgwriter._refresh)
# NOTE: Prüfen, ob es sauber läuft!
if self._th_mainloop is not None and self._th_mainloop.is_alive():
self._th_mainloop.join(1)
while len(self._lst_refresh) > 0:
dev = self._lst_refresh.pop()
dev._selfupdate = False
if not self._monitoring:
self.writeprocimg(dev)
self._looprunning = False
# NOTE: Loops müssen sich selber IMMER sauber beenden
def get_jconfigrsc(self):
"""Laed die piCotry Konfiguration und erstellt ein <class 'dict'>.
@@ -483,7 +485,7 @@ class RevPiModIO(object):
signal(SIGINT, self.__evt_exit)
signal(SIGTERM, self.__evt_exit)
def mainloop(self, freeze=False, blocking=True):
def mainloop(self, blocking=True):
"""Startet den Mainloop mit Eventueberwachung.
Der aktuelle Programmthread wird hier bis Aufruf von
@@ -492,19 +494,12 @@ class RevPiModIO(object):
einem Event registrierten, IOs. Wird eine Veraenderung erkannt,
fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus.
Wenn der Parameter "freeze" mit True angegeben ist, wird die
Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen
ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren
aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen
in das Prozessabbild geschrieben.
Wenn der Parameter "blocking" mit False angegeben wird, aktiviert
dies die Eventueberwachung und blockiert das Programm NICHT an der
Stelle des Aufrufs. Eignet sich gut fuer die GUI Programmierung, wenn
Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt
werden soll.
@param freeze Wenn True, Prozessabbildsynchronisierung anhalten
@param blocking Wenn False, blockiert das Programm NICHT
@return None
@@ -522,8 +517,7 @@ class RevPiModIO(object):
# Thread erstellen, wenn nicht blockieren soll
if not blocking:
self._th_mainloop = Thread(
target=self.mainloop,
kwargs={"freeze": freeze, "blocking": True}
target=self.mainloop, kwargs={"blocking": True}
)
self._th_mainloop.start()
return
@@ -538,129 +532,37 @@ class RevPiModIO(object):
dev._ba_datacp = dev._ba_devdata[:]
dev._filelock.release()
lst_fire = []
dict_delay = {}
# ImgWriter mit Eventüberwachung aktivieren
self._imgwriter._collect_events(True)
e = None
while not self._exit.is_set():
# Auf neue Daten warten und nur ausführen wenn set()
if not self._imgwriter.newdata.wait(2.5):
try:
tup_fire = self._imgwriter._eventq.get(timeout=1)
if tup_fire[0][2]:
th = helpermodule.EventCallback(
tup_fire[0][0], tup_fire[1], tup_fire[2]
)
th.start()
else:
# Direct callen da Prüfung in io.IOBase.reg_event ist
tup_fire[0][0](tup_fire[1], tup_fire[2])
except Empty:
if not self._exit.is_set() and not self._imgwriter.is_alive():
self.exit(full=False)
self._looprunning = False
raise RuntimeError("autorefresh thread not running")
continue
self._imgwriter.newdata.clear()
# Während Auswertung refresh sperren
self._imgwriter.lck_refresh.acquire()
for dev in self._lst_refresh:
if len(dev._dict_events) == 0 \
or dev._ba_datacp == dev._ba_devdata:
continue
for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == \
dev._ba_devdata[io_event._slc_address]:
continue
if io_event._bitaddress >= 0:
boolcp = bool(int.from_bytes(
dev._ba_datacp[io_event._slc_address],
byteorder=io_event._byteorder
) & 1 << io_event._bitaddress)
boolor = bool(int.from_bytes(
dev._ba_devdata[io_event._slc_address],
byteorder=io_event._byteorder
) & 1 << io_event._bitaddress)
if boolor == boolcp:
continue
for regfunc in dev._dict_events[io_event]:
if regfunc[1] == BOTH \
or regfunc[1] == RISING and boolor \
or regfunc[1] == FALLING and not boolor:
if regfunc[3] == 0:
lst_fire.append((
regfunc, io_event._name, io_event.value
))
else:
# Verzögertes Event in dict einfügen
tupfire = (
regfunc, io_event._name, io_event.value
)
if regfunc[4] or tupfire not in dict_delay:
dict_delay[tupfire] = ceil(
regfunc[3] /
self._imgwriter.refresh
)
else:
for regfunc in dev._dict_events[io_event]:
if regfunc[3] == 0:
lst_fire.append(
(regfunc, io_event._name, io_event.value)
)
else:
# Verzögertes Event in dict einfügen
if regfunc[4] or regfunc not in dict_delay:
dict_delay[(
regfunc, io_event._name, io_event.value
)] = ceil(
regfunc[3] / self._imgwriter.refresh
)
# Nach Verarbeitung aller IOs die Bytes kopieren
dev._filelock.acquire()
dev._ba_datacp = dev._ba_devdata[:]
dev._filelock.release()
# Refreshsperre aufheben wenn nicht freeze
if not freeze:
self._imgwriter.lck_refresh.release()
# EventTuple:
# ((func, edge, as_thread, delay, löschen), ioname, iovalue)
# Verzögerte Events prüfen
for tup_fire in list(dict_delay.keys()):
if tup_fire[0][4] \
and getattr(self.io, tup_fire[1]).value != tup_fire[2]:
del dict_delay[tup_fire]
else:
dict_delay[tup_fire] -= 1
if dict_delay[tup_fire] <= 0:
# Verzögertes Event übernehmen und löschen
lst_fire.append(tup_fire)
del dict_delay[tup_fire]
# Erst nach Datenübernahme alle Events feuern
try:
while len(lst_fire) > 0:
tup_fire = lst_fire.pop()
if tup_fire[0][2]:
th = helpermodule.EventCallback(
tup_fire[0][0], tup_fire[1], tup_fire[2]
)
th.start()
else:
# Direct callen da Prüfung in io.IOBase.reg_event ist
tup_fire[0][0](tup_fire[1], tup_fire[2])
except Exception as e:
if self._imgwriter.lck_refresh.locked():
self._imgwriter.lck_refresh.release()
e = RuntimeError("autorefresh thread not running")
except Exception as ex:
self.exit(full=False)
self._looprunning = False
raise e
# Refreshsperre aufheben wenn freeze
if freeze:
self._imgwriter.lck_refresh.release()
e = ex
# Mainloop verlassen
self._imgwriter._collect_events(False)
self._looprunning = False
self._th_mainloop = None
# Fehler prüfen
if e is not None:
raise e
def readprocimg(self, device=None):
"""Einlesen aller Inputs aller/eines Devices vom Prozessabbild.