Prozessabbildübertragung per MQTT begonnen

This commit is contained in:
2018-04-09 13:56:11 +02:00
parent f937767479
commit 7f712aaf63
9 changed files with 392 additions and 2 deletions

129
revpipyload/mqttserver.py Normal file
View File

@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
#
# RevPiPyLoad
#
# Webpage: https://revpimodio.org/revpipyplc/
# (c) Sven Sager, License: LGPLv3
#
"""Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit."""
import proginit
from ssl import CERT_NONE
from paho.mqtt.client import Client
from threading import Thread, Event
class MqttServer(Thread):
"""Server fuer die Uebertragung des Prozessabbilds per MQTT."""
def __init__(
self, basetopic, sendinterval, host, port=1883,
tls_set=False, username="", password=None, client_id=""):
"""Init MqttServer class.
@param basetopic Basis-Topic fuer Datenaustausch
@param sendinterval Prozessabbild alle n Sekunden senden
@param host Adresse <class 'str'> des MQTT-Servers
@param port Portnummer <class 'int'> des MQTT-Servers
@param keepalive MQTT Ping bei leerlauf
@param tls_set TLS fuer Verbindung zum MQTT-Server verwenden
@param username Optional Benutzername fuer MQTT-Server
@param password Optional Password fuer MQTT-Server
@param client_id MQTT ClientID, wenn leer automatisch random erzeugung
"""
# TODO: Parameterprüfung
super().__init__()
# Klassenvariablen
self.__exit = False
self._evt_data = Event()
self._sendinterval = sendinterval
self._host = host
self._port = port
# Topics konfigurieren
self._mqtt_picontrol = "{}/picontrol".format(basetopic)
self._mqtt_pictory = "{}/pictory".format(basetopic)
self._mqtt_sendpictory = "{}/needpictory".format(basetopic)
self._mq = Client(client_id)
if username != "":
self._mq.username_pw_set(username, password)
if tls_set:
self._mq.tls_set(cert_reqs=CERT_NONE)
self._mq.tls_insecure_set(True)
# Handler konfigurieren
self._mq.on_connect = self._on_connect
self._mq.on_message = self._on_message
# TODO: self._mq.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
"""Verbindung zu MQTT Broker."""
print("Connect rc:", rc)
if rc > 0:
self.__mqttend = True
raise RuntimeError("can not connect to mqtt server")
# Subscribe piCtory Anforderung
client.subscribe(self._mqtt_sendpictory)
def _on_message(self, client, userdata, msg):
"""Sendet piCtory Konfiguration."""
with open(proginit.pargs.configrsc, "rb") as fh:
client.publish(self._mqtt_pictory, fh.read())
# Prozessabbild senden
self._evt_data.set()
def newlogfile(self):
"""Konfiguriert die FileHandler auf neue Logdatei."""
pass
def run(self):
"""Startet die Uebertragung per MQTT."""
proginit.logger.debug("enter MqttServer.start()")
# Prozessabbild öffnen
try:
fh_proc = open(proginit.pargs.procimg, "r+b", 0)
except:
fh_proc = None
self.__exit = True
proginit.logger.error(
"can not open process image {}".format(proginit.pargs.procimg)
)
# MQTT verbinden
self._mq.connect(self._host, self._port, keepalive=60)
self._mq.loop_start()
# mainloop
while not self.__exit:
self._evt_data.clear()
# FIXME: Ganzes Prozessabbild übertragen
self._mq.publish(self._mqtt_picontrol, fh_proc.read(4096))
fh_proc.seek(0)
self._evt_data.wait(self._sendinterval)
# MQTT trennen
self._mq.loop_stop()
self._mq.disconnect()
# FileHandler schließen
if fh_proc is not None:
fh_proc.close()
proginit.logger.debug("leave MqttServer.start()")
def stop(self):
"""Stoppt die Uebertragung per MQTT."""
proginit.logger.debug("enter MqttServer.stop()")
self.__exit = True
self._evt_data.set()
proginit.logger.debug("leave MqttServer.stop()")

View File

