diff --git a/data/etc/revpipyload/revpipyload.conf b/data/etc/revpipyload/revpipyload.conf index 8afb9c1..c1fa1eb 100644 --- a/data/etc/revpipyload/revpipyload.conf +++ b/data/etc/revpipyload/revpipyload.conf @@ -22,3 +22,14 @@ port = 55234 xmlrpc = 0 aclfile = /etc/revpipyload/aclxmlrpc.conf bindip = * + +[MQTT] +mqtt = 0 +basetopic = revpi/data +sendinterval = 15 +host = +port = 1883 +tls_set = 0 +username = +password = +client_id = diff --git a/doc/index.html b/doc/index.html index b9be723..2986894 100644 --- a/doc/index.html +++ b/doc/index.html @@ -27,6 +27,9 @@ Modules logsystem Modul fuer die Verwaltung der Logdateien. +mqttserver +Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit. + picontrolserver Modul fuer die Verwaltung der PLC-Slave Funktionen. diff --git a/doc/mqttserver.html b/doc/mqttserver.html new file mode 100644 index 0000000..841d5e1 --- /dev/null +++ b/doc/mqttserver.html @@ -0,0 +1,145 @@ + + +mqttserver + + + +

+mqttserver

+

+Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit. +

+

+Global Attributes

+ + +
None
+

+Classes

+ + + + + +
MqttServerServer fuer die Uebertragung des Prozessabbilds per MQTT.
+

+Functions

+ + +
None
+

+ +

MqttServer

+

+Server fuer die Uebertragung des Prozessabbilds per MQTT. +

+

+Derived from

+Thread +

+Class Attributes

+ + +
None
+

+Class Methods

+ + +
None
+

+Methods

+ + + + + + + + + + + + + + + + + + + + +
MqttServerInit MqttServer class.
_on_connectVerbindung zu MQTT Broker.
_on_messageSendet piCtory Konfiguration.
newlogfileKonfiguriert die FileHandler auf neue Logdatei.
runStartet die Uebertragung per MQTT.
stopStoppt die Uebertragung per MQTT.
+

+Static Methods

+ + +
None
+ +

+MqttServer (Constructor)

+MqttServer(basetopic, sendinterval, host, port=1883, tls_set=False, username="", password=None, client_id="") +

+Init MqttServer class. +

+
basetopic
+
+Basis-Topic fuer Datenaustausch +
sendinterval
+
+Prozessabbild alle n Sekunden senden +
host
+
+Adresse des MQTT-Servers +
port
+
+Portnummer des MQTT-Servers +
keepalive
+
+MQTT Ping bei leerlauf +
tls_set
+
+TLS fuer Verbindung zum MQTT-Server verwenden +
username
+
+Optional Benutzername fuer MQTT-Server +
password
+
+Optional Password fuer MQTT-Server +
client_id
+
+MQTT ClientID, wenn leer automatisch random erzeugung +
+
+

+MqttServer._on_connect

+_on_connect(client, userdata, flags, rc) +

+Verbindung zu MQTT Broker. +

+

+MqttServer._on_message

+_on_message(client, userdata, msg) +

+Sendet piCtory Konfiguration. +

+

+MqttServer.newlogfile

+newlogfile() +

+Konfiguriert die FileHandler auf neue Logdatei. +

+

+MqttServer.run

+run() +

+Startet die Uebertragung per MQTT. +

+

+MqttServer.stop

+stop() +

+Stoppt die Uebertragung per MQTT. +

+
Up
+
+ \ No newline at end of file diff --git a/doc/revpipyload.html b/doc/revpipyload.html index e478cd8..86682ad 100644 --- a/doc/revpipyload.html +++ b/doc/revpipyload.html @@ -87,6 +87,9 @@ Methods _loadconfig Load configuration file and setup modul. +_plcmqtt +Konfiguriert den MQTT-Thread fuer die Ausfuehrung. + _plcslave Erstellt den PlcSlave-Server Thread. @@ -111,6 +114,9 @@ Methods stop Stop revpipyload. +stop_plcmqtt +Beendet MQTT Sender. + stop_plcprogram Beendet PLC Programm. @@ -217,7 +223,18 @@ RevPiPyLoad._loadconfig _loadconfig()

