From 01515e28c5fa11b48b813095ca4eb9a8f864a711 Mon Sep 17 00:00:00 2001 From: NaruX Date: Wed, 15 Nov 2017 09:18:33 +0100 Subject: [PATCH 1/5] =?UTF-8?q?Event=C3=BCberwachung=20=C3=BCber=20Queues?= =?UTF-8?q?=20realisiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/revpimodio2.helper.html | 23 ++++++ doc/revpimodio2.modio.html | 13 +-- eric-revpimodio2.api | 3 +- revpimodio2/helper.py | 100 ++++++++++++++++++++++ revpimodio2/modio.py | 160 +++++++----------------------------- 5 files changed, 158 insertions(+), 141 deletions(-) diff --git a/doc/revpimodio2.helper.html b/doc/revpimodio2.helper.html index c6dc4d0..8907d61 100644 --- a/doc/revpimodio2.helper.html +++ b/doc/revpimodio2.helper.html @@ -440,6 +440,12 @@ Methods ProcimgWriter Init ProcimgWriter class. +__check_change +Findet Aenderungen fuer die Eventueberwachung. + +_collect_events +Aktiviert oder Deaktiviert die Eventueberwachung. + _get_ioerrors Ruft aktuelle Anzahl der Fehler ab. @@ -481,6 +487,23 @@ Init ProcimgWriter class.
Parent Object
+ +

+ProcimgWriter.__check_change

+__check_change(dev) +

+Findet Aenderungen fuer die Eventueberwachung. +

+

+ProcimgWriter._collect_events

+_collect_events(value) +

+Aktiviert oder Deaktiviert die Eventueberwachung. +

+
value
+
+True aktiviert / False deaktiviert +

ProcimgWriter._get_ioerrors

diff --git a/doc/revpimodio2.modio.html b/doc/revpimodio2.modio.html index 711c979..fcc6d20 100644 --- a/doc/revpimodio2.modio.html +++ b/doc/revpimodio2.modio.html @@ -440,7 +440,7 @@ Funktion wird nach dem letzten Lesen der Inputs

RevPiModIO.mainloop

-mainloop(freeze=False, blocking=True) +mainloop(blocking=True)

Startet den Mainloop mit Eventueberwachung.

@@ -449,12 +449,6 @@ Startet den Mainloop mit Eventueberwachung. durchlaeuft die Eventueberwachung und prueft Aenderungen der, mit einem Event registrierten, IOs. Wird eine Veraenderung erkannt, fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus. -

- Wenn der Parameter "freeze" mit True angegeben ist, wird die - Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen - ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren - aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen - in das Prozessabbild geschrieben.

Wenn der Parameter "blocking" mit False angegeben wird, aktiviert dies die Eventueberwachung und blockiert das Programm NICHT an der @@ -462,10 +456,7 @@ Startet den Mainloop mit Eventueberwachung. Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt werden soll.

