mirror of
https://github.com/naruxde/revpipyload.git
synced 2025-11-08 15:13:52 +01:00
mqtt publisher automatisch starten, wenn Einstellungen neu geladen werden
Fehlerabfang beim Prozessabbild mqtt Client Verbindet sich async ohne Fehler, wenn Broker noch nicht da ist
This commit is contained in:
@@ -54,14 +54,20 @@ Methods</h3>
|
|||||||
<td>Init MqttServer class.</td>
|
<td>Init MqttServer class.</td>
|
||||||
</tr><tr>
|
</tr><tr>
|
||||||
<td><a style="color:#0000FF" href="#MqttServer._get_procimglength">_get_procimglength</a></td>
|
<td><a style="color:#0000FF" href="#MqttServer._get_procimglength">_get_procimglength</a></td>
|
||||||
<td>Ermittelt aus piCtory Konfiguraiton die laenge.</td>
|
<td>Ermittelt aus piCtory Konfiguration die laenge.</td>
|
||||||
</tr><tr>
|
</tr><tr>
|
||||||
<td><a style="color:#0000FF" href="#MqttServer._on_connect">_on_connect</a></td>
|
<td><a style="color:#0000FF" href="#MqttServer._on_connect">_on_connect</a></td>
|
||||||
<td>Verbindung zu MQTT Broker.</td>
|
<td>Verbindung zu MQTT Broker.</td>
|
||||||
</tr><tr>
|
</tr><tr>
|
||||||
|
<td><a style="color:#0000FF" href="#MqttServer._on_disconnect">_on_disconnect</a></td>
|
||||||
|
<td>Wertet Verbindungsabbruch aus.</td>
|
||||||
|
</tr><tr>
|
||||||
<td><a style="color:#0000FF" href="#MqttServer._on_message">_on_message</a></td>
|
<td><a style="color:#0000FF" href="#MqttServer._on_message">_on_message</a></td>
|
||||||
<td>Sendet piCtory Konfiguration.</td>
|
<td>Sendet piCtory Konfiguration.</td>
|
||||||
</tr><tr>
|
</tr><tr>
|
||||||
|
<td><a style="color:#0000FF" href="#MqttServer._send_pictory_conf">_send_pictory_conf</a></td>
|
||||||
|
<td>Sendet piCtory Konfiguration.</td>
|
||||||
|
</tr><tr>
|
||||||
<td><a style="color:#0000FF" href="#MqttServer.newlogfile">newlogfile</a></td>
|
<td><a style="color:#0000FF" href="#MqttServer.newlogfile">newlogfile</a></td>
|
||||||
<td>Konfiguriert die FileHandler auf neue Logdatei.</td>
|
<td>Konfiguriert die FileHandler auf neue Logdatei.</td>
|
||||||
</tr><tr>
|
</tr><tr>
|
||||||
@@ -117,7 +123,7 @@ MQTT ClientID, wenn leer automatisch random erzeugung
|
|||||||
MqttServer._get_procimglength</h3>
|
MqttServer._get_procimglength</h3>
|
||||||
<b>_get_procimglength</b>(<i></i>)
|
<b>_get_procimglength</b>(<i></i>)
|
||||||
<p>
|
<p>
|
||||||
Ermittelt aus piCtory Konfiguraiton die laenge.
|
Ermittelt aus piCtory Konfiguration die laenge.
|
||||||
</p><dl>
|
</p><dl>
|
||||||
<dt>Returns:</dt>
|
<dt>Returns:</dt>
|
||||||
<dd>
|
<dd>
|
||||||
@@ -129,12 +135,24 @@ MqttServer._on_connect</h3>
|
|||||||
<b>_on_connect</b>(<i>client, userdata, flags, rc</i>)
|
<b>_on_connect</b>(<i>client, userdata, flags, rc</i>)
|
||||||
<p>
|
<p>
|
||||||
Verbindung zu MQTT Broker.
|
Verbindung zu MQTT Broker.
|
||||||
|
</p><a NAME="MqttServer._on_disconnect" ID="MqttServer._on_disconnect"></a>
|
||||||
|
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
||||||
|
MqttServer._on_disconnect</h3>
|
||||||
|
<b>_on_disconnect</b>(<i>client, userdata, rc</i>)
|
||||||
|
<p>
|
||||||
|
Wertet Verbindungsabbruch aus.
|
||||||
</p><a NAME="MqttServer._on_message" ID="MqttServer._on_message"></a>
|
</p><a NAME="MqttServer._on_message" ID="MqttServer._on_message"></a>
|
||||||
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
||||||
MqttServer._on_message</h3>
|
MqttServer._on_message</h3>
|
||||||
<b>_on_message</b>(<i>client, userdata, msg</i>)
|
<b>_on_message</b>(<i>client, userdata, msg</i>)
|
||||||
<p>
|
<p>
|
||||||
Sendet piCtory Konfiguration.
|
Sendet piCtory Konfiguration.
|
||||||
|
</p><a NAME="MqttServer._send_pictory_conf" ID="MqttServer._send_pictory_conf"></a>
|
||||||
|
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
||||||
|
MqttServer._send_pictory_conf</h3>
|
||||||
|
<b>_send_pictory_conf</b>(<i></i>)
|
||||||
|
<p>
|
||||||
|
Sendet piCtory Konfiguration.
|
||||||
</p><a NAME="MqttServer.newlogfile" ID="MqttServer.newlogfile"></a>
|
</p><a NAME="MqttServer.newlogfile" ID="MqttServer.newlogfile"></a>
|
||||||
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
<h3 style="background-color:#FFFFFF;color:#FF0000">
|
||||||
MqttServer.newlogfile</h3>
|
MqttServer.newlogfile</h3>
|
||||||
|
|||||||
@@ -14,7 +14,9 @@ logsystem.PipeLogwriter.stop?4()
|
|||||||
logsystem.PipeLogwriter?1(logfilename)
|
logsystem.PipeLogwriter?1(logfilename)
|
||||||
mqttserver.MqttServer._get_procimglength?5()
|
mqttserver.MqttServer._get_procimglength?5()
|
||||||
mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc)
|
mqttserver.MqttServer._on_connect?5(client, userdata, flags, rc)
|
||||||
|
mqttserver.MqttServer._on_disconnect?5(client, userdata, rc)
|
||||||
mqttserver.MqttServer._on_message?5(client, userdata, msg)
|
mqttserver.MqttServer._on_message?5(client, userdata, msg)
|
||||||
|
mqttserver.MqttServer._send_pictory_conf?5()
|
||||||
mqttserver.MqttServer.newlogfile?4()
|
mqttserver.MqttServer.newlogfile?4()
|
||||||
mqttserver.MqttServer.run?4()
|
mqttserver.MqttServer.run?4()
|
||||||
mqttserver.MqttServer.stop?4()
|
mqttserver.MqttServer.stop?4()
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ class MqttServer(Thread):
|
|||||||
self._mq.on_message = self._on_message
|
self._mq.on_message = self._on_message
|
||||||
|
|
||||||
def _get_procimglength(self):
|
def _get_procimglength(self):
|
||||||
"""Ermittelt aus piCtory Konfiguraiton die laenge.
|
"""Ermittelt aus piCtory Konfiguration die laenge.
|
||||||
@return Laenge des Prozessabbilds <class 'int'>"""
|
@return Laenge des Prozessabbilds <class 'int'>"""
|
||||||
try:
|
try:
|
||||||
with open(proginit.pargs.configrsc, "r") as fh:
|
with open(proginit.pargs.configrsc, "r") as fh:
|
||||||
@@ -101,25 +101,36 @@ class MqttServer(Thread):
|
|||||||
def _on_connect(self, client, userdata, flags, rc):
|
def _on_connect(self, client, userdata, flags, rc):
|
||||||
"""Verbindung zu MQTT Broker."""
|
"""Verbindung zu MQTT Broker."""
|
||||||
if rc > 0:
|
if rc > 0:
|
||||||
self.__mqttend = True
|
proginit.warning("can not connect to mqtt broker - will retry")
|
||||||
raise RuntimeError("can not connect to mqtt server")
|
else:
|
||||||
|
# piCtory übertragen um alle RevPiMqttIO zu benachrichtigen
|
||||||
|
self._send_pictory_conf()
|
||||||
|
|
||||||
# piCtory übertragen um alle RevPiMqttIO zu benachrichtigen
|
# Subscribe piCtory Anforderung
|
||||||
self._on_message(client, userdata, None)
|
client.subscribe(self._mqtt_sendpictory)
|
||||||
|
|
||||||
# Subscribe piCtory Anforderung
|
def _on_disconnect(self, client, userdata, rc):
|
||||||
client.subscribe(self._mqtt_sendpictory)
|
"""Wertet Verbindungsabbruch aus."""
|
||||||
|
if rc != 0:
|
||||||
|
proginit.warning(
|
||||||
|
"unexpected disconnection from mqtt broker - "
|
||||||
|
"will try to reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
def _on_message(self, client, userdata, msg):
|
def _on_message(self, client, userdata, msg):
|
||||||
"""Sendet piCtory Konfiguration."""
|
"""Sendet piCtory Konfiguration."""
|
||||||
|
|
||||||
# piCtory Konfiguration senden
|
# piCtory Konfiguration senden
|
||||||
with open(proginit.pargs.configrsc, "rb") as fh:
|
self._send_pictory_conf()
|
||||||
client.publish(self._mqtt_pictory, fh.read())
|
|
||||||
|
|
||||||
# Prozessabbild senden
|
# Prozessabbild senden
|
||||||
self._evt_data.set()
|
self._evt_data.set()
|
||||||
|
|
||||||
|
def _send_pictory_conf(self):
|
||||||
|
"""Sendet piCtory Konfiguration."""
|
||||||
|
with open(proginit.pargs.configrsc, "rb") as fh:
|
||||||
|
self._mq.publish(self._mqtt_pictory, fh.read())
|
||||||
|
|
||||||
def newlogfile(self):
|
def newlogfile(self):
|
||||||
"""Konfiguriert die FileHandler auf neue Logdatei."""
|
"""Konfiguriert die FileHandler auf neue Logdatei."""
|
||||||
pass
|
pass
|
||||||
@@ -139,19 +150,34 @@ class MqttServer(Thread):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# MQTT verbinden
|
# MQTT verbinden
|
||||||
self._mq.connect(self._host, self._port, keepalive=60)
|
self._mq.connect_async(self._host, self._port, keepalive=60)
|
||||||
self._mq.loop_start()
|
self._mq.loop_start()
|
||||||
|
|
||||||
# mainloop
|
# mainloop
|
||||||
|
buff = b''
|
||||||
|
err_count = 0
|
||||||
while not self.__exit:
|
while not self.__exit:
|
||||||
self._evt_data.clear()
|
self._evt_data.clear()
|
||||||
|
|
||||||
# Prozessabbild mit Daten übertragen
|
# Prozessabbild lesen
|
||||||
self._mq.publish(
|
try:
|
||||||
self._mqtt_picontrol,
|
fh_proc.seek(0)
|
||||||
fh_proc.read(self._procimglength)
|
buff = fh_proc.read(self._procimglength)
|
||||||
)
|
if err_count > 0:
|
||||||
fh_proc.seek(0)
|
proginit.warning(
|
||||||
|
"resume mqtt publishing after {} errors on "
|
||||||
|
"processimage".format(err_count)
|
||||||
|
)
|
||||||
|
err_count = 0
|
||||||
|
except IOError:
|
||||||
|
if err_count == 0:
|
||||||
|
proginit.logger.error(
|
||||||
|
"could not read process image for mqtt publishing"
|
||||||
|
)
|
||||||
|
err_count += 1
|
||||||
|
else:
|
||||||
|
# Prozessabbild übertragen
|
||||||
|
self._mq.publish(self._mqtt_picontrol, buff)
|
||||||
|
|
||||||
self._evt_data.wait(self._sendinterval)
|
self._evt_data.wait(self._sendinterval)
|
||||||
|
|
||||||
|
|||||||
@@ -267,6 +267,9 @@ class RevPiPyLoad():
|
|||||||
|
|
||||||
# MQTT konfigurieren
|
# MQTT konfigurieren
|
||||||
self.th_mqtt = self._plcmqtt()
|
self.th_mqtt = self._plcmqtt()
|
||||||
|
if self.th_mqtt is not None and not self._exit:
|
||||||
|
proginit.logger.info("start mqtt publisher")
|
||||||
|
self.th_mqtt.start()
|
||||||
|
|
||||||
# PLC Programm konfigurieren
|
# PLC Programm konfigurieren
|
||||||
if restart_plcprogram:
|
if restart_plcprogram:
|
||||||
@@ -629,7 +632,7 @@ class RevPiPyLoad():
|
|||||||
proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()")
|
proginit.logger.debug("enter RevPiPyLoad.stop_plcmqtt()")
|
||||||
|
|
||||||
if self.th_mqtt is not None and self.th_mqtt.is_alive():
|
if self.th_mqtt is not None and self.th_mqtt.is_alive():
|
||||||
proginit.logger.info("stopping revpiplc thread")
|
proginit.logger.info("stopping mqtt thread")
|
||||||
self.th_mqtt.stop()
|
self.th_mqtt.stop()
|
||||||
self.th_mqtt.join()
|
self.th_mqtt.join()
|
||||||
proginit.logger.debug("mqtt thread successfully closed")
|
proginit.logger.debug("mqtt thread successfully closed")
|
||||||
|
|||||||
Reference in New Issue
Block a user