From 7f712aaf6397072a0b95ffa24a3d79541f900306 Mon Sep 17 00:00:00 2001 From: NaruX Date: Mon, 9 Apr 2018 13:56:11 +0200 Subject: [PATCH 1/4] =?UTF-8?q?Prozessabbild=C3=BCbertragung=20per=20MQTT?= =?UTF-8?q?=20begonnen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/etc/revpipyload/revpipyload.conf | 11 ++ doc/index.html | 3 + doc/mqttserver.html | 145 ++++++++++++++++++++++++++ doc/revpipyload.html | 25 ++++- eric-revpipyload.api | 8 ++ eric-revpipyload.bas | 1 + revpipyload.e4p | 3 +- revpipyload/mqttserver.py | 129 +++++++++++++++++++++++ revpipyload/revpipyload.py | 69 ++++++++++++ 9 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 doc/mqttserver.html create mode 100644 revpipyload/mqttserver.py diff --git a/data/etc/revpipyload/revpipyload.conf b/data/etc/revpipyload/revpipyload.conf index 8afb9c1..c1fa1eb 100644 --- a/data/etc/revpipyload/revpipyload.conf +++ b/data/etc/revpipyload/revpipyload.conf @@ -22,3 +22,14 @@ port = 55234 xmlrpc = 0 aclfile = /etc/revpipyload/aclxmlrpc.conf bindip = * + +[MQTT] +mqtt = 0 +basetopic = revpi/data +sendinterval = 15 +host = +port = 1883 +tls_set = 0 +username = +password = +client_id = diff --git a/doc/index.html b/doc/index.html index b9be723..2986894 100644 --- a/doc/index.html +++ b/doc/index.html @@ -27,6 +27,9 @@ Modules logsystem Modul fuer die Verwaltung der Logdateien. +mqttserver +Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit. + picontrolserver Modul fuer die Verwaltung der PLC-Slave Funktionen. diff --git a/doc/mqttserver.html b/doc/mqttserver.html new file mode 100644 index 0000000..841d5e1 --- /dev/null +++ b/doc/mqttserver.html @@ -0,0 +1,145 @@ + + +mqttserver + + + +

+mqttserver

+

+Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit. +

+

+Global Attributes

+ + +
None
+

+Classes

+ + + + + +
MqttServerServer fuer die Uebertragung des Prozessabbilds per MQTT.
+

+Functions

+ + +
None
+

+ +

MqttServer

+

+Server fuer die Uebertragung des Prozessabbilds per MQTT. +

+

+Derived from

+Thread +

+Class Attributes

+ + +
None
+

+Class Methods

+ + +
None
+

+Methods

+ + + + + + + + + + + + + + + + + + + + +
MqttServerInit MqttServer class.
_on_connectVerbindung zu MQTT Broker.
_on_messageSendet piCtory Konfiguration.
newlogfileKonfiguriert die FileHandler auf neue Logdatei.
runStartet die Uebertragung per MQTT.
stopStoppt die Uebertragung per MQTT.
+

+Static Methods

+ + +
None
+ +

+MqttServer (Constructor)

+MqttServer(basetopic, sendinterval, host, port=1883, tls_set=False, username="", password=None, client_id="") +

+Init MqttServer class. +

+
basetopic
+
+Basis-Topic fuer Datenaustausch +
sendinterval
+
+Prozessabbild alle n Sekunden senden +
host
+
+Adresse des MQTT-Servers +
port
+
+Portnummer des MQTT-Servers +
keepalive
+
+MQTT Ping bei leerlauf +
tls_set
+
+TLS fuer Verbindung zum MQTT-Server verwenden +
username
+
+Optional Benutzername fuer MQTT-Server +
password
+
+Optional Password fuer MQTT-Server +
client_id
+
+MQTT ClientID, wenn leer automatisch random erzeugung +
+
+

+MqttServer._on_connect

+_on_connect(client, userdata, flags, rc) +

+Verbindung zu MQTT Broker. +

+

+MqttServer._on_message

+_on_message(client, userdata, msg) +

+Sendet piCtory Konfiguration. +

+

+MqttServer.newlogfile

+newlogfile() +

+Konfiguriert die FileHandler auf neue Logdatei. +

+

+MqttServer.run

+run() +

+Startet die Uebertragung per MQTT. +

+

+MqttServer.stop

+stop() +

+Stoppt die Uebertragung per MQTT. +

+
Up
+
+ \ No newline at end of file diff --git a/doc/revpipyload.html b/doc/revpipyload.html index e478cd8..86682ad 100644 --- a/doc/revpipyload.html +++ b/doc/revpipyload.html @@ -87,6 +87,9 @@ Methods _loadconfig Load configuration file and setup modul. +_plcmqtt +Konfiguriert den MQTT-Thread fuer die Ausfuehrung. + _plcslave Erstellt den PlcSlave-Server Thread. @@ -111,6 +114,9 @@ Methods stop Stop revpipyload. +stop_plcmqtt +Beendet MQTT Sender. + stop_plcprogram Beendet PLC Programm. @@ -217,7 +223,18 @@ RevPiPyLoad._loadconfig _loadconfig()

