The MQTT system sends byte IOs as real byte to the broker

Most IOs are `bool()` or `int()`. In some cases users replaced some IO values
as raw bytes. The MQTT system now also sends these as raw bytes.
This commit is contained in:
2021-09-09 09:16:48 +02:00
parent 3fe69b54cc
commit d8191959c5

View File

@@ -37,17 +37,11 @@ class MqttServer(Thread):
if not isinstance(basetopic, str): if not isinstance(basetopic, str):
raise ValueError("parameter topic must be <class 'str'>") raise ValueError("parameter topic must be <class 'str'>")
if not (isinstance(sendinterval, int) and sendinterval >= 0): if not (isinstance(sendinterval, int) and sendinterval >= 0):
raise ValueError( raise ValueError("parameter sendinterval must be <class 'int'> and >= 0")
"parameter sendinterval must be <class 'int'> and >= 0"
)
if not (isinstance(broker_address, str) and broker_address != ""): if not (isinstance(broker_address, str) and broker_address != ""):
raise ValueError( raise ValueError("parameter broker_address must be <class 'str'> and not empty")
"parameter broker_address must be <class 'str'> and not empty"
)
if not (isinstance(port, int) and 0 < port < 65535): if not (isinstance(port, int) and 0 < port < 65535):
raise ValueError( raise ValueError("parameter sendinterval must be <class 'int'> and 1 - 65535")
"parameter sendinterval must be <class 'int'> and 1 - 65535"
)
if not isinstance(tls_set, bool): if not isinstance(tls_set, bool):
raise ValueError("parameter tls_set must be <class 'bool'>") raise ValueError("parameter tls_set must be <class 'bool'>")
if not isinstance(username, str): if not isinstance(username, str):
@@ -157,17 +151,11 @@ class MqttServer(Thread):
shared_procimg=True, shared_procimg=True,
) )
self._rpi.debug = -1 self._rpi.debug = -1
proginit.logger.warning( proginit.logger.warning("replace_ios_file not loadable for MQTT - using defaults now | {0}".format(e))
"replace_ios_file not loadable for MQTT - using "
"defaults now | {0}".format(e)
)
except Exception as e: except Exception as e:
self._rpi = None self._rpi = None
proginit.logger.error( proginit.logger.error("piCtory configuration not loadable for MQTT | {0}".format(e))
"piCtory configuration not loadable for MQTT | "
"{0}".format(e)
)
raise e raise e
# Exportierte IOs laden # Exportierte IOs laden
@@ -233,10 +221,7 @@ class MqttServer(Thread):
proginit.logger.debug("enter MqttServer._on_disconnect()") proginit.logger.debug("enter MqttServer._on_disconnect()")
if rc != 0: if rc != 0:
proginit.logger.warning( proginit.logger.warning("unexpected disconnection from mqtt broker - will try to reconnect")
"unexpected disconnection from mqtt broker - "
"will try to reconnect"
)
proginit.logger.debug("leave MqttServer._on_disconnect()") proginit.logger.debug("leave MqttServer._on_disconnect()")
@@ -253,9 +238,7 @@ class MqttServer(Thread):
else: else:
lst_topic = msg.topic.split("/") lst_topic = msg.topic.split("/")
if len(lst_topic) < 2: if len(lst_topic) < 2:
proginit.logger.info( proginit.logger.info("wrong topic format - need ./get/ioname or ./set/ioname")
"wrong topic format - need ./get/ioname or ./set/ioname"
)
return return
# Aktion und IO auswerten # Aktion und IO auswerten
@@ -276,17 +259,12 @@ class MqttServer(Thread):
io = self._rpi.io[ioname] io = self._rpi.io[ioname]
io_needbytes = type(io.value) == bytes io_needbytes = type(io.value) == bytes
except Exception: except Exception:
proginit.logger.error( proginit.logger.error("can not find io '{0}' for MQTT".format(ioname))
"can not find io '{0}' for MQTT".format(ioname)
)
return return
# Aktion verarbeiten # Aktion verarbeiten
if not io.export: if not io.export:
proginit.logger.error( proginit.logger.error("io '{0}' is not marked as export in piCtory for MQTT use".format(ioname))
"io '{0}' is not marked as export in piCtory for MQTT use"
"".format(ioname)
)
elif ioget: elif ioget:
# Werte laden, wenn nicht autorefresh # Werte laden, wenn nicht autorefresh
@@ -297,9 +275,7 @@ class MqttServer(Thread):
self._evt_io(io.name, io.value, requested=True) self._evt_io(io.name, io.value, requested=True)
elif ioset and io.type != revpimodio2.OUT: elif ioset and io.type != revpimodio2.OUT:
proginit.logger.error( proginit.logger.error("can not write to inputs with MQTT")
"can not write to inputs with MQTT"
)
elif ioset: elif ioset:
# Convert MQTT Payload to valid Output-Value # Convert MQTT Payload to valid Output-Value
@@ -313,10 +289,7 @@ class MqttServer(Thread):
try: try:
value = value.to_bytes(io.length, io.byteorder) value = value.to_bytes(io.length, io.byteorder)
except OverflowError: except OverflowError:
proginit.logger.error( proginit.logger.error("can not convert value '{0}' to fitting bytes".format(value))
"can not convert value '{0}' to fitting bytes"
"".format(value)
)
return return
elif value == "false" and not io_needbytes: elif value == "false" and not io_needbytes:
@@ -324,20 +297,14 @@ class MqttServer(Thread):
elif value == "true" and not io_needbytes: elif value == "true" and not io_needbytes:
value = 1 value = 1
else: else:
proginit.logger.error( proginit.logger.error("can not convert value '{0}' for output '{1}'".format(value, ioname))
"can not convert value '{0}' for output '{1}'"
"".format(value, ioname)
)
return return
# Write Value to RevPi # Write Value to RevPi
try: try:
io.value = value io.value = value
except Exception: except Exception:
proginit.logger.error( proginit.logger.error("could not write '{0}' to Output '{1}'".format(value, ioname))
"could not write '{0}' to Output '{1}'"
"".format(value, ioname)
)
elif ioreset: elif ioreset:
# Counter zurücksetzen # Counter zurücksetzen
@@ -348,9 +315,7 @@ class MqttServer(Thread):
else: else:
# Aktion nicht erkennbar # Aktion nicht erkennbar
proginit.logger.warning( proginit.logger.warning("can not see get/set in topic '{0}'".format(msg.topic))
"can not see get/set in topic '{0}'".format(msg.topic)
)
def _send_pictory_conf(self): def _send_pictory_conf(self):
"""Sendet piCtory Konfiguration per MQTT.""" """Sendet piCtory Konfiguration per MQTT."""
@@ -359,10 +324,7 @@ class MqttServer(Thread):
self._mq.publish(self._mqtt_pictory, fh.read()) self._mq.publish(self._mqtt_pictory, fh.read())
fh.close() fh.close()
except Exception: except Exception:
proginit.logger.error( proginit.logger.error("can not read and publish piCtory config '{0}'".format(proginit.pargs.configrsc))
"can not read and publish piCtory config '{0}'"
"".format(proginit.pargs.configrsc)
)
def newlogfile(self): def newlogfile(self):
"""Konfiguriert die FileHandler auf neue Logdatei.""" """Konfiguriert die FileHandler auf neue Logdatei."""
@@ -382,16 +344,12 @@ class MqttServer(Thread):
proginit.logger.debug("enter MqttServer.run()") proginit.logger.debug("enter MqttServer.run()")
# MQTT verbinden # MQTT verbinden
proginit.logger.info( proginit.logger.info("connecting to mqtt broker {0}".format(self._broker_address))
"connecting to mqtt broker {0}".format(self._broker_address)
)
try: try:
self._mq.connect(self._broker_address, self._port, keepalive=60) self._mq.connect(self._broker_address, self._port, keepalive=60)
except Exception: except Exception:
self._on_connect(self._mq, None, None, 3) self._on_connect(self._mq, None, None, 3)
self._mq.connect_async( self._mq.connect_async(self._broker_address, self._port, keepalive=60)
self._broker_address, self._port, keepalive=60
)
self._mq.loop_start() self._mq.loop_start()
# Eventüberwachung starten # Eventüberwachung starten
@@ -411,9 +369,7 @@ class MqttServer(Thread):
# Eventüberwachung erneut starten # Eventüberwachung erneut starten
if self._send_events: if self._send_events:
proginit.logger.debug( proginit.logger.debug("start non blocking mainloop of revpimodio")
"start non blocking mainloop of revpimodio"
)
self._rpi.mainloop(blocking=False) self._rpi.mainloop(blocking=False)
if send_cycledata: if send_cycledata:
@@ -424,19 +380,14 @@ class MqttServer(Thread):
# Exportierte IOs übertragen # Exportierte IOs übertragen
for io in self._exported_ios: for io in self._exported_ios:
value = io.value value = io.value
if isinstance(value, bytes): if isinstance(value, bool):
value = int.from_bytes(value, "little") value = int(value) # Convert False/True to 0/1. Publish function would send the string.
self._mq.publish(self._mqtt_io.format(io.name), int(value)) self._mq.publish(self._mqtt_io.format(io.name), value)
self._evt_data.wait( self._evt_data.wait(10 if not send_cycledata else self._sendinterval)
10 if not send_cycledata else self._sendinterval
)
# MQTT trennen # MQTT trennen
proginit.logger.info( proginit.logger.info("disconnecting from mqtt broker {0}".format(self._broker_address))
"disconnecting from mqtt broker {0}".format(self._broker_address)
)
# NOTE: dies gab dead-locks: self._mq.loop_stop()
self._mq.disconnect() self._mq.disconnect()
# RevPiModIO aufräumen # RevPiModIO aufräumen