@@ -32,6 +32,7 @@ begrenzt werden!
"""
import gzip
import logsystem
import mqttserver
import picontrolserver
import plcsystem
import proginit
@@ -185,6 +186,28 @@ class RevPiPyLoad():
self.zeroonexit = \
int(self.globalconfig["DEFAULT"].get("zeroonexit", 1))
# Konfiguration verarbeiten [MQTT]
self.mqtt = 0
if "MQTT" in self.globalconfig:
self.mqtt = \
int(self.globalconfig["MQTT"].get("mqtt", 0))
self.mqttbasetopic = \
self.globalconfig["MQTT"].get("basetopic", "")
self.mqttsendinterval = \
int(self.globalconfig["MQTT"].get("sendinterval", 15))
self.mqtthost = \
self.globalconfig["MQTT"].get("host", "")
self.mqttport = \
int(self.globalconfig["MQTT"].get("port", 1883))
self.mqtttls_set = \
int(self.globalconfig["MQTT"].get("tls_set", 0))
self.mqttusername = \
self.globalconfig["MQTT"].get("username", "")
self.mqttpassword = \
self.globalconfig["MQTT"].get("password", "")
self.mqttclient_id = \
self.globalconfig["MQTT"].get("client_id", "")
# Konfiguration verarbeiten [PLCSLAVE]
self.plcslave = 0
if "PLCSLAVE" in self.globalconfig:
@@ -241,6 +264,9 @@ class RevPiPyLoad():
)
os.chdir(self.plcworkdir)
# MQTT konfigurieren
self.th_mqtt = self._plcmqtt()
# PLC Programm konfigurieren
if restart_plcprogram:
self.stop_plcprogram()
@@ -368,6 +394,30 @@ class RevPiPyLoad():
proginit.logger.debug("leave RevPiPyLoad._loadconfig()")
def _plcmqtt(self):
"""Konfiguriert den MQTT-Thread fuer die Ausfuehrung.
@return MQTT-Thread Object or None"""
proginit.logger.debug("enter RevPiPyLoad._plcmqtt()")
th_plc = None
if self.mqtt:
try:
th_plc = mqttserver.MqttServer(
self.mqttbasetopic,
self.mqttsendinterval,
self.mqtthost,
self.mqttport,
self.mqtttls_set,
self.mqttusername,
self.mqttpassword,
self.mqttclient_id
)
except:
pass
proginit.logger.debug("leave RevPiPyLoad._plcmqtt()")
return th_plc
def _plcthread(self):
"""Konfiguriert den PLC-Thread fuer die Ausfuehrung.
@return PLC-Thread Object or None"""
@@ -503,6 +553,10 @@ class RevPiPyLoad():
proginit.logger.info("start xmlrpc-server")
self.xsrv.start()
if self.mqtt:
# MQTT Uebertragung starten
self.th_mqtt.start()
if self.plcslave:
# Slaveausfuehrung übergeben
self.th_plcslave.start()
@@ -519,6 +573,8 @@ class RevPiPyLoad():
proginit.logger.info("got reqeust to reload config")
self._loadconfig()
# TODO: MQTT prüfen und neu starten
# PLC Server Thread prüfen
if self.plcslave and self.th_plcslave is not None \
and not self.th_plcslave.is_alive():
@@ -543,6 +599,7 @@ class RevPiPyLoad():
proginit.logger.info("stopping revpipyload")
# Alle Sub-Systeme beenden
self.stop_plcmqtt()
self.stop_plcslave()
self.stop_plcprogram()
self.stop_xmlrpcserver()
@@ -558,6 +615,18 @@ class RevPiPyLoad():
self._exit = True
proginit.logger.debug("leave RevPiPyLoad.stop()")
def stop_plcmqtt(self):
"""Beendet MQTT Sender."""
proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()")
if self.th_mqtt is not None and self.th_mqtt.is_alive():
proginit.logger.info("stopping revpiplc thread")
self.th_mqtt.stop()
self.th_mqtt.join()
proginit.logger.debug("mqtt thread successfully closed")
proginit.logger.debug("leave RevPiPyLoad.stop_plcmqtt()")
def stop_plcprogram(self):
"""Beendet PLC Programm."""
proginit.logger.debug("enter RevPiPyLoad.stop_plcprogram()")