Load configuration file and setup modul. -

+

+

+RevPiPyLoad._plcmqtt

+_plcmqtt() +

+Konfiguriert den MQTT-Thread fuer die Ausfuehrung. +

+
Returns:
+
+MQTT-Thread Object or None +
+

RevPiPyLoad._plcslave

_plcslave() @@ -288,6 +305,12 @@ RevPiPyLoad.stop stop()

Stop revpipyload. +

+

+RevPiPyLoad.stop_plcmqtt

+stop_plcmqtt() +

+Beendet MQTT Sender.

RevPiPyLoad.stop_plcprogram

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index 354bb8c..0a28dec 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -12,6 +12,12 @@ logsystem.PipeLogwriter.newlogfile?4() logsystem.PipeLogwriter.run?4() logsystem.PipeLogwriter.stop?4() logsystem.PipeLogwriter?1(logfilename) +mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc) +mqttserver.MqttServer._on_message?5(client, userdata, msg) +mqttserver.MqttServer.newlogfile?4() +mqttserver.MqttServer.run?4() +mqttserver.MqttServer.stop?4() +mqttserver.MqttServer?1(basetopic, sendinterval, host, port=1883, tls_set=False, username="", password=None, client_id="") picontrolserver.RevPiSlave.check_connectedacl?4() picontrolserver.RevPiSlave.newlogfile?4() picontrolserver.RevPiSlave.run?4() @@ -52,6 +58,7 @@ proginit.startdir?7 revpipyload.RevPiPyLoad._check_mustrestart_plcprogram?5() revpipyload.RevPiPyLoad._check_mustrestart_plcslave?5() revpipyload.RevPiPyLoad._loadconfig?5() +revpipyload.RevPiPyLoad._plcmqtt?5() revpipyload.RevPiPyLoad._plcslave?5() revpipyload.RevPiPyLoad._plcthread?5() revpipyload.RevPiPyLoad._sigexit?5(signum, frame) @@ -61,6 +68,7 @@ revpipyload.RevPiPyLoad.packapp?4(mode="tar", pictory=False) revpipyload.RevPiPyLoad.root?7 revpipyload.RevPiPyLoad.start?4() revpipyload.RevPiPyLoad.stop?4() +revpipyload.RevPiPyLoad.stop_plcmqtt?4() revpipyload.RevPiPyLoad.stop_plcprogram?4() revpipyload.RevPiPyLoad.stop_plcslave?4() revpipyload.RevPiPyLoad.stop_xmlrpcserver?4() diff --git a/eric-revpipyload.bas b/eric-revpipyload.bas index 9333951..0fe2ca4 100644 --- a/eric-revpipyload.bas +++ b/eric-revpipyload.bas @@ -1,3 +1,4 @@ +MqttServer Thread PipeLogwriter Thread RevPiPlc Thread RevPiSlave Thread diff --git a/revpipyload.e4p b/revpipyload.e4p index 2fc0562..d10b018 100644 --- a/revpipyload.e4p +++ b/revpipyload.e4p @@ -1,7 +1,7 @@ - + en_US @@ -25,6 +25,7 @@ revpipyload/xrpcserver.py revpipyload/shared/ipaclmanager.py revpipyload/shared/__init__.py + revpipyload/mqttserver.py diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py new file mode 100644 index 0000000..ae1c768 --- /dev/null +++ b/revpipyload/mqttserver.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# +# RevPiPyLoad +# +# Webpage: https://revpimodio.org/revpipyplc/ +# (c) Sven Sager, License: LGPLv3 +# +"""Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit.""" +import proginit +from ssl import CERT_NONE +from paho.mqtt.client import Client +from threading import Thread, Event + + +class MqttServer(Thread): + + """Server fuer die Uebertragung des Prozessabbilds per MQTT.""" + + def __init__( + self, basetopic, sendinterval, host, port=1883, + tls_set=False, username="", password=None, client_id=""): + """Init MqttServer class. + + @param basetopic Basis-Topic fuer Datenaustausch + @param sendinterval Prozessabbild alle n Sekunden senden + @param host Adresse des MQTT-Servers + @param port Portnummer des MQTT-Servers + @param keepalive MQTT Ping bei leerlauf + @param tls_set TLS fuer Verbindung zum MQTT-Server verwenden + @param username Optional Benutzername fuer MQTT-Server + @param password Optional Password fuer MQTT-Server + @param client_id MQTT ClientID, wenn leer automatisch random erzeugung + + """ + # TODO: Parameterprüfung + + super().__init__() + + # Klassenvariablen + self.__exit = False + self._evt_data = Event() + self._sendinterval = sendinterval + self._host = host + self._port = port + + # Topics konfigurieren + self._mqtt_picontrol = "{}/picontrol".format(basetopic) + self._mqtt_pictory = "{}/pictory".format(basetopic) + self._mqtt_sendpictory = "{}/needpictory".format(basetopic) + + self._mq = Client(client_id) + if username != "": + self._mq.username_pw_set(username, password) + if tls_set: + self._mq.tls_set(cert_reqs=CERT_NONE) + self._mq.tls_insecure_set(True) + + # Handler konfigurieren + self._mq.on_connect = self._on_connect + self._mq.on_message = self._on_message + # TODO: self._mq.on_disconnect = self._on_disconnect + + def _on_connect(self, client, userdata, flags, rc): + """Verbindung zu MQTT Broker.""" + print("Connect rc:", rc) + if rc > 0: + self.__mqttend = True + raise RuntimeError("can not connect to mqtt server") + + # Subscribe piCtory Anforderung + client.subscribe(self._mqtt_sendpictory) + + def _on_message(self, client, userdata, msg): + """Sendet piCtory Konfiguration.""" + + with open(proginit.pargs.configrsc, "rb") as fh: + client.publish(self._mqtt_pictory, fh.read()) + + # Prozessabbild senden + self._evt_data.set() + + def newlogfile(self): + """Konfiguriert die FileHandler auf neue Logdatei.""" + pass + + def run(self): + """Startet die Uebertragung per MQTT.""" + proginit.logger.debug("enter MqttServer.start()") + + # Prozessabbild öffnen + try: + fh_proc = open(proginit.pargs.procimg, "r+b", 0) + except: + fh_proc = None + self.__exit = True + proginit.logger.error( + "can not open process image {}".format(proginit.pargs.procimg) + ) + + # MQTT verbinden + self._mq.connect(self._host, self._port, keepalive=60) + self._mq.loop_start() + + # mainloop + while not self.__exit: + self._evt_data.clear() + + # FIXME: Ganzes Prozessabbild übertragen + self._mq.publish(self._mqtt_picontrol, fh_proc.read(4096)) + fh_proc.seek(0) + + self._evt_data.wait(self._sendinterval) + + # MQTT trennen + self._mq.loop_stop() + self._mq.disconnect() + + # FileHandler schließen + if fh_proc is not None: + fh_proc.close() + + proginit.logger.debug("leave MqttServer.start()") + + def stop(self): + """Stoppt die Uebertragung per MQTT.""" + proginit.logger.debug("enter MqttServer.stop()") + self.__exit = True + self._evt_data.set() + proginit.logger.debug("leave MqttServer.stop()") diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index fc101a8..a2b4628 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -32,6 +32,7 @@ begrenzt werden! """ import gzip import logsystem +import mqttserver import picontrolserver import plcsystem import proginit @@ -185,6 +186,28 @@ class RevPiPyLoad(): self.zeroonexit = \ int(self.globalconfig["DEFAULT"].get("zeroonexit", 1)) + # Konfiguration verarbeiten [MQTT] + self.mqtt = 0 + if "MQTT" in self.globalconfig: + self.mqtt = \ + int(self.globalconfig["MQTT"].get("mqtt", 0)) + self.mqttbasetopic = \ + self.globalconfig["MQTT"].get("basetopic", "") + self.mqttsendinterval = \ + int(self.globalconfig["MQTT"].get("sendinterval", 15)) + self.mqtthost = \ + self.globalconfig["MQTT"].get("host", "") + self.mqttport = \ + int(self.globalconfig["MQTT"].get("port", 1883)) + self.mqtttls_set = \ + int(self.globalconfig["MQTT"].get("tls_set", 0)) + self.mqttusername = \ + self.globalconfig["MQTT"].get("username", "") + self.mqttpassword = \ + self.globalconfig["MQTT"].get("password", "") + self.mqttclient_id = \ + self.globalconfig["MQTT"].get("client_id", "") + # Konfiguration verarbeiten [PLCSLAVE] self.plcslave = 0 if "PLCSLAVE" in self.globalconfig: @@ -241,6 +264,9 @@ class RevPiPyLoad(): ) os.chdir(self.plcworkdir) + # MQTT konfigurieren + self.th_mqtt = self._plcmqtt() + # PLC Programm konfigurieren if restart_plcprogram: self.stop_plcprogram() @@ -368,6 +394,30 @@ class RevPiPyLoad(): proginit.logger.debug("leave RevPiPyLoad._loadconfig()") + def _plcmqtt(self): + """Konfiguriert den MQTT-Thread fuer die Ausfuehrung. + @return MQTT-Thread Object or None""" + proginit.logger.debug("enter RevPiPyLoad._plcmqtt()") + + th_plc = None + if self.mqtt: + try: + th_plc = mqttserver.MqttServer( + self.mqttbasetopic, + self.mqttsendinterval, + self.mqtthost, + self.mqttport, + self.mqtttls_set, + self.mqttusername, + self.mqttpassword, + self.mqttclient_id + ) + except: + pass + + proginit.logger.debug("leave RevPiPyLoad._plcmqtt()") + return th_plc + def _plcthread(self): """Konfiguriert den PLC-Thread fuer die Ausfuehrung. @return PLC-Thread Object or None""" @@ -503,6 +553,10 @@ class RevPiPyLoad(): proginit.logger.info("start xmlrpc-server") self.xsrv.start() + if self.mqtt: + # MQTT Uebertragung starten + self.th_mqtt.start() + if self.plcslave: # Slaveausfuehrung übergeben self.th_plcslave.start() @@ -519,6 +573,8 @@ class RevPiPyLoad(): proginit.logger.info("got reqeust to reload config") self._loadconfig() + # TODO: MQTT prüfen und neu starten + # PLC Server Thread prüfen if self.plcslave and self.th_plcslave is not None \ and not self.th_plcslave.is_alive(): @@ -543,6 +599,7 @@ class RevPiPyLoad(): proginit.logger.info("stopping revpipyload") # Alle Sub-Systeme beenden + self.stop_plcmqtt() self.stop_plcslave() self.stop_plcprogram() self.stop_xmlrpcserver() @@ -558,6 +615,18 @@ class RevPiPyLoad(): self._exit = True proginit.logger.debug("leave RevPiPyLoad.stop()") + def stop_plcmqtt(self): + """Beendet MQTT Sender.""" + proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()") + + if self.th_mqtt is not None and self.th_mqtt.is_alive(): + proginit.logger.info("stopping revpiplc thread") + self.th_mqtt.stop() + self.th_mqtt.join() + proginit.logger.debug("mqtt thread successfully closed") + + proginit.logger.debug("leave RevPiPyLoad.stop_plcmqtt()") + def stop_plcprogram(self): """Beendet PLC Programm.""" proginit.logger.debug("enter RevPiPyLoad.stop_plcprogram()") From 2b296b78f9008b2ec03cb12c017bc7a66fce6d1b Mon Sep 17 00:00:00 2001 From: NaruX Date: Mon, 9 Apr 2018 18:32:26 +0200 Subject: [PATCH 2/4] =?UTF-8?q?Nur=20konfigurierte=20Bytes=20aus=20Procimg?= =?UTF-8?q?=20=C3=BCbertragen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/etc/revpipyload/revpipyload.conf | 2 +- doc/mqttserver.html | 14 ++++++++ eric-revpipyload.api | 1 + revpipyload/mqttserver.py | 50 ++++++++++++++++++++++++--- revpipyload/plcsystem.py | 7 +++- revpipyload/revpipyload.py | 11 +++--- 6 files changed, 74 insertions(+), 11 deletions(-) diff --git a/data/etc/revpipyload/revpipyload.conf b/data/etc/revpipyload/revpipyload.conf index c1fa1eb..a518d5e 100644 --- a/data/etc/revpipyload/revpipyload.conf +++ b/data/etc/revpipyload/revpipyload.conf @@ -26,7 +26,7 @@ bindip = * [MQTT] mqtt = 0 basetopic = revpi/data -sendinterval = 15 +sendinterval = 10 host = port = 1883 tls_set = 0 diff --git a/doc/mqttserver.html b/doc/mqttserver.html index 841d5e1..b40c3af 100644 --- a/doc/mqttserver.html +++ b/doc/mqttserver.html @@ -53,6 +53,9 @@ Methods MqttServer Init MqttServer class. +_get_procimglength +Ermittelt aus piCtory Konfiguraiton die laenge. + _on_connect Verbindung zu MQTT Broker. @@ -109,6 +112,17 @@ Optional Password fuer MQTT-Server
MQTT ClientID, wenn leer automatisch random erzeugung
+ +

+MqttServer._get_procimglength

+_get_procimglength() +

+Ermittelt aus piCtory Konfiguraiton die laenge. +

+
Returns:
+
+Laenge des Prozessabbilds +

MqttServer._on_connect

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index 0a28dec..e9171e3 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -12,6 +12,7 @@ logsystem.PipeLogwriter.newlogfile?4() logsystem.PipeLogwriter.run?4() logsystem.PipeLogwriter.stop?4() logsystem.PipeLogwriter?1(logfilename) +mqttserver.MqttServer._get_procimglength?5() mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc) mqttserver.MqttServer._on_message?5(client, userdata, msg) mqttserver.MqttServer.newlogfile?4() diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py index ae1c768..07417a1 100644 --- a/revpipyload/mqttserver.py +++ b/revpipyload/mqttserver.py @@ -7,6 +7,7 @@ # """Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit.""" import proginit +from json import load as jload from ssl import CERT_NONE from paho.mqtt.client import Client from threading import Thread, Event @@ -39,9 +40,10 @@ class MqttServer(Thread): # Klassenvariablen self.__exit = False self._evt_data = Event() - self._sendinterval = sendinterval self._host = host + self._procimglength = self._get_procimglength() self._port = port + self._sendinterval = sendinterval # Topics konfigurieren self._mqtt_picontrol = "{}/picontrol".format(basetopic) @@ -60,9 +62,45 @@ class MqttServer(Thread): self._mq.on_message = self._on_message # TODO: self._mq.on_disconnect = self._on_disconnect + def _get_procimglength(self): + """Ermittelt aus piCtory Konfiguraiton die laenge. + @return Laenge des Prozessabbilds """ + try: + with open(proginit.pargs.configrsc, "r") as fh: + rsc = jload(fh) + except: + return 0 + + length = 0 + + # piCtory Config prüfen + if "Devices" not in rsc: + return 0 + + # Letzes piCtory Device laden + last_dev = rsc["Devices"].pop() + length += last_dev["offset"] + + # bei mem beginnen, weil nur der höchste IO benötigt wird + for type_iom in ["mem", "out", "inp"]: + lst_iom = sorted( + last_dev[type_iom], + key=lambda x: int(x), + reverse=True + ) + + if len(lst_iom) > 0: + # Daten des letzen IOM auswerten + last_iom = last_dev[type_iom][str(lst_iom[0])] + bitlength = int(last_iom[2]) + length += int(last_iom[3]) + length += 1 if bitlength == 1 else int(bitlength / 8) + break + + return length + def _on_connect(self, client, userdata, flags, rc): """Verbindung zu MQTT Broker.""" - print("Connect rc:", rc) if rc > 0: self.__mqttend = True raise RuntimeError("can not connect to mqtt server") @@ -73,6 +111,7 @@ class MqttServer(Thread): def _on_message(self, client, userdata, msg): """Sendet piCtory Konfiguration.""" + # piCtory Konfiguration senden with open(proginit.pargs.configrsc, "rb") as fh: client.publish(self._mqtt_pictory, fh.read()) @@ -105,8 +144,11 @@ class MqttServer(Thread): while not self.__exit: self._evt_data.clear() - # FIXME: Ganzes Prozessabbild übertragen - self._mq.publish(self._mqtt_picontrol, fh_proc.read(4096)) + # Prozessabbild mit Daten übertragen + self._mq.publish( + self._mqtt_picontrol, + fh_proc.read(self._procimglength) + ) fh_proc.seek(0) self._evt_data.wait(self._sendinterval) diff --git a/revpipyload/plcsystem.py b/revpipyload/plcsystem.py index 8bb778f..cd6de76 100644 --- a/revpipyload/plcsystem.py +++ b/revpipyload/plcsystem.py @@ -226,7 +226,12 @@ class RevPiPlc(Thread): # Prozess beenden count = 0 proginit.logger.info("term plc program {}".format(self._program)) - self._procplc.terminate() + try: + self._procplc.terminate() + except ProcessLookupError: + proginit.logger.error("plc program was terminated unexpectedly") + proginit.logger.debug("leave RevPiPlc.stop()") + return while self._procplc.poll() is None and count < 10: count += 1 diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index a2b4628..d6f7dc2 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -32,7 +32,6 @@ begrenzt werden! """ import gzip import logsystem -import mqttserver import picontrolserver import plcsystem import proginit @@ -402,7 +401,8 @@ class RevPiPyLoad(): th_plc = None if self.mqtt: try: - th_plc = mqttserver.MqttServer( + from mqttserver import MqttServer + th_plc = MqttServer( self.mqttbasetopic, self.mqttsendinterval, self.mqtthost, @@ -413,6 +413,7 @@ class RevPiPyLoad(): self.mqttclient_id ) except: + # TODO: Fehlermeldung ausgeben bezüglich paho.mqtt pass proginit.logger.debug("leave RevPiPyLoad._plcmqtt()") @@ -553,8 +554,8 @@ class RevPiPyLoad(): proginit.logger.info("start xmlrpc-server") self.xsrv.start() - if self.mqtt: - # MQTT Uebertragung starten + # MQTT Uebertragung starten + if self.mqtt and self.th_mqtt is not None: self.th_mqtt.start() if self.plcslave: @@ -599,9 +600,9 @@ class RevPiPyLoad(): proginit.logger.info("stopping revpipyload") # Alle Sub-Systeme beenden + self.stop_plcprogram() self.stop_plcmqtt() self.stop_plcslave() - self.stop_plcprogram() self.stop_xmlrpcserver() # Logreader schließen From bd0df81c33dce02b7a8572f65488d4ec79db370a Mon Sep 17 00:00:00 2001 From: NaruX Date: Wed, 11 Apr 2018 12:28:38 +0200 Subject: [PATCH 3/4] =?UTF-8?q?MQTT=20sendet=20beim=20Start=20piCtory=20un?= =?UTF-8?q?d=20Daten=20um=20laufende=20RevPiMqttIO=20zu=20informieren=20MQ?= =?UTF-8?q?TT=20in=20XML-RPC=20eingebaut=20MQTT=20in=20XML=20Settings=20?= =?UTF-8?q?=C3=BCbernommen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/revpipyload.html | 45 +++++++++++++++++++++++ eric-revpipyload.api | 3 ++ revpipyload.e4p | 4 +-- revpipyload/mqttserver.py | 6 ++-- revpipyload/revpipyload.py | 74 +++++++++++++++++++++++++++++++++++--- setup.py | 2 +- 6 files changed, 125 insertions(+), 9 deletions(-) diff --git a/doc/revpipyload.html b/doc/revpipyload.html index 86682ad..149107e 100644 --- a/doc/revpipyload.html +++ b/doc/revpipyload.html @@ -138,6 +138,15 @@ Methods xml_getprocimg Gibt die Rohdaten aus piControl0 zurueck. +xml_mqttrunning +Prueft ob MQTT Uebertragung noch lauft. + +xml_mqttstart +Startet die MQTT Uebertragung. + +xml_mqttstop +Stoppt die MQTT Uebertragung. + xml_plcdownload Uebertraegt ein Archiv vom plcworkdir. @@ -373,6 +382,42 @@ Gibt die Rohdaten aus piControl0 zurueck.
xmlrpc.client.Binary()
+ +

+RevPiPyLoad.xml_mqttrunning

+xml_mqttrunning() +

+Prueft ob MQTT Uebertragung noch lauft. +

+
Returns:
+
+True, wenn MQTT Uebertragung noch lauft +
+
+

+RevPiPyLoad.xml_mqttstart

+xml_mqttstart() +

+Startet die MQTT Uebertragung. +

+
Returns:
+
+Statuscode: + 0: erfolgreich gestartet + -1: Nicht aktiv in Konfiguration + -2: Laeuft bereits +
+
+

+RevPiPyLoad.xml_mqttstop

+xml_mqttstop() +

+Stoppt die MQTT Uebertragung. +

+
Returns:
+
+True, wenn stop erfolgreich +

RevPiPyLoad.xml_plcdownload

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index e9171e3..77d540a 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -77,6 +77,9 @@ revpipyload.RevPiPyLoad.xml_getconfig?4() revpipyload.RevPiPyLoad.xml_getfilelist?4() revpipyload.RevPiPyLoad.xml_getpictoryrsc?4() revpipyload.RevPiPyLoad.xml_getprocimg?4() +revpipyload.RevPiPyLoad.xml_mqttrunning?4() +revpipyload.RevPiPyLoad.xml_mqttstart?4() +revpipyload.RevPiPyLoad.xml_mqttstop?4() revpipyload.RevPiPyLoad.xml_plcdownload?4(mode="tar", pictory=False) revpipyload.RevPiPyLoad.xml_plcexitcode?4() revpipyload.RevPiPyLoad.xml_plcrunning?4() diff --git a/revpipyload.e4p b/revpipyload.e4p index d10b018..5f2c15c 100644 --- a/revpipyload.e4p +++ b/revpipyload.e4p @@ -1,7 +1,7 @@ - + en_US @@ -9,7 +9,7 @@ Python3 Console Dieser Loader wird über das Init-System geladen und führt das angegebene Pythonprogramm aus. Es ist für den RevolutionPi gedacht um automatisch das SPS-Programm zu starten. - 0.6.5 + 0.7.0 Sven Sager akira@narux.de diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py index 07417a1..3d5dbcd 100644 --- a/revpipyload/mqttserver.py +++ b/revpipyload/mqttserver.py @@ -60,7 +60,6 @@ class MqttServer(Thread): # Handler konfigurieren self._mq.on_connect = self._on_connect self._mq.on_message = self._on_message - # TODO: self._mq.on_disconnect = self._on_disconnect def _get_procimglength(self): """Ermittelt aus piCtory Konfiguraiton die laenge. @@ -69,7 +68,7 @@ class MqttServer(Thread): with open(proginit.pargs.configrsc, "r") as fh: rsc = jload(fh) except: - return 0 + return 4096 length = 0 @@ -105,6 +104,9 @@ class MqttServer(Thread): self.__mqttend = True raise RuntimeError("can not connect to mqtt server") + # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen + self._on_message(client, userdata, None) + # Subscribe piCtory Anforderung client.subscribe(self._mqtt_sendpictory) diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index d6f7dc2..a9806f8 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -50,7 +50,7 @@ from time import asctime from xmlrpc.client import Binary from xrpcserver import SaveXMLRPCServer -pyloadversion = "0.6.5" +pyloadversion = "0.7.0" class RevPiPyLoad(): @@ -81,6 +81,7 @@ class RevPiPyLoad(): self.xmlrpcacl = IpAclManager(minlevel=0, maxlevel=4) # Threads/Prozesse + self.th_mqtt = None self.th_plcslave = None self.plc = None @@ -147,6 +148,7 @@ class RevPiPyLoad(): proginit.logger.debug("enter RevPiPyLoad._loadconfig()") # Subsysteme herunterfahren + self.stop_plcmqtt() self.stop_xmlrpcserver() # Konfigurationsdatei laden @@ -330,6 +332,8 @@ class RevPiPyLoad(): 0, self.xml_plcstop, "plcstop") self.xsrv.register_function( 0, self.xml_reload, "reload") + self.xsrv.register_function( + 0, self.xml_mqttrunning, "mqttrunning") self.xsrv.register_function( 0, self.xml_plcslaverunning, "plcslaverunning") @@ -370,6 +374,10 @@ class RevPiPyLoad(): lambda: os.system(proginit.picontrolreset), "resetpicontrol" ) + self.xsrv.register_function( + 3, self.xml_mqttstart, "mqttstart") + self.xsrv.register_function( + 3, self.xml_mqttstop, "mqttstop") self.xsrv.register_function( 3, self.xml_plcslavestart, "plcslavestart") self.xsrv.register_function( @@ -555,11 +563,11 @@ class RevPiPyLoad(): self.xsrv.start() # MQTT Uebertragung starten - if self.mqtt and self.th_mqtt is not None: + if self.th_mqtt is not None: self.th_mqtt.start() - if self.plcslave: - # Slaveausfuehrung übergeben + # Slaveausfuehrung übergeben + if self.th_plcslave is not None: self.th_plcslave.start() # PLC Programm automatisch starten @@ -682,6 +690,17 @@ class RevPiPyLoad(): dc["zeroonerror"] = self.zeroonerror dc["zeroonexit"] = self.zeroonexit + # MQTT Sektion + dc["mqtt"] = self.mqtt + dc["basetopic"] = self.mqttbasetopic + dc["sendinterval"] = self.mqttsendinterval + dc["host"] = self.mqtthost + dc["port"] = self.mqttport + dc["tls_set"] = self.mqtttls_set + dc["username"] = self.mqttusername + dc["password"] = self.mqttpassword + dc["client_id"] = self.mqttclient_id + # PLCSLAVE Sektion dc["plcslave"] = self.plcslave dc["plcslaveacl"] = self.plcslaveacl.acl @@ -722,6 +741,42 @@ class RevPiPyLoad(): buff = fh.read() return Binary(buff) + def xml_mqttrunning(self): + """Prueft ob MQTT Uebertragung noch lauft. + @return True, wenn MQTT Uebertragung noch lauft""" + proginit.logger.debug("xmlrpc call mqttrunning") + return False if self.th_mqtt is None \ + else self.th_mqtt.is_alive() + + def xml_mqttstart(self): + """Startet die MQTT Uebertragung. + + @return Statuscode: + 0: erfolgreich gestartet + -1: Nicht aktiv in Konfiguration + -2: Laeuft bereits + + """ + if self.th_mqtt is not None and self.th_mqtt.is_alive(): + return -2 + else: + self.th_mqtt = self._plcmqtt() + if self.th_mqtt is None: + return -1 + else: + self.th_mqtt.start() + return 0 + + def xml_mqttstop(self): + """Stoppt die MQTT Uebertragung. + @return True, wenn stop erfolgreich""" + if self.th_mqtt is not None: + self.stop_plcmqtt() + self.th_mqtt = None + return True + else: + return False + def xml_plcdownload(self, mode="tar", pictory=False): """Uebertraegt ein Archiv vom plcworkdir. @@ -868,6 +923,17 @@ class RevPiPyLoad(): "zeroonerror": "[01]", "zeroonexit": "[01]", }, + "MQTT": { + "mqtt": "[01]", + "mqttbasetopic": ".*", + "mqttsendinterval": "[0-9]+", + "mqtthost": ".+", + "mqttport": "[0-9]+", + "mqtttls_set": "[01]", + "mqttusername": ".*", + "mqttpassword": ".*", + "mqttclient_id": ".+", + }, "PLCSLAVE": { "plcslave": "[01]", "plcslaveacl": self.plcslaveacl.regex_acl, diff --git a/setup.py b/setup.py index 8307359..251798b 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ setup( license="LGPLv3", name="revpipyload", - version="0.6.5", + version="0.7.0", scripts=["data/revpipyload"], From 5560cfb1821482b4eb0d6e6792bd51deece286fb Mon Sep 17 00:00:00 2001 From: NaruX Date: Tue, 1 May 2018 17:50:16 +0200 Subject: [PATCH 4/4] mqtt publisher automatisch starten, wenn Einstellungen neu geladen werden Fehlerabfang beim Prozessabbild mqtt Client Verbindet sich async ohne Fehler, wenn Broker noch nicht da ist --- doc/mqttserver.html | 22 +++++++++++++-- eric-revpipyload.api | 2 ++ revpipyload/mqttserver.py | 58 +++++++++++++++++++++++++++----------- revpipyload/revpipyload.py | 5 +++- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/doc/mqttserver.html b/doc/mqttserver.html index b40c3af..aa97c35 100644 --- a/doc/mqttserver.html +++ b/doc/mqttserver.html @@ -54,14 +54,20 @@ Methods Init MqttServer class. _get_procimglength -Ermittelt aus piCtory Konfiguraiton die laenge. +Ermittelt aus piCtory Konfiguration die laenge. _on_connect Verbindung zu MQTT Broker. +_on_disconnect +Wertet Verbindungsabbruch aus. + _on_message Sendet piCtory Konfiguration. +_send_pictory_conf +Sendet piCtory Konfiguration. + newlogfile Konfiguriert die FileHandler auf neue Logdatei. @@ -117,7 +123,7 @@ MQTT ClientID, wenn leer automatisch random erzeugung MqttServer._get_procimglength _get_procimglength()

-Ermittelt aus piCtory Konfiguraiton die laenge. +Ermittelt aus piCtory Konfiguration die laenge.

Returns:
@@ -129,12 +135,24 @@ MqttServer._on_connect _on_connect(client, userdata, flags, rc)

Verbindung zu MQTT Broker. +

+

+MqttServer._on_disconnect

+_on_disconnect(client, userdata, rc) +

+Wertet Verbindungsabbruch aus.

MqttServer._on_message

_on_message(client, userdata, msg)

Sendet piCtory Konfiguration. +

+

+MqttServer._send_pictory_conf

+_send_pictory_conf() +

+Sendet piCtory Konfiguration.

MqttServer.newlogfile

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index 77d540a..7e7e323 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -14,7 +14,9 @@ logsystem.PipeLogwriter.stop?4() logsystem.PipeLogwriter?1(logfilename) mqttserver.MqttServer._get_procimglength?5() mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc) +mqttserver.MqttServer._on_disconnect?5(client, userdata, rc) mqttserver.MqttServer._on_message?5(client, userdata, msg) +mqttserver.MqttServer._send_pictory_conf?5() mqttserver.MqttServer.newlogfile?4() mqttserver.MqttServer.run?4() mqttserver.MqttServer.stop?4() diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py index 3d5dbcd..c198e4f 100644 --- a/revpipyload/mqttserver.py +++ b/revpipyload/mqttserver.py @@ -62,7 +62,7 @@ class MqttServer(Thread): self._mq.on_message = self._on_message def _get_procimglength(self): - """Ermittelt aus piCtory Konfiguraiton die laenge. + """Ermittelt aus piCtory Konfiguration die laenge. @return Laenge des Prozessabbilds """ try: with open(proginit.pargs.configrsc, "r") as fh: @@ -101,25 +101,36 @@ class MqttServer(Thread): def _on_connect(self, client, userdata, flags, rc): """Verbindung zu MQTT Broker.""" if rc > 0: - self.__mqttend = True - raise RuntimeError("can not connect to mqtt server") + proginit.warning("can not connect to mqtt broker - will retry") + else: + # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen + self._send_pictory_conf() - # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen - self._on_message(client, userdata, None) + # Subscribe piCtory Anforderung + client.subscribe(self._mqtt_sendpictory) - # Subscribe piCtory Anforderung - client.subscribe(self._mqtt_sendpictory) + def _on_disconnect(self, client, userdata, rc): + """Wertet Verbindungsabbruch aus.""" + if rc != 0: + proginit.warning( + "unexpected disconnection from mqtt broker - " + "will try to reconnect" + ) def _on_message(self, client, userdata, msg): """Sendet piCtory Konfiguration.""" # piCtory Konfiguration senden - with open(proginit.pargs.configrsc, "rb") as fh: - client.publish(self._mqtt_pictory, fh.read()) + self._send_pictory_conf() # Prozessabbild senden self._evt_data.set() + def _send_pictory_conf(self): + """Sendet piCtory Konfiguration.""" + with open(proginit.pargs.configrsc, "rb") as fh: + self._mq.publish(self._mqtt_pictory, fh.read()) + def newlogfile(self): """Konfiguriert die FileHandler auf neue Logdatei.""" pass @@ -139,19 +150,34 @@ class MqttServer(Thread): ) # MQTT verbinden - self._mq.connect(self._host, self._port, keepalive=60) + self._mq.connect_async(self._host, self._port, keepalive=60) self._mq.loop_start() # mainloop + buff = b'' + err_count = 0 while not self.__exit: self._evt_data.clear() - # Prozessabbild mit Daten übertragen - self._mq.publish( - self._mqtt_picontrol, - fh_proc.read(self._procimglength) - ) - fh_proc.seek(0) + # Prozessabbild lesen + try: + fh_proc.seek(0) + buff = fh_proc.read(self._procimglength) + if err_count > 0: + proginit.warning( + "resume mqtt publishing after {} errors on " + "processimage".format(err_count) + ) + err_count = 0 + except IOError: + if err_count == 0: + proginit.logger.error( + "could not read process image for mqtt publishing" + ) + err_count += 1 + else: + # Prozessabbild übertragen + self._mq.publish(self._mqtt_picontrol, buff) self._evt_data.wait(self._sendinterval) diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index a9806f8..b4b06ee 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -267,6 +267,9 @@ class RevPiPyLoad(): # MQTT konfigurieren self.th_mqtt = self._plcmqtt() + if self.th_mqtt is not None and not self._exit: + proginit.logger.info("start mqtt publisher") + self.th_mqtt.start() # PLC Programm konfigurieren if restart_plcprogram: @@ -629,7 +632,7 @@ class RevPiPyLoad(): proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()") if self.th_mqtt is not None and self.th_mqtt.is_alive(): - proginit.logger.info("stopping revpiplc thread") + proginit.logger.info("stopping mqtt thread") self.th_mqtt.stop() self.th_mqtt.join() proginit.logger.debug("mqtt thread successfully closed")