From 3f28f0ae489b915f5c5ec4cbed66f870bf5ec070 Mon Sep 17 00:00:00 2001 From: Sven Sager Date: Sun, 8 Mar 2020 14:53:28 +0100 Subject: [PATCH] Implement bytebuffer and length check to NetIO Compatible to PyLoad before 0.9 --- revpimodio2/netio.py | 239 +++++++++++++++++++++++-------------------- 1 file changed, 126 insertions(+), 113 deletions(-) diff --git a/revpimodio2/netio.py b/revpimodio2/netio.py index d56b2e8..fb5f17d 100644 --- a/revpimodio2/netio.py +++ b/revpimodio2/netio.py @@ -30,7 +30,7 @@ _sysreplaceioh = b'\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' # Übertragene Bytes schreiben _sysflush = b'\x01SD\x00\x00\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x17' # Hashvalues -HASH_FAIL = b'\xff' * 16 +HASH_FAIL = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' class AclException(Exception): @@ -54,10 +54,11 @@ class NetFH(Thread): so gesteuert werden. """ - __slots__ = "__by_buff", "__check_replace_ios", "__config_changed", \ + __slots__ = "__buff_size", "__buff_block", "__buff_recv", \ + "__by_buff", "__check_replace_ios", "__config_changed", \ "__int_buff", "__dictdirty", "__flusherr", "__replace_ios_h", \ - "__pictory_h", "__position", "__sockact", "__sockerr", "__sockend", \ - "__socklock", "__timeout", "__trigger", "__waitsync", "_address", \ + "__pictory_h", "__position", "__sockerr", "__sockend", \ + "__socklock", "__timeout", "__waitsync", "_address", \ "_slavesock", "daemon" def __init__(self, address: tuple, check_replace_ios: bool, timeout=500): @@ -71,7 +72,10 @@ class NetFH(Thread): super().__init__() self.daemon = True - self.__by_buff = b'' + self.__buff_size = 2048 # Values up to 32 are static in code! + self.__buff_block = bytearray(self.__buff_size) + self.__buff_recv = bytearray() + self.__by_buff = bytearray() self.__check_replace_ios = check_replace_ios self.__config_changed = False self.__int_buff = 0 @@ -79,15 +83,13 @@ class NetFH(Thread): self.__flusherr = False self.__replace_ios_h = b'' self.__pictory_h = b'' - self.__sockact = False self.__sockerr = Event() self.__sockend = Event() self.__socklock = Lock() self.__timeout = None - self.__trigger = False self.__waitsync = None self._address = address - self._slavesock = None + self._slavesock = None # type: socket.socket # Parameterprüfung if not isinstance(address, tuple): @@ -114,7 +116,10 @@ class NetFH(Thread): def __check_acl(self, bytecode: bytes) -> None: """ - Pueft ob ACL auf RevPi den Vorgang erlaubt oder wirft exception. + Pueft ob ACL auf RevPi den Vorgang erlaubt. + + Ist der Vorgang nicht zulässig, wird der Socket sofort geschlossen + und eine Exception geworfen. :param bytecode: Antwort, die geprueft werden solll """ @@ -138,14 +143,12 @@ class NetFH(Thread): if isinstance(value, int) and (100 <= value <= 60000): self.__timeout = value / 1000 - socket.setdefaulttimeout(self.__timeout) - # Timeouts in Socket setzen if self._slavesock is not None: self._slavesock.settimeout(self.__timeout) # 45 Prozent vom Timeout für Synctimer verwenden - self.__waitsync = self.__timeout / 10 * 4.5 + self.__waitsync = self.__timeout / 100 * 45 else: raise ValueError("value must between 10 and 60000 milliseconds") @@ -156,6 +159,7 @@ class NetFH(Thread): so = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: so.connect(self._address) + so.settimeout(self.__timeout) # Hashwerte anfordern recv_len = 16 @@ -164,35 +168,33 @@ class NetFH(Thread): so.sendall(_sysreplaceioh) recv_len += 16 - # Hashwerte empfangen - byte_buff = bytearray() - zero_byte = 0 - while not self.__sockend.is_set() and len(byte_buff) < recv_len: - data = so.recv(recv_len) - if data == b'': - zero_byte += 1 - if zero_byte == 100: - raise OSError("too many zero bytes on hash load") - byte_buff += data + # Hashwerte empfangen mit eigenen Puffern, da nicht gelocked + buff_recv = bytearray(recv_len) + while recv_len > 0: + block = so.recv(recv_len) + if block == b'': + raise OSError("lost connection on hash receive") + buff_recv += block + recv_len -= len(block) # Änderung an piCtory prüfen - if self.__pictory_h and byte_buff[:16] != self.__pictory_h: + if self.__pictory_h and buff_recv[:16] != self.__pictory_h: self.__config_changed = True self.close() raise ConfigChanged( "configuration on revolution pi was changed") else: - self.__pictory_h = byte_buff[:16] + self.__pictory_h = buff_recv[:16] # Änderung an replace_ios prüfen if self.__check_replace_ios and self.__replace_ios_h \ - and byte_buff[16:] != self.__replace_ios_h: + and buff_recv[16:] != self.__replace_ios_h: self.__config_changed = True self.close() raise ConfigChanged( "configuration on revolution pi was changed") else: - self.__replace_ios_h = byte_buff[16:] + self.__replace_ios_h = buff_recv[16:] except ConfigChanged: so.close() raise @@ -215,23 +217,40 @@ class NetFH(Thread): for pos in self.__dictdirty: self.set_dirtybytes(pos, self.__dictdirty[pos]) - def _direct_send(self, send_bytes: bytes, recv_count: int) -> bytes: + def _direct_send(self, send_bytes: bytes, recv_len: int) -> bytes: """ - Fuer debugging direktes Senden von Daten. + Sicherer Zugriff auf send / recv Schnittstelle mit Laengenpruefung. :param send_bytes: Bytes, die gesendet werden sollen - :param recv_count: Anzahl der Empfangsbytes + :param recv_len: Anzahl der die empfangen werden sollen :return: Empfangende Bytes """ if self.__sockend.is_set(): raise ValueError("I/O operation on closed file") with self.__socklock: - self._slavesock.sendall(send_bytes) - # FIXME: Schleife bis Daten empfangen sind einbauen - recv = self._slavesock.recv(recv_count) - self.__trigger = True - return recv + counter = 0 + send_len = len(send_bytes) + while counter < send_len: + # Send loop to trigger timeout of socket on each send + sent = self._slavesock.send(send_bytes[counter:]) + if sent == 0: + self.__sockerr.set() + raise IOError("lost network connection") + counter += sent + + self.__buff_recv.clear() + while recv_len > 0: + count = self._slavesock.recv_into( + self.__buff_block, min(recv_len, self.__buff_size) + ) + if count == 0: + self.__sockerr.set() + raise IOError("lost network connection") + self.__buff_recv += self.__buff_block[:count] + recv_len -= count + + return bytes(self.__buff_recv) def clear_dirtybytes(self, position=None) -> None: """ @@ -282,8 +301,6 @@ class NetFH(Thread): # Fehler nach übernahme der Daten auslösen um diese zu setzen self.__sockerr.set() - self.__trigger = True - def close(self) -> None: """Verbindung trennen.""" if self.__sockend.is_set(): @@ -296,10 +313,8 @@ class NetFH(Thread): if self._slavesock is not None: try: self.__socklock.acquire() - self._slavesock.send(_sysexit) - - # NOTE: Wird das benötigt? - self._slavesock.shutdown(socket.SHUT_RDWR) + self._slavesock.sendall(_sysexit) + self._slavesock.shutdown(socket.SHUT_WR) except Exception: pass finally: @@ -315,19 +330,16 @@ class NetFH(Thread): raise ValueError("flush of closed file") with self.__socklock: - self._slavesock.sendall( - self.__by_buff + _sysflush - ) - - # Rückmeldebyte auswerten - blockok = self._slavesock.recv(1) + self.__by_buff += _sysflush + self._slavesock.sendall(self.__by_buff) # Puffer immer leeren self.__int_buff = 0 - self.__by_buff = b'' + self.__by_buff.clear() + # Rückmeldebyte auswerten + blockok = self._slavesock.recv(1) if blockok != b'\x1e': - # ACL prüfen und ggf Fehler werfen self.__check_acl(blockok) @@ -337,8 +349,6 @@ class NetFH(Thread): else: self.__flusherr = False - self.__trigger = True - def get_closed(self) -> bool: """ Pruefen ob Verbindung geschlossen ist. @@ -395,25 +405,23 @@ class NetFH(Thread): raise TypeError("arg must be ") with self.__socklock: - self._slavesock.send( + self._slavesock.sendall( b'\x01IC' + request.to_bytes(length=4, byteorder="little") + len(arg).to_bytes(length=2, byteorder="little") + - b'\x00\x00\x00\x00\x00\x00\x17' + b'\x00\x00\x00\x00\x00\x00\x17' + + arg ) - self._slavesock.sendall(arg) # Rückmeldebyte auswerten - check = self._slavesock.recv(1) - if check != b'\x1e': + blockok = self._slavesock.recv(1) + if blockok != b'\x1e': # ACL prüfen und ggf Fehler werfen - self.__check_acl(check) + self.__check_acl(blockok) self.__sockerr.set() raise IOError("ioctl error on network") - self.__trigger = True - def read(self, length: int) -> bytes: """ Daten ueber das Netzwerk lesen. @@ -427,26 +435,28 @@ class NetFH(Thread): raise ValueError("read of closed file") with self.__socklock: - self._slavesock.send( + self._slavesock.sendall( b'\x01DA' + self.__position.to_bytes(length=2, byteorder="little") + length.to_bytes(length=2, byteorder="little") + b'\x00\x00\x00\x00\x00\x00\x00\x00\x17' ) - bytesbuff = bytearray() - while not self.__sockend.is_set() and len(bytesbuff) < length: - rbytes = self._slavesock.recv(256) + self.__position += length - if rbytes == b'': + self.__buff_recv.clear() + while length > 0: + count = self._slavesock.recv_into( + self.__buff_block, + min(length, self.__buff_size), + ) + if count == 0: self.__sockerr.set() raise IOError("read error on network") - bytesbuff += rbytes + self.__buff_recv += self.__buff_block[:count] + length -= count - self.__position += length - self.__trigger = True - - return bytes(bytesbuff) + return bytes(self.__buff_recv) def readpictory(self) -> bytes: """ @@ -463,24 +473,21 @@ class NetFH(Thread): ) with self.__socklock: - self._slavesock.send(_syspictory) + self._slavesock.sendall(_syspictory) - byte_buff = bytearray() - zero_byte = 0 - while not self.__sockend.is_set() and zero_byte < 100: - data = self._slavesock.recv(128) - if data == b'': - zero_byte += 1 + self.__buff_recv.clear() + while not self.__sockend.is_set(): + count = self._slavesock.recv_into(self.__buff_block, self.__buff_size) + if count == 0: + self.__sockerr.set() + raise IOError("readpictory error on network") + self.__buff_recv += self.__buff_block[:count] - byte_buff += data - if data.find(b'\x04') >= 0: - self.__trigger = True + if b'\x04' in self.__buff_recv: + # Found EndOfText byte + break - # NOTE: Nur suchen oder Ende prüfen? - return bytes(byte_buff[:-1]) - - self.__sockerr.set() - raise IOError("readpictory error on network") + return bytes(self.__buff_recv[:-1]) def readreplaceio(self) -> bytes: """ @@ -497,24 +504,21 @@ class NetFH(Thread): ) with self.__socklock: - self._slavesock.send(_sysreplaceio) + self._slavesock.sendall(_sysreplaceio) - byte_buff = bytearray() - zero_byte = 0 - while not self.__sockend.is_set() and zero_byte < 100: - data = self._slavesock.recv(128) - if data == b'': - zero_byte += 1 + self.__buff_recv.clear() + while not self.__sockend.is_set(): + count = self._slavesock.recv_into(self.__buff_block, self.__buff_size) + if count == 0: + self.__sockerr.set() + raise IOError("readreplaceio error on network") + self.__buff_recv += self.__buff_block[:count] - byte_buff += data - if data.find(b'\x04') >= 0: - self.__trigger = True + if b'\x04' in self.__buff_recv: + # Found EndOfText byte + break - # NOTE: Nur suchen oder Ende prüfen? - return bytes(byte_buff[:-1]) - - self.__sockerr.set() - raise IOError("readreplaceio error on network") + return bytes(self.__buff_recv[:-1]) def run(self) -> None: """Handler fuer Synchronisierung.""" @@ -531,8 +535,9 @@ class NetFH(Thread): ) self._connect() if self.__sockerr.is_set(): - # Verhindert bei Scheitern 100% CPU last + # Verhindert beim Scheitern 100% CPU last self.__sockend.wait(self.__waitsync) + continue else: state_reconnect = False warnings.warn( @@ -541,24 +546,33 @@ class NetFH(Thread): ) # Kein Fehler aufgetreten, sync durchführen wenn socket frei - if not self.__trigger and \ - self.__socklock.acquire(blocking=False): + if self.__socklock.acquire(blocking=False): try: - self._slavesock.send(_syssync) - data = self._slavesock.recv(2) + self._slavesock.sendall(_syssync) + + self.__buff_recv.clear() + recv_lenght = 2 + while recv_lenght > 0: + count = self._slavesock.recv_into( + self.__buff_block, recv_lenght + ) + if count == 0: + raise IOError("lost network connection") + self.__buff_recv += self.__buff_block[:count] + recv_lenght -= count + except IOError: self.__sockerr.set() else: - if data != b'\x06\x16': + if self.__buff_recv != b'\x06\x16': warnings.warn( "data error on network sync", RuntimeWarning ) self.__sockerr.set() - - self.__socklock.release() - - self.__trigger = False + continue + finally: + self.__socklock.release() # Warten nach Sync damit Instantiierung funktioniert self.__sockerr.wait(self.__waitsync) @@ -616,8 +630,6 @@ class NetFH(Thread): # Fehler nach übernahme der Daten auslösen um diese zu setzen self.__sockerr.set() - self.__trigger = True - def set_timeout(self, value: int) -> None: """ Setzt Timeoutwert fuer Verbindung. @@ -633,7 +645,7 @@ class NetFH(Thread): try: self.__socklock.acquire() - self._slavesock.send( + self._slavesock.sendall( b'\x01CF' + value.to_bytes(length=2, byteorder="little") + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' @@ -646,8 +658,6 @@ class NetFH(Thread): finally: self.__socklock.release() - self.__trigger = True - def tell(self) -> int: """ Gibt aktuelle Position zurueck. @@ -891,6 +901,9 @@ class RevPiNetIO(_RevPiModIO): """ Konfiguriert den PLC Slave mit den piCtory Defaultwerten. + Diese Werte werden auf dem RevPi gesetzt, wenn die Verbindung + unerwartet (Netzwerkfehler) unterbrochen wird. + :param device: nur auf einzelnes Device anwenden, sonst auf Alle """ if self.monitoring: