Implement bytebuffer and length check to NetIO

Compatible to PyLoad before 0.9
This commit is contained in:
2020-03-08 14:53:28 +01:00
parent ce0142f48e
commit 3f28f0ae48

View File

@@ -30,7 +30,7 @@ _sysreplaceioh = b'\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17'
# Übertragene Bytes schreiben # Übertragene Bytes schreiben
_sysflush = b'\x01SD\x00\x00\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x17' _sysflush = b'\x01SD\x00\x00\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x17'
# Hashvalues # Hashvalues
HASH_FAIL = b'\xff' * 16 HASH_FAIL = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
class AclException(Exception): class AclException(Exception):
@@ -54,10 +54,11 @@ class NetFH(Thread):
so gesteuert werden. so gesteuert werden.
""" """
__slots__ = "__by_buff", "__check_replace_ios", "__config_changed", \ __slots__ = "__buff_size", "__buff_block", "__buff_recv", \
"__by_buff", "__check_replace_ios", "__config_changed", \
"__int_buff", "__dictdirty", "__flusherr", "__replace_ios_h", \ "__int_buff", "__dictdirty", "__flusherr", "__replace_ios_h", \
"__pictory_h", "__position", "__sockact", "__sockerr", "__sockend", \ "__pictory_h", "__position", "__sockerr", "__sockend", \
"__socklock", "__timeout", "__trigger", "__waitsync", "_address", \ "__socklock", "__timeout", "__waitsync", "_address", \
"_slavesock", "daemon" "_slavesock", "daemon"
def __init__(self, address: tuple, check_replace_ios: bool, timeout=500): def __init__(self, address: tuple, check_replace_ios: bool, timeout=500):
@@ -71,7 +72,10 @@ class NetFH(Thread):
super().__init__() super().__init__()
self.daemon = True self.daemon = True
self.__by_buff = b'' self.__buff_size = 2048 # Values up to 32 are static in code!
self.__buff_block = bytearray(self.__buff_size)
self.__buff_recv = bytearray()
self.__by_buff = bytearray()
self.__check_replace_ios = check_replace_ios self.__check_replace_ios = check_replace_ios
self.__config_changed = False self.__config_changed = False
self.__int_buff = 0 self.__int_buff = 0
@@ -79,15 +83,13 @@ class NetFH(Thread):
self.__flusherr = False self.__flusherr = False
self.__replace_ios_h = b'' self.__replace_ios_h = b''
self.__pictory_h = b'' self.__pictory_h = b''
self.__sockact = False
self.__sockerr = Event() self.__sockerr = Event()
self.__sockend = Event() self.__sockend = Event()
self.__socklock = Lock() self.__socklock = Lock()
self.__timeout = None self.__timeout = None
self.__trigger = False
self.__waitsync = None self.__waitsync = None
self._address = address self._address = address
self._slavesock = None self._slavesock = None # type: socket.socket
# Parameterprüfung # Parameterprüfung
if not isinstance(address, tuple): if not isinstance(address, tuple):
@@ -114,7 +116,10 @@ class NetFH(Thread):
def __check_acl(self, bytecode: bytes) -> None: def __check_acl(self, bytecode: bytes) -> None:
""" """
Pueft ob ACL auf RevPi den Vorgang erlaubt oder wirft exception. Pueft ob ACL auf RevPi den Vorgang erlaubt.
Ist der Vorgang nicht zulässig, wird der Socket sofort geschlossen
und eine Exception geworfen.
:param bytecode: Antwort, die geprueft werden solll :param bytecode: Antwort, die geprueft werden solll
""" """
@@ -138,14 +143,12 @@ class NetFH(Thread):
if isinstance(value, int) and (100 <= value <= 60000): if isinstance(value, int) and (100 <= value <= 60000):
self.__timeout = value / 1000 self.__timeout = value / 1000
socket.setdefaulttimeout(self.__timeout)
# Timeouts in Socket setzen # Timeouts in Socket setzen
if self._slavesock is not None: if self._slavesock is not None:
self._slavesock.settimeout(self.__timeout) self._slavesock.settimeout(self.__timeout)
# 45 Prozent vom Timeout für Synctimer verwenden # 45 Prozent vom Timeout für Synctimer verwenden
self.__waitsync = self.__timeout / 10 * 4.5 self.__waitsync = self.__timeout / 100 * 45
else: else:
raise ValueError("value must between 10 and 60000 milliseconds") raise ValueError("value must between 10 and 60000 milliseconds")
@@ -156,6 +159,7 @@ class NetFH(Thread):
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM) so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
so.connect(self._address) so.connect(self._address)
so.settimeout(self.__timeout)
# Hashwerte anfordern # Hashwerte anfordern
recv_len = 16 recv_len = 16
@@ -164,35 +168,33 @@ class NetFH(Thread):
so.sendall(_sysreplaceioh) so.sendall(_sysreplaceioh)
recv_len += 16 recv_len += 16
# Hashwerte empfangen # Hashwerte empfangen mit eigenen Puffern, da nicht gelocked
byte_buff = bytearray() buff_recv = bytearray(recv_len)
zero_byte = 0 while recv_len > 0:
while not self.__sockend.is_set() and len(byte_buff) < recv_len: block = so.recv(recv_len)
data = so.recv(recv_len) if block == b'':
if data == b'': raise OSError("lost connection on hash receive")
zero_byte += 1 buff_recv += block
if zero_byte == 100: recv_len -= len(block)
raise OSError("too many zero bytes on hash load")
byte_buff += data
# Änderung an piCtory prüfen # Änderung an piCtory prüfen
if self.__pictory_h and byte_buff[:16] != self.__pictory_h: if self.__pictory_h and buff_recv[:16] != self.__pictory_h:
self.__config_changed = True self.__config_changed = True
self.close() self.close()
raise ConfigChanged( raise ConfigChanged(
"configuration on revolution pi was changed") "configuration on revolution pi was changed")
else: else:
self.__pictory_h = byte_buff[:16] self.__pictory_h = buff_recv[:16]
# Änderung an replace_ios prüfen # Änderung an replace_ios prüfen
if self.__check_replace_ios and self.__replace_ios_h \ if self.__check_replace_ios and self.__replace_ios_h \
and byte_buff[16:] != self.__replace_ios_h: and buff_recv[16:] != self.__replace_ios_h:
self.__config_changed = True self.__config_changed = True
self.close() self.close()
raise ConfigChanged( raise ConfigChanged(
"configuration on revolution pi was changed") "configuration on revolution pi was changed")
else: else:
self.__replace_ios_h = byte_buff[16:] self.__replace_ios_h = buff_recv[16:]
except ConfigChanged: except ConfigChanged:
so.close() so.close()
raise raise
@@ -215,23 +217,40 @@ class NetFH(Thread):
for pos in self.__dictdirty: for pos in self.__dictdirty:
self.set_dirtybytes(pos, self.__dictdirty[pos]) self.set_dirtybytes(pos, self.__dictdirty[pos])
def _direct_send(self, send_bytes: bytes, recv_count: int) -> bytes: def _direct_send(self, send_bytes: bytes, recv_len: int) -> bytes:
""" """
Fuer debugging direktes Senden von Daten. Sicherer Zugriff auf send / recv Schnittstelle mit Laengenpruefung.
:param send_bytes: Bytes, die gesendet werden sollen :param send_bytes: Bytes, die gesendet werden sollen
:param recv_count: Anzahl der Empfangsbytes :param recv_len: Anzahl der die empfangen werden sollen
:return: Empfangende Bytes :return: Empfangende Bytes
""" """
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
with self.__socklock: with self.__socklock:
self._slavesock.sendall(send_bytes) counter = 0
# FIXME: Schleife bis Daten empfangen sind einbauen send_len = len(send_bytes)
recv = self._slavesock.recv(recv_count) while counter < send_len:
self.__trigger = True # Send loop to trigger timeout of socket on each send
return recv sent = self._slavesock.send(send_bytes[counter:])
if sent == 0:
self.__sockerr.set()
raise IOError("lost network connection")
counter += sent
self.__buff_recv.clear()
while recv_len > 0:
count = self._slavesock.recv_into(
self.__buff_block, min(recv_len, self.__buff_size)
)
if count == 0:
self.__sockerr.set()
raise IOError("lost network connection")
self.__buff_recv += self.__buff_block[:count]
recv_len -= count
return bytes(self.__buff_recv)
def clear_dirtybytes(self, position=None) -> None: def clear_dirtybytes(self, position=None) -> None:
""" """
@@ -282,8 +301,6 @@ class NetFH(Thread):
# Fehler nach übernahme der Daten auslösen um diese zu setzen # Fehler nach übernahme der Daten auslösen um diese zu setzen
self.__sockerr.set() self.__sockerr.set()
self.__trigger = True
def close(self) -> None: def close(self) -> None:
"""Verbindung trennen.""" """Verbindung trennen."""
if self.__sockend.is_set(): if self.__sockend.is_set():
@@ -296,10 +313,8 @@ class NetFH(Thread):
if self._slavesock is not None: if self._slavesock is not None:
try: try:
self.__socklock.acquire() self.__socklock.acquire()
self._slavesock.send(_sysexit) self._slavesock.sendall(_sysexit)
self._slavesock.shutdown(socket.SHUT_WR)
# NOTE: Wird das benötigt?
self._slavesock.shutdown(socket.SHUT_RDWR)
except Exception: except Exception:
pass pass
finally: finally:
@@ -315,19 +330,16 @@ class NetFH(Thread):
raise ValueError("flush of closed file") raise ValueError("flush of closed file")
with self.__socklock: with self.__socklock:
self._slavesock.sendall( self.__by_buff += _sysflush
self.__by_buff + _sysflush self._slavesock.sendall(self.__by_buff)
)
# Rückmeldebyte auswerten
blockok = self._slavesock.recv(1)
# Puffer immer leeren # Puffer immer leeren
self.__int_buff = 0 self.__int_buff = 0
self.__by_buff = b'' self.__by_buff.clear()
# Rückmeldebyte auswerten
blockok = self._slavesock.recv(1)
if blockok != b'\x1e': if blockok != b'\x1e':
# ACL prüfen und ggf Fehler werfen # ACL prüfen und ggf Fehler werfen
self.__check_acl(blockok) self.__check_acl(blockok)
@@ -337,8 +349,6 @@ class NetFH(Thread):
else: else:
self.__flusherr = False self.__flusherr = False
self.__trigger = True
def get_closed(self) -> bool: def get_closed(self) -> bool:
""" """
Pruefen ob Verbindung geschlossen ist. Pruefen ob Verbindung geschlossen ist.
@@ -395,25 +405,23 @@ class NetFH(Thread):
raise TypeError("arg must be <class 'bytes'>") raise TypeError("arg must be <class 'bytes'>")
with self.__socklock: with self.__socklock:
self._slavesock.send( self._slavesock.sendall(
b'\x01IC' + b'\x01IC' +
request.to_bytes(length=4, byteorder="little") + request.to_bytes(length=4, byteorder="little") +
len(arg).to_bytes(length=2, byteorder="little") + len(arg).to_bytes(length=2, byteorder="little") +
b'\x00\x00\x00\x00\x00\x00\x17' b'\x00\x00\x00\x00\x00\x00\x17' +
arg
) )
self._slavesock.sendall(arg)
# Rückmeldebyte auswerten # Rückmeldebyte auswerten
check = self._slavesock.recv(1) blockok = self._slavesock.recv(1)
if check != b'\x1e': if blockok != b'\x1e':
# ACL prüfen und ggf Fehler werfen # ACL prüfen und ggf Fehler werfen
self.__check_acl(check) self.__check_acl(blockok)
self.__sockerr.set() self.__sockerr.set()
raise IOError("ioctl error on network") raise IOError("ioctl error on network")
self.__trigger = True
def read(self, length: int) -> bytes: def read(self, length: int) -> bytes:
""" """
Daten ueber das Netzwerk lesen. Daten ueber das Netzwerk lesen.
@@ -427,26 +435,28 @@ class NetFH(Thread):
raise ValueError("read of closed file") raise ValueError("read of closed file")
with self.__socklock: with self.__socklock:
self._slavesock.send( self._slavesock.sendall(
b'\x01DA' + b'\x01DA' +
self.__position.to_bytes(length=2, byteorder="little") + self.__position.to_bytes(length=2, byteorder="little") +
length.to_bytes(length=2, byteorder="little") + length.to_bytes(length=2, byteorder="little") +
b'\x00\x00\x00\x00\x00\x00\x00\x00\x17' b'\x00\x00\x00\x00\x00\x00\x00\x00\x17'
) )
bytesbuff = bytearray() self.__position += length
while not self.__sockend.is_set() and len(bytesbuff) < length:
rbytes = self._slavesock.recv(256)
if rbytes == b'': self.__buff_recv.clear()
while length > 0:
count = self._slavesock.recv_into(
self.__buff_block,
min(length, self.__buff_size),
)
if count == 0:
self.__sockerr.set() self.__sockerr.set()
raise IOError("read error on network") raise IOError("read error on network")
bytesbuff += rbytes self.__buff_recv += self.__buff_block[:count]
length -= count
self.__position += length return bytes(self.__buff_recv)
self.__trigger = True
return bytes(bytesbuff)
def readpictory(self) -> bytes: def readpictory(self) -> bytes:
""" """
@@ -463,24 +473,21 @@ class NetFH(Thread):
) )
with self.__socklock: with self.__socklock:
self._slavesock.send(_syspictory) self._slavesock.sendall(_syspictory)
byte_buff = bytearray()
zero_byte = 0
while not self.__sockend.is_set() and zero_byte < 100:
data = self._slavesock.recv(128)
if data == b'':
zero_byte += 1
byte_buff += data
if data.find(b'\x04') >= 0:
self.__trigger = True
# NOTE: Nur suchen oder Ende prüfen?
return bytes(byte_buff[:-1])
self.__buff_recv.clear()
while not self.__sockend.is_set():
count = self._slavesock.recv_into(self.__buff_block, self.__buff_size)
if count == 0:
self.__sockerr.set() self.__sockerr.set()
raise IOError("readpictory error on network") raise IOError("readpictory error on network")
self.__buff_recv += self.__buff_block[:count]
if b'\x04' in self.__buff_recv:
# Found EndOfText byte
break
return bytes(self.__buff_recv[:-1])
def readreplaceio(self) -> bytes: def readreplaceio(self) -> bytes:
""" """
@@ -497,24 +504,21 @@ class NetFH(Thread):
) )
with self.__socklock: with self.__socklock:
self._slavesock.send(_sysreplaceio) self._slavesock.sendall(_sysreplaceio)
byte_buff = bytearray()
zero_byte = 0
while not self.__sockend.is_set() and zero_byte < 100:
data = self._slavesock.recv(128)
if data == b'':
zero_byte += 1
byte_buff += data
if data.find(b'\x04') >= 0:
self.__trigger = True
# NOTE: Nur suchen oder Ende prüfen?
return bytes(byte_buff[:-1])
self.__buff_recv.clear()
while not self.__sockend.is_set():
count = self._slavesock.recv_into(self.__buff_block, self.__buff_size)
if count == 0:
self.__sockerr.set() self.__sockerr.set()
raise IOError("readreplaceio error on network") raise IOError("readreplaceio error on network")
self.__buff_recv += self.__buff_block[:count]
if b'\x04' in self.__buff_recv:
# Found EndOfText byte
break
return bytes(self.__buff_recv[:-1])
def run(self) -> None: def run(self) -> None:
"""Handler fuer Synchronisierung.""" """Handler fuer Synchronisierung."""
@@ -531,8 +535,9 @@ class NetFH(Thread):
) )
self._connect() self._connect()
if self.__sockerr.is_set(): if self.__sockerr.is_set():
# Verhindert bei Scheitern 100% CPU last # Verhindert beim Scheitern 100% CPU last
self.__sockend.wait(self.__waitsync) self.__sockend.wait(self.__waitsync)
continue
else: else:
state_reconnect = False state_reconnect = False
warnings.warn( warnings.warn(
@@ -541,25 +546,34 @@ class NetFH(Thread):
) )
# Kein Fehler aufgetreten, sync durchführen wenn socket frei # Kein Fehler aufgetreten, sync durchführen wenn socket frei
if not self.__trigger and \ if self.__socklock.acquire(blocking=False):
self.__socklock.acquire(blocking=False):
try: try:
self._slavesock.send(_syssync) self._slavesock.sendall(_syssync)
data = self._slavesock.recv(2)
self.__buff_recv.clear()
recv_lenght = 2
while recv_lenght > 0:
count = self._slavesock.recv_into(
self.__buff_block, recv_lenght
)
if count == 0:
raise IOError("lost network connection")
self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count
except IOError: except IOError:
self.__sockerr.set() self.__sockerr.set()
else: else:
if data != b'\x06\x16': if self.__buff_recv != b'\x06\x16':
warnings.warn( warnings.warn(
"data error on network sync", "data error on network sync",
RuntimeWarning RuntimeWarning
) )
self.__sockerr.set() self.__sockerr.set()
continue
finally:
self.__socklock.release() self.__socklock.release()
self.__trigger = False
# Warten nach Sync damit Instantiierung funktioniert # Warten nach Sync damit Instantiierung funktioniert
self.__sockerr.wait(self.__waitsync) self.__sockerr.wait(self.__waitsync)
@@ -616,8 +630,6 @@ class NetFH(Thread):
# Fehler nach übernahme der Daten auslösen um diese zu setzen # Fehler nach übernahme der Daten auslösen um diese zu setzen
self.__sockerr.set() self.__sockerr.set()
self.__trigger = True
def set_timeout(self, value: int) -> None: def set_timeout(self, value: int) -> None:
""" """
Setzt Timeoutwert fuer Verbindung. Setzt Timeoutwert fuer Verbindung.
@@ -633,7 +645,7 @@ class NetFH(Thread):
try: try:
self.__socklock.acquire() self.__socklock.acquire()
self._slavesock.send( self._slavesock.sendall(
b'\x01CF' + b'\x01CF' +
value.to_bytes(length=2, byteorder="little") + value.to_bytes(length=2, byteorder="little") +
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17'
@@ -646,8 +658,6 @@ class NetFH(Thread):
finally: finally:
self.__socklock.release() self.__socklock.release()
self.__trigger = True
def tell(self) -> int: def tell(self) -> int:
""" """
Gibt aktuelle Position zurueck. Gibt aktuelle Position zurueck.
@@ -891,6 +901,9 @@ class RevPiNetIO(_RevPiModIO):
""" """
Konfiguriert den PLC Slave mit den piCtory Defaultwerten. Konfiguriert den PLC Slave mit den piCtory Defaultwerten.
Diese Werte werden auf dem RevPi gesetzt, wenn die Verbindung
unerwartet (Netzwerkfehler) unterbrochen wird.
:param device: nur auf einzelnes Device anwenden, sonst auf Alle :param device: nur auf einzelnes Device anwenden, sonst auf Alle
""" """
if self.monitoring: if self.monitoring: