From 5560cfb1821482b4eb0d6e6792bd51deece286fb Mon Sep 17 00:00:00 2001 From: NaruX Date: Tue, 1 May 2018 17:50:16 +0200 Subject: [PATCH] mqtt publisher automatisch starten, wenn Einstellungen neu geladen werden Fehlerabfang beim Prozessabbild mqtt Client Verbindet sich async ohne Fehler, wenn Broker noch nicht da ist --- doc/mqttserver.html | 22 +++++++++++++-- eric-revpipyload.api | 2 ++ revpipyload/mqttserver.py | 58 +++++++++++++++++++++++++++----------- revpipyload/revpipyload.py | 5 +++- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/doc/mqttserver.html b/doc/mqttserver.html index b40c3af..aa97c35 100644 --- a/doc/mqttserver.html +++ b/doc/mqttserver.html @@ -54,14 +54,20 @@ Methods Init MqttServer class. _get_procimglength -Ermittelt aus piCtory Konfiguraiton die laenge. +Ermittelt aus piCtory Konfiguration die laenge. _on_connect Verbindung zu MQTT Broker. +_on_disconnect +Wertet Verbindungsabbruch aus. + _on_message Sendet piCtory Konfiguration. +_send_pictory_conf +Sendet piCtory Konfiguration. + newlogfile Konfiguriert die FileHandler auf neue Logdatei. @@ -117,7 +123,7 @@ MQTT ClientID, wenn leer automatisch random erzeugung MqttServer._get_procimglength _get_procimglength()

-Ermittelt aus piCtory Konfiguraiton die laenge. +Ermittelt aus piCtory Konfiguration die laenge.

Returns:
@@ -129,12 +135,24 @@ MqttServer._on_connect _on_connect(client, userdata, flags, rc)

Verbindung zu MQTT Broker. +

+

+MqttServer._on_disconnect

+_on_disconnect(client, userdata, rc) +

+Wertet Verbindungsabbruch aus.

MqttServer._on_message

_on_message(client, userdata, msg)

Sendet piCtory Konfiguration. +

+

+MqttServer._send_pictory_conf

+_send_pictory_conf() +

+Sendet piCtory Konfiguration.

MqttServer.newlogfile

diff --git a/eric-revpipyload.api b/eric-revpipyload.api index 77d540a..7e7e323 100644 --- a/eric-revpipyload.api +++ b/eric-revpipyload.api @@ -14,7 +14,9 @@ logsystem.PipeLogwriter.stop?4() logsystem.PipeLogwriter?1(logfilename) mqttserver.MqttServer._get_procimglength?5() mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc) +mqttserver.MqttServer._on_disconnect?5(client, userdata, rc) mqttserver.MqttServer._on_message?5(client, userdata, msg) +mqttserver.MqttServer._send_pictory_conf?5() mqttserver.MqttServer.newlogfile?4() mqttserver.MqttServer.run?4() mqttserver.MqttServer.stop?4() diff --git a/revpipyload/mqttserver.py b/revpipyload/mqttserver.py index 3d5dbcd..c198e4f 100644 --- a/revpipyload/mqttserver.py +++ b/revpipyload/mqttserver.py @@ -62,7 +62,7 @@ class MqttServer(Thread): self._mq.on_message = self._on_message def _get_procimglength(self): - """Ermittelt aus piCtory Konfiguraiton die laenge. + """Ermittelt aus piCtory Konfiguration die laenge. @return Laenge des Prozessabbilds """ try: with open(proginit.pargs.configrsc, "r") as fh: @@ -101,25 +101,36 @@ class MqttServer(Thread): def _on_connect(self, client, userdata, flags, rc): """Verbindung zu MQTT Broker.""" if rc > 0: - self.__mqttend = True - raise RuntimeError("can not connect to mqtt server") + proginit.warning("can not connect to mqtt broker - will retry") + else: + # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen + self._send_pictory_conf() - # piCtory übertragen um alle RevPiMqttIO zu benachrichtigen - self._on_message(client, userdata, None) + # Subscribe piCtory Anforderung + client.subscribe(self._mqtt_sendpictory) - # Subscribe piCtory Anforderung - client.subscribe(self._mqtt_sendpictory) + def _on_disconnect(self, client, userdata, rc): + """Wertet Verbindungsabbruch aus.""" + if rc != 0: + proginit.warning( + "unexpected disconnection from mqtt broker - " + "will try to reconnect" + ) def _on_message(self, client, userdata, msg): """Sendet piCtory Konfiguration.""" # piCtory Konfiguration senden - with open(proginit.pargs.configrsc, "rb") as fh: - client.publish(self._mqtt_pictory, fh.read()) + self._send_pictory_conf() # Prozessabbild senden self._evt_data.set() + def _send_pictory_conf(self): + """Sendet piCtory Konfiguration.""" + with open(proginit.pargs.configrsc, "rb") as fh: + self._mq.publish(self._mqtt_pictory, fh.read()) + def newlogfile(self): """Konfiguriert die FileHandler auf neue Logdatei.""" pass @@ -139,19 +150,34 @@ class MqttServer(Thread): ) # MQTT verbinden - self._mq.connect(self._host, self._port, keepalive=60) + self._mq.connect_async(self._host, self._port, keepalive=60) self._mq.loop_start() # mainloop + buff = b'' + err_count = 0 while not self.__exit: self._evt_data.clear() - # Prozessabbild mit Daten übertragen - self._mq.publish( - self._mqtt_picontrol, - fh_proc.read(self._procimglength) - ) - fh_proc.seek(0) + # Prozessabbild lesen + try: + fh_proc.seek(0) + buff = fh_proc.read(self._procimglength) + if err_count > 0: + proginit.warning( + "resume mqtt publishing after {} errors on " + "processimage".format(err_count) + ) + err_count = 0 + except IOError: + if err_count == 0: + proginit.logger.error( + "could not read process image for mqtt publishing" + ) + err_count += 1 + else: + # Prozessabbild übertragen + self._mq.publish(self._mqtt_picontrol, buff) self._evt_data.wait(self._sendinterval) diff --git a/revpipyload/revpipyload.py b/revpipyload/revpipyload.py index a9806f8..b4b06ee 100755 --- a/revpipyload/revpipyload.py +++ b/revpipyload/revpipyload.py @@ -267,6 +267,9 @@ class RevPiPyLoad(): # MQTT konfigurieren self.th_mqtt = self._plcmqtt() + if self.th_mqtt is not None and not self._exit: + proginit.logger.info("start mqtt publisher") + self.th_mqtt.start() # PLC Programm konfigurieren if restart_plcprogram: @@ -629,7 +632,7 @@ class RevPiPyLoad(): proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()") if self.th_mqtt is not None and self.th_mqtt.is_alive(): - proginit.logger.info("stopping revpiplc thread") + proginit.logger.info("stopping mqtt thread") self.th_mqtt.stop() self.th_mqtt.join() proginit.logger.debug("mqtt thread successfully closed")