Load configuration file and setup modul. -

+

+

+RevPiPyLoad._plcmqtt

+_plcmqtt() +

+Konfiguriert den MQTT-Thread fuer die Ausfuehrung. +

+
Returns:
+
+MQTT-Thread Object or None +
+

RevPiPyLoad._plcslave

_plcslave() @@ -288,6 +305,12 @@ RevPiPyLoad.stop stop()

Stop revpipyload. +

+

+RevPiPyLoad.stop_plcmqtt

+stop_plcmqtt() +

+Beendet MQTT Sender.

RevPiPyLoad.stop_plcprogram

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index 354bb8c..0a28dec 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -12,6 +12,12 @@ logsystem.PipeLogwriter.newlogfile?4() logsystem.PipeLogwriter.run?4() logsystem.PipeLogwriter.stop?4() logsystem.PipeLogwriter?1(logfilename) +mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc) +mqttserver.MqttServer._on_message?5(client, userdata, msg) +mqttserver.MqttServer.newlogfile?4() +mqttserver.MqttServer.run?4() +mqttserver.MqttServer.stop?4() +mqttserver.MqttServer?1(basetopic, sendinterval, host, port=1883, tls_set=False, username="", password=None, client_id="") picontrolserver.RevPiSlave.check_connectedacl?4() picontrolserver.RevPiSlave.newlogfile?4() picontrolserver.RevPiSlave.run?4() @@ -52,6 +58,7 @@ proginit.startdir?7 revpipyload.RevPiPyLoad._check_mustrestart_plcprogram?5() revpipyload.RevPiPyLoad._check_mustrestart_plcslave?5() revpipyload.RevPiPyLoad._loadconfig?5() +revpipyload.RevPiPyLoad._plcmqtt?5() revpipyload.RevPiPyLoad._plcslave?5() revpipyload.RevPiPyLoad._plcthread?5() revpipyload.RevPiPyLoad._sigexit?5(signum, frame) @@ -61,6 +68,7 @@ revpipyload.RevPiPyLoad.packapp?4(mode="tar", pictory=False) revpipyload.RevPiPyLoad.root?7 revpipyload.RevPiPyLoad.start?4() revpipyload.RevPiPyLoad.stop?4() +revpipyload.RevPiPyLoad.stop_plcmqtt?4() revpipyload.RevPiPyLoad.stop_plcprogram?4() revpipyload.RevPiPyLoad.stop_plcslave?4() revpipyload.RevPiPyLoad.stop_xmlrpcserver?4() diff --git a/eric-revpipyload.bas b/eric-revpipyload.bas index 9333951..0fe2ca4 100644 --- a/eric-revpipyload.bas +++ b/eric-revpipyload.bas @@ -1,3 +1,4 @@ +MqttServer Thread PipeLogwriter Thread RevPiPlc Thread RevPiSlave Thread diff --git a/revpipyload.e4p b/revpipyload.e4p index 2fc0562..d10b018 100644 --- a/revpipyload.e4p +++ b/revpipyload.e4p @@ -1,7 +1,7 @@ - + en_US @@ -25,6 +25,7 @@ revpipyload/xrpcserver.py revpipyload/shared/ipaclmanager.py revpipyload/shared/__init__.py + revpipyload/mqttserver.py diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py new file mode 100644 index 0000000..ae1c768 --- /dev/null +++ b/revpipyload/mqttserver.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# +# RevPiPyLoad +# +# Webpage: https://revpimodio.org/revpipyplc/ +# (c) Sven Sager, License: LGPLv3 +# +"""Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit.""" +import proginit +from ssl import CERT_NONE +from paho.mqtt.client import Client +from threading import Thread, Event + + +class MqttServer(Thread): + + """Server fuer die Uebertragung des Prozessabbilds per MQTT.""" + + def __init__( + self, basetopic, sendinterval, host, port=1883, + tls_set=False, username="", password=None, client_id=""): + """Init MqttServer class. + + @param basetopic Basis-Topic fuer Datenaustausch + @param sendinterval Prozessabbild alle n Sekunden senden + @param host Adresse des MQTT-Servers + @param port Portnummer des MQTT-Servers + @param keepalive MQTT Ping bei leerlauf + @param tls_set TLS fuer Verbindung zum MQTT-Server verwenden + @param username Optional Benutzername fuer MQTT-Server + @param password Optional Password fuer MQTT-Server + @param client_id MQTT ClientID, wenn leer automatisch random erzeugung + + """ + # TODO: Parameterprüfung + + super().__init__() + + # Klassenvariablen + self.__exit = False + self._evt_data = Event() + self._sendinterval = sendinterval + self._host = host + self._port = port + + # Topics konfigurieren + self._mqtt_picontrol = "{}/picontrol".format(basetopic) + self._mqtt_pictory = "{}/pictory".format(basetopic) + self._mqtt_sendpictory = "{}/needpictory".format(basetopic) + + self._mq = Client(client_id) + if username != "": + self._mq.username_pw_set(username, password) + if tls_set: + self._mq.tls_set(cert_reqs=CERT_NONE) + self._mq.tls_insecure_set(True) + + # Handler konfigurieren + self._mq.on_connect = self._on_connect + self._mq.on_message = self._on_message + # TODO: self._mq.on_disconnect = self._on_disconnect + + def _on_connect(self, client, userdata, flags, rc): + """Verbindung zu MQTT Broker.""" + print("Connect rc:", rc) + if rc > 0: + self.__mqttend = True + raise RuntimeError("can not connect to mqtt server") + + # Subscribe piCtory Anforderung + client.subscribe(self._mqtt_sendpictory) + + def _on_message(self, client, userdata, msg): + """Sendet piCtory Konfiguration.""" + + with open(proginit.pargs.configrsc, "rb") as fh: + client.publish(self._mqtt_pictory, fh.read()) + + # Prozessabbild senden + self._evt_data.set() + + def newlogfile(self): + """Konfiguriert die FileHandler auf neue Logdatei.""" + pass + + def run(self): + """Startet die Uebertragung per MQTT.""" + proginit.logger.debug("enter MqttServer.start()") + + # Prozessabbild öffnen + try: + fh_proc = open(proginit.pargs.procimg, "r+b", 0) + except: + fh_proc = None + self.__exit = True + proginit.logger.error( + "can not open process image {}".format(proginit.pargs.procimg) + ) + + # MQTT verbinden + self._mq.connect(self._host, self._port, keepalive=60) + self._mq.loop_start() + + # mainloop + while not self.__exit: + self._evt_data.clear() + + # FIXME: Ganzes Prozessabbild übertragen + self._mq.publish(self._mqtt_picontrol, fh_proc.read(4096)) + fh_proc.seek(0) + + self._evt_data.wait(self._sendinterval) + + # MQTT trennen + self._mq.loop_stop() + self._mq.disconnect() + + # FileHandler schließen + if fh_proc is not None: + fh_proc.close() + + proginit.logger.debug("leave MqttServer.start()") + + def stop(self): + """Stoppt die Uebertragung per MQTT.""" + proginit.logger.debug("enter MqttServer.stop()") + self.__exit = True + self._evt_data.set() + proginit.logger.debug("leave MqttServer.stop()") diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index fc101a8..a2b4628 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -32,6 +32,7 @@ begrenzt werden! """ import gzip import logsystem +import mqttserver import picontrolserver import plcsystem import proginit @@ -185,6 +186,28 @@ class RevPiPyLoad(): self.zeroonexit = \ int(self.globalconfig["DEFAULT"].get("zeroonexit", 1)) + # Konfiguration verarbeiten [MQTT] + self.mqtt = 0 + if "MQTT" in self.globalconfig: + self.mqtt = \ + int(self.globalconfig["MQTT"].get("mqtt", 0)) + self.mqttbasetopic = \ + self.globalconfig["MQTT"].get("basetopic", "") + self.mqttsendinterval = \ + int(self.globalconfig["MQTT"].get("sendinterval", 15)) + self.mqtthost = \ + self.globalconfig["MQTT"].get("host", "") + self.mqttport = \ + int(self.globalconfig["MQTT"].get("port", 1883)) + self.mqtttls_set = \ + int(self.globalconfig["MQTT"].get("tls_set", 0)) + self.mqttusername = \ + self.globalconfig["MQTT"].get("username", "") + self.mqttpassword = \ + self.globalconfig["MQTT"].get("password", "") + self.mqttclient_id = \ + self.globalconfig["MQTT"].get("client_id", "") + # Konfiguration verarbeiten [PLCSLAVE] self.plcslave = 0 if "PLCSLAVE" in self.globalconfig: @@ -241,6 +264,9 @@ class RevPiPyLoad(): ) os.chdir(self.plcworkdir) + # MQTT konfigurieren + self.th_mqtt = self._plcmqtt() + # PLC Programm konfigurieren if restart_plcprogram: self.stop_plcprogram() @@ -368,6 +394,30 @@ class RevPiPyLoad(): proginit.logger.debug("leave RevPiPyLoad._loadconfig()") + def _plcmqtt(self): + """Konfiguriert den MQTT-Thread fuer die Ausfuehrung. + @return MQTT-Thread Object or None""" + proginit.logger.debug("enter RevPiPyLoad._plcmqtt()") + + th_plc = None + if self.mqtt: + try: + th_plc = mqttserver.MqttServer( + self.mqttbasetopic, + self.mqttsendinterval, + self.mqtthost, + self.mqttport, + self.mqtttls_set, + self.mqttusername, + self.mqttpassword, + self.mqttclient_id + ) + except: + pass + + proginit.logger.debug("leave RevPiPyLoad._plcmqtt()") + return th_plc + def _plcthread(self): """Konfiguriert den PLC-Thread fuer die Ausfuehrung. @return PLC-Thread Object or None""" @@ -503,6 +553,10 @@ class RevPiPyLoad(): proginit.logger.info("start xmlrpc-server") self.xsrv.start() + if self.mqtt: + # MQTT Uebertragung starten + self.th_mqtt.start() + if self.plcslave: # Slaveausfuehrung übergeben self.th_plcslave.start() @@ -519,6 +573,8 @@ class RevPiPyLoad(): proginit.logger.info("got reqeust to reload config") self._loadconfig() + # TODO: MQTT prüfen und neu starten + # PLC Server Thread prüfen if self.plcslave and self.th_plcslave is not None \ and not self.th_plcslave.is_alive(): @@ -543,6 +599,7 @@ class RevPiPyLoad(): proginit.logger.info("stopping revpipyload") # Alle Sub-Systeme beenden + self.stop_plcmqtt() self.stop_plcslave() self.stop_plcprogram() self.stop_xmlrpcserver() @@ -558,6 +615,18 @@ class RevPiPyLoad(): self._exit = True proginit.logger.debug("leave RevPiPyLoad.stop()") + def stop_plcmqtt(self): + """Beendet MQTT Sender.""" + proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()") + + if self.th_mqtt is not None and self.th_mqtt.is_alive(): + proginit.logger.info("stopping revpiplc thread") + self.th_mqtt.stop() + self.th_mqtt.join() + proginit.logger.debug("mqtt thread successfully closed") + + proginit.logger.debug("leave RevPiPyLoad.stop_plcmqtt()") + def stop_plcprogram(self): """Beendet PLC Programm.""" proginit.logger.debug("enter RevPiPyLoad.stop_plcprogram()")