IOs des Core als einzelne Werte behandeln

Topic base/get/ioname sendet den angegebenen IO sofort per MQTT base/got/ioname
Topic base/get sendet Zyklische Daten sofort
Funktion SaveXMLRPCServer.isAlive in SaveXMLRPCServer.is_alive() umbenannt
MqttServer läd RevPiModIO sofort neu bei reload_revpimodio
sendinterval=0 deaktiviert zyklische Übertragung
This commit is contained in:
2018-09-20 11:58:04 +02:00
parent 22cc824694
commit 50f406570c
8 changed files with 117 additions and 37 deletions

View File

@@ -22,7 +22,7 @@ class MqttServer(Thread):
"""Init MqttServer class.
@param basetopic Basis-Topic fuer Datenaustausch
@param sendinterval Prozessabbild alle n Sekunden senden
@param sendinterval Prozessabbild alle n Sekunden senden / 0 = aus
@param broker_address Adresse <class 'str'> des MQTT-Servers
@param port Portnummer <class 'int'> des MQTT-Servers
@param tls_set TLS fuer Verbindung zum MQTT-Server verwenden
@@ -35,9 +35,9 @@ class MqttServer(Thread):
"""
if not isinstance(basetopic, str):
raise ValueError("parameter topic must be <class 'str'>")
if not (isinstance(sendinterval, int) and sendinterval > 0):
if not (isinstance(sendinterval, int) and sendinterval >= 0):
raise ValueError(
"parameter sendinterval must be <class 'int'> and > 0"
"parameter sendinterval must be <class 'int'> and >= 0"
)
if not (isinstance(broker_address, str) and broker_address != ""):
raise ValueError(
@@ -80,9 +80,12 @@ class MqttServer(Thread):
# Topics konfigurieren
self._mqtt_evt_io = join(basetopic, "event/{0}")
self._mqtt_got_io = join(basetopic, "got/{0}")
self._mqtt_io = join(basetopic, "io/{0}")
self._mqtt_ioget = join(basetopic, "get/#")
self._mqtt_ioset = join(basetopic, "set/#")
self._mqtt_pictory = join(basetopic, "pictory")
self._mqtt_senddata = join(basetopic, "get")
self._mqtt_sendpictory = join(basetopic, "needpictory")
self._mq = Client(client_id)
@@ -96,13 +99,22 @@ class MqttServer(Thread):
self._mq.on_connect = self._on_connect
self._mq.on_message = self._on_message
def _evt_io(self, name, value):
def _evt_io(self, name, value, requested=False):
"""Sendet Daten aus Events.
@param name IO-Name
@param value IO-Value"""
@param value IO-Value
@param requested Wenn True, wird 'got' Topic verwendet
"""
if requested:
topic = self._mqtt_got_io.format(name)
else:
topic = self._mqtt_evt_io.format(name)
if isinstance(value, bytes):
value = int.from_bytes(value, "little")
self._mq.publish(self._mqtt_evt_io.format(name), int(value))
self._mq.publish(topic, int(value))
def _loadrevpimodio(self):
"""Instantiiert das RevPiModIO Modul.
@@ -147,6 +159,26 @@ class MqttServer(Thread):
io.reg_event(self._evt_io)
self._exported_ios.append(io)
# RevPiLED Ausgang zerlegen und exportieren
if self._rpi.core._ioled.export:
lst_coreio = [
self._rpi.core.a1green, self._rpi.core.a1red,
self._rpi.core.a2green, self._rpi.core.a2red,
]
# Connect-IOs anhängen
if type(self._rpi.core) == revpimodio2.device.Connect:
lst_coreio += [
self._rpi.core.a3green, self._rpi.core.a3red,
self._rpi.core.wd,
self._rpi.core.x2in, self._rpi.core.x2out,
]
# Events registrieren
for io in lst_coreio:
io.reg_event(self._evt_io)
self._exported_ios.append(io)
proginit.logger.debug("created revpimodio2 object")
def _on_connect(self, client, userdata, flags, rc):
@@ -158,6 +190,8 @@ class MqttServer(Thread):
)
else:
# Subscribe piCtory Anforderung
client.subscribe(self._mqtt_ioget)
client.subscribe(self._mqtt_senddata)
client.subscribe(self._mqtt_sendpictory)
if self._write_outputs:
client.subscribe(self._mqtt_ioset)
@@ -176,12 +210,33 @@ class MqttServer(Thread):
# piCtory Konfiguration senden
self._send_pictory_conf()
elif msg.topic == self._mqtt_senddata:
# Alle zyklischen Daten senden
self._evt_data.set()
else:
lst_topic = msg.topic.split("/")
if len(lst_topic) < 2:
proginit.logger.info(
"wrong topic format - need ./get/ioname or ./set/ioname"
)
return
# Aktion und IO auswerten
ioget = lst_topic[-2].lower() == "get"
ioset = lst_topic[-2].lower() == "set"
ioname = lst_topic[-1]
coreio = ioname.find(".") != -1
try:
io = self._rpi_write.io[ioname]
# IO holen
if coreio:
coreio = ioname.split(".")[-1]
io = getattr(self._rpi_write.core, coreio)
if not isinstance(io, revpimodio2.io.IOBase):
raise RuntimeError()
else:
io = self._rpi_write.io[ioname]
io_needbytes = type(io.value) == bytes
except Exception:
proginit.logger.error(
@@ -189,17 +244,28 @@ class MqttServer(Thread):
)
return
# Check Output exists and is an Output
if io.type != revpimodio2.OUT:
proginit.logger.error(
"can not write to inputs with MQTT"
)
elif not io.export:
# Aktion verarbeiten
if not io.export:
proginit.logger.error(
"io '{0}' is not marked as export in piCtory for MQTT use"
"".format(ioname)
)
else:
elif ioget:
# Daten je nach IO Type aus Prozessabbild laden
if io.type == revpimodio2.OUT:
io._parentdevice.syncoutputs()
else:
io._parentdevice.readprocimg()
# Publish Wert von IO
self._evt_io(io.name, io.value, requested=True)
elif ioset and io.type != revpimodio2.OUT:
proginit.logger.error(
"can not write to inputs with MQTT"
)
elif ioset:
# Convert MQTT Payload to valid Output-Value
value = msg.payload.decode("utf8")
@@ -241,6 +307,12 @@ class MqttServer(Thread):
else:
io._parentdevice.writeprocimg()
else:
# Aktion nicht erkennbar
proginit.logger.warning(
"can not see get/set in topic '{0}'".format(msg.topic)
)
def _send_pictory_conf(self):
"""Sendet piCtory Konfiguration per MQTT."""
try:
@@ -260,6 +332,7 @@ class MqttServer(Thread):
def reload_revpimodio(self):
"""Fuehrt im naechsten Zyklus zum Reload."""
self._reloadmodio = True
self._evt_data.set()
def run(self):
"""Startet die Uebertragung per MQTT."""
@@ -280,6 +353,7 @@ class MqttServer(Thread):
self._rpi.mainloop(blocking=False)
# mainloop
send_cycledata = self._sendinterval > 0
while not self.__exit:
self._evt_data.clear()
@@ -291,18 +365,21 @@ class MqttServer(Thread):
if self._send_events:
self._rpi.mainloop(blocking=False)
# Werte laden, wenn nicht autorefresh
if not self._send_events:
self._rpi.readprocimg()
if send_cycledata:
# Werte laden, wenn nicht autorefresh
if not self._send_events:
self._rpi.readprocimg()
# Exportierte IOs übertragen
for io in self._exported_ios:
value = io.value
if isinstance(value, bytes):
value = int.from_bytes(value, "little")
self._mq.publish(self._mqtt_io.format(io.name), int(value))
# Exportierte IOs übertragen
for io in self._exported_ios:
value = io.value
if isinstance(value, bytes):
value = int.from_bytes(value, "little")
self._mq.publish(self._mqtt_io.format(io.name), int(value))
self._evt_data.wait(self._sendinterval)
self._evt_data.wait(
10 if not send_cycledata else self._sendinterval
)
# MQTT trennen
self._mq.loop_stop()

View File

@@ -387,7 +387,7 @@ class RevPiPyLoad():
self.xml_ps = None
proginit.logger.warning(
"can not load revpimodio2 module. maybe its not installed "
"or an old version (required at least 2.2.3). if you "
"or an old version (required at least 2.2.4). if you "
"like to use the process monitor feature, update/install "
"revpimodio2: 'apt-get install python3-revpimodio2'"
)
@@ -453,7 +453,7 @@ class RevPiPyLoad():
except Exception:
proginit.logger.warning(
"can not load revpimodio2 module. maybe its not installed "
"or an old version (required at least 2.2.3). if you "
"or an old version (required at least 2.2.4). if you "
"like to use the mqtt feature, update/install "
"revpimodio2: 'apt-get install python3-revpimodio2'"
)

View File

@@ -62,7 +62,7 @@ class SaveXMLRPCServer(SimpleXMLRPCServer):
return super()._dispatch(method, params)
def isAlive(self):
def is_alive(self):
"""Prueft ob der XML RPC Server laeuft.
@return True, wenn Server noch laeuft"""
return False if self.fut is None else self.fut.running()