From 01515e28c5fa11b48b813095ca4eb9a8f864a711 Mon Sep 17 00:00:00 2001 From: NaruX Date: Wed, 15 Nov 2017 09:18:33 +0100 Subject: [PATCH] =?UTF-8?q?Event=C3=BCberwachung=20=C3=BCber=20Queues=20re?= =?UTF-8?q?alisiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/revpimodio2.helper.html | 23 ++++++ doc/revpimodio2.modio.html | 13 +-- eric-revpimodio2.api | 3 +- revpimodio2/helper.py | 100 ++++++++++++++++++++++ revpimodio2/modio.py | 160 +++++++----------------------------- 5 files changed, 158 insertions(+), 141 deletions(-) diff --git a/doc/revpimodio2.helper.html b/doc/revpimodio2.helper.html index c6dc4d0..8907d61 100644 --- a/doc/revpimodio2.helper.html +++ b/doc/revpimodio2.helper.html @@ -440,6 +440,12 @@ Methods ProcimgWriter Init ProcimgWriter class. +__check_change +Findet Aenderungen fuer die Eventueberwachung. + +_collect_events +Aktiviert oder Deaktiviert die Eventueberwachung. + _get_ioerrors Ruft aktuelle Anzahl der Fehler ab. @@ -481,6 +487,23 @@ Init ProcimgWriter class.
Parent Object
+ +

+ProcimgWriter.__check_change

+__check_change(dev) +

+Findet Aenderungen fuer die Eventueberwachung. +

+

+ProcimgWriter._collect_events

+_collect_events(value) +

+Aktiviert oder Deaktiviert die Eventueberwachung. +

+
value
+
+True aktiviert / False deaktiviert +

ProcimgWriter._get_ioerrors

diff --git a/doc/revpimodio2.modio.html b/doc/revpimodio2.modio.html index 711c979..fcc6d20 100644 --- a/doc/revpimodio2.modio.html +++ b/doc/revpimodio2.modio.html @@ -440,7 +440,7 @@ Funktion wird nach dem letzten Lesen der Inputs

RevPiModIO.mainloop

-mainloop(freeze=False, blocking=True) +mainloop(blocking=True)

Startet den Mainloop mit Eventueberwachung.

@@ -449,12 +449,6 @@ Startet den Mainloop mit Eventueberwachung. durchlaeuft die Eventueberwachung und prueft Aenderungen der, mit einem Event registrierten, IOs. Wird eine Veraenderung erkannt, fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus. -

- Wenn der Parameter "freeze" mit True angegeben ist, wird die - Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen - ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren - aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen - in das Prozessabbild geschrieben.

Wenn der Parameter "blocking" mit False angegeben wird, aktiviert dies die Eventueberwachung und blockiert das Programm NICHT an der @@ -462,10 +456,7 @@ Startet den Mainloop mit Eventueberwachung. Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt werden soll.

-
freeze
-
-Wenn True, Prozessabbildsynchronisierung anhalten -
blocking
+
blocking
Wenn False, blockiert das Programm NICHT
diff --git a/eric-revpimodio2.api b/eric-revpimodio2.api index 6288d91..ade70dc 100644 --- a/eric-revpimodio2.api +++ b/eric-revpimodio2.api @@ -70,6 +70,7 @@ revpimodio2.helper.Cycletools?1(cycletime) revpimodio2.helper.EventCallback.run?4() revpimodio2.helper.EventCallback.stop?4() revpimodio2.helper.EventCallback?1(func, name, value) +revpimodio2.helper.ProcimgWriter._collect_events?5(value) revpimodio2.helper.ProcimgWriter._get_ioerrors?5() revpimodio2.helper.ProcimgWriter._gotioerror?5() revpimodio2.helper.ProcimgWriter.get_maxioerrors?4() @@ -149,7 +150,7 @@ revpimodio2.modio.RevPiModIO.get_jconfigrsc?4() revpimodio2.modio.RevPiModIO.handlesignalend?4(cleanupfunc=None) revpimodio2.modio.RevPiModIO.ioerrors?7 revpimodio2.modio.RevPiModIO.length?7 -revpimodio2.modio.RevPiModIO.mainloop?4(freeze=False, blocking=True) +revpimodio2.modio.RevPiModIO.mainloop?4(blocking=True) revpimodio2.modio.RevPiModIO.maxioerrors?7 revpimodio2.modio.RevPiModIO.monitoring?7 revpimodio2.modio.RevPiModIO.procimg?7 diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index c51e10a..6d9213b 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -6,10 +6,12 @@ # (c) Sven Sager, License: LGPLv3 # """RevPiModIO Helperklassen und Tools.""" +import queue import warnings from math import ceil from threading import Event, Lock, Thread from timeit import default_timer +from revpimodio2 import RISING, FALLING, BOTH class EventCallback(Thread): @@ -281,7 +283,10 @@ class ProcimgWriter(Thread): """Init ProcimgWriter class. @param parentmodio Parent Object""" super().__init__() + self.__dict_delay = {} + self.__eventwork = False self._adjwait = 0 + self._eventq = queue.Queue() self._ioerror = 0 self._maxioerrors = 0 self._modio = parentmodio @@ -292,6 +297,77 @@ class ProcimgWriter(Thread): self.lck_refresh = Lock() self.newdata = Event() + def __check_change(self, dev): + """Findet Aenderungen fuer die Eventueberwachung.""" + for io_event in dev._dict_events: + + if dev._ba_datacp[io_event._slc_address] == \ + dev._ba_devdata[io_event._slc_address]: + continue + + if io_event._bitaddress >= 0: + boolcp = bool(int.from_bytes( + dev._ba_datacp[io_event._slc_address], + byteorder=io_event._byteorder + ) & 1 << io_event._bitaddress) + boolor = bool(int.from_bytes( + dev._ba_devdata[io_event._slc_address], + byteorder=io_event._byteorder + ) & 1 << io_event._bitaddress) + + if boolor == boolcp: + continue + + for regfunc in dev._dict_events[io_event]: + if regfunc[1] == BOTH \ + or regfunc[1] == RISING and boolor \ + or regfunc[1] == FALLING and not boolor: + if regfunc[3] == 0: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + # Verzögertes Event in dict einfügen + tupfire = ( + regfunc, io_event._name, io_event.value + ) + if regfunc[4] or tupfire not in self.__dict_delay: + self.__dict_delay[tupfire] = ceil( + regfunc[3] / 1000 / self._refresh + ) + else: + for regfunc in dev._dict_events[io_event]: + if regfunc[3] == 0: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + # Verzögertes Event in dict einfügen + tupfire = ( + regfunc, io_event._name, io_event.value + ) + if regfunc[4] or tupfire not in self.__dict_delay: + self.__dict_delay[tupfire] = ceil( + regfunc[3] / 1000 / self._refresh + ) + + # Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) + dev._ba_datacp = dev._ba_devdata[:] + + def _collect_events(self, value): + """Aktiviert oder Deaktiviert die Eventueberwachung. + @param value True aktiviert / False deaktiviert""" + if type(value) != bool: + raise ValueError("value must be ") + + if self.__eventwork != value: + with self.lck_refresh: + self.__eventwork = value + self._eventq = queue.Queue() + self.__dict_delay = {} + def _get_ioerrors(self): """Ruft aktuelle Anzahl der Fehler ab. @return Aktuelle Fehleranzahl""" @@ -328,6 +404,21 @@ class ProcimgWriter(Thread): while not self._work.is_set(): ot = default_timer() + # Verzögerte Events prüfen + # NOTE: Darf dies VOR der Aktualisierung der Daten gemacht werden? + if self.__eventwork: + for tup_fire in list(self.__dict_delay.keys()): + if tup_fire[0][4] \ + and getattr(self._modio.io, tup_fire[1]).value != \ + tup_fire[2]: + del self.__dict_delay[tup_fire] + else: + self.__dict_delay[tup_fire] -= 1 + if self.__dict_delay[tup_fire] <= 1: + # Verzögertes Event übernehmen und löschen + self._eventq.put(tup_fire, False) + del self.__dict_delay[tup_fire] + # Lockobjekt holen und Fehler werfen, wenn nicht schnell genug if not self.lck_refresh.acquire(timeout=self._adjwait): warnings.warn( @@ -352,6 +443,10 @@ class ProcimgWriter(Thread): for dev in self._modio._lst_refresh: dev._filelock.acquire() dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] + if self.__eventwork \ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) dev._filelock.release() else: # Inputs in Puffer, Outputs in Prozessabbild @@ -359,6 +454,11 @@ class ProcimgWriter(Thread): for dev in self._modio._lst_refresh: dev._filelock.acquire() dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] + if self.__eventwork\ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) + try: fh.seek(dev._slc_outoff.start) fh.write(dev._ba_devdata[dev._slc_out]) diff --git a/revpimodio2/modio.py b/revpimodio2/modio.py index cc7e8bc..b974995 100644 --- a/revpimodio2/modio.py +++ b/revpimodio2/modio.py @@ -8,8 +8,8 @@ """RevPiModIO Hauptklasse fuer piControl0 Zugriff.""" import warnings from json import load as jload -from math import ceil from os import access, F_OK, R_OK +from queue import Empty from signal import signal, SIG_DFL, SIGINT, SIGTERM from threading import Thread, Event @@ -18,7 +18,6 @@ from . import device as devicemodule from . import helper as helpermodule from . import summary as summarymodule from .io import IOList -from revpimodio2 import RISING, FALLING, BOTH class RevPiModIO(object): @@ -419,12 +418,15 @@ class RevPiModIO(object): if self._imgwriter is not None and self._imgwriter.is_alive(): self._imgwriter.stop() self._imgwriter.join(self._imgwriter._refresh) + # NOTE: Prüfen, ob es sauber läuft! + if self._th_mainloop is not None and self._th_mainloop.is_alive(): + self._th_mainloop.join(1) while len(self._lst_refresh) > 0: dev = self._lst_refresh.pop() dev._selfupdate = False if not self._monitoring: self.writeprocimg(dev) - self._looprunning = False + # NOTE: Loops müssen sich selber IMMER sauber beenden def get_jconfigrsc(self): """Laed die piCotry Konfiguration und erstellt ein . @@ -483,7 +485,7 @@ class RevPiModIO(object): signal(SIGINT, self.__evt_exit) signal(SIGTERM, self.__evt_exit) - def mainloop(self, freeze=False, blocking=True): + def mainloop(self, blocking=True): """Startet den Mainloop mit Eventueberwachung. Der aktuelle Programmthread wird hier bis Aufruf von @@ -492,19 +494,12 @@ class RevPiModIO(object): einem Event registrierten, IOs. Wird eine Veraenderung erkannt, fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus. - Wenn der Parameter "freeze" mit True angegeben ist, wird die - Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen - ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren - aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen - in das Prozessabbild geschrieben. - Wenn der Parameter "blocking" mit False angegeben wird, aktiviert dies die Eventueberwachung und blockiert das Programm NICHT an der Stelle des Aufrufs. Eignet sich gut fuer die GUI Programmierung, wenn Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt werden soll. - @param freeze Wenn True, Prozessabbildsynchronisierung anhalten @param blocking Wenn False, blockiert das Programm NICHT @return None @@ -522,8 +517,7 @@ class RevPiModIO(object): # Thread erstellen, wenn nicht blockieren soll if not blocking: self._th_mainloop = Thread( - target=self.mainloop, - kwargs={"freeze": freeze, "blocking": True} + target=self.mainloop, kwargs={"blocking": True} ) self._th_mainloop.start() return @@ -538,129 +532,37 @@ class RevPiModIO(object): dev._ba_datacp = dev._ba_devdata[:] dev._filelock.release() - lst_fire = [] - dict_delay = {} + # ImgWriter mit Eventüberwachung aktivieren + self._imgwriter._collect_events(True) + e = None + while not self._exit.is_set(): - # Auf neue Daten warten und nur ausführen wenn set() - if not self._imgwriter.newdata.wait(2.5): + try: + tup_fire = self._imgwriter._eventq.get(timeout=1) + if tup_fire[0][2]: + th = helpermodule.EventCallback( + tup_fire[0][0], tup_fire[1], tup_fire[2] + ) + th.start() + else: + # Direct callen da Prüfung in io.IOBase.reg_event ist + tup_fire[0][0](tup_fire[1], tup_fire[2]) + except Empty: if not self._exit.is_set() and not self._imgwriter.is_alive(): self.exit(full=False) - self._looprunning = False - raise RuntimeError("autorefresh thread not running") - continue - - self._imgwriter.newdata.clear() - - # Während Auswertung refresh sperren - self._imgwriter.lck_refresh.acquire() - - for dev in self._lst_refresh: - - if len(dev._dict_events) == 0 \ - or dev._ba_datacp == dev._ba_devdata: - continue - - for io_event in dev._dict_events: - - if dev._ba_datacp[io_event._slc_address] == \ - dev._ba_devdata[io_event._slc_address]: - continue - - if io_event._bitaddress >= 0: - boolcp = bool(int.from_bytes( - dev._ba_datacp[io_event._slc_address], - byteorder=io_event._byteorder - ) & 1 << io_event._bitaddress) - boolor = bool(int.from_bytes( - dev._ba_devdata[io_event._slc_address], - byteorder=io_event._byteorder - ) & 1 << io_event._bitaddress) - - if boolor == boolcp: - continue - - for regfunc in dev._dict_events[io_event]: - if regfunc[1] == BOTH \ - or regfunc[1] == RISING and boolor \ - or regfunc[1] == FALLING and not boolor: - if regfunc[3] == 0: - lst_fire.append(( - regfunc, io_event._name, io_event.value - )) - else: - # Verzögertes Event in dict einfügen - tupfire = ( - regfunc, io_event._name, io_event.value - ) - if regfunc[4] or tupfire not in dict_delay: - dict_delay[tupfire] = ceil( - regfunc[3] / - self._imgwriter.refresh - ) - else: - for regfunc in dev._dict_events[io_event]: - if regfunc[3] == 0: - lst_fire.append( - (regfunc, io_event._name, io_event.value) - ) - else: - # Verzögertes Event in dict einfügen - if regfunc[4] or regfunc not in dict_delay: - dict_delay[( - regfunc, io_event._name, io_event.value - )] = ceil( - regfunc[3] / self._imgwriter.refresh - ) - - # Nach Verarbeitung aller IOs die Bytes kopieren - dev._filelock.acquire() - dev._ba_datacp = dev._ba_devdata[:] - dev._filelock.release() - - # Refreshsperre aufheben wenn nicht freeze - if not freeze: - self._imgwriter.lck_refresh.release() - - # EventTuple: - # ((func, edge, as_thread, delay, löschen), ioname, iovalue) - - # Verzögerte Events prüfen - for tup_fire in list(dict_delay.keys()): - if tup_fire[0][4] \ - and getattr(self.io, tup_fire[1]).value != tup_fire[2]: - del dict_delay[tup_fire] - else: - dict_delay[tup_fire] -= 1 - if dict_delay[tup_fire] <= 0: - # Verzögertes Event übernehmen und löschen - lst_fire.append(tup_fire) - del dict_delay[tup_fire] - - # Erst nach Datenübernahme alle Events feuern - try: - while len(lst_fire) > 0: - tup_fire = lst_fire.pop() - if tup_fire[0][2]: - th = helpermodule.EventCallback( - tup_fire[0][0], tup_fire[1], tup_fire[2] - ) - th.start() - else: - # Direct callen da Prüfung in io.IOBase.reg_event ist - tup_fire[0][0](tup_fire[1], tup_fire[2]) - except Exception as e: - if self._imgwriter.lck_refresh.locked(): - self._imgwriter.lck_refresh.release() + e = RuntimeError("autorefresh thread not running") + except Exception as ex: self.exit(full=False) - self._looprunning = False - raise e - - # Refreshsperre aufheben wenn freeze - if freeze: - self._imgwriter.lck_refresh.release() + e = ex # Mainloop verlassen + self._imgwriter._collect_events(False) self._looprunning = False + self._th_mainloop = None + + # Fehler prüfen + if e is not None: + raise e def readprocimg(self, device=None): """Einlesen aller Inputs aller/eines Devices vom Prozessabbild.