Redesign netio.py to prevent errors and provide clean source code

This commit is contained in:
2020-08-30 10:41:52 +02:00
parent ff0f84bb03
commit 963e173dc2
3 changed files with 116 additions and 211 deletions

View File

@@ -22,7 +22,7 @@ __author__ = "Sven Sager <akira@revpimodio.org>"
__copyright__ = "Copyright (C) 2020 Sven Sager" __copyright__ = "Copyright (C) 2020 Sven Sager"
__license__ = "LGPLv3" __license__ = "LGPLv3"
__name__ = "revpimodio2" __name__ = "revpimodio2"
__version__ = "2.5.3" __version__ = "2.5.3a"
# Global package values # Global package values
OFF = 0 OFF = 0

View File

@@ -82,7 +82,6 @@ class NetFH(Thread):
self.__config_changed = False self.__config_changed = False
self.__int_buff = 0 self.__int_buff = 0
self.__dictdirty = {} self.__dictdirty = {}
self.__flusherr = False
self.__replace_ios_h = b'' self.__replace_ios_h = b''
self.__pictory_h = b'' self.__pictory_h = b''
self.__sockerr = Event() self.__sockerr = Event()
@@ -210,7 +209,6 @@ class NetFH(Thread):
self._slavesock = so self._slavesock = so
self.__sockerr.clear() self.__sockerr.clear()
self.__flusherr = False
# Timeout setzen # Timeout setzen
self.set_timeout(int(self.__timeout * 1000)) self.set_timeout(int(self.__timeout * 1000))
@@ -219,18 +217,25 @@ class NetFH(Thread):
for pos in self.__dictdirty: for pos in self.__dictdirty:
self.set_dirtybytes(pos, self.__dictdirty[pos]) self.set_dirtybytes(pos, self.__dictdirty[pos])
def _direct_send(self, send_bytes: bytes, recv_len: int) -> bytes: def _direct_sr(self, send_bytes: bytes, recv_len: int) -> bytes:
""" """
Sicherer Zugriff auf send / recv Schnittstelle mit Laengenpruefung. Secure send and receive function for network handler.
:param send_bytes: Bytes, die gesendet werden sollen Will raise exception on closed network handler or network errors and
:param recv_len: Anzahl der die empfangen werden sollen set the sockerr flag.
:return: Empfangende Bytes
:param send_bytes: Bytes to send or empty
:param recv_len: Amount of bytes to receive
:return: Received bytes
""" """
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
if self.__sockerr.is_set():
raise IOError("not allowed while reconnect")
try:
self.__socklock.acquire()
with self.__socklock:
counter = 0 counter = 0
send_len = len(send_bytes) send_len = len(send_bytes)
while counter < send_len: while counter < send_len:
@@ -238,7 +243,7 @@ class NetFH(Thread):
sent = self._slavesock.send(send_bytes[counter:]) sent = self._slavesock.send(send_bytes[counter:])
if sent == 0: if sent == 0:
self.__sockerr.set() self.__sockerr.set()
raise IOError("lost network connection") raise IOError("lost network connection while send")
counter += sent counter += sent
self.__buff_recv.clear() self.__buff_recv.clear()
@@ -247,12 +252,20 @@ class NetFH(Thread):
self.__buff_block, min(recv_len, self.__buff_size) self.__buff_block, min(recv_len, self.__buff_size)
) )
if count == 0: if count == 0:
self.__sockerr.set() raise IOError("lost network connection while receive")
raise IOError("lost network connection")
self.__buff_recv += self.__buff_block[:count] self.__buff_recv += self.__buff_block[:count]
recv_len -= count recv_len -= count
return bytes(self.__buff_recv) # Create copy in socklock environment
return_buffer = bytes(self.__buff_recv)
except Exception:
self.__sockerr.set()
raise
finally:
self.__socklock.release()
return return_buffer
def clear_dirtybytes(self, position=None) -> None: def clear_dirtybytes(self, position=None) -> None:
""" """
@@ -268,42 +281,32 @@ class NetFH(Thread):
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
error = False
try:
self.__socklock.acquire()
if position is None:
# Alle Dirtybytes löschen
self._slavesock.sendall(_sysdeldirty)
else:
# Nur bestimmte Dirtybytes löschen
# b CM ii xx c0000000 b = 16
self._slavesock.sendall(pack(
"=c2sH2xc7xc",
HEADER_START, b'EY', position, b'\xfe', HEADER_STOP
))
check = self._slavesock.recv(1)
if check != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(check)
raise IOError("clear dirtybytes error on network")
except AclException:
raise
except Exception:
error = True
finally:
self.__socklock.release()
# Daten immer übernehmen # Daten immer übernehmen
if position is None: if position is None:
self.__dictdirty = {} self.__dictdirty.clear()
elif position in self.__dictdirty: elif position in self.__dictdirty:
del self.__dictdirty[position] del self.__dictdirty[position]
if error: try:
# Fehler nach übernahme der Daten auslösen um diese zu setzen if position is None:
# Alle Dirtybytes löschen
buff = self._direct_sr(_sysdeldirty, 1)
else:
# Nur bestimmte Dirtybytes löschen
# b CM ii xx c0000000 b = 16
buff = self._direct_sr(pack(
"=c2sH2xc7xc",
HEADER_START, b'EY', position, b'\xfe', HEADER_STOP
), 1)
if buff != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(buff)
raise IOError("clear dirtybytes error on network")
except AclException:
self.__dictdirty.clear()
raise
except Exception:
self.__sockerr.set() self.__sockerr.set()
def close(self) -> None: def close(self) -> None:
@@ -334,32 +337,25 @@ class NetFH(Thread):
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("flush of closed file") raise ValueError("flush of closed file")
with self.__socklock: try:
# b CM ii ii 00000000 b = 16 # b CM ii ii 00000000 b = 16
try: buff = self._direct_sr(pack(
self._slavesock.sendall(pack( "=c2sHH8xc",
"=c2sHH8xc", HEADER_START, b'FD', self.__int_buff, len(self.__by_buff), HEADER_STOP
HEADER_START, b'FD', self.__int_buff, len(self.__by_buff), HEADER_STOP ) + self.__by_buff, 1)
) + self.__by_buff) except Exception:
except Exception: raise
self.__flusherr = True finally:
raise # Puffer immer leeren
finally: self.__int_buff = 0
# Puffer immer leeren self.__by_buff.clear()
self.__int_buff = 0
self.__by_buff.clear()
# Rückmeldebyte auswerten if buff != b'\x1e':
blockok = self._slavesock.recv(1) # ACL prüfen und ggf Fehler werfen
if blockok != b'\x1e': self.__check_acl(buff)
# ACL prüfen und ggf Fehler werfen
self.__check_acl(blockok)
self.__flusherr = True self.__sockerr.set()
self.__sockerr.set() raise IOError("flush error on network")
raise IOError("flush error on network")
else:
self.__flusherr = False
def get_closed(self) -> bool: def get_closed(self) -> bool:
""" """
@@ -416,21 +412,17 @@ class NetFH(Thread):
if not (isinstance(arg, bytes) and len(arg) <= 1024): if not (isinstance(arg, bytes) and len(arg) <= 1024):
raise TypeError("arg must be <class 'bytes'>") raise TypeError("arg must be <class 'bytes'>")
with self.__socklock: # b CM xx ii iiii0000 b = 16
# b CM xx ii iiii0000 b = 16 buff = self._direct_sr(pack(
self._slavesock.sendall(pack( "=c2s2xHI4xc",
"=c2s2xHI4xc", HEADER_START, b'IC', len(arg), request, HEADER_STOP
HEADER_START, b'IC', len(arg), request, HEADER_STOP ) + arg, 1)
) + arg) if buff != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(buff)
# Rückmeldebyte auswerten self.__sockerr.set()
blockok = self._slavesock.recv(1) raise IOError("ioctl error on network")
if blockok != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(blockok)
self.__sockerr.set()
raise IOError("ioctl error on network")
def read(self, length: int) -> bytes: def read(self, length: int) -> bytes:
""" """
@@ -444,28 +436,14 @@ class NetFH(Thread):
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("read of closed file") raise ValueError("read of closed file")
with self.__socklock: # b CM ii ii 00000000 b = 16
# b CM ii ii 00000000 b = 16 buff = self._direct_sr(pack(
self._slavesock.sendall(pack( "=c2sHH8xc",
"=c2sHH8xc", HEADER_START, b'DA', self.__position, length, HEADER_STOP
HEADER_START, b'DA', self.__position, length, HEADER_STOP ), length)
))
self.__position += length self.__position += length
return buff
self.__buff_recv.clear()
while length > 0:
count = self._slavesock.recv_into(
self.__buff_block,
min(length, self.__buff_size),
)
if count == 0:
self.__sockerr.set()
raise IOError("read error on network")
self.__buff_recv += self.__buff_block[:count]
length -= count
return bytes(self.__buff_recv)
def readinto(self, buffer: bytearray) -> int: def readinto(self, buffer: bytearray) -> int:
""" """
@@ -480,29 +458,14 @@ class NetFH(Thread):
raise ValueError("read of closed file") raise ValueError("read of closed file")
length = len(buffer) length = len(buffer)
with self.__socklock:
# b CM ii ii 00000000 b = 16
self._slavesock.sendall(pack(
"=c2sHH8xc",
HEADER_START, b'DA', self.__position, length, HEADER_STOP
))
net_buffer = b'' # b CM ii ii 00000000 b = 16
while length > 0: buff = self._direct_sr(pack(
count = self._slavesock.recv_into( "=c2sHH8xc",
self.__buff_block, HEADER_START, b'DA', self.__position, length, HEADER_STOP
min(length, self.__buff_size), ), length)
)
if count == 0:
self.__sockerr.set()
raise IOError("read error on network")
net_buffer += self.__buff_block[:count]
length -= count
# We received the correct amount of bytes here
self.__position += length
buffer[:] = net_buffer
buffer[:] = buff
return len(buffer) return len(buffer)
def readpictory(self) -> bytes: def readpictory(self) -> bytes:
@@ -519,31 +482,9 @@ class NetFH(Thread):
"could not read/parse piCtory configuration over network" "could not read/parse piCtory configuration over network"
) )
with self.__socklock: buff = self._direct_sr(_syspictory, 4)
self._slavesock.sendall(_syspictory) recv_length, = unpack("=I", buff)
return self._direct_sr(b'', recv_length)
self.__buff_recv.clear()
recv_lenght = 4
while recv_lenght > 0:
count = self._slavesock.recv_into(
self.__buff_block, recv_lenght
)
if count == 0:
raise IOError("readpictory length error on network")
self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count
recv_lenght, = unpack("=I", self.__buff_recv)
while recv_lenght > 0:
count = self._slavesock.recv_into(
self.__buff_block, min(recv_lenght, self.__buff_size)
)
if count == 0:
raise IOError("readpictory file error on network")
self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count
return bytes(self.__buff_recv[4:])
def readreplaceio(self) -> bytes: def readreplaceio(self) -> bytes:
""" """
@@ -559,31 +500,9 @@ class NetFH(Thread):
"replace_io_file: could not read/parse over network" "replace_io_file: could not read/parse over network"
) )
with self.__socklock: buff = self._direct_sr(_sysreplaceio, 4)
self._slavesock.sendall(_sysreplaceio) recv_length, = unpack("=I", buff)
return self._direct_sr(b'', recv_length)
self.__buff_recv.clear()
recv_lenght = 4
while recv_lenght > 0:
count = self._slavesock.recv_into(
self.__buff_block, recv_lenght
)
if count == 0:
raise IOError("readreplaceio length error on network")
self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count
recv_lenght, = unpack("=I", self.__buff_recv)
while recv_lenght > 0:
count = self._slavesock.recv_into(
self.__buff_block, min(recv_lenght, self.__buff_size)
)
if count == 0:
raise IOError("readreplaceio file error on network")
self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count
return bytes(self.__buff_recv[4:])
def run(self) -> None: def run(self) -> None:
"""Handler fuer Synchronisierung.""" """Handler fuer Synchronisierung."""
@@ -622,7 +541,7 @@ class NetFH(Thread):
self.__buff_block, recv_lenght self.__buff_block, recv_lenght
) )
if count == 0: if count == 0:
raise IOError("lost network connection") raise IOError("lost network connection on sync")
self.__buff_recv += self.__buff_block[:count] self.__buff_recv += self.__buff_block[:count]
recv_lenght -= count recv_lenght -= count
@@ -666,34 +585,26 @@ class NetFH(Thread):
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
error = False
try:
self.__socklock.acquire()
# b CM ii ii 00000000 b = 16
self._slavesock.sendall(pack(
"=c2sHH8xc",
HEADER_START, b'EY', position, len(dirtybytes), HEADER_STOP
) + dirtybytes)
check = self._slavesock.recv(1)
if check != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(check)
raise IOError("set dirtybytes error on network")
except AclException:
raise
except Exception:
error = True
finally:
self.__socklock.release()
# Daten immer übernehmen # Daten immer übernehmen
self.__dictdirty[position] = dirtybytes self.__dictdirty[position] = dirtybytes
if error: try:
# Fehler nach übernahme der Daten auslösen um diese zu setzen # b CM ii ii 00000000 b = 16
buff = self._direct_sr(pack(
"=c2sHH8xc",
HEADER_START, b'EY', position, len(dirtybytes), HEADER_STOP
) + dirtybytes, 1)
if buff != b'\x1e':
# ACL prüfen und ggf Fehler werfen
self.__check_acl(buff)
raise IOError("set dirtybytes error on network")
except AclException:
# Not allowed, clear for reconnect
self.__dictdirty.clear()
raise
except Exception:
self.__sockerr.set() self.__sockerr.set()
def set_timeout(self, value: int) -> None: def set_timeout(self, value: int) -> None:
@@ -709,20 +620,15 @@ class NetFH(Thread):
self.__set_systimeout(value) self.__set_systimeout(value)
try: try:
self.__socklock.acquire()
# b CM ii xx 00000000 b = 16 # b CM ii xx 00000000 b = 16
self._slavesock.sendall(pack( buff = self._direct_sr(pack(
"=c2sH10xc", "=c2sH10xc",
HEADER_START, b'CF', value, HEADER_STOP HEADER_START, b'CF', value, HEADER_STOP
)) ), 1)
check = self._slavesock.recv(1) if buff != b'\x1e':
if check != b'\x1e':
raise IOError("set timeout error on network") raise IOError("set timeout error on network")
except Exception: except Exception:
self.__sockerr.set() self.__sockerr.set()
finally:
self.__socklock.release()
def tell(self) -> int: def tell(self) -> int:
""" """
@@ -747,9 +653,8 @@ class NetFH(Thread):
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("write to closed file") raise ValueError("write to closed file")
if self.__sockerr.is_set():
if self.__flusherr: raise IOError("not allowed while reconnect")
raise IOError("I/O error since last flush")
with self.__socklock: with self.__socklock:
self.__int_buff += 1 self.__int_buff += 1

View File

@@ -17,7 +17,7 @@ setup(
license="LGPLv3", license="LGPLv3",
name="revpimodio2", name="revpimodio2",
version="2.5.3", version="2.5.3a",
packages=["revpimodio2"], packages=["revpimodio2"],
python_requires="~=3.2", python_requires="~=3.2",