Alle IOs mit Export-Flag in piCtory werden per MQTT gesendet

Parameter für Event-Senden und Output-Schreiben in Konfig eingefügt
Processabbild wird nicht komplett per MQTT gesendet
piCtory Konfig kann angefordert werden
Ausgänge können gesetzt werden (wenn Export-Flag)
This commit is contained in:
2018-09-03 12:16:43 +02:00
parent 0e27aa1d57
commit c6fe2e93e5
11 changed files with 313 additions and 134 deletions

View File

@@ -1,15 +1,13 @@
# -*- coding: utf-8 -*-
#
# RevPiPyLoad
#
# Webpage: https://revpimodio.org/revpipyplc/
# (c) Sven Sager, License: LGPLv3
#
"""Stellt die MQTT Uebertragung fuer IoT-Zwecke bereit."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "GPLv3"
import proginit
from json import load as jload
from ssl import CERT_NONE
import revpimodio2
from os.path import join
from paho.mqtt.client import Client
from ssl import CERT_NONE
from threading import Thread, Event
@@ -19,36 +17,71 @@ class MqttServer(Thread):
def __init__(
self, basetopic, sendinterval, host, port=1883,
tls_set=False, username="", password=None, client_id=""):
tls_set=False, username="", password=None, client_id="",
send_events=False, write_outputs=False):
"""Init MqttServer class.
@param basetopic Basis-Topic fuer Datenaustausch
@param sendinterval Prozessabbild alle n Sekunden senden
@param host Adresse <class 'str'> des MQTT-Servers
@param port Portnummer <class 'int'> des MQTT-Servers
@param keepalive MQTT Ping bei leerlauf
@param tls_set TLS fuer Verbindung zum MQTT-Server verwenden
@param username Optional Benutzername fuer MQTT-Server
@param password Optional Password fuer MQTT-Server
@param client_id MQTT ClientID, wenn leer automatisch random erzeugung
@param send_events Sendet Werte bei IO Wertaenderung
@param write_outputs Per MQTT auch Outputs schreiben
"""
# TODO: Parameterprüfung
if not isinstance(basetopic, str):
raise ValueError("parameter topic must be <class 'str'>")
if not (isinstance(sendinterval, int) and sendinterval > 0):
raise ValueError(
"parameter sendinterval must be <class 'int'> and gt 0"
)
if not isinstance(host, str):
raise ValueError("parameter host must be <class 'str'>")
if not (isinstance(port, int) and 0 < port < 65535):
raise ValueError(
"parameter sendinterval must be <class 'int'> and 1 - 65535"
)
if not isinstance(tls_set, bool):
raise ValueError("parameter tls_set must be <class 'bool'>")
if not isinstance(username, str):
raise ValueError("parameter username must be <class 'str'>")
if not (password is None or isinstance(password, str)):
raise ValueError("parameter password must be <class 'str'>")
if not isinstance(client_id, str):
raise ValueError("parameter client_id must be <class 'str'>")
if not isinstance(send_events, bool):
raise ValueError("parameter send_events must be <class 'bool'>")
if not isinstance(write_outputs, bool):
raise ValueError("parameter write_outputs must be <class 'bool'>")
super().__init__()
# Klassenvariablen
self.__exit = False
self._evt_data = Event()
self._exported_ios = []
self._host = host
self._procimglength = self._get_procimglength()
self._port = port
self._reloadmodio = False
self._rpi = None
self._rpi_write = None
self._send_events = send_events
self._sendinterval = sendinterval
self._write_outputs = write_outputs
# RevPiModIO laden oder mit Exception aussteigen
self._loadrevpimodio()
# Topics konfigurieren
self._mqtt_picontrol = "{}/picontrol".format(basetopic)
self._mqtt_pictory = "{}/pictory".format(basetopic)
self._mqtt_sendpictory = "{}/needpictory".format(basetopic)
self._mqtt_evt_io = join(basetopic, "event/{0}")
self._mqtt_io = join(basetopic, "io/{0}")
self._mqtt_ioset = join(basetopic, "set/#")
self._mqtt_pictory = join(basetopic, "pictory")
self._mqtt_sendpictory = join(basetopic, "needpictory")
self._mq = Client(client_id)
if username != "":
@@ -61,123 +94,181 @@ class MqttServer(Thread):
self._mq.on_connect = self._on_connect
self._mq.on_message = self._on_message
def _get_procimglength(self):
"""Ermittelt aus piCtory Konfiguration die laenge.
@return Laenge des Prozessabbilds <class 'int'>"""
def _evt_io(self, name, value):
"""Sendet Daten aus Events.
@param name IO-Name
@param value IO-Value"""
if isinstance(value, bytes):
value = int.from_bytes(value, "little")
self._mq.publish(self._mqtt_evt_io.format(name), int(value))
def _loadrevpimodio(self):
"""Instantiiert das RevPiModIO Modul.
@return None or Exception"""
self._reloadmodio = False
self._exported_ios = []
# RevPiModIO-Modul Instantiieren
if self._rpi is not None:
self._rpi.cleanup()
if self._rpi_write is not None:
self._rpi_write.cleanup()
proginit.logger.debug("create revpimodio2 object for MQTT")
try:
with open(proginit.pargs.configrsc, "r") as fh:
rsc = jload(fh)
except:
return 4096
length = 0
# piCtory Config prüfen
if "Devices" not in rsc:
return 0
# Letzes piCtory Device laden
last_dev = rsc["Devices"].pop()
length += last_dev["offset"]
# bei mem beginnen, weil nur der höchste IO benötigt wird
for type_iom in ["mem", "out", "inp"]:
lst_iom = sorted(
last_dev[type_iom],
key=lambda x: int(x),
reverse=True
# Lesend und Eventüberwachung
self._rpi = revpimodio2.RevPiModIO(
autorefresh=self._send_events,
monitoring=True,
configrsc=proginit.pargs.configrsc,
procimg=proginit.pargs.procimg
)
if len(lst_iom) > 0:
# Daten des letzen IOM auswerten
last_iom = last_dev[type_iom][str(lst_iom[0])]
bitlength = int(last_iom[2])
length += int(last_iom[3])
length += 1 if bitlength == 1 else int(bitlength / 8)
break
# Schreibenen Zugriff
if self._write_outputs:
self._rpi_write = revpimodio2.RevPiModIO(
configrsc=proginit.pargs.configrsc,
procimg=proginit.pargs.procimg
)
return length
except Exception as e:
self._rpi = None
self._rpi_write = None
proginit.logger.error(
"piCtory configuration not loadable for MQTT"
)
raise e
# Exportierte IOs laden
for dev in self._rpi.device:
for io in dev.get_allios(export=True):
io.reg_event(self._evt_io)
self._exported_ios.append(io)
# Eventüberwachung starten
if self._send_events:
self._rpi.mainloop(blocking=False)
proginit.logger.debug("created revpimodio2 object")
def _on_connect(self, client, userdata, flags, rc):
"""Verbindung zu MQTT Broker."""
if rc > 0:
proginit.warning("can not connect to mqtt broker - will retry")
proginit.logger.warning(
"can not connect to mqtt broker - error {0} - will retry"
"".format(rc)
)
else:
# piCtory übertragen um alle RevPiMqttIO zu benachrichtigen
self._send_pictory_conf()
# Subscribe piCtory Anforderung
client.subscribe(self._mqtt_sendpictory)
if self._write_outputs:
client.subscribe(self._mqtt_ioset)
def _on_disconnect(self, client, userdata, rc):
"""Wertet Verbindungsabbruch aus."""
if rc != 0:
proginit.warning(
proginit.logger.warning(
"unexpected disconnection from mqtt broker - "
"will try to reconnect"
)
def _on_message(self, client, userdata, msg):
"""Sendet piCtory Konfiguration."""
if msg.topic == self._mqtt_pictory:
# piCtory Konfiguration senden
self._send_pictory_conf()
# piCtory Konfiguration senden
self._send_pictory_conf()
else:
lst_topic = msg.topic.split("/")
ioname = lst_topic[-1]
# Prozessabbild senden
self._evt_data.set()
try:
io = self._rpi_write.io[ioname]
except Exception:
proginit.logger.error(
"can not find io '{0}' for MQTT".format(ioname)
)
return
# Check Output exists and is an Output
if io.type != revpimodio2.OUT:
proginit.logger.error(
"can not write to inputs with MQTT"
)
elif not io.export:
proginit.logger.error(
"io '{0}' is not marked as export in piCtory for MQTT use"
"".format(ioname)
)
else:
# Convert MQTT Payload to valid Output-Value
value = msg.payload.decode("utf8")
if value.isdecimal():
value = int(value)
elif value == "false":
value = 0
elif value == "true":
value = 1
else:
proginit.logger.error(
"can not convert value '{0}' for output '{1}'"
"".format(value, ioname)
)
return
# Write Value to RevPi
io._parentdevice.syncoutputs()
io.value = value
io._parentdevice.writeprocimg()
def _send_pictory_conf(self):
"""Sendet piCtory Konfiguration."""
with open(proginit.pargs.configrsc, "rb") as fh:
"""Sendet piCtory Konfiguration per MQTT."""
try:
fh = open(proginit.pargs.configrsc, "rb")
self._mq.publish(self._mqtt_pictory, fh.read())
fh.close()
except Exception:
proginit.logger.error(
"can not read and publish piCtory config '{0}'"
"".format(proginit.pargs.configrsc)
)
def newlogfile(self):
"""Konfiguriert die FileHandler auf neue Logdatei."""
pass
def reload_revpimodio(self):
"""Fuehrt im naechsten Zyklus zum Reload."""
self._reloadmodio = True
def run(self):
"""Startet die Uebertragung per MQTT."""
proginit.logger.debug("enter MqttServer.start()")
# Prozessabbild öffnen
try:
fh_proc = open(proginit.pargs.procimg, "r+b", 0)
except:
fh_proc = None
self.__exit = True
proginit.logger.error(
"can not open process image {}".format(proginit.pargs.procimg)
)
proginit.logger.debug("enter MqttServer.run()")
# MQTT verbinden
self._mq.connect_async(self._host, self._port, keepalive=60)
self._mq.loop_start()
# mainloop
buff = b''
err_count = 0
while not self.__exit:
self._evt_data.clear()
# Prozessabbild lesen
try:
fh_proc.seek(0)
buff = fh_proc.read(self._procimglength)
if err_count > 0:
proginit.warning(
"resume mqtt publishing after {} errors on "
"processimage".format(err_count)
)
err_count = 0
except IOError:
if err_count == 0:
proginit.logger.error(
"could not read process image for mqtt publishing"
)
err_count += 1
else:
# Prozessabbild übertragen
self._mq.publish(self._mqtt_picontrol, buff)
# RevPiModIO neu laden
if self._reloadmodio:
self._loadrevpimodio()
# Werte laden, wenn nicht autorefresh
if not self._send_events:
self._rpi.readprocimg()
# Exportierte IOs übertragen
for io in self._exported_ios:
value = io.value
if isinstance(value, bytes):
value = int.from_bytes(value, "little")
self._mq.publish(self._mqtt_io.format(io.name), int(value))
self._evt_data.wait(self._sendinterval)
@@ -185,11 +276,10 @@ class MqttServer(Thread):
self._mq.loop_stop()
self._mq.disconnect()
# FileHandler schließen
if fh_proc is not None:
fh_proc.close()
# RevPiModIO aufräumen
self._rpi.cleanup()
proginit.logger.debug("leave MqttServer.start()")
proginit.logger.debug("leave MqttServer.run()")
def stop(self):
"""Stoppt die Uebertragung per MQTT."""

View File

@@ -45,7 +45,10 @@ class ProcimgServer():
"ps_setvalue": self.setvalue,
}
self.loadrevpimodio()
# RevPiModIO laden oder mit Exception aussteigen
ex = self.loadrevpimodio()
if ex is not None:
raise ex
proginit.logger.debug("leave ProcimgServer.__init__()")
@@ -84,25 +87,28 @@ class ProcimgServer():
def loadrevpimodio(self):
"""Instantiiert das RevPiModIO Modul.
@return True, wenn erfolgreich, sonst False"""
@return None or Exception"""
# RevPiModIO-Modul Instantiieren
if self.rpi is not None:
self.rpi.cleanup()
proginit.logger.debug("create revpimodio2 object")
proginit.logger.debug("create revpimodio2 object for ProcimgServer")
try:
self.rpi = revpimodio2.RevPiModIO(
configrsc=proginit.pargs.configrsc,
procimg=proginit.pargs.procimg
)
except Exception:
except Exception as e:
self.rpi = None
proginit.logger.error("piCtory configuration not loadable")
return False
proginit.logger.error(
"piCtory configuration not loadable for ProcimgServer"
)
return e
# NOTE: Warum das?
self.rpi.syncoutputs(device=0)
proginit.logger.debug("created revpimodio2 object")
return True
def setvalue(self, device, io, value):
"""Setzt einen Wert auf dem RevPi.

View File

@@ -188,23 +188,27 @@ class RevPiPyLoad():
self.mqtt = 0
if "MQTT" in self.globalconfig:
self.mqtt = \
int(self.globalconfig["MQTT"].get("mqtt", 0))
self.globalconfig["MQTT"].getboolean("mqtt", False)
self.mqttbasetopic = \
self.globalconfig["MQTT"].get("basetopic", "")
self.mqttsendinterval = \
int(self.globalconfig["MQTT"].get("sendinterval", 15))
self.globalconfig["MQTT"].getint("sendinterval", 15)
self.mqtthost = \
self.globalconfig["MQTT"].get("host", "")
self.mqttport = \
int(self.globalconfig["MQTT"].get("port", 1883))
self.globalconfig["MQTT"].getint("port", 1883)
self.mqtttls_set = \
int(self.globalconfig["MQTT"].get("tls_set", 0))
self.globalconfig["MQTT"].getboolean("tls_set", False)
self.mqttusername = \
self.globalconfig["MQTT"].get("username", "")
self.mqttpassword = \
self.globalconfig["MQTT"].get("password", "")
self.mqttclient_id = \
self.globalconfig["MQTT"].get("client_id", "")
self.mqttsend_events = \
self.globalconfig["MQTT"].getboolean("send_on_event", False)
self.mqttwrite_outputs = \
self.globalconfig["MQTT"].getboolean("write_outputs", False)
# Konfiguration verarbeiten [PLCSLAVE]
self.plcslave = False
@@ -348,7 +352,7 @@ class RevPiPyLoad():
self.xml_ps = None
proginit.logger.warning(
"can not load revpimodio2 module. maybe its not installed "
"or an old version (required at least 2.1.6). if you "
"or an old version (required at least 2.2.3). if you "
"like to use the process monitor feature, update/install "
"revpimodio2: 'apt-get install python3-revpimodio2'"
)
@@ -419,11 +423,17 @@ class RevPiPyLoad():
self.mqtttls_set,
self.mqttusername,
self.mqttpassword,
self.mqttclient_id
self.mqttclient_id,
self.mqttsend_events,
self.mqttwrite_outputs,
)
except Exception:
proginit.logger.warning(
"can not load revpimodio2 module. maybe its not installed "
"or an old version (required at least 2.2.3). if you "
"like to use the mqtt feature, update/install "
"revpimodio2: 'apt-get install python3-revpimodio2'"
)
except:
# TODO: Fehlermeldung ausgeben bezüglich paho.mqtt
pass
proginit.logger.debug("leave RevPiPyLoad._plcmqtt()")
return th_plc
@@ -701,6 +711,8 @@ class RevPiPyLoad():
dc["username"] = self.mqttusername
dc["password"] = self.mqttpassword
dc["client_id"] = self.mqttclient_id
dc["send_events"] = self.mqttsend_events
dc["write_outputs"] = self.mqttwrite_outputs
# PLCSLAVE Sektion
dc["plcslave"] = self.plcslave
@@ -934,6 +946,8 @@ class RevPiPyLoad():
"mqttusername": ".*",
"mqttpassword": ".*",
"mqttclient_id": ".+",
"mqttsend_events": "[01]",
"mqttwrite_outputs": "[01]",
},
"PLCSLAVE": {
"plcslave": "[01]",