diff --git a/doc/mqttserver.html b/doc/mqttserver.html
index b40c3af..aa97c35 100644
--- a/doc/mqttserver.html
+++ b/doc/mqttserver.html
@@ -54,14 +54,20 @@ Methods
Init MqttServer class. |
| _get_procimglength |
-Ermittelt aus piCtory Konfiguraiton die laenge. |
+Ermittelt aus piCtory Konfiguration die laenge. |
| _on_connect |
Verbindung zu MQTT Broker. |
+| _on_disconnect |
+Wertet Verbindungsabbruch aus. |
+
| _on_message |
Sendet piCtory Konfiguration. |
+| _send_pictory_conf |
+Sendet piCtory Konfiguration. |
+
| newlogfile |
Konfiguriert die FileHandler auf neue Logdatei. |
@@ -117,7 +123,7 @@ MQTT ClientID, wenn leer automatisch random erzeugung
MqttServer._get_procimglength
_get_procimglength()
-Ermittelt aus piCtory Konfiguraiton die laenge.
+Ermittelt aus piCtory Konfiguration die laenge.
- Returns:
-
@@ -129,12 +135,24 @@ MqttServer._on_connect
_on_connect(client, userdata, flags, rc)
Verbindung zu MQTT Broker.
+
+
+MqttServer._on_disconnect
+_on_disconnect(client, userdata, rc)
+
+Wertet Verbindungsabbruch aus.
MqttServer._on_message
_on_message(client, userdata, msg)
Sendet piCtory Konfiguration.
+
+
+MqttServer._send_pictory_conf
+_send_pictory_conf()
+
+Sendet piCtory Konfiguration.
MqttServer.newlogfile
diff --git a/eric-revpipyload.api b/eric-revpipyload.api
index 77d540a..7e7e323 100644
--- a/eric-revpipyload.api
+++ b/eric-revpipyload.api
@@ -14,7 +14,9 @@ logsystem.PipeLogwriter.stop?4()
logsystem.PipeLogwriter?1(logfilename)
mqttserver.MqttServer._get_procimglength?5()
mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc)
+mqttserver.MqttServer._on_disconnect?5(client, userdata, rc)
mqttserver.MqttServer._on_message?5(client, userdata, msg)
+mqttserver.MqttServer._send_pictory_conf?5()
mqttserver.MqttServer.newlogfile?4()
mqttserver.MqttServer.run?4()
mqttserver.MqttServer.stop?4()
diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py
index 3d5dbcd..c198e4f 100644
--- a/revpipyload/mqttserver.py
+++ b/revpipyload/mqttserver.py
@@ -62,7 +62,7 @@ class MqttServer(Thread):
self._mq.on_message = self._on_message
def _get_procimglength(self):
- """Ermittelt aus piCtory Konfiguraiton die laenge.
+ """Ermittelt aus piCtory Konfiguration die laenge.
@return Laenge des Prozessabbilds """
try:
with open(proginit.pargs.configrsc, "r") as fh:
@@ -101,25 +101,36 @@ class MqttServer(Thread):
def _on_connect(self, client, userdata, flags, rc):
"""Verbindung zu MQTT Broker."""
if rc > 0:
- self.__mqttend = True
- raise RuntimeError("can not connect to mqtt server")
+ proginit.warning("can not connect to mqtt broker - will retry")
+ else:
+ # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen
+ self._send_pictory_conf()
- # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen
- self._on_message(client, userdata, None)
+ # Subscribe piCtory Anforderung
+ client.subscribe(self._mqtt_sendpictory)
- # Subscribe piCtory Anforderung
- client.subscribe(self._mqtt_sendpictory)
+ def _on_disconnect(self, client, userdata, rc):
+ """Wertet Verbindungsabbruch aus."""
+ if rc != 0:
+ proginit.warning(
+ "unexpected disconnection from mqtt broker - "
+ "will try to reconnect"
+ )
def _on_message(self, client, userdata, msg):
"""Sendet piCtory Konfiguration."""
# piCtory Konfiguration senden
- with open(proginit.pargs.configrsc, "rb") as fh:
- client.publish(self._mqtt_pictory, fh.read())
+ self._send_pictory_conf()
# Prozessabbild senden
self._evt_data.set()
+ def _send_pictory_conf(self):
+ """Sendet piCtory Konfiguration."""
+ with open(proginit.pargs.configrsc, "rb") as fh:
+ self._mq.publish(self._mqtt_pictory, fh.read())
+
def newlogfile(self):
"""Konfiguriert die FileHandler auf neue Logdatei."""
pass
@@ -139,19 +150,34 @@ class MqttServer(Thread):
)
# MQTT verbinden
- self._mq.connect(self._host, self._port, keepalive=60)
+ self._mq.connect_async(self._host, self._port, keepalive=60)
self._mq.loop_start()
# mainloop
+ buff = b''
+ err_count = 0
while not self.__exit:
self._evt_data.clear()
- # Prozessabbild mit Daten übertragen
- self._mq.publish(
- self._mqtt_picontrol,
- fh_proc.read(self._procimglength)
- )
- fh_proc.seek(0)
+ # Prozessabbild lesen
+ try:
+ fh_proc.seek(0)
+ buff = fh_proc.read(self._procimglength)
+ if err_count > 0:
+ proginit.warning(
+ "resume mqtt publishing after {} errors on "
+ "processimage".format(err_count)
+ )
+ err_count = 0
+ except IOError:
+ if err_count == 0:
+ proginit.logger.error(
+ "could not read process image for mqtt publishing"
+ )
+ err_count += 1
+ else:
+ # Prozessabbild übertragen
+ self._mq.publish(self._mqtt_picontrol, buff)
self._evt_data.wait(self._sendinterval)
diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py
index a9806f8..b4b06ee 100755
--- a/revpipyload/revpipyload.py
+++ b/revpipyload/revpipyload.py
@@ -267,6 +267,9 @@ class RevPiPyLoad():
# MQTT konfigurieren
self.th_mqtt = self._plcmqtt()
+ if self.th_mqtt is not None and not self._exit:
+ proginit.logger.info("start mqtt publisher")
+ self.th_mqtt.start()
# PLC Programm konfigurieren
if restart_plcprogram:
@@ -629,7 +632,7 @@ class RevPiPyLoad():
proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()")
if self.th_mqtt is not None and self.th_mqtt.is_alive():
- proginit.logger.info("stopping revpiplc thread")
+ proginit.logger.info("stopping mqtt thread")
self.th_mqtt.stop()
self.th_mqtt.join()
proginit.logger.debug("mqtt thread successfully closed")