-
freeze
-
-Wenn True, Prozessabbildsynchronisierung anhalten -
blocking
+
blocking
Wenn False, blockiert das Programm NICHT
diff --git a/eric-revpimodio2.api b/eric-revpimodio2.api index 6288d91..ade70dc 100644 --- a/eric-revpimodio2.api +++ b/eric-revpimodio2.api @@ -70,6 +70,7 @@ revpimodio2.helper.Cycletools?1(cycletime) revpimodio2.helper.EventCallback.run?4() revpimodio2.helper.EventCallback.stop?4() revpimodio2.helper.EventCallback?1(func, name, value) +revpimodio2.helper.ProcimgWriter._collect_events?5(value) revpimodio2.helper.ProcimgWriter._get_ioerrors?5() revpimodio2.helper.ProcimgWriter._gotioerror?5() revpimodio2.helper.ProcimgWriter.get_maxioerrors?4() @@ -149,7 +150,7 @@ revpimodio2.modio.RevPiModIO.get_jconfigrsc?4() revpimodio2.modio.RevPiModIO.handlesignalend?4(cleanupfunc=None) revpimodio2.modio.RevPiModIO.ioerrors?7 revpimodio2.modio.RevPiModIO.length?7 -revpimodio2.modio.RevPiModIO.mainloop?4(freeze=False, blocking=True) +revpimodio2.modio.RevPiModIO.mainloop?4(blocking=True) revpimodio2.modio.RevPiModIO.maxioerrors?7 revpimodio2.modio.RevPiModIO.monitoring?7 revpimodio2.modio.RevPiModIO.procimg?7 diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index c51e10a..6d9213b 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -6,10 +6,12 @@ # (c) Sven Sager, License: LGPLv3 # """RevPiModIO Helperklassen und Tools.""" +import queue import warnings from math import ceil from threading import Event, Lock, Thread from timeit import default_timer +from revpimodio2 import RISING, FALLING, BOTH class EventCallback(Thread): @@ -281,7 +283,10 @@ class ProcimgWriter(Thread): """Init ProcimgWriter class. @param parentmodio Parent Object""" super().__init__() + self.__dict_delay = {} + self.__eventwork = False self._adjwait = 0 + self._eventq = queue.Queue() self._ioerror = 0 self._maxioerrors = 0 self._modio = parentmodio @@ -292,6 +297,77 @@ class ProcimgWriter(Thread): self.lck_refresh = Lock() self.newdata = Event() + def __check_change(self, dev): + """Findet Aenderungen fuer die Eventueberwachung.""" + for io_event in dev._dict_events: + + if dev._ba_datacp[io_event._slc_address] == \ + dev._ba_devdata[io_event._slc_address]: + continue + + if io_event._bitaddress >= 0: + boolcp = bool(int.from_bytes( + dev._ba_datacp[io_event._slc_address], + byteorder=io_event._byteorder + ) & 1 << io_event._bitaddress) + boolor = bool(int.from_bytes( + dev._ba_devdata[io_event._slc_address], + byteorder=io_event._byteorder + ) & 1 << io_event._bitaddress) + + if boolor == boolcp: + continue + + for regfunc in dev._dict_events[io_event]: + if regfunc[1] == BOTH \ + or regfunc[1] == RISING and boolor \ + or regfunc[1] == FALLING and not boolor: + if regfunc[3] == 0: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + # Verzögertes Event in dict einfügen + tupfire = ( + regfunc, io_event._name, io_event.value + ) + if regfunc[4] or tupfire not in self.__dict_delay: + self.__dict_delay[tupfire] = ceil( + regfunc[3] / 1000 / self._refresh + ) + else: + for regfunc in dev._dict_events[io_event]: + if regfunc[3] == 0: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + # Verzögertes Event in dict einfügen + tupfire = ( + regfunc, io_event._name, io_event.value + ) + if regfunc[4] or tupfire not in self.__dict_delay: + self.__dict_delay[tupfire] = ceil( + regfunc[3] / 1000 / self._refresh + ) + + # Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) + dev._ba_datacp = dev._ba_devdata[:] + + def _collect_events(self, value): + """Aktiviert oder Deaktiviert die Eventueberwachung. + @param value True aktiviert / False deaktiviert""" + if type(value) != bool: + raise ValueError("value must be ") + + if self.__eventwork != value: + with self.lck_refresh: + self.__eventwork = value + self._eventq = queue.Queue() + self.__dict_delay = {} + def _get_ioerrors(self): """Ruft aktuelle Anzahl der Fehler ab. @return Aktuelle Fehleranzahl""" @@ -328,6 +404,21 @@ class ProcimgWriter(Thread): while not self._work.is_set(): ot = default_timer() + # Verzögerte Events prüfen + # NOTE: Darf dies VOR der Aktualisierung der Daten gemacht werden? + if self.__eventwork: + for tup_fire in list(self.__dict_delay.keys()): + if tup_fire[0][4] \ + and getattr(self._modio.io, tup_fire[1]).value != \ + tup_fire[2]: + del self.__dict_delay[tup_fire] + else: + self.__dict_delay[tup_fire] -= 1 + if self.__dict_delay[tup_fire] <= 1: + # Verzögertes Event übernehmen und löschen + self._eventq.put(tup_fire, False) + del self.__dict_delay[tup_fire] + # Lockobjekt holen und Fehler werfen, wenn nicht schnell genug if not self.lck_refresh.acquire(timeout=self._adjwait): warnings.warn( @@ -352,6 +443,10 @@ class ProcimgWriter(Thread): for dev in self._modio._lst_refresh: dev._filelock.acquire() dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] + if self.__eventwork \ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) dev._filelock.release() else: # Inputs in Puffer, Outputs in Prozessabbild @@ -359,6 +454,11 @@ class ProcimgWriter(Thread): for dev in self._modio._lst_refresh: dev._filelock.acquire() dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] + if self.__eventwork\ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) + try: fh.seek(dev._slc_outoff.start) fh.write(dev._ba_devdata[dev._slc_out]) diff --git a/revpimodio2/modio.py b/revpimodio2/modio.py index cc7e8bc..b974995 100644 --- a/revpimodio2/modio.py +++ b/revpimodio2/modio.py @@ -8,8 +8,8 @@ """RevPiModIO Hauptklasse fuer piControl0 Zugriff.""" import warnings from json import load as jload -from math import ceil from os import access, F_OK, R_OK +from queue import Empty from signal import signal, SIG_DFL, SIGINT, SIGTERM from threading import Thread, Event @@ -18,7 +18,6 @@ from . import device as devicemodule from . import helper as helpermodule from . import summary as summarymodule from .io import IOList -from revpimodio2 import RISING, FALLING, BOTH class RevPiModIO(object): @@ -419,12 +418,15 @@ class RevPiModIO(object): if self._imgwriter is not None and self._imgwriter.is_alive(): self._imgwriter.stop() self._imgwriter.join(self._imgwriter._refresh) + # NOTE: Prüfen, ob es sauber läuft! + if self._th_mainloop is not None and self._th_mainloop.is_alive(): + self._th_mainloop.join(1) while len(self._lst_refresh) > 0: dev = self._lst_refresh.pop() dev._selfupdate = False if not self._monitoring: self.writeprocimg(dev) - self._looprunning = False + # NOTE: Loops müssen sich selber IMMER sauber beenden def get_jconfigrsc(self): """Laed die piCotry Konfiguration und erstellt ein . @@ -483,7 +485,7 @@ class RevPiModIO(object): signal(SIGINT, self.__evt_exit) signal(SIGTERM, self.__evt_exit) - def mainloop(self, freeze=False, blocking=True): + def mainloop(self, blocking=True): """Startet den Mainloop mit Eventueberwachung. Der aktuelle Programmthread wird hier bis Aufruf von @@ -492,19 +494,12 @@ class RevPiModIO(object): einem Event registrierten, IOs. Wird eine Veraenderung erkannt, fuert das Programm die dazugehoerigen Funktionen der Reihe nach aus. - Wenn der Parameter "freeze" mit True angegeben ist, wird die - Prozessabbildsynchronisierung angehalten bis alle Eventfunktionen - ausgefuehrt wurden. Inputs behalten fuer die gesamte Dauer ihren - aktuellen Wert und Outputs werden erst nach Durchlauf aller Funktionen - in das Prozessabbild geschrieben. - Wenn der Parameter "blocking" mit False angegeben wird, aktiviert dies die Eventueberwachung und blockiert das Programm NICHT an der Stelle des Aufrufs. Eignet sich gut fuer die GUI Programmierung, wenn Events vom RevPi benoetigt werden, aber das Programm weiter ausgefuehrt werden soll. - @param freeze Wenn True, Prozessabbildsynchronisierung anhalten @param blocking Wenn False, blockiert das Programm NICHT @return None @@ -522,8 +517,7 @@ class RevPiModIO(object): # Thread erstellen, wenn nicht blockieren soll if not blocking: self._th_mainloop = Thread( - target=self.mainloop, - kwargs={"freeze": freeze, "blocking": True} + target=self.mainloop, kwargs={"blocking": True} ) self._th_mainloop.start() return @@ -538,129 +532,37 @@ class RevPiModIO(object): dev._ba_datacp = dev._ba_devdata[:] dev._filelock.release() - lst_fire = [] - dict_delay = {} + # ImgWriter mit Eventüberwachung aktivieren + self._imgwriter._collect_events(True) + e = None + while not self._exit.is_set(): - # Auf neue Daten warten und nur ausführen wenn set() - if not self._imgwriter.newdata.wait(2.5): + try: + tup_fire = self._imgwriter._eventq.get(timeout=1) + if tup_fire[0][2]: + th = helpermodule.EventCallback( + tup_fire[0][0], tup_fire[1], tup_fire[2] + ) + th.start() + else: + # Direct callen da Prüfung in io.IOBase.reg_event ist + tup_fire[0][0](tup_fire[1], tup_fire[2]) + except Empty: if not self._exit.is_set() and not self._imgwriter.is_alive(): self.exit(full=False) - self._looprunning = False - raise RuntimeError("autorefresh thread not running") - continue - - self._imgwriter.newdata.clear() - - # Während Auswertung refresh sperren - self._imgwriter.lck_refresh.acquire() - - for dev in self._lst_refresh: - - if len(dev._dict_events) == 0 \ - or dev._ba_datacp == dev._ba_devdata: - continue - - for io_event in dev._dict_events: - - if dev._ba_datacp[io_event._slc_address] == \ - dev._ba_devdata[io_event._slc_address]: - continue - - if io_event._bitaddress >= 0: - boolcp = bool(int.from_bytes( - dev._ba_datacp[io_event._slc_address], - byteorder=io_event._byteorder - ) & 1 << io_event._bitaddress) - boolor = bool(int.from_bytes( - dev._ba_devdata[io_event._slc_address], - byteorder=io_event._byteorder - ) & 1 << io_event._bitaddress) - - if boolor == boolcp: - continue - - for regfunc in dev._dict_events[io_event]: - if regfunc[1] == BOTH \ - or regfunc[1] == RISING and boolor \ - or regfunc[1] == FALLING and not boolor: - if regfunc[3] == 0: - lst_fire.append(( - regfunc, io_event._name, io_event.value - )) - else: - # Verzögertes Event in dict einfügen - tupfire = ( - regfunc, io_event._name, io_event.value - ) - if regfunc[4] or tupfire not in dict_delay: - dict_delay[tupfire] = ceil( - regfunc[3] / - self._imgwriter.refresh - ) - else: - for regfunc in dev._dict_events[io_event]: - if regfunc[3] == 0: - lst_fire.append( - (regfunc, io_event._name, io_event.value) - ) - else: - # Verzögertes Event in dict einfügen - if regfunc[4] or regfunc not in dict_delay: - dict_delay[( - regfunc, io_event._name, io_event.value - )] = ceil( - regfunc[3] / self._imgwriter.refresh - ) - - # Nach Verarbeitung aller IOs die Bytes kopieren - dev._filelock.acquire() - dev._ba_datacp = dev._ba_devdata[:] - dev._filelock.release() - - # Refreshsperre aufheben wenn nicht freeze - if not freeze: - self._imgwriter.lck_refresh.release() - - # EventTuple: - # ((func, edge, as_thread, delay, löschen), ioname, iovalue) - - # Verzögerte Events prüfen - for tup_fire in list(dict_delay.keys()): - if tup_fire[0][4] \ - and getattr(self.io, tup_fire[1]).value != tup_fire[2]: - del dict_delay[tup_fire] - else: - dict_delay[tup_fire] -= 1 - if dict_delay[tup_fire] <= 0: - # Verzögertes Event übernehmen und löschen - lst_fire.append(tup_fire) - del dict_delay[tup_fire] - - # Erst nach Datenübernahme alle Events feuern - try: - while len(lst_fire) > 0: - tup_fire = lst_fire.pop() - if tup_fire[0][2]: - th = helpermodule.EventCallback( - tup_fire[0][0], tup_fire[1], tup_fire[2] - ) - th.start() - else: - # Direct callen da Prüfung in io.IOBase.reg_event ist - tup_fire[0][0](tup_fire[1], tup_fire[2]) - except Exception as e: - if self._imgwriter.lck_refresh.locked(): - self._imgwriter.lck_refresh.release() + e = RuntimeError("autorefresh thread not running") + except Exception as ex: self.exit(full=False) - self._looprunning = False - raise e - - # Refreshsperre aufheben wenn freeze - if freeze: - self._imgwriter.lck_refresh.release() + e = ex # Mainloop verlassen + self._imgwriter._collect_events(False) self._looprunning = False + self._th_mainloop = None + + # Fehler prüfen + if e is not None: + raise e def readprocimg(self, device=None): """Einlesen aller Inputs aller/eines Devices vom Prozessabbild. From 8b0e4652050e28a776ac6216c047af3938913d30 Mon Sep 17 00:00:00 2001 From: NaruX Date: Wed, 15 Nov 2017 11:32:56 +0100 Subject: [PATCH 2/5] Fehlerabfang und Leistung in ProcimgWriter.run() verbessert --- revpimodio2/helper.py | 108 +++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 59 deletions(-) diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index 6d9213b..151ce30 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -401,24 +401,10 @@ class ProcimgWriter(Thread): """Startet die automatische Prozessabbildsynchronisierung.""" fh = self._modio._create_myfh() self._adjwait = self._refresh + while not self._work.is_set(): ot = default_timer() - # Verzögerte Events prüfen - # NOTE: Darf dies VOR der Aktualisierung der Daten gemacht werden? - if self.__eventwork: - for tup_fire in list(self.__dict_delay.keys()): - if tup_fire[0][4] \ - and getattr(self._modio.io, tup_fire[1]).value != \ - tup_fire[2]: - del self.__dict_delay[tup_fire] - else: - self.__dict_delay[tup_fire] -= 1 - if self.__dict_delay[tup_fire] <= 1: - # Verzögertes Event übernehmen und löschen - self._eventq.put(tup_fire, False) - del self.__dict_delay[tup_fire] - # Lockobjekt holen und Fehler werfen, wenn nicht schnell genug if not self.lck_refresh.acquire(timeout=self._adjwait): warnings.warn( @@ -427,63 +413,67 @@ class ProcimgWriter(Thread): ), RuntimeWarning ) + # Verzögerte Events pausieren an dieser Stelle continue try: fh.seek(0) bytesbuff = bytearray(fh.read(self._modio._length)) + + if self._modio._monitoring: + # Inputs und Outputs in Puffer + for dev in self._modio._lst_refresh: + dev._filelock.acquire() + dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] + if self.__eventwork \ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) + dev._filelock.release() + else: + # Inputs in Puffer, Outputs in Prozessabbild + for dev in self._modio._lst_refresh: + with dev._filelock: + dev._ba_devdata[dev._slc_inp] = \ + bytesbuff[dev._slc_inpoff] + if self.__eventwork\ + and len(dev._dict_events) > 0 \ + and dev._ba_datacp != dev._ba_devdata: + self.__check_change(dev) + + fh.seek(dev._slc_outoff.start) + fh.write(dev._ba_devdata[dev._slc_out]) + + if self._modio._buffedwrite: + fh.flush() + except IOError: self._gotioerror() self.lck_refresh.release() - self._work.wait(self._adjwait) continue - if self._modio._monitoring: - # Inputs und Outputs in Puffer - for dev in self._modio._lst_refresh: - dev._filelock.acquire() - dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] - if self.__eventwork \ - and len(dev._dict_events) > 0 \ - and dev._ba_datacp != dev._ba_devdata: - self.__check_change(dev) - dev._filelock.release() else: - # Inputs in Puffer, Outputs in Prozessabbild - ioerr = False - for dev in self._modio._lst_refresh: - dev._filelock.acquire() - dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] - if self.__eventwork\ - and len(dev._dict_events) > 0 \ - and dev._ba_datacp != dev._ba_devdata: - self.__check_change(dev) + # Alle aufwecken + self.lck_refresh.release() + self.newdata.set() - try: - fh.seek(dev._slc_outoff.start) - fh.write(dev._ba_devdata[dev._slc_out]) - except IOError: - ioerr = True - finally: - dev._filelock.release() + finally: + # Verzögerte Events prüfen + if self.__eventwork: + for tup_fire in list(self.__dict_delay.keys()): + if tup_fire[0][4] \ + and getattr(self._modio.io, tup_fire[1]).value != \ + tup_fire[2]: + del self.__dict_delay[tup_fire] + else: + self.__dict_delay[tup_fire] -= 1 + if self.__dict_delay[tup_fire] <= 0: + # Verzögertes Event übernehmen und löschen + self._eventq.put(tup_fire, False) + del self.__dict_delay[tup_fire] - if self._modio._buffedwrite: - try: - fh.flush() - except IOError: - ioerr = True - - if ioerr: - self._gotioerror() - self.lck_refresh.release() - self._work.wait(self._adjwait) - continue - - self.lck_refresh.release() - - # Alle aufwecken - self.newdata.set() - self._work.wait(self._adjwait) + # Refresh abwarten + self._work.wait(self._adjwait) # Wartezeit anpassen um echte self._refresh zu erreichen if default_timer() - ot >= self._refresh: From b16af483dca34d1a0ecfe566d50a36f9b41ff988 Mon Sep 17 00:00:00 2001 From: NaruX Date: Sat, 2 Dec 2017 14:36:04 +0100 Subject: [PATCH 3/5] =?UTF-8?q?Eigene=20EventQueue=20f=C3=BCr=20Threads,?= =?UTF-8?q?=20um=20diese=20direkt=20zu=20starten?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/revpimodio2.helper.html | 9 +++++++ revpimodio2/helper.py | 53 ++++++++++++++++++++++++++++++------- revpimodio2/modio.py | 10 ++----- 3 files changed, 55 insertions(+), 17 deletions(-) diff --git a/doc/revpimodio2.helper.html b/doc/revpimodio2.helper.html index 8907d61..c1c26d0 100644 --- a/doc/revpimodio2.helper.html +++ b/doc/revpimodio2.helper.html @@ -443,6 +443,9 @@ Methods __check_change Findet Aenderungen fuer die Eventueberwachung. +__exec_th +Fuehrt Events aus, die als Thread registriert wurden. + _collect_events Aktiviert oder Deaktiviert die Eventueberwachung. @@ -493,6 +496,12 @@ ProcimgWriter.__check_change __check_change(dev)

Findet Aenderungen fuer die Eventueberwachung. +

+

+ProcimgWriter.__exec_th

+__exec_th() +

+Fuehrt Events aus, die als Thread registriert wurden.

ProcimgWriter._collect_events

diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index 151ce30..bca9ef2 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -284,6 +284,8 @@ class ProcimgWriter(Thread): @param parentmodio Parent Object""" super().__init__() self.__dict_delay = {} + self.__eventth = None + self.__eventqth = queue.Queue() self.__eventwork = False self._adjwait = 0 self._eventq = queue.Queue() @@ -323,10 +325,16 @@ class ProcimgWriter(Thread): or regfunc[1] == RISING and boolor \ or regfunc[1] == FALLING and not boolor: if regfunc[3] == 0: - self._eventq.put( - (regfunc, io_event._name, io_event.value), - False - ) + if regfunc[2]: + self.__eventqth.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) else: # Verzögertes Event in dict einfügen tupfire = ( @@ -339,10 +347,16 @@ class ProcimgWriter(Thread): else: for regfunc in dev._dict_events[io_event]: if regfunc[3] == 0: - self._eventq.put( - (regfunc, io_event._name, io_event.value), - False - ) + if regfunc[2]: + self.__eventqth.put( + (regfunc, io_event._name, io_event.value), + False + ) + else: + self._eventq.put( + (regfunc, io_event._name, io_event.value), + False + ) else: # Verzögertes Event in dict einfügen tupfire = ( @@ -356,6 +370,19 @@ class ProcimgWriter(Thread): # Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) dev._ba_datacp = dev._ba_devdata[:] + def __exec_th(self): + """Fuehrt Events aus, die als Thread registriert wurden.""" + while self.__eventwork: + try: + tup_fireth = self.__eventqth.get(timeout=1) + th = EventCallback( + tup_fireth[0][0], tup_fireth[1], tup_fireth[2] + ) + th.start() + # TODO: Error handling + except queue.Empty: + pass + def _collect_events(self, value): """Aktiviert oder Deaktiviert die Eventueberwachung. @param value True aktiviert / False deaktiviert""" @@ -365,8 +392,12 @@ class ProcimgWriter(Thread): if self.__eventwork != value: with self.lck_refresh: self.__eventwork = value + self.__eventqth = queue.Queue() self._eventq = queue.Queue() self.__dict_delay = {} + if value: + self.__eventth = Thread(target=self.__exec_th) + self.__eventth.start() def _get_ioerrors(self): """Ruft aktuelle Anzahl der Fehler ab. @@ -469,7 +500,10 @@ class ProcimgWriter(Thread): self.__dict_delay[tup_fire] -= 1 if self.__dict_delay[tup_fire] <= 0: # Verzögertes Event übernehmen und löschen - self._eventq.put(tup_fire, False) + if tup_fire[0][2]: + self.__eventqth.put(tup_fire, False) + else: + self._eventq.put(tup_fire, False) del self.__dict_delay[tup_fire] # Refresh abwarten @@ -490,6 +524,7 @@ class ProcimgWriter(Thread): self._adjwait += 0.001 # Alle am Ende erneut aufwecken + self._collect_events(False) self.newdata.set() fh.close() diff --git a/revpimodio2/modio.py b/revpimodio2/modio.py index b974995..f88cce0 100644 --- a/revpimodio2/modio.py +++ b/revpimodio2/modio.py @@ -539,14 +539,8 @@ class RevPiModIO(object): while not self._exit.is_set(): try: tup_fire = self._imgwriter._eventq.get(timeout=1) - if tup_fire[0][2]: - th = helpermodule.EventCallback( - tup_fire[0][0], tup_fire[1], tup_fire[2] - ) - th.start() - else: - # Direct callen da Prüfung in io.IOBase.reg_event ist - tup_fire[0][0](tup_fire[1], tup_fire[2]) + # Direct callen da Prüfung in io.IOBase.reg_event ist + tup_fire[0][0](tup_fire[1], tup_fire[2]) except Empty: if not self._exit.is_set() and not self._imgwriter.is_alive(): self.exit(full=False) From 87a648cbc68d5b079047edd3325858e7d80df02a Mon Sep 17 00:00:00 2001 From: NaruX Date: Sat, 2 Dec 2017 16:17:02 +0100 Subject: [PATCH 4/5] =?UTF-8?q?IOEvent-Klasse=20eingebaut=20-=20Ersetzt=20?= =?UTF-8?q?tuple()=20reg=5Fevent,=20reg=5Ftimerevent=20=C3=BCber=20zentral?= =?UTF-8?q?e=20Funktion=20verwaltet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/revpimodio2.io.html | 69 +++++++++++++++ eric-revpimodio2.api | 1 + revpimodio2/helper.py | 32 +++---- revpimodio2/io.py | 180 +++++++++++++++++----------------------- revpimodio2/modio.py | 4 +- 5 files changed, 166 insertions(+), 120 deletions(-) diff --git a/doc/revpimodio2.io.html b/doc/revpimodio2.io.html index 223107a..d8ca045 100644 --- a/doc/revpimodio2.io.html +++ b/doc/revpimodio2.io.html @@ -24,6 +24,9 @@ Classes IOBase Basisklasse fuer alle IO-Objekte. +IOEvent +Basisklasse fuer IO-Events. + IOList Basisklasse fuer direkten Zugriff auf IO Objekte. @@ -141,6 +144,9 @@ Methods __len__ Gibt die Bytelaenge des IO zurueck. +__reg_xevent +Verwaltet reg_event und reg_timerevent. + __str__ -Wert der Klasse. @@ -228,6 +234,29 @@ Gibt die Bytelaenge des IO zurueck.
Bytelaenge des IO - 0 bei BITs
+
+

+IOBase.__reg_xevent

+__reg_xevent(func, delay, edge, as_thread, overwrite) +

+Verwaltet reg_event und reg_timerevent. +

+
func
+
+Funktion die bei Aenderung aufgerufen werden soll +
delay
+
+Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung +
edge
+
+Ausfuehren bei RISING, FALLING or BOTH Wertaenderung +
as_thread
+
+Bei True, Funktion als EventCallback-Thread ausfuehren +
overwrite
+
+Wenn True, wird Event bei ueberschrieben +

IOBase.__str__

@@ -468,6 +497,46 @@ Zeit in ms nach der abgebrochen wird
Up


+ +

IOEvent

+

+Basisklasse fuer IO-Events. +

+

+Derived from

+object +

+Class Attributes

+ + +
None
+

+Class Methods

+ + +
None
+

+Methods

+ + + + + +
IOEventInit IOEvent class.
+

+Static Methods

+ + +
None
+ +

+IOEvent (Constructor)

+IOEvent(func, edge, as_thread, delay, overwrite) +

+Init IOEvent class. +

+
Up
+

IOList

diff --git a/eric-revpimodio2.api b/eric-revpimodio2.api index ade70dc..6c5077f 100644 --- a/eric-revpimodio2.api +++ b/eric-revpimodio2.api @@ -105,6 +105,7 @@ revpimodio2.io.IOBase.unreg_event?4(func=None, edge=None) revpimodio2.io.IOBase.value?7 revpimodio2.io.IOBase.wait?4(edge=BOTH, exitevent=None, okvalue=None, timeout=0) revpimodio2.io.IOBase?1(parentdevice, valuelist, iotype, byteorder, signed) +revpimodio2.io.IOEvent?1(func, edge, as_thread, delay, overwrite) revpimodio2.io.IOList._private_register_new_io_object?5(new_io) revpimodio2.io.IOList?1() revpimodio2.io.IntIO._get_signed?5() diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index bca9ef2..b2375f5 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -321,11 +321,11 @@ class ProcimgWriter(Thread): continue for regfunc in dev._dict_events[io_event]: - if regfunc[1] == BOTH \ - or regfunc[1] == RISING and boolor \ - or regfunc[1] == FALLING and not boolor: - if regfunc[3] == 0: - if regfunc[2]: + if regfunc.edge == BOTH \ + or regfunc.edge == RISING and boolor \ + or regfunc.edge == FALLING and not boolor: + if regfunc.delay == 0: + if regfunc.as_thread: self.__eventqth.put( (regfunc, io_event._name, io_event.value), False @@ -340,14 +340,15 @@ class ProcimgWriter(Thread): tupfire = ( regfunc, io_event._name, io_event.value ) - if regfunc[4] or tupfire not in self.__dict_delay: + if regfunc.overwrite \ + or tupfire not in self.__dict_delay: self.__dict_delay[tupfire] = ceil( - regfunc[3] / 1000 / self._refresh + regfunc.delay / 1000 / self._refresh ) else: for regfunc in dev._dict_events[io_event]: - if regfunc[3] == 0: - if regfunc[2]: + if regfunc.delay == 0: + if regfunc.as_thread: self.__eventqth.put( (regfunc, io_event._name, io_event.value), False @@ -362,9 +363,10 @@ class ProcimgWriter(Thread): tupfire = ( regfunc, io_event._name, io_event.value ) - if regfunc[4] or tupfire not in self.__dict_delay: + if regfunc.overwrite \ + or tupfire not in self.__dict_delay: self.__dict_delay[tupfire] = ceil( - regfunc[3] / 1000 / self._refresh + regfunc.delay / 1000 / self._refresh ) # Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) @@ -376,10 +378,9 @@ class ProcimgWriter(Thread): try: tup_fireth = self.__eventqth.get(timeout=1) th = EventCallback( - tup_fireth[0][0], tup_fireth[1], tup_fireth[2] + tup_fireth[0].func, tup_fireth[1], tup_fireth[2] ) th.start() - # TODO: Error handling except queue.Empty: pass @@ -492,7 +493,7 @@ class ProcimgWriter(Thread): # Verzögerte Events prüfen if self.__eventwork: for tup_fire in list(self.__dict_delay.keys()): - if tup_fire[0][4] \ + if tup_fire[0].overwrite \ and getattr(self._modio.io, tup_fire[1]).value != \ tup_fire[2]: del self.__dict_delay[tup_fire] @@ -500,7 +501,7 @@ class ProcimgWriter(Thread): self.__dict_delay[tup_fire] -= 1 if self.__dict_delay[tup_fire] <= 0: # Verzögertes Event übernehmen und löschen - if tup_fire[0][2]: + if tup_fire[0].as_thread: self.__eventqth.put(tup_fire, False) else: self._eventq.put(tup_fire, False) @@ -530,6 +531,7 @@ class ProcimgWriter(Thread): def stop(self): """Beendet die automatische Prozessabbildsynchronisierung.""" + self._collect_events(False) self._work.set() def set_maxioerrors(self, value): diff --git a/revpimodio2/io.py b/revpimodio2/io.py index 1bc8946..4e50f5b 100644 --- a/revpimodio2/io.py +++ b/revpimodio2/io.py @@ -11,6 +11,19 @@ from threading import Event from revpimodio2 import RISING, FALLING, BOTH, INP, OUT, MEM, consttostr +class IOEvent(object): + + """Basisklasse fuer IO-Events.""" + + def __init__(self, func, edge, as_thread, delay, overwrite): + """Init IOEvent class.""" + self.as_thread = as_thread + self.delay = delay + self.edge = edge + self.func = func + self.overwrite = overwrite + + class IOList(object): """Basisklasse fuer direkten Zugriff auf IO Objekte.""" @@ -317,6 +330,67 @@ class IOBase(object): @return Namen des IOs""" return self._name + def __reg_xevent(self, func, delay, edge, as_thread, overwrite): + """Verwaltet reg_event und reg_timerevent. + + @param func Funktion die bei Aenderung aufgerufen werden soll + @param delay Verzoegerung in ms zum Ausloesen - auch bei Wertaenderung + @param edge Ausfuehren bei RISING, FALLING or BOTH Wertaenderung + @param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren + @param overwrite Wenn True, wird Event bei ueberschrieben + + """ + # Prüfen ob Funktion callable ist + if not callable(func): + raise AttributeError( + "registered function '{}' is not callable".format(func) + ) + if type(delay) != int or delay < 0: + raise AttributeError( + "'delay' must be and greater or equal 0" + ) + if edge != BOTH and self._bitaddress < 0: + raise AttributeError( + "parameter 'edge' can be used with bit io objects only" + ) + + if self not in self._parentdevice._dict_events: + self._parentdevice._dict_events[self] = \ + [IOEvent(func, edge, as_thread, delay, overwrite)] + else: + # Prüfen ob Funktion schon registriert ist + for regfunc in self._parentdevice._dict_events[self]: + if regfunc.func != func: + # Nächsten Eintrag testen + continue + + if edge == BOTH or regfunc.edge == BOTH: + if self._bitaddress < 0: + raise AttributeError( + "io '{}' with function '{}' already in list." + "".format(self._name, func) + ) + else: + raise AttributeError( + "io '{}' with function '{}' already in list with " + "edge '{}' - edge '{}' not allowed anymore".format( + self._name, func, + consttostr(regfunc.edge), consttostr(edge) + ) + ) + elif regfunc.edge == edge: + raise AttributeError( + "io '{}' with function '{}' for given edge '{}' " + "already in list".format( + self._name, func, consttostr(edge) + ) + ) + + # Eventfunktion einfügen + self._parentdevice._dict_events[self].append( + IOEvent(func, edge, as_thread, delay, overwrite) + ) + def _get_address(self): """Gibt die absolute Byteadresse im Prozessabbild zurueck. @return Absolute Byteadresse""" @@ -365,56 +439,7 @@ class IOBase(object): @param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren """ - # Prüfen ob Funktion callable ist - if not callable(func): - raise AttributeError( - "registered function '{}' is not callable".format(func) - ) - if type(delay) != int or delay < 0: - raise AttributeError( - "'delay' must be and greater or equal 0" - ) - if edge != BOTH and self._bitaddress < 0: - raise AttributeError( - "parameter 'edge' can be used with bit io objects only" - ) - - if self not in self._parentdevice._dict_events: - self._parentdevice._dict_events[self] = \ - [(func, edge, as_thread, delay, True)] - else: - # Prüfen ob Funktion schon registriert ist - for regfunc in self._parentdevice._dict_events[self]: - if regfunc[0] != func: - # Nächsten Eintrag testen - continue - - if edge == BOTH or regfunc[1] == BOTH: - if self._bitaddress < 0: - raise AttributeError( - "io '{}' with function '{}' already in list." - "".format(self._name, func) - ) - else: - raise AttributeError( - "io '{}' with function '{}' already in list with " - "edge '{}' - edge '{}' not allowed anymore".format( - self._name, func, - consttostr(regfunc[1]), consttostr(edge) - ) - ) - elif regfunc[1] == edge: - raise AttributeError( - "io '{}' with function '{}' for given edge '{}' " - "already in list".format( - self._name, func, consttostr(edge) - ) - ) - - # Eventfunktion einfügen - self._parentdevice._dict_events[self].append( - (func, edge, as_thread, delay, True) - ) + self.__reg_xevent(func, delay, edge, as_thread, True) def reg_timerevent(self, func, delay, edge=BOTH, as_thread=False): """Registriert fuer IO einen Timer, welcher nach delay func ausfuehrt. @@ -435,56 +460,7 @@ class IOBase(object): @param as_thread Bei True, Funktion als EventCallback-Thread ausfuehren """ - # Prüfen ob Funktion callable ist - if not callable(func): - raise AttributeError( - "registered function '{}' is not callable".format(func) - ) - if type(delay) != int or delay < 0: - raise AttributeError( - "'delay' must be and greater or equal 0" - ) - if edge != BOTH and self._bitaddress < 0: - raise AttributeError( - "parameter 'edge' can be used with bit io objects only" - ) - - if self not in self._parentdevice._dict_events: - self._parentdevice._dict_events[self] = \ - [(func, edge, as_thread, delay, False)] - else: - # Prüfen ob Funktion schon registriert ist - for regfunc in self._parentdevice._dict_events[self]: - if regfunc[0] != func: - # Nächsten Eintrag testen - continue - - if edge == BOTH or regfunc[1] == BOTH: - if self._bitaddress < 0: - raise AttributeError( - "io '{}' with function '{}' already in list." - "".format(self._name, func) - ) - else: - raise AttributeError( - "io '{}' with function '{}' already in list with " - "edge '{}' - edge '{}' not allowed anymore".format( - self._name, func, - consttostr(regfunc[1]), consttostr(edge) - ) - ) - elif regfunc[1] == edge: - raise AttributeError( - "io '{}' with function '{}' for given edge '{}' " - "already in list".format( - self._name, func, consttostr(edge) - ) - ) - - # Eventfunktion einfügen - self._parentdevice._dict_events[self].append( - (func, edge, as_thread, delay, False) - ) + self.__reg_xevent(func, delay, edge, as_thread, False) def replace_io(self, name, frm, **kwargs): """Ersetzt bestehenden IO mit Neuem. @@ -613,8 +589,8 @@ class IOBase(object): else: newlist = [] for regfunc in self._parentdevice._dict_events[self]: - if regfunc[0] != func or edge is not None \ - and regfunc[1] != edge: + if regfunc.func != func or edge is not None \ + and regfunc.edge != edge: newlist.append(regfunc) diff --git a/revpimodio2/modio.py b/revpimodio2/modio.py index f88cce0..f6c740e 100644 --- a/revpimodio2/modio.py +++ b/revpimodio2/modio.py @@ -418,7 +418,6 @@ class RevPiModIO(object): if self._imgwriter is not None and self._imgwriter.is_alive(): self._imgwriter.stop() self._imgwriter.join(self._imgwriter._refresh) - # NOTE: Prüfen, ob es sauber läuft! if self._th_mainloop is not None and self._th_mainloop.is_alive(): self._th_mainloop.join(1) while len(self._lst_refresh) > 0: @@ -426,7 +425,6 @@ class RevPiModIO(object): dev._selfupdate = False if not self._monitoring: self.writeprocimg(dev) - # NOTE: Loops müssen sich selber IMMER sauber beenden def get_jconfigrsc(self): """Laed die piCotry Konfiguration und erstellt ein . @@ -540,7 +538,7 @@ class RevPiModIO(object): try: tup_fire = self._imgwriter._eventq.get(timeout=1) # Direct callen da Prüfung in io.IOBase.reg_event ist - tup_fire[0][0](tup_fire[1], tup_fire[2]) + tup_fire[0].func(tup_fire[1], tup_fire[2]) except Empty: if not self._exit.is_set() and not self._imgwriter.is_alive(): self.exit(full=False) From e4f2c95dda323306e4f54ae9ed31f0d19fde5385 Mon Sep 17 00:00:00 2001 From: NaruX Date: Sat, 2 Dec 2017 17:03:45 +0100 Subject: [PATCH 5/5] =?UTF-8?q?EventThread=20vor=20Mehrfachstarten=20gesch?= =?UTF-8?q?=C3=BCtzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- revpimodio2/helper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/revpimodio2/helper.py b/revpimodio2/helper.py index b2375f5..592134f 100644 --- a/revpimodio2/helper.py +++ b/revpimodio2/helper.py @@ -284,7 +284,7 @@ class ProcimgWriter(Thread): @param parentmodio Parent Object""" super().__init__() self.__dict_delay = {} - self.__eventth = None + self.__eventth = Thread(target=self.__exec_th) self.__eventqth = queue.Queue() self.__eventwork = False self._adjwait = 0 @@ -396,7 +396,9 @@ class ProcimgWriter(Thread): self.__eventqth = queue.Queue() self._eventq = queue.Queue() self.__dict_delay = {} - if value: + + # Threadmanagement + if value and not self.__eventth.is_alive(): self.__eventth = Thread(target=self.__exec_th) self.__eventth.start()