diff --git a/revpimodio2/__init__.py b/revpimodio2/__init__.py index 1135477..31a4c4f 100644 --- a/revpimodio2/__init__.py +++ b/revpimodio2/__init__.py @@ -22,7 +22,7 @@ __author__ = "Sven Sager " __copyright__ = "Copyright (C) 2020 Sven Sager" __license__ = "LGPLv3" __name__ = "revpimodio2" -__version__ = "2.5.3" +__version__ = "2.5.3a" # Global package values OFF = 0 diff --git a/revpimodio2/netio.py b/revpimodio2/netio.py index 8b823e1..4189ec5 100644 --- a/revpimodio2/netio.py +++ b/revpimodio2/netio.py @@ -82,7 +82,6 @@ class NetFH(Thread): self.__config_changed = False self.__int_buff = 0 self.__dictdirty = {} - self.__flusherr = False self.__replace_ios_h = b'' self.__pictory_h = b'' self.__sockerr = Event() @@ -210,7 +209,6 @@ class NetFH(Thread): self._slavesock = so self.__sockerr.clear() - self.__flusherr = False # Timeout setzen self.set_timeout(int(self.__timeout * 1000)) @@ -219,18 +217,25 @@ class NetFH(Thread): for pos in self.__dictdirty: self.set_dirtybytes(pos, self.__dictdirty[pos]) - def _direct_send(self, send_bytes: bytes, recv_len: int) -> bytes: + def _direct_sr(self, send_bytes: bytes, recv_len: int) -> bytes: """ - Sicherer Zugriff auf send / recv Schnittstelle mit Laengenpruefung. + Secure send and receive function for network handler. - :param send_bytes: Bytes, die gesendet werden sollen - :param recv_len: Anzahl der die empfangen werden sollen - :return: Empfangende Bytes + Will raise exception on closed network handler or network errors and + set the sockerr flag. + + :param send_bytes: Bytes to send or empty + :param recv_len: Amount of bytes to receive + :return: Received bytes """ if self.__sockend.is_set(): raise ValueError("I/O operation on closed file") + if self.__sockerr.is_set(): + raise IOError("not allowed while reconnect") + + try: + self.__socklock.acquire() - with self.__socklock: counter = 0 send_len = len(send_bytes) while counter < send_len: @@ -238,7 +243,7 @@ class NetFH(Thread): sent = self._slavesock.send(send_bytes[counter:]) if sent == 0: self.__sockerr.set() - raise IOError("lost network connection") + raise IOError("lost network connection while send") counter += sent self.__buff_recv.clear() @@ -247,12 +252,20 @@ class NetFH(Thread): self.__buff_block, min(recv_len, self.__buff_size) ) if count == 0: - self.__sockerr.set() - raise IOError("lost network connection") + raise IOError("lost network connection while receive") self.__buff_recv += self.__buff_block[:count] recv_len -= count - return bytes(self.__buff_recv) + # Create copy in socklock environment + return_buffer = bytes(self.__buff_recv) + except Exception: + self.__sockerr.set() + raise + + finally: + self.__socklock.release() + + return return_buffer def clear_dirtybytes(self, position=None) -> None: """ @@ -268,42 +281,32 @@ class NetFH(Thread): if self.__sockend.is_set(): raise ValueError("I/O operation on closed file") - error = False - try: - self.__socklock.acquire() - - if position is None: - # Alle Dirtybytes löschen - self._slavesock.sendall(_sysdeldirty) - else: - # Nur bestimmte Dirtybytes löschen - # b CM ii xx c0000000 b = 16 - self._slavesock.sendall(pack( - "=c2sH2xc7xc", - HEADER_START, b'EY', position, b'\xfe', HEADER_STOP - )) - - check = self._slavesock.recv(1) - if check != b'\x1e': - # ACL prüfen und ggf Fehler werfen - self.__check_acl(check) - - raise IOError("clear dirtybytes error on network") - except AclException: - raise - except Exception: - error = True - finally: - self.__socklock.release() - # Daten immer übernehmen if position is None: - self.__dictdirty = {} + self.__dictdirty.clear() elif position in self.__dictdirty: del self.__dictdirty[position] - if error: - # Fehler nach übernahme der Daten auslösen um diese zu setzen + try: + if position is None: + # Alle Dirtybytes löschen + buff = self._direct_sr(_sysdeldirty, 1) + else: + # Nur bestimmte Dirtybytes löschen + # b CM ii xx c0000000 b = 16 + buff = self._direct_sr(pack( + "=c2sH2xc7xc", + HEADER_START, b'EY', position, b'\xfe', HEADER_STOP + ), 1) + if buff != b'\x1e': + # ACL prüfen und ggf Fehler werfen + self.__check_acl(buff) + + raise IOError("clear dirtybytes error on network") + except AclException: + self.__dictdirty.clear() + raise + except Exception: self.__sockerr.set() def close(self) -> None: @@ -334,32 +337,25 @@ class NetFH(Thread): if self.__sockend.is_set(): raise ValueError("flush of closed file") - with self.__socklock: + try: # b CM ii ii 00000000 b = 16 - try: - self._slavesock.sendall(pack( - "=c2sHH8xc", - HEADER_START, b'FD', self.__int_buff, len(self.__by_buff), HEADER_STOP - ) + self.__by_buff) - except Exception: - self.__flusherr = True - raise - finally: - # Puffer immer leeren - self.__int_buff = 0 - self.__by_buff.clear() + buff = self._direct_sr(pack( + "=c2sHH8xc", + HEADER_START, b'FD', self.__int_buff, len(self.__by_buff), HEADER_STOP + ) + self.__by_buff, 1) + except Exception: + raise + finally: + # Puffer immer leeren + self.__int_buff = 0 + self.__by_buff.clear() - # Rückmeldebyte auswerten - blockok = self._slavesock.recv(1) - if blockok != b'\x1e': - # ACL prüfen und ggf Fehler werfen - self.__check_acl(blockok) + if buff != b'\x1e': + # ACL prüfen und ggf Fehler werfen + self.__check_acl(buff) - self.__flusherr = True - self.__sockerr.set() - raise IOError("flush error on network") - else: - self.__flusherr = False + self.__sockerr.set() + raise IOError("flush error on network") def get_closed(self) -> bool: """ @@ -416,21 +412,17 @@ class NetFH(Thread): if not (isinstance(arg, bytes) and len(arg) <= 1024): raise TypeError("arg must be ") - with self.__socklock: - # b CM xx ii iiii0000 b = 16 - self._slavesock.sendall(pack( - "=c2s2xHI4xc", - HEADER_START, b'IC', len(arg), request, HEADER_STOP - ) + arg) + # b CM xx ii iiii0000 b = 16 + buff = self._direct_sr(pack( + "=c2s2xHI4xc", + HEADER_START, b'IC', len(arg), request, HEADER_STOP + ) + arg, 1) + if buff != b'\x1e': + # ACL prüfen und ggf Fehler werfen + self.__check_acl(buff) - # Rückmeldebyte auswerten - blockok = self._slavesock.recv(1) - if blockok != b'\x1e': - # ACL prüfen und ggf Fehler werfen - self.__check_acl(blockok) - - self.__sockerr.set() - raise IOError("ioctl error on network") + self.__sockerr.set() + raise IOError("ioctl error on network") def read(self, length: int) -> bytes: """ @@ -444,28 +436,14 @@ class NetFH(Thread): if self.__sockend.is_set(): raise ValueError("read of closed file") - with self.__socklock: - # b CM ii ii 00000000 b = 16 - self._slavesock.sendall(pack( - "=c2sHH8xc", - HEADER_START, b'DA', self.__position, length, HEADER_STOP - )) + # b CM ii ii 00000000 b = 16 + buff = self._direct_sr(pack( + "=c2sHH8xc", + HEADER_START, b'DA', self.__position, length, HEADER_STOP + ), length) - self.__position += length - - self.__buff_recv.clear() - while length > 0: - count = self._slavesock.recv_into( - self.__buff_block, - min(length, self.__buff_size), - ) - if count == 0: - self.__sockerr.set() - raise IOError("read error on network") - self.__buff_recv += self.__buff_block[:count] - length -= count - - return bytes(self.__buff_recv) + self.__position += length + return buff def readinto(self, buffer: bytearray) -> int: """ @@ -480,29 +458,14 @@ class NetFH(Thread): raise ValueError("read of closed file") length = len(buffer) - with self.__socklock: - # b CM ii ii 00000000 b = 16 - self._slavesock.sendall(pack( - "=c2sHH8xc", - HEADER_START, b'DA', self.__position, length, HEADER_STOP - )) - net_buffer = b'' - while length > 0: - count = self._slavesock.recv_into( - self.__buff_block, - min(length, self.__buff_size), - ) - if count == 0: - self.__sockerr.set() - raise IOError("read error on network") - net_buffer += self.__buff_block[:count] - length -= count - - # We received the correct amount of bytes here - self.__position += length - buffer[:] = net_buffer + # b CM ii ii 00000000 b = 16 + buff = self._direct_sr(pack( + "=c2sHH8xc", + HEADER_START, b'DA', self.__position, length, HEADER_STOP + ), length) + buffer[:] = buff return len(buffer) def readpictory(self) -> bytes: @@ -519,31 +482,9 @@ class NetFH(Thread): "could not read/parse piCtory configuration over network" ) - with self.__socklock: - self._slavesock.sendall(_syspictory) - - self.__buff_recv.clear() - recv_lenght = 4 - while recv_lenght > 0: - count = self._slavesock.recv_into( - self.__buff_block, recv_lenght - ) - if count == 0: - raise IOError("readpictory length error on network") - self.__buff_recv += self.__buff_block[:count] - recv_lenght -= count - - recv_lenght, = unpack("=I", self.__buff_recv) - while recv_lenght > 0: - count = self._slavesock.recv_into( - self.__buff_block, min(recv_lenght, self.__buff_size) - ) - if count == 0: - raise IOError("readpictory file error on network") - self.__buff_recv += self.__buff_block[:count] - recv_lenght -= count - - return bytes(self.__buff_recv[4:]) + buff = self._direct_sr(_syspictory, 4) + recv_length, = unpack("=I", buff) + return self._direct_sr(b'', recv_length) def readreplaceio(self) -> bytes: """ @@ -559,31 +500,9 @@ class NetFH(Thread): "replace_io_file: could not read/parse over network" ) - with self.__socklock: - self._slavesock.sendall(_sysreplaceio) - - self.__buff_recv.clear() - recv_lenght = 4 - while recv_lenght > 0: - count = self._slavesock.recv_into( - self.__buff_block, recv_lenght - ) - if count == 0: - raise IOError("readreplaceio length error on network") - self.__buff_recv += self.__buff_block[:count] - recv_lenght -= count - - recv_lenght, = unpack("=I", self.__buff_recv) - while recv_lenght > 0: - count = self._slavesock.recv_into( - self.__buff_block, min(recv_lenght, self.__buff_size) - ) - if count == 0: - raise IOError("readreplaceio file error on network") - self.__buff_recv += self.__buff_block[:count] - recv_lenght -= count - - return bytes(self.__buff_recv[4:]) + buff = self._direct_sr(_sysreplaceio, 4) + recv_length, = unpack("=I", buff) + return self._direct_sr(b'', recv_length) def run(self) -> None: """Handler fuer Synchronisierung.""" @@ -622,7 +541,7 @@ class NetFH(Thread): self.__buff_block, recv_lenght ) if count == 0: - raise IOError("lost network connection") + raise IOError("lost network connection on sync") self.__buff_recv += self.__buff_block[:count] recv_lenght -= count @@ -666,34 +585,26 @@ class NetFH(Thread): if self.__sockend.is_set(): raise ValueError("I/O operation on closed file") - error = False - try: - self.__socklock.acquire() - - # b CM ii ii 00000000 b = 16 - self._slavesock.sendall(pack( - "=c2sHH8xc", - HEADER_START, b'EY', position, len(dirtybytes), HEADER_STOP - ) + dirtybytes) - - check = self._slavesock.recv(1) - if check != b'\x1e': - # ACL prüfen und ggf Fehler werfen - self.__check_acl(check) - - raise IOError("set dirtybytes error on network") - except AclException: - raise - except Exception: - error = True - finally: - self.__socklock.release() - # Daten immer übernehmen self.__dictdirty[position] = dirtybytes - if error: - # Fehler nach übernahme der Daten auslösen um diese zu setzen + try: + # b CM ii ii 00000000 b = 16 + buff = self._direct_sr(pack( + "=c2sHH8xc", + HEADER_START, b'EY', position, len(dirtybytes), HEADER_STOP + ) + dirtybytes, 1) + + if buff != b'\x1e': + # ACL prüfen und ggf Fehler werfen + self.__check_acl(buff) + + raise IOError("set dirtybytes error on network") + except AclException: + # Not allowed, clear for reconnect + self.__dictdirty.clear() + raise + except Exception: self.__sockerr.set() def set_timeout(self, value: int) -> None: @@ -709,20 +620,15 @@ class NetFH(Thread): self.__set_systimeout(value) try: - self.__socklock.acquire() - # b CM ii xx 00000000 b = 16 - self._slavesock.sendall(pack( + buff = self._direct_sr(pack( "=c2sH10xc", HEADER_START, b'CF', value, HEADER_STOP - )) - check = self._slavesock.recv(1) - if check != b'\x1e': + ), 1) + if buff != b'\x1e': raise IOError("set timeout error on network") except Exception: self.__sockerr.set() - finally: - self.__socklock.release() def tell(self) -> int: """ @@ -747,9 +653,8 @@ class NetFH(Thread): raise ConfigChanged("configuration on revolution pi was changed") if self.__sockend.is_set(): raise ValueError("write to closed file") - - if self.__flusherr: - raise IOError("I/O error since last flush") + if self.__sockerr.is_set(): + raise IOError("not allowed while reconnect") with self.__socklock: self.__int_buff += 1 diff --git a/setup.py b/setup.py index 2fd9026..57af0c4 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( license="LGPLv3", name="revpimodio2", - version="2.5.3", + version="2.5.3a", packages=["revpimodio2"], python_requires="~=3.2",