diff --git a/src/revpimodio2/__init__.py b/src/revpimodio2/__init__.py index 4f28048..3dcbbd1 100644 --- a/src/revpimodio2/__init__.py +++ b/src/revpimodio2/__init__.py @@ -14,10 +14,25 @@ fuehrt das Modul bei Datenaenderung aus. """ __all__ = [ "IOEvent", - "RevPiModIO", "RevPiModIODriver", "RevPiModIOSelected", "run_plc", - "RevPiNetIO", "RevPiNetIODriver", "RevPiNetIOSelected", "run_net_plc", - "Cycletools", "EventCallback", - "ProductType", "DeviceType", "AIO", "COMPACT", "DI", "DO", "DIO", "FLAT", "MIO", + "RevPiModIO", + "RevPiModIODriver", + "RevPiModIOSelected", + "run_plc", + "RevPiNetIO", + "RevPiNetIODriver", + "RevPiNetIOSelected", + "run_net_plc", + "Cycletools", + "EventCallback", + "ProductType", + "DeviceType", + "AIO", + "COMPACT", + "DI", + "DO", + "DIO", + "FLAT", + "MIO", ] __author__ = "Sven Sager " __copyright__ = "Copyright (C) 2023 Sven Sager" diff --git a/src/revpimodio2/_internal.py b/src/revpimodio2/_internal.py index d8d3ca1..1ae18ff 100644 --- a/src/revpimodio2/_internal.py +++ b/src/revpimodio2/_internal.py @@ -29,11 +29,11 @@ def acheck(check_type, **kwargs) -> None: for var_name in kwargs: none_okay = var_name.endswith("_noneok") - if not (isinstance(kwargs[var_name], check_type) or - none_okay and kwargs[var_name] is None): + if not (isinstance(kwargs[var_name], check_type) or none_okay and kwargs[var_name] is None): msg = "Argument '{0}' must be {1}{2}".format( - var_name.rstrip("_noneok"), str(check_type), - " or " if none_okay else "" + var_name.rstrip("_noneok"), + str(check_type), + " or " if none_okay else "", ) raise TypeError(msg) diff --git a/src/revpimodio2/device.py b/src/revpimodio2/device.py index 63a690b..5e65de1 100644 --- a/src/revpimodio2/device.py +++ b/src/revpimodio2/device.py @@ -94,9 +94,7 @@ class DeviceList(object): :return: aller Devices """ - for dev in sorted( - self.__dict_position, - key=lambda key: self.__dict_position[key]._offset): + for dev in sorted(self.__dict_position, key=lambda key: self.__dict_position[key]._offset): yield self.__dict_position[dev] def __len__(self): @@ -129,14 +127,37 @@ class Device(object): ihren Prozessabbildpuffer und sorgt fuer die Aktualisierung der IO-Werte. """ - __slots__ = "__my_io_list", "_ba_devdata", "_ba_datacp", \ - "_dict_events", "_filelock", "_modio", "_name", \ - "_offset", "_position", "_producttype", "_selfupdate", \ - "_slc_devoff", "_slc_inp", "_slc_inpoff", "_slc_mem", \ - "_slc_memoff", "_slc_out", "_slc_outoff", "_shared_procimg", \ - "_shared_write", \ - "bmk", "catalognr", "comment", "extend", \ - "guid", "id", "inpvariant", "outvariant", "type" + __slots__ = ( + "__my_io_list", + "_ba_devdata", + "_ba_datacp", + "_dict_events", + "_filelock", + "_modio", + "_name", + "_offset", + "_position", + "_producttype", + "_selfupdate", + "_slc_devoff", + "_slc_inp", + "_slc_inpoff", + "_slc_mem", + "_slc_memoff", + "_slc_out", + "_slc_outoff", + "_shared_procimg", + "_shared_write", + "bmk", + "catalognr", + "comment", + "extend", + "guid", + "id", + "inpvariant", + "outvariant", + "type", + ) def __init__(self, parentmodio, dict_device, simulator=False): """ @@ -170,36 +191,30 @@ class Device(object): "must be {1} but is {2} - Overlapping devices overwrite the " "same memory, which has unpredictable effects!!!" "".format(self._name, parentmodio.length, self._offset), - Warning + Warning, ) # IOM-Objekte erstellen und Adressen in SLCs speichern if simulator: - self._slc_inp = self._buildio( - dict_device.get("out"), INP) - self._slc_out = self._buildio( - dict_device.get("inp"), OUT) + self._slc_inp = self._buildio(dict_device.get("out"), INP) + self._slc_out = self._buildio(dict_device.get("inp"), OUT) else: - self._slc_inp = self._buildio( - dict_device.get("inp"), INP) - self._slc_out = self._buildio( - dict_device.get("out"), OUT) - self._slc_mem = self._buildio( - dict_device.get("mem"), MEM - ) + self._slc_inp = self._buildio(dict_device.get("inp"), INP) + self._slc_out = self._buildio(dict_device.get("out"), OUT) + self._slc_mem = self._buildio(dict_device.get("mem"), MEM) # SLCs mit offset berechnen self._slc_devoff = slice(self._offset, self._offset + self.length) self._slc_inpoff = slice( self._slc_inp.start + self._offset, - self._slc_inp.stop + self._offset + self._slc_inp.stop + self._offset, ) self._slc_outoff = slice( self._slc_out.start + self._offset, - self._slc_out.stop + self._offset + self._slc_out.stop + self._offset, ) self._slc_memoff = slice( self._slc_mem.start + self._offset, - self._slc_mem.stop + self._offset + self._slc_mem.stop + self._offset, ) # Alle restlichen attribute an Klasse anhängen @@ -245,8 +260,7 @@ class Device(object): return True return False else: - return key in self._modio.io \ - and getattr(self._modio.io, key)._parentdevice == self + return key in self._modio.io and getattr(self._modio.io, key)._parentdevice == self def __getitem__(self, key): """ @@ -315,53 +329,40 @@ class Device(object): int_min, int_max = PROCESS_IMAGE_SIZE, 0 for key in sorted(dict_io, key=lambda x: int(x)): - # Neuen IO anlegen if iotype == MEM: # Memory setting - io_new = MemIO( - self, dict_io[key], - iotype, - "little", - False - ) + io_new = MemIO(self, dict_io[key], iotype, "little", False) elif bool(dict_io[key][7]): # Bei Bitwerten IOBase verwenden - io_new = IOBase( - self, dict_io[key], iotype, "little", False - ) - elif isinstance(self, DioModule) and \ - dict_io[key][3] in self._lst_counter: + io_new = IOBase(self, dict_io[key], iotype, "little", False) + elif isinstance(self, DioModule) and dict_io[key][3] in self._lst_counter: # Counter IO auf einem DI oder DIO io_new = IntIOCounter( self._lst_counter.index(dict_io[key][3]), - self, dict_io[key], + self, + dict_io[key], iotype, "little", - False + False, ) elif isinstance(self, Gateway): # Ersetzbare IOs erzeugen - io_new = IntIOReplaceable( - self, dict_io[key], - iotype, - "little", - False - ) + io_new = IntIOReplaceable(self, dict_io[key], iotype, "little", False) else: io_new = IntIO( - self, dict_io[key], + self, + dict_io[key], iotype, "little", # Bei AIO (103) signed auf True setzen - self._producttype == ProductType.AIO + self._producttype == ProductType.AIO, ) if io_new.address < self._modio.length: warnings.warn( - "IO {0} is not in the device offset and will be ignored" - "".format(io_new.name), - Warning + "IO {0} is not in the device offset and will be ignored".format(io_new.name), + Warning, ) else: # IO registrieren @@ -407,7 +408,6 @@ class Device(object): :param activate: Default True fuegt Device zur Synchronisierung hinzu """ if activate and self not in self._modio._lst_refresh: - # Daten bei Aufnahme direkt einlesen! self._modio.readprocimg(self) @@ -457,9 +457,7 @@ class Device(object): :param export: Nur In-/Outputs mit angegebenen 'Export' Wert in piCtory :return: Input und Output, keine MEMs """ - return list(self.__getioiter( - slice(self._slc_inpoff.start, self._slc_outoff.stop), export - )) + return list(self.__getioiter(slice(self._slc_inpoff.start, self._slc_outoff.stop), export)) def get_inputs(self, export=None) -> list: """ @@ -565,7 +563,6 @@ class Base(Device): class GatewayMixin: - @property def leftgate(self) -> bool: """ @@ -573,9 +570,7 @@ class GatewayMixin: :return: True, wenn piGate links existiert """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 16) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 16) @property def rightgate(self) -> bool: @@ -584,9 +579,7 @@ class GatewayMixin: :return: True, wenn piGate rechts existiert """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 32) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 32) class ModularBase(Base): @@ -596,9 +589,16 @@ class ModularBase(Base): Stellt Funktionen fuer den Status zur Verfuegung. """ - __slots__ = "_slc_cycle", "_slc_errorcnt", "_slc_statusbyte", \ - "_slc_temperature", "_slc_errorlimit1", "_slc_errorlimit2", \ - "_slc_frequency", "_slc_led" + __slots__ = ( + "_slc_cycle", + "_slc_errorcnt", + "_slc_statusbyte", + "_slc_temperature", + "_slc_errorlimit1", + "_slc_errorlimit2", + "_slc_frequency", + "_slc_led", + ) def __errorlimit(self, slc_io: slice, errorlimit: int) -> None: """ @@ -608,12 +608,9 @@ class ModularBase(Base): :return: Aktuellen ErrorLimit oder None wenn nicht verfuegbar """ if 0 <= errorlimit <= 65535: - self._ba_devdata[slc_io] = \ - errorlimit.to_bytes(2, byteorder="little") + self._ba_devdata[slc_io] = errorlimit.to_bytes(2, byteorder="little") else: - raise ValueError( - "errorlimit value must be between 0 and 65535" - ) + raise ValueError("errorlimit value must be between 0 and 65535") def _get_status(self) -> int: """ @@ -621,9 +618,7 @@ class ModularBase(Base): :return: Status als """ - return int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) + return int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") @property def picontrolrunning(self) -> bool: @@ -632,9 +627,7 @@ class ModularBase(Base): :return: True, wenn Treiber laeuft """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 1) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 1) @property def unconfdevice(self) -> bool: @@ -643,9 +636,7 @@ class ModularBase(Base): :return: True, wenn IO Modul nicht konfiguriert """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 2) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 2) @property def missingdeviceorgate(self) -> bool: @@ -654,9 +645,7 @@ class ModularBase(Base): :return: True, wenn IO-Modul fehlt oder piGate konfiguriert """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 4) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 4) @property def overunderflow(self) -> bool: @@ -665,9 +654,7 @@ class ModularBase(Base): :return: True, wenn falscher Speicher belegt ist """ - return bool(int.from_bytes( - self._ba_devdata[self._slc_statusbyte], byteorder="little" - ) & 8) + return bool(int.from_bytes(self._ba_devdata[self._slc_statusbyte], byteorder="little") & 8) @property def iocycle(self) -> int: @@ -676,8 +663,10 @@ class ModularBase(Base): :return: Zykluszeit in ms ( -1 wenn nicht verfuegbar) """ - return -1 if self._slc_cycle is None else int.from_bytes( - self._ba_devdata[self._slc_cycle], byteorder="little" + return ( + -1 + if self._slc_cycle is None + else int.from_bytes(self._ba_devdata[self._slc_cycle], byteorder="little") ) @property @@ -687,8 +676,10 @@ class ModularBase(Base): :return: CPU-Temperatur in Celsius (-273 wenn nich verfuegbar) """ - return -273 if self._slc_temperature is None else int.from_bytes( - self._ba_devdata[self._slc_temperature], byteorder="little" + return ( + -273 + if self._slc_temperature is None + else int.from_bytes(self._ba_devdata[self._slc_temperature], byteorder="little") ) @property @@ -698,9 +689,11 @@ class ModularBase(Base): :return: CPU Taktfrequenz in MHz (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_frequency is None else int.from_bytes( - self._ba_devdata[self._slc_frequency], byteorder="little" - ) * 10 + return ( + -1 + if self._slc_frequency is None + else int.from_bytes(self._ba_devdata[self._slc_frequency], byteorder="little") * 10 + ) @property def ioerrorcount(self) -> int: @@ -709,8 +702,10 @@ class ModularBase(Base): :return: Fehleranzahl der piBridge (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_errorcnt is None else int.from_bytes( - self._ba_devdata[self._slc_errorcnt], byteorder="little" + return ( + -1 + if self._slc_errorcnt is None + else int.from_bytes(self._ba_devdata[self._slc_errorcnt], byteorder="little") ) @property @@ -720,8 +715,10 @@ class ModularBase(Base): :return: Aktueller Wert fuer ErrorLimit1 (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_errorlimit1 is None else int.from_bytes( - self._ba_devdata[self._slc_errorlimit1], byteorder="little" + return ( + -1 + if self._slc_errorlimit1 is None + else int.from_bytes(self._ba_devdata[self._slc_errorlimit1], byteorder="little") ) @errorlimit1.setter @@ -732,9 +729,7 @@ class ModularBase(Base): :param value: Neuer ErrorLimit1 Wert """ if self._slc_errorlimit1 is None: - raise RuntimeError( - "selected core item in piCtory does not support errorlimit1" - ) + raise RuntimeError("selected core item in piCtory does not support errorlimit1") else: self.__errorlimit(self._slc_errorlimit1, value) @@ -745,8 +740,10 @@ class ModularBase(Base): :return: Aktueller Wert fuer ErrorLimit2 (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_errorlimit2 is None else int.from_bytes( - self._ba_devdata[self._slc_errorlimit2], byteorder="little" + return ( + -1 + if self._slc_errorlimit2 is None + else int.from_bytes(self._ba_devdata[self._slc_errorlimit2], byteorder="little") ) @errorlimit2.setter @@ -757,9 +754,7 @@ class ModularBase(Base): :param value: Neuer ErrorLimit2 Wert """ if self._slc_errorlimit2 is None: - raise RuntimeError( - "selected core item in piCtory does not support errorlimit2" - ) + raise RuntimeError("selected core item in piCtory does not support errorlimit2") else: self.__errorlimit(self._slc_errorlimit2, value) @@ -777,11 +772,8 @@ class Core(ModularBase, GatewayMixin): def __setattr__(self, key, value): """Verhindert Ueberschreibung der LEDs.""" - if hasattr(self, key) and key in ( - "a1green", "a1red", "a2green", "a2red", "wd"): - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + if hasattr(self, key) and key in ("a1green", "a1red", "a2green", "a2red", "wd"): + raise AttributeError("direct assignment is not supported - use .value Attribute") else: object.__setattr__(self, key, value) @@ -831,28 +823,43 @@ class Core(ModularBase, GatewayMixin): exp_a2red = exp_a1green # Echte IOs erzeugen - self.a1green = IOBase(self, [ - "core.a1green", 0, 1, self._slc_led.start, - exp_a1green, None, "LED_A1_GREEN", "0" - ], OUT, "little", False) - self.a1red = IOBase(self, [ - "core.a1red", 0, 1, self._slc_led.start, - exp_a1red, None, "LED_A1_RED", "1" - ], OUT, "little", False) - self.a2green = IOBase(self, [ - "core.a2green", 0, 1, self._slc_led.start, - exp_a2green, None, "LED_A2_GREEN", "2" - ], OUT, "little", False) - self.a2red = IOBase(self, [ - "core.a2red", 0, 1, self._slc_led.start, - exp_a2red, None, "LED_A2_RED", "3" - ], OUT, "little", False) + self.a1green = IOBase( + self, + ["core.a1green", 0, 1, self._slc_led.start, exp_a1green, None, "LED_A1_GREEN", "0"], + OUT, + "little", + False, + ) + self.a1red = IOBase( + self, + ["core.a1red", 0, 1, self._slc_led.start, exp_a1red, None, "LED_A1_RED", "1"], + OUT, + "little", + False, + ) + self.a2green = IOBase( + self, + ["core.a2green", 0, 1, self._slc_led.start, exp_a2green, None, "LED_A2_GREEN", "2"], + OUT, + "little", + False, + ) + self.a2red = IOBase( + self, + ["core.a2red", 0, 1, self._slc_led.start, exp_a2red, None, "LED_A2_RED", "3"], + OUT, + "little", + False, + ) # Watchdog einrichten (Core=soft / Connect=soft/hard) - self.wd = IOBase(self, [ - "core.wd", 0, 1, self._slc_led.start, - False, None, "WatchDog", "7" - ], OUT, "little", False) + self.wd = IOBase( + self, + ["core.wd", 0, 1, self._slc_led.start, False, None, "WatchDog", "7"], + OUT, + "little", + False, + ) def _get_leda1(self) -> int: """ @@ -910,16 +917,12 @@ class Connect(Core): Stellt Funktionen fuer die LEDs, Watchdog und den Status zur Verfuegung. """ - __slots__ = "__evt_wdtoggle", "__th_wdtoggle", "a3green", "a3red", \ - "x2in", "x2out" + __slots__ = "__evt_wdtoggle", "__th_wdtoggle", "a3green", "a3red", "x2in", "x2out" def __setattr__(self, key, value): """Verhindert Ueberschreibung der speziellen IOs.""" - if hasattr(self, key) and key in ( - "a3green", "a3red", "x2in", "x2out"): - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + if hasattr(self, key) and key in ("a3green", "a3red", "x2in", "x2out"): + raise AttributeError("direct assignment is not supported - use .value Attribute") super(Connect, self).__setattr__(key, value) def __wdtoggle(self) -> None: @@ -954,24 +957,36 @@ class Connect(Core): exp_x2in = lst_status[0].export # Echte IOs erzeugen - self.a3green = IOBase(self, [ - "core.a3green", 0, 1, self._slc_led.start, - exp_a3green, None, "LED_A3_GREEN", "4" - ], OUT, "little", False) - self.a3red = IOBase(self, [ - "core.a3red", 0, 1, self._slc_led.start, - exp_a3red, None, "LED_A3_RED", "5" - ], OUT, "little", False) + self.a3green = IOBase( + self, + ["core.a3green", 0, 1, self._slc_led.start, exp_a3green, None, "LED_A3_GREEN", "4"], + OUT, + "little", + False, + ) + self.a3red = IOBase( + self, + ["core.a3red", 0, 1, self._slc_led.start, exp_a3red, None, "LED_A3_RED", "5"], + OUT, + "little", + False, + ) # IO Objekte für WD und X2 in/out erzeugen - self.x2in = IOBase(self, [ - "core.x2in", 0, 1, self._slc_statusbyte.start, - exp_x2in, None, "Connect_X2_IN", "6" - ], INP, "little", False) - self.x2out = IOBase(self, [ - "core.x2out", 0, 1, self._slc_led.start, - exp_x2out, None, "Connect_X2_OUT", "6" - ], OUT, "little", False) + self.x2in = IOBase( + self, + ["core.x2in", 0, 1, self._slc_statusbyte.start, exp_x2in, None, "Connect_X2_IN", "6"], + INP, + "little", + False, + ) + self.x2out = IOBase( + self, + ["core.x2out", 0, 1, self._slc_led.start, exp_x2out, None, "Connect_X2_OUT", "6"], + OUT, + "little", + False, + ) # Export hardware watchdog to use it with other systems self.wd._export = int(exp_wd) # Do this without mrk for export! @@ -1021,13 +1036,9 @@ class Connect(Core): :param value: True zum aktivieren, False zum beenden """ if self._modio._monitoring: - raise RuntimeError( - "can not toggle watchdog, while system is in monitoring mode" - ) + raise RuntimeError("can not toggle watchdog, while system is in monitoring mode") if self._modio._simulator: - raise RuntimeError( - "can not toggle watchdog, while system is in simulator mode" - ) + raise RuntimeError("can not toggle watchdog, while system is in simulator mode") if not value: self.__evt_wdtoggle.set() @@ -1050,26 +1061,47 @@ class Connect4(ModularBase): __slots__ = ( "_slc_output", - "a1red", "a1green", "a1blue", - "a2red", "a2green", "a2blue", - "a3red", "a3green", "a3blue", - "a4red", "a4green", "a4blue", - "a5red", "a5green", "a5blue", - "x2in", "x2out" + "a1red", + "a1green", + "a1blue", + "a2red", + "a2green", + "a2blue", + "a3red", + "a3green", + "a3blue", + "a4red", + "a4green", + "a4blue", + "a5red", + "a5green", + "a5blue", + "x2in", + "x2out", ) def __setattr__(self, key, value): """Verhindert Ueberschreibung der speziellen IOs.""" if hasattr(self, key) and key in ( - "a1red", "a1green", "a1blue", - "a2red", "a2green", "a2blue", - "a3red", "a3green", "a3blue", - "a4red", "a4green", "a4blue", - "a5red", "a5green", "a5blue", - "x2in", "x2out"): - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + "a1red", + "a1green", + "a1blue", + "a2red", + "a2green", + "a2blue", + "a3red", + "a3green", + "a3blue", + "a4red", + "a4green", + "a4blue", + "a5red", + "a5green", + "a5blue", + "x2in", + "x2out", + ): + raise AttributeError("direct assignment is not supported - use .value Attribute") super(Connect4, self).__setattr__(key, value) def _devconfigure(self) -> None: @@ -1137,80 +1169,131 @@ class Connect4(ModularBase): exp_x2in = lst_status[0].export # Echte IOs erzeugen - self.a1red = IOBase(self, [ - "core.a1red", 0, 1, self._slc_led.start, - exp_a1red, None, "LED_A1_RED", "0" - ], OUT, "little", False) - self.a1green = IOBase(self, [ - "core.a1green", 0, 1, self._slc_led.start, - exp_a1green, None, "LED_A1_GREEN", "1" - ], OUT, "little", False) - self.a1blue = IOBase(self, [ - "core.a1blue", 0, 1, self._slc_led.start, - exp_a1blue, None, "LED_A1_BLUE", "2" - ], OUT, "little", False) + self.a1red = IOBase( + self, + ["core.a1red", 0, 1, self._slc_led.start, exp_a1red, None, "LED_A1_RED", "0"], + OUT, + "little", + False, + ) + self.a1green = IOBase( + self, + ["core.a1green", 0, 1, self._slc_led.start, exp_a1green, None, "LED_A1_GREEN", "1"], + OUT, + "little", + False, + ) + self.a1blue = IOBase( + self, + ["core.a1blue", 0, 1, self._slc_led.start, exp_a1blue, None, "LED_A1_BLUE", "2"], + OUT, + "little", + False, + ) - self.a2red = IOBase(self, [ - "core.a2red", 0, 1, self._slc_led.start, - exp_a2red, None, "LED_A2_RED", "3" - ], OUT, "little", False) - self.a2green = IOBase(self, [ - "core.a2green", 0, 1, self._slc_led.start, - exp_a2green, None, "LED_A2_GREEN", "4" - ], OUT, "little", False) - self.a2blue = IOBase(self, [ - "core.a2blue", 0, 1, self._slc_led.start, - exp_a2blue, None, "LED_A2_BLUE", "5" - ], OUT, "little", False) + self.a2red = IOBase( + self, + ["core.a2red", 0, 1, self._slc_led.start, exp_a2red, None, "LED_A2_RED", "3"], + OUT, + "little", + False, + ) + self.a2green = IOBase( + self, + ["core.a2green", 0, 1, self._slc_led.start, exp_a2green, None, "LED_A2_GREEN", "4"], + OUT, + "little", + False, + ) + self.a2blue = IOBase( + self, + ["core.a2blue", 0, 1, self._slc_led.start, exp_a2blue, None, "LED_A2_BLUE", "5"], + OUT, + "little", + False, + ) - self.a3red = IOBase(self, [ - "core.a3red", 0, 1, self._slc_led.start, - exp_a3red, None, "LED_A3_RED", "6" - ], OUT, "little", False) - self.a3green = IOBase(self, [ - "core.a3green", 0, 1, self._slc_led.start, - exp_a3green, None, "LED_A3_GREEN", "7" - ], OUT, "little", False) - self.a3blue = IOBase(self, [ - "core.a3blue", 0, 1, self._slc_led.start, - exp_a3blue, None, "LED_A3_BLUE", "8" - ], OUT, "little", False) + self.a3red = IOBase( + self, + ["core.a3red", 0, 1, self._slc_led.start, exp_a3red, None, "LED_A3_RED", "6"], + OUT, + "little", + False, + ) + self.a3green = IOBase( + self, + ["core.a3green", 0, 1, self._slc_led.start, exp_a3green, None, "LED_A3_GREEN", "7"], + OUT, + "little", + False, + ) + self.a3blue = IOBase( + self, + ["core.a3blue", 0, 1, self._slc_led.start, exp_a3blue, None, "LED_A3_BLUE", "8"], + OUT, + "little", + False, + ) - self.a4red = IOBase(self, [ - "core.a4red", 0, 1, self._slc_led.start, - exp_a4red, None, "LED_A4_RED", "9" - ], OUT, "little", False) - self.a4green = IOBase(self, [ - "core.a4green", 0, 1, self._slc_led.start, - exp_a4green, None, "LED_A4_GREEN", "10" - ], OUT, "little", False) - self.a4blue = IOBase(self, [ - "core.a4blue", 0, 1, self._slc_led.start, - exp_a4blue, None, "LED_A4_BLUE", "11" - ], OUT, "little", False) + self.a4red = IOBase( + self, + ["core.a4red", 0, 1, self._slc_led.start, exp_a4red, None, "LED_A4_RED", "9"], + OUT, + "little", + False, + ) + self.a4green = IOBase( + self, + ["core.a4green", 0, 1, self._slc_led.start, exp_a4green, None, "LED_A4_GREEN", "10"], + OUT, + "little", + False, + ) + self.a4blue = IOBase( + self, + ["core.a4blue", 0, 1, self._slc_led.start, exp_a4blue, None, "LED_A4_BLUE", "11"], + OUT, + "little", + False, + ) - self.a5red = IOBase(self, [ - "core.a5red", 0, 1, self._slc_led.start, - exp_a5red, None, "LED_A5_RED", "12" - ], OUT, "little", False) - self.a5green = IOBase(self, [ - "core.a5green", 0, 1, self._slc_led.start, - exp_a5green, None, "LED_A5_GREEN", "13" - ], OUT, "little", False) - self.a5blue = IOBase(self, [ - "core.a5blue", 0, 1, self._slc_led.start, - exp_a5blue, None, "LED_A5_BLUE", "14" - ], OUT, "little", False) + self.a5red = IOBase( + self, + ["core.a5red", 0, 1, self._slc_led.start, exp_a5red, None, "LED_A5_RED", "12"], + OUT, + "little", + False, + ) + self.a5green = IOBase( + self, + ["core.a5green", 0, 1, self._slc_led.start, exp_a5green, None, "LED_A5_GREEN", "13"], + OUT, + "little", + False, + ) + self.a5blue = IOBase( + self, + ["core.a5blue", 0, 1, self._slc_led.start, exp_a5blue, None, "LED_A5_BLUE", "14"], + OUT, + "little", + False, + ) # IO Objekte für WD und X2 in/out erzeugen - self.x2in = IOBase(self, [ - "core.x2in", 0, 1, self._slc_statusbyte.start, - exp_x2in, None, "Connect_X2_IN", "6" - ], INP, "little", False) - self.x2out = IOBase(self, [ - "core.x2out", 0, 1, self._slc_led.start, - exp_x2out, None, "Connect_X2_OUT", "6" - ], OUT, "little", False) + self.x2in = IOBase( + self, + ["core.x2in", 0, 1, self._slc_statusbyte.start, exp_x2in, None, "Connect_X2_IN", "6"], + INP, + "little", + False, + ) + self.x2out = IOBase( + self, + ["core.x2out", 0, 1, self._slc_led.start, exp_x2out, None, "Connect_X2_OUT", "6"], + OUT, + "little", + False, + ) def _get_leda1(self) -> int: """ @@ -1333,16 +1416,21 @@ class Compact(Base): Objekt zugegriffen. """ - __slots__ = "_slc_temperature", "_slc_frequency", "_slc_led", \ - "a1green", "a1red", "a2green", "a2red", "wd" + __slots__ = ( + "_slc_temperature", + "_slc_frequency", + "_slc_led", + "a1green", + "a1red", + "a2green", + "a2red", + "wd", + ) def __setattr__(self, key, value): """Verhindert Ueberschreibung der LEDs.""" - if hasattr(self, key) and key in ( - "a1green", "a1red", "a2green", "a2red", "wd"): - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + if hasattr(self, key) and key in ("a1green", "a1red", "a2green", "a2red", "wd"): + raise AttributeError("direct assignment is not supported - use .value Attribute") else: object.__setattr__(self, key, value) @@ -1369,28 +1457,43 @@ class Compact(Base): exp_a2red = exp_a1green # Echte IOs erzeugen - self.a1green = IOBase(self, [ - "core.a1green", 0, 1, self._slc_led.start, - exp_a1green, None, "LED_A1_GREEN", "0" - ], OUT, "little", False) - self.a1red = IOBase(self, [ - "core.a1red", 0, 1, self._slc_led.start, - exp_a1red, None, "LED_A1_RED", "1" - ], OUT, "little", False) - self.a2green = IOBase(self, [ - "core.a2green", 0, 1, self._slc_led.start, - exp_a2green, None, "LED_A2_GREEN", "2" - ], OUT, "little", False) - self.a2red = IOBase(self, [ - "core.a2red", 0, 1, self._slc_led.start, - exp_a2red, None, "LED_A2_RED", "3" - ], OUT, "little", False) + self.a1green = IOBase( + self, + ["core.a1green", 0, 1, self._slc_led.start, exp_a1green, None, "LED_A1_GREEN", "0"], + OUT, + "little", + False, + ) + self.a1red = IOBase( + self, + ["core.a1red", 0, 1, self._slc_led.start, exp_a1red, None, "LED_A1_RED", "1"], + OUT, + "little", + False, + ) + self.a2green = IOBase( + self, + ["core.a2green", 0, 1, self._slc_led.start, exp_a2green, None, "LED_A2_GREEN", "2"], + OUT, + "little", + False, + ) + self.a2red = IOBase( + self, + ["core.a2red", 0, 1, self._slc_led.start, exp_a2red, None, "LED_A2_RED", "3"], + OUT, + "little", + False, + ) # Software watchdog einrichten - self.wd = IOBase(self, [ - "core.wd", 0, 1, self._slc_led.start, - False, None, "WatchDog", "7" - ], OUT, "little", False) + self.wd = IOBase( + self, + ["core.wd", 0, 1, self._slc_led.start, False, None, "WatchDog", "7"], + OUT, + "little", + False, + ) def _get_leda1(self) -> int: """ @@ -1448,8 +1551,10 @@ class Compact(Base): :return: CPU-Temperatur in Celsius (-273 wenn nich verfuegbar) """ - return -273 if self._slc_temperature is None else int.from_bytes( - self._ba_devdata[self._slc_temperature], byteorder="little" + return ( + -273 + if self._slc_temperature is None + else int.from_bytes(self._ba_devdata[self._slc_temperature], byteorder="little") ) @property @@ -1459,9 +1564,11 @@ class Compact(Base): :return: CPU Taktfrequenz in MHz (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_frequency is None else int.from_bytes( - self._ba_devdata[self._slc_frequency], byteorder="little" - ) * 10 + return ( + -1 + if self._slc_frequency is None + else int.from_bytes(self._ba_devdata[self._slc_frequency], byteorder="little") * 10 + ) class Flat(Base): @@ -1472,21 +1579,45 @@ class Flat(Base): Objekt zugegriffen. """ - __slots__ = "_slc_temperature", "_slc_frequency", "_slc_led", \ - "_slc_switch", "_slc_dout", \ - "a1green", "a1red", "a2green", "a2red", \ - "a3green", "a3red", "a4green", "a4red", \ - "a5green", "a5red", "relais", "switch", "wd" + __slots__ = ( + "_slc_temperature", + "_slc_frequency", + "_slc_led", + "_slc_switch", + "_slc_dout", + "a1green", + "a1red", + "a2green", + "a2red", + "a3green", + "a3red", + "a4green", + "a4red", + "a5green", + "a5red", + "relais", + "switch", + "wd", + ) def __setattr__(self, key, value): """Verhindert Ueberschreibung der LEDs.""" if hasattr(self, key) and key in ( - "a1green", "a1red", "a2green", "a2red", - "a3green", "a3red", "a4green", "a4red", - "a5green", "a5red", "relais", "switch", "wd"): - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + "a1green", + "a1red", + "a2green", + "a2red", + "a3green", + "a3red", + "a4green", + "a4red", + "a5green", + "a5red", + "relais", + "switch", + "wd", + ): + raise AttributeError("direct assignment is not supported - use .value Attribute") else: object.__setattr__(self, key, value) @@ -1530,68 +1661,107 @@ class Flat(Base): exp_a5red = exp_a1green # Echte IOs erzeugen - self.a1green = IOBase(self, [ - "core.a1green", 0, 1, self._slc_led.start, - exp_a1green, None, "LED_A1_GREEN", "0" - ], OUT, "little", False) - self.a1red = IOBase(self, [ - "core.a1red", 0, 1, self._slc_led.start, - exp_a1red, None, "LED_A1_RED", "1" - ], OUT, "little", False) - self.a2green = IOBase(self, [ - "core.a2green", 0, 1, self._slc_led.start, - exp_a2green, None, "LED_A2_GREEN", "2" - ], OUT, "little", False) - self.a2red = IOBase(self, [ - "core.a2red", 0, 1, self._slc_led.start, - exp_a2red, None, "LED_A2_RED", "3" - ], OUT, "little", False) - self.a3green = IOBase(self, [ - "core.a3green", 0, 1, self._slc_led.start, - exp_a3green, None, "LED_A3_GREEN", "4" - ], OUT, "little", False) - self.a3red = IOBase(self, [ - "core.a3red", 0, 1, self._slc_led.start, - exp_a3red, None, "LED_A3_RED", "5" - ], OUT, "little", False) - self.a4green = IOBase(self, [ - "core.a4green", 0, 1, self._slc_led.start, - exp_a4green, None, "LED_A4_GREEN", "6" - ], OUT, "little", False) - self.a4red = IOBase(self, [ - "core.a4red", 0, 1, self._slc_led.start, - exp_a4red, None, "LED_A4_RED", "7" - ], OUT, "little", False) - self.a5green = IOBase(self, [ - "core.a5green", 0, 1, self._slc_led.start, - exp_a5green, None, "LED_A5_GREEN", "8" - ], OUT, "little", False) - self.a5red = IOBase(self, [ - "core.a5red", 0, 1, self._slc_led.start, - exp_a5red, None, "LED_A5_RED", "9" - ], OUT, "little", False) + self.a1green = IOBase( + self, + ["core.a1green", 0, 1, self._slc_led.start, exp_a1green, None, "LED_A1_GREEN", "0"], + OUT, + "little", + False, + ) + self.a1red = IOBase( + self, + ["core.a1red", 0, 1, self._slc_led.start, exp_a1red, None, "LED_A1_RED", "1"], + OUT, + "little", + False, + ) + self.a2green = IOBase( + self, + ["core.a2green", 0, 1, self._slc_led.start, exp_a2green, None, "LED_A2_GREEN", "2"], + OUT, + "little", + False, + ) + self.a2red = IOBase( + self, + ["core.a2red", 0, 1, self._slc_led.start, exp_a2red, None, "LED_A2_RED", "3"], + OUT, + "little", + False, + ) + self.a3green = IOBase( + self, + ["core.a3green", 0, 1, self._slc_led.start, exp_a3green, None, "LED_A3_GREEN", "4"], + OUT, + "little", + False, + ) + self.a3red = IOBase( + self, + ["core.a3red", 0, 1, self._slc_led.start, exp_a3red, None, "LED_A3_RED", "5"], + OUT, + "little", + False, + ) + self.a4green = IOBase( + self, + ["core.a4green", 0, 1, self._slc_led.start, exp_a4green, None, "LED_A4_GREEN", "6"], + OUT, + "little", + False, + ) + self.a4red = IOBase( + self, + ["core.a4red", 0, 1, self._slc_led.start, exp_a4red, None, "LED_A4_RED", "7"], + OUT, + "little", + False, + ) + self.a5green = IOBase( + self, + ["core.a5green", 0, 1, self._slc_led.start, exp_a5green, None, "LED_A5_GREEN", "8"], + OUT, + "little", + False, + ) + self.a5red = IOBase( + self, + ["core.a5red", 0, 1, self._slc_led.start, exp_a5red, None, "LED_A5_RED", "9"], + OUT, + "little", + False, + ) # Real IO for switch lst_io = self._modio.io[self._slc_devoff][self._slc_switch.start] exp_io = lst_io[0].export - self.switch = IOBase(self, [ - "flat.switch", 0, 1, self._slc_switch.start, - exp_io, None, "Flat_Switch", "0" - ], INP, "little", False) + self.switch = IOBase( + self, + ["flat.switch", 0, 1, self._slc_switch.start, exp_io, None, "Flat_Switch", "0"], + INP, + "little", + False, + ) # Real IO for relais lst_io = self._modio.io[self._slc_devoff][self._slc_dout.start] exp_io = lst_io[0].export - self.relais = IOBase(self, [ - "flat.relais", 0, 1, self._slc_dout.start, - exp_io, None, "Flat_Relais", "0" - ], OUT, "little", False) + self.relais = IOBase( + self, + ["flat.relais", 0, 1, self._slc_dout.start, exp_io, None, "Flat_Relais", "0"], + OUT, + "little", + False, + ) # Software watchdog einrichten - self.wd = IOBase(self, [ - "core.wd", 0, 1, self._slc_led.start, - False, None, "WatchDog", "15" - ], OUT, "little", False) + self.wd = IOBase( + self, + ["core.wd", 0, 1, self._slc_led.start, False, None, "WatchDog", "15"], + OUT, + "little", + False, + ) def _get_leda1(self) -> int: """ @@ -1710,8 +1880,10 @@ class Flat(Base): :return: CPU-Temperatur in Celsius (-273 wenn nich verfuegbar) """ - return -273 if self._slc_temperature is None else int.from_bytes( - self._ba_devdata[self._slc_temperature], byteorder="little" + return ( + -273 + if self._slc_temperature is None + else int.from_bytes(self._ba_devdata[self._slc_temperature], byteorder="little") ) @property @@ -1721,9 +1893,11 @@ class Flat(Base): :return: CPU Taktfrequenz in MHz (-1 wenn nicht verfuegbar) """ - return -1 if self._slc_frequency is None else int.from_bytes( - self._ba_devdata[self._slc_frequency], byteorder="little" - ) * 10 + return ( + -1 + if self._slc_frequency is None + else int.from_bytes(self._ba_devdata[self._slc_frequency], byteorder="little") * 10 + ) class DioModule(Device): @@ -1770,7 +1944,7 @@ class Gateway(Device): self._dict_slc = { INP: self._slc_inp, OUT: self._slc_out, - MEM: self._slc_mem + MEM: self._slc_mem, } def get_rawbytes(self) -> bytes: @@ -1809,10 +1983,7 @@ class Virtual(Gateway): :return: True, wenn Arbeiten am virtuellen Device erfolgreich waren """ if self._modio._monitoring: - raise RuntimeError( - "can not write process image, while system is in monitoring " - "mode" - ) + raise RuntimeError("can not write process image, while system is in monitoring mode") workokay = True self._filelock.acquire() diff --git a/src/revpimodio2/helper.py b/src/revpimodio2/helper.py index 9ad4ad0..0938cef 100644 --- a/src/revpimodio2/helper.py +++ b/src/revpimodio2/helper.py @@ -87,12 +87,31 @@ class Cycletools: Lampen synchron blinken zu lassen. """ - __slots__ = "__cycle", "__cycletime", "__ucycle", "__dict_ton", \ - "__dict_tof", "__dict_tp", "__dict_change", \ - "_start_timer", "core", "device", \ - "first", "io", "last", "var", \ - "flag1c", "flag5c", "flag10c", "flag15c", "flag20c", \ - "flank5c", "flank10c", "flank15c", "flank20c" + __slots__ = ( + "__cycle", + "__cycletime", + "__ucycle", + "__dict_ton", + "__dict_tof", + "__dict_tp", + "__dict_change", + "_start_timer", + "core", + "device", + "first", + "io", + "last", + "var", + "flag1c", + "flag5c", + "flag10c", + "flag15c", + "flag20c", + "flank5c", + "flank10c", + "flank15c", + "flank20c", + ) def __init__(self, cycletime, revpi_object): """Init Cycletools class.""" @@ -208,16 +227,13 @@ class Cycletools: else: value = io.get_value() return self.__dict_change[io] != value and ( - value and edge == RISING or - not value and edge == FALLING + value and edge == RISING or not value and edge == FALLING ) else: if not isinstance(io, IOBase): raise TypeError("parameter 'io' must be an io object") if not (edge == BOTH or type(io.value) == bool): - raise ValueError( - "parameter 'edge' can be used with bit io objects only" - ) + raise ValueError("parameter 'edge' can be used with bit io objects only") self.__dict_change[io] = None return False @@ -283,8 +299,7 @@ class Cycletools: :param milliseconds: Millisekunden, der Verzoegerung wenn neu gestartet """ if self.__dict_ton.get(name, [-1])[0] == -1: - self.__dict_ton[name] = \ - [ceil(milliseconds / self.__cycletime), True] + self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True] else: self.__dict_ton[name][1] = True @@ -326,8 +341,7 @@ class Cycletools: :param milliseconds: Millisekunden, die der Impuls anstehen soll """ if self.__dict_tp.get(name, [-1])[0] == -1: - self.__dict_tp[name] = \ - [ceil(milliseconds / self.__cycletime), True] + self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True] else: self.__dict_tp[name][1] = True @@ -364,9 +378,19 @@ class ProcimgWriter(Thread): Event-Handling verwendet. """ - __slots__ = "__dict_delay", "__eventth", "_eventqth", "__eventwork", \ - "_eventq", "_modio", \ - "_refresh", "_work", "daemon", "lck_refresh", "newdata" + __slots__ = ( + "__dict_delay", + "__eventth", + "_eventqth", + "__eventwork", + "_eventq", + "_modio", + "_refresh", + "_work", + "daemon", + "lck_refresh", + "newdata", + ) def __init__(self, parentmodio): """Init ProcimgWriter class.""" @@ -387,43 +411,38 @@ class ProcimgWriter(Thread): def __check_change(self, dev) -> None: """Findet Aenderungen fuer die Eventueberwachung.""" for io_event in dev._dict_events: - - if dev._ba_datacp[io_event._slc_address] == \ - dev._ba_devdata[io_event._slc_address]: + if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]: continue if io_event._bitshift: - boolcp = dev._ba_datacp[io_event._slc_address.start] \ - & io_event._bitshift - boolor = dev._ba_devdata[io_event._slc_address.start] \ - & io_event._bitshift + boolcp = dev._ba_datacp[io_event._slc_address.start] & io_event._bitshift + boolor = dev._ba_devdata[io_event._slc_address.start] & io_event._bitshift if boolor == boolcp: continue for regfunc in dev._dict_events[io_event]: - if regfunc.edge == BOTH \ - or regfunc.edge == RISING and boolor \ - or regfunc.edge == FALLING and not boolor: + if ( + regfunc.edge == BOTH + or regfunc.edge == RISING + and boolor + or regfunc.edge == FALLING + and not boolor + ): if regfunc.delay == 0: if regfunc.as_thread: - self._eventqth.put( - (regfunc, io_event._name, io_event.value), - False - ) + self._eventqth.put((regfunc, io_event._name, io_event.value), False) else: - self._eventq.put( - (regfunc, io_event._name, io_event.value), - False - ) + self._eventq.put((regfunc, io_event._name, io_event.value), False) else: # Verzögertes Event in dict einfügen tup_fire = ( - regfunc, io_event._name, io_event.value, + regfunc, + io_event._name, + io_event.value, io_event, ) - if regfunc.overwrite \ - or tup_fire not in self.__dict_delay: + if regfunc.overwrite or tup_fire not in self.__dict_delay: self.__dict_delay[tup_fire] = ceil( regfunc.delay / 1000 / self._refresh ) @@ -431,26 +450,19 @@ class ProcimgWriter(Thread): for regfunc in dev._dict_events[io_event]: if regfunc.delay == 0: if regfunc.as_thread: - self._eventqth.put( - (regfunc, io_event._name, io_event.value), - False - ) + self._eventqth.put((regfunc, io_event._name, io_event.value), False) else: - self._eventq.put( - (regfunc, io_event._name, io_event.value), - False - ) + self._eventq.put((regfunc, io_event._name, io_event.value), False) else: # Verzögertes Event in dict einfügen tup_fire = ( - regfunc, io_event._name, io_event.value, + regfunc, + io_event._name, + io_event.value, io_event, ) - if regfunc.overwrite \ - or tup_fire not in self.__dict_delay: - self.__dict_delay[tup_fire] = ceil( - regfunc.delay / 1000 / self._refresh - ) + if regfunc.overwrite or tup_fire not in self.__dict_delay: + self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh) # Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) dev._ba_datacp = dev._ba_devdata[:] @@ -460,9 +472,7 @@ class ProcimgWriter(Thread): while self.__eventwork: try: tup_fireth = self._eventqth.get(timeout=1) - th = EventCallback( - tup_fireth[0].func, tup_fireth[1], tup_fireth[2] - ) + th = EventCallback(tup_fireth[0].func, tup_fireth[1], tup_fireth[2]) th.start() self._eventqth.task_done() except queue.Empty: @@ -524,7 +534,7 @@ class ProcimgWriter(Thread): warnings.warn( "cycle time of {0} ms exceeded in your cycle function" "".format(int(self._refresh * 1000)), - RuntimeWarning + RuntimeWarning, ) mrk_delay = self._refresh # Nur durch cycleloop erreichbar - keine verzögerten Events @@ -545,23 +555,25 @@ class ProcimgWriter(Thread): # Read all device bytes, because it is shared fh.seek(dev.offset) - bytesbuff[dev._slc_devoff] = \ - fh.read(len(dev._ba_devdata)) + bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata)) if self._modio._monitoring or dev._shared_procimg: # Inputs und Outputs in Puffer dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] - if self.__eventwork \ - and len(dev._dict_events) > 0 \ - and dev._ba_datacp != dev._ba_devdata: + if ( + self.__eventwork + and len(dev._dict_events) > 0 + and dev._ba_datacp != dev._ba_devdata + ): self.__check_change(dev) else: # Inputs in Puffer, Outputs in Prozessabbild - dev._ba_devdata[dev._slc_inp] = \ - bytesbuff[dev._slc_inpoff] - if self.__eventwork \ - and len(dev._dict_events) > 0 \ - and dev._ba_datacp != dev._ba_devdata: + dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] + if ( + self.__eventwork + and len(dev._dict_events) > 0 + and dev._ba_datacp != dev._ba_devdata + ): self.__check_change(dev) fh.seek(dev._slc_outoff.start) @@ -579,16 +591,13 @@ class ProcimgWriter(Thread): else: if not mrk_warn: if self._modio._debug == 0: - warnings.warn( - "recover from io errors on process image", - RuntimeWarning - ) + warnings.warn("recover from io errors on process image", RuntimeWarning) else: warnings.warn( "recover from io errors on process image - total " "count of {0} errors now" "".format(self._modio._ioerror), - RuntimeWarning + RuntimeWarning, ) mrk_warn = True @@ -600,8 +609,7 @@ class ProcimgWriter(Thread): # Verzögerte Events prüfen if self.__eventwork: for tup_fire in tuple(self.__dict_delay.keys()): - if tup_fire[0].overwrite and \ - tup_fire[3].value != tup_fire[2]: + if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]: del self.__dict_delay[tup_fire] else: self.__dict_delay[tup_fire] -= 1 @@ -617,9 +625,8 @@ class ProcimgWriter(Thread): # Second default_timer call include calculation time from above if default_timer() - ot > self._refresh: warnings.warn( - "io refresh time of {0} ms exceeded!" - "".format(int(self._refresh * 1000)), - RuntimeWarning + "io refresh time of {0} ms exceeded!".format(int(self._refresh * 1000)), + RuntimeWarning, ) mrk_delay = 0.0 else: @@ -641,8 +648,6 @@ class ProcimgWriter(Thread): if type(value) == int and 5 <= value <= 2000: self._refresh = value / 1000 else: - raise ValueError( - "refresh time must be 5 to 2000 milliseconds" - ) + raise ValueError("refresh time must be 5 to 2000 milliseconds") refresh = property(get_refresh, set_refresh) diff --git a/src/revpimodio2/io.py b/src/revpimodio2/io.py index ceafcb3..fc33ae1 100644 --- a/src/revpimodio2/io.py +++ b/src/revpimodio2/io.py @@ -8,8 +8,7 @@ import struct from re import match as rematch from threading import Event -from ._internal import consttostr, RISING, FALLING, BOTH, INP, OUT, \ - MEM, PROCESS_IMAGE_SIZE +from ._internal import consttostr, RISING, FALLING, BOTH, INP, OUT, MEM, PROCESS_IMAGE_SIZE try: # Funktioniert nur auf Unix @@ -69,8 +68,16 @@ class IOList(object): self.__dict_iobyte[io_del.address][io_del._bitaddress] = None # Do not use any() because we want to know None, not 0 - if self.__dict_iobyte[io_del.address] == \ - [None, None, None, None, None, None, None, None]: + if self.__dict_iobyte[io_del.address] == [ + None, + None, + None, + None, + None, + None, + None, + None, + ]: self.__dict_iobyte[io_del.address] = [] else: self.__dict_iobyte[io_del.address].remove(io_del) @@ -110,9 +117,7 @@ class IOList(object): elif type(key) == slice: return [ self.__dict_iobyte[int_io] - for int_io in range( - key.start, key.stop, 1 if key.step is None else key.step - ) + for int_io in range(key.start, key.stop, 1 if key.step is None else key.step) ] else: return getattr(self, key) @@ -143,15 +148,10 @@ class IOList(object): def __setattr__(self, key, value): """Verbietet aus Leistungsguenden das direkte Setzen von Attributen.""" - if key in ( - "_IOList__dict_iobyte", - "_IOList__dict_iorefname" - ): + if key in ("_IOList__dict_iobyte", "_IOList__dict_iorefname"): object.__setattr__(self, key, value) else: - raise AttributeError( - "direct assignment is not supported - use .value Attribute" - ) + raise AttributeError("direct assignment is not supported - use .value Attribute") def __private_replace_oldio_with_newio(self, io) -> None: """ @@ -168,16 +168,17 @@ class IOList(object): scan_stop = scan_start + (1 if io._length == 0 else io._length) # Defaultvalue über mehrere Bytes sammeln - calc_defaultvalue = b'' + calc_defaultvalue = b"" for i in range(scan_start, scan_stop): for oldio in self.__dict_iobyte[i]: - if type(oldio) == StructIO: # Hier gibt es schon einen neuen IO if oldio._bitshift: - if io._bitshift == oldio._bitshift \ - and io._slc_address == oldio._slc_address: + if ( + io._bitshift == oldio._bitshift + and io._slc_address == oldio._slc_address + ): raise MemoryError( "bit {0} already assigned to '{1}'".format( io._bitaddress, oldio._name @@ -186,9 +187,7 @@ class IOList(object): else: # Bereits überschriebene bytes sind ungültig raise MemoryError( - "new io '{0}' overlaps memory of '{1}'".format( - io._name, oldio._name - ) + "new io '{0}' overlaps memory of '{1}'".format(io._name, oldio._name) ) elif oldio is not None: # IOs im Speicherbereich des neuen IO merken @@ -201,8 +200,7 @@ class IOList(object): if io._byteorder == "little": calc_defaultvalue += oldio._defaultvalue else: - calc_defaultvalue = \ - oldio._defaultvalue + calc_defaultvalue + calc_defaultvalue = oldio._defaultvalue + calc_defaultvalue # ios aus listen entfernen delattr(self, oldio._name) @@ -211,9 +209,7 @@ class IOList(object): # Nur bei StructIO und keiner gegebenen defaultvalue übernehmen if io._bitshift: io_byte_address = io._parentio_address - io.address - io._defaultvalue = bool( - io._parentio_defaultvalue[io_byte_address] & io._bitshift - ) + io._defaultvalue = bool(io._parentio_defaultvalue[io_byte_address] & io._bitshift) else: io._defaultvalue = calc_defaultvalue @@ -226,8 +222,7 @@ class IOList(object): if isinstance(new_io, IOBase): if hasattr(self, new_io._name): raise AttributeError( - "attribute {0} already exists - can not set io" - "".format(new_io._name) + "attribute {0} already exists - can not set io".format(new_io._name) ) if type(new_io) is StructIO: @@ -239,8 +234,16 @@ class IOList(object): if new_io._bitshift: if len(self.__dict_iobyte[new_io.address]) != 8: # "schnell" 8 Einträge erstellen da es BIT IOs sind - self.__dict_iobyte[new_io.address] += \ - [None, None, None, None, None, None, None, None] + self.__dict_iobyte[new_io.address] += [ + None, + None, + None, + None, + None, + None, + None, + None, + ] self.__dict_iobyte[new_io.address][new_io._bitaddress] = new_io else: self.__dict_iobyte[new_io.address].append(new_io) @@ -289,13 +292,26 @@ class IOBase(object): auch als verwendet werden koennen. """ - __slots__ = "__bit_ioctl_off", "__bit_ioctl_on", "_bitaddress", \ - "_bitshift", "_bitlength", "_byteorder", "_defaultvalue", \ - "_export", "_iotype", "_length", "_name", "_parentdevice", \ - "_read_only_io", "_signed", "_slc_address", "bmk" + __slots__ = ( + "__bit_ioctl_off", + "__bit_ioctl_on", + "_bitaddress", + "_bitshift", + "_bitlength", + "_byteorder", + "_defaultvalue", + "_export", + "_iotype", + "_length", + "_name", + "_parentdevice", + "_read_only_io", + "_signed", + "_slc_address", + "bmk", + ) - def __init__(self, parentdevice, valuelist: list, iotype: int, - byteorder: str, signed: bool): + def __init__(self, parentdevice, valuelist: list, iotype: int, byteorder: str, signed: bool): """ Instantiierung der IOBase-Klasse. @@ -312,8 +328,7 @@ class IOBase(object): # Bitadressen auf Bytes aufbrechen und umrechnen self._bitaddress = -1 if valuelist[7] == "" else int(valuelist[7]) % 8 - self._bitshift = None if self._bitaddress == -1 \ - else 1 << self._bitaddress + self._bitshift = None if self._bitaddress == -1 else 1 << self._bitaddress # Längenberechnung self._bitlength = int(valuelist[2]) @@ -333,9 +348,7 @@ class IOBase(object): if self._bitshift: # Höhere Bits als 7 auf nächste Bytes umbrechen int_startaddress += int(int(valuelist[7]) / 8) - self._slc_address = slice( - int_startaddress, int_startaddress + 1 - ) + self._slc_address = slice(int_startaddress, int_startaddress + 1) # Defaultvalue ermitteln, sonst False if valuelist[1] is None and type(self) == StructIO: @@ -347,14 +360,10 @@ class IOBase(object): self._defaultvalue = False # Ioctl für Bitsetzung setzen - self.__bit_ioctl_off = struct.pack( - " Nur False wenn False oder 0 sonst True """ if self._bitshift: - return bool( - self._parentdevice._ba_devdata[self._slc_address.start] - & self._bitshift - ) + return bool(self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift) else: return any(self._parentdevice._ba_devdata[self._slc_address]) @@ -406,8 +411,7 @@ class IOBase(object): # Inline get_value() if self._bitshift: return bool( - self._parentdevice._ba_devdata[self._slc_address.start] - & self._bitshift + self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift ) else: return bytes(self._parentdevice._ba_devdata[self._slc_address]) @@ -430,8 +434,9 @@ class IOBase(object): """ return self._name - def __reg_xevent(self, func, delay: int, edge: int, as_thread: bool, - overwrite: bool, prefire: bool) -> None: + def __reg_xevent( + self, func, delay: int, edge: int, as_thread: bool, overwrite: bool, prefire: bool + ) -> None: """ Verwaltet reg_event und reg_timerevent. @@ -444,26 +449,19 @@ class IOBase(object): """ # Prüfen ob Funktion callable ist if not callable(func): - raise ValueError( - "registered function '{0}' is not callable".format(func) - ) + raise ValueError("registered function '{0}' is not callable".format(func)) if type(delay) != int or delay < 0: - raise ValueError( - "'delay' must be and greater or equal 0" - ) + raise ValueError("'delay' must be and greater or equal 0") if edge != BOTH and not self._bitshift: - raise ValueError( - "parameter 'edge' can be used with bit io objects only" - ) + raise ValueError("parameter 'edge' can be used with bit io objects only") if prefire and self._parentdevice._modio._looprunning: - raise RuntimeError( - "prefire can not be used if mainloop is running" - ) + raise RuntimeError("prefire can not be used if mainloop is running") if self not in self._parentdevice._dict_events: with self._parentdevice._filelock: - self._parentdevice._dict_events[self] = \ - [IOEvent(func, edge, as_thread, delay, overwrite, prefire)] + self._parentdevice._dict_events[self] = [ + IOEvent(func, edge, as_thread, delay, overwrite, prefire) + ] else: # Prüfen ob Funktion schon registriert ist for regfunc in self._parentdevice._dict_events[self]: @@ -476,10 +474,7 @@ class IOBase(object): raise RuntimeError( "io '{0}' with function '{1}' already in list " "with edge '{2}' - edge '{3}' not allowed anymore" - "".format( - self._name, func, - consttostr(regfunc.edge), consttostr(edge) - ) + "".format(self._name, func, consttostr(regfunc.edge), consttostr(edge)) ) else: raise RuntimeError( @@ -490,9 +485,7 @@ class IOBase(object): elif regfunc.edge == edge: raise RuntimeError( "io '{0}' with function '{1}' for given edge '{2}' " - "already in list".format( - self._name, func, consttostr(edge) - ) + "already in list".format(self._name, func, consttostr(edge)) ) # Eventfunktion einfügen @@ -548,9 +541,7 @@ class IOBase(object): if self._bitshift: # Write single bit to process image - value = \ - self._parentdevice._ba_devdata[self._slc_address.start] & \ - self._bitshift + value = self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift if self._parentdevice._modio._run_on_pi: # IOCTL auf dem RevPi with self._parentdevice._modio._myfh_lck: @@ -559,8 +550,7 @@ class IOBase(object): ioctl( self._parentdevice._modio._myfh, 19216, - self.__bit_ioctl_on if value - else self.__bit_ioctl_off + self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: self._parentdevice._modio._gotioerror("ioset", e) @@ -572,12 +562,10 @@ class IOBase(object): try: self._parentdevice._modio._myfh.ioctl( 19216, - self.__bit_ioctl_on if value - else self.__bit_ioctl_off + self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: - self._parentdevice._modio._gotioerror( - "net_ioset", e) + self._parentdevice._modio._gotioerror("net_ioset", e) return False else: @@ -586,8 +574,7 @@ class IOBase(object): # Set value durchführen (Funktion K+16) self._parentdevice._modio._simulate_ioctl( 19216, - self.__bit_ioctl_on if value - else self.__bit_ioctl_off + self.__bit_ioctl_on if value else self.__bit_ioctl_off, ) except Exception as e: self._parentdevice._modio._gotioerror("file_ioset", e) @@ -598,9 +585,7 @@ class IOBase(object): value = bytes(self._parentdevice._ba_devdata[self._slc_address]) with self._parentdevice._modio._myfh_lck: try: - self._parentdevice._modio._myfh.seek( - self._get_address() - ) + self._parentdevice._modio._myfh.seek(self._get_address()) self._parentdevice._modio._myfh.write(value) if self._parentdevice._modio._buffedwrite: self._parentdevice._modio._myfh.flush() @@ -625,15 +610,11 @@ class IOBase(object): :return: IO-Wert als oder """ if self._bitshift: - return bool( - self._parentdevice._ba_devdata[self._slc_address.start] - & self._bitshift - ) + return bool(self._parentdevice._ba_devdata[self._slc_address.start] & self._bitshift) else: return bytes(self._parentdevice._ba_devdata[self._slc_address]) - def reg_event( - self, func, delay=0, edge=BOTH, as_thread=False, prefire=False): + def reg_event(self, func, delay=0, edge=BOTH, as_thread=False, prefire=False): """ Registriert fuer IO ein Event bei der Eventueberwachung. @@ -652,8 +633,7 @@ class IOBase(object): """ self.__reg_xevent(func, delay, edge, as_thread, True, prefire) - def reg_timerevent( - self, func, delay, edge=BOTH, as_thread=False, prefire=False): + def reg_timerevent(self, func, delay, edge=BOTH, as_thread=False, prefire=False): """ Registriert fuer IO einen Timer, welcher nach delay func ausfuehrt. @@ -685,20 +665,13 @@ class IOBase(object): if self._iotype == INP: if self._parentdevice._modio._simulator: raise RuntimeError( - "can not write to output '{0}' in simulator mode" - "".format(self._name) + "can not write to output '{0}' in simulator mode".format(self._name) ) else: - raise RuntimeError( - "can not write to input '{0}'".format(self._name) - ) + raise RuntimeError("can not write to input '{0}'".format(self._name)) elif self._iotype == MEM: - raise RuntimeError( - "can not write to memory '{0}'".format(self._name) - ) - raise RuntimeError( - "the io object '{0}' is read only".format(self._name) - ) + raise RuntimeError("can not write to memory '{0}'".format(self._name)) + raise RuntimeError("the io object '{0}' is read only".format(self._name)) if self._bitshift: # Versuchen egal welchen Typ in Bool zu konvertieren @@ -722,8 +695,7 @@ class IOBase(object): int_byte -= self._bitshift # Zurückschreiben wenn verändert - self._parentdevice._ba_devdata[self._slc_address.start] = \ - int_byte + self._parentdevice._ba_devdata[self._slc_address.start] = int_byte self._parentdevice._filelock.release() @@ -738,9 +710,7 @@ class IOBase(object): if self._length != len(value): raise ValueError( "'{0}' requires a object of " - "length {1}, but {2} was given".format( - self._name, self._length, len(value) - ) + "length {1}, but {2} was given".format(self._name, self._length, len(value)) ) if self._parentdevice._shared_procimg: @@ -764,8 +734,7 @@ class IOBase(object): else: newlist = [] for regfunc in self._parentdevice._dict_events[self]: - if regfunc.func != func or edge is not None \ - and regfunc.edge != edge: + if regfunc.func != func or edge is not None and regfunc.edge != edge: newlist.append(regfunc) # Wenn Funktionen übrig bleiben, diese übernehmen @@ -832,17 +801,11 @@ class IOBase(object): "revpimodio2.FALLING or revpimodio2.BOTH" ) if not (exitevent is None or type(exitevent) == Event): - raise TypeError( - "parameter 'exitevent' must be " - ) + raise TypeError("parameter 'exitevent' must be ") if type(timeout) != int or timeout < 0: - raise ValueError( - "parameter 'timeout' must be and greater than 0" - ) + raise ValueError("parameter 'timeout' must be and greater than 0") if edge != BOTH and not self._bitshift: - raise ValueError( - "parameter 'edge' can be used with bit Inputs only" - ) + raise ValueError("parameter 'edge' can be used with bit Inputs only") # Abbruchwert prüfen if okvalue == self.value: @@ -858,23 +821,27 @@ class IOBase(object): exitevent = Event() flt_timecount = 0 if bool_timecount else -1 - while not self._parentdevice._modio._waitexit.is_set() \ - and not exitevent.is_set() \ - and flt_timecount < timeout: - + while ( + not self._parentdevice._modio._waitexit.is_set() + and not exitevent.is_set() + and flt_timecount < timeout + ): if self._parentdevice._modio._imgwriter.newdata.wait(2.5): self._parentdevice._modio._imgwriter.newdata.clear() if val_start != self.value: - if edge == BOTH \ - or edge == RISING and not val_start \ - or edge == FALLING and val_start: + if ( + edge == BOTH + or edge == RISING + and not val_start + or edge == FALLING + and val_start + ): return 0 else: val_start = not val_start if bool_timecount: - flt_timecount += \ - self._parentdevice._modio._imgwriter._refresh + flt_timecount += self._parentdevice._modio._imgwriter._refresh elif bool_timecount: flt_timecount += 2.5 @@ -922,7 +889,7 @@ class IntIO(IOBase): return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, - signed=self._signed + signed=self._signed, ) def __call__(self, value=None): @@ -931,16 +898,18 @@ class IntIO(IOBase): return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, - signed=self._signed + signed=self._signed, ) else: # Inline from set_intvalue() if type(value) == int: - self.set_value(value.to_bytes( - self._length, - byteorder=self._byteorder, - signed=self._signed - )) + self.set_value( + value.to_bytes( + self._length, + byteorder=self._byteorder, + signed=self._signed, + ) + ) else: raise TypeError( "'{0}' need a value, but {1} was given" @@ -983,9 +952,7 @@ class IntIO(IOBase): :return: Defaultvalue """ - return int.from_bytes( - self._defaultvalue, byteorder=self._byteorder, signed=self._signed - ) + return int.from_bytes(self._defaultvalue, byteorder=self._byteorder, signed=self._signed) def get_intvalue(self) -> int: """ @@ -996,7 +963,7 @@ class IntIO(IOBase): return int.from_bytes( self._parentdevice._ba_devdata[self._slc_address], byteorder=self._byteorder, - signed=self._signed + signed=self._signed, ) def set_intvalue(self, value: int) -> None: @@ -1006,11 +973,13 @@ class IntIO(IOBase): :param value: Wert """ if type(value) == int: - self.set_value(value.to_bytes( - self._length, - byteorder=self._byteorder, - signed=self._signed - )) + self.set_value( + value.to_bytes( + self._length, + byteorder=self._byteorder, + signed=self._signed, + ) + ) else: raise TypeError( "'{0}' need a value, but {1} was given" @@ -1028,9 +997,7 @@ class IntIOCounter(IntIO): __slots__ = ("__ioctl_arg",) - def __init__( - self, counter_id, - parentdevice, valuelist, iotype, byteorder, signed): + def __init__(self, counter_id, parentdevice, valuelist, iotype, byteorder, signed): """ Instantiierung der IntIOCounter-Klasse. @@ -1044,10 +1011,11 @@ class IntIOCounter(IntIO): # Deviceposition + leer + Counter_ID # ID-Bits: 7|6|5|4|3|2|1|0|15|14|13|12|11|10|9|8 - self.__ioctl_arg = \ - parentdevice._position.to_bytes(1, "little") \ - + b'\x00' \ + self.__ioctl_arg = ( + parentdevice._position.to_bytes(1, "little") + + b"\x00" + (1 << counter_id).to_bytes(2, "little") + ) """ IOCTL fuellt dieses struct, welches durch padding im Speicher nach @@ -1067,23 +1035,16 @@ class IntIOCounter(IntIO): def reset(self) -> None: """Setzt den Counter des Inputs zurueck.""" if self._parentdevice._modio._monitoring: - raise RuntimeError( - "can not reset counter, while system is in monitoring mode" - ) + raise RuntimeError("can not reset counter, while system is in monitoring mode") if self._parentdevice._modio._simulator: - raise RuntimeError( - "can not reset counter, while system is in simulator mode" - ) + raise RuntimeError("can not reset counter, while system is in simulator mode") if self._parentdevice._modio._run_on_pi: # IOCTL auf dem RevPi with self._parentdevice._modio._myfh_lck: try: # Counter reset durchführen (Funktion K+20) - ioctl( - self._parentdevice._modio._myfh, - 19220, self.__ioctl_arg - ) + ioctl(self._parentdevice._modio._myfh, 19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("iorst", e) @@ -1091,9 +1052,7 @@ class IntIOCounter(IntIO): # IOCTL über Netzwerk with self._parentdevice._modio._myfh_lck: try: - self._parentdevice._modio._myfh.ioctl( - 19220, self.__ioctl_arg - ) + self._parentdevice._modio._myfh.ioctl(19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("net_iorst", e) @@ -1101,9 +1060,7 @@ class IntIOCounter(IntIO): # IOCTL in Datei simulieren try: # Set value durchführen (Funktion K+20) - self._parentdevice._modio._simulate_ioctl( - 19220, self.__ioctl_arg - ) + self._parentdevice._modio._simulate_ioctl(19220, self.__ioctl_arg) except Exception as e: self._parentdevice._modio._gotioerror("file_iorst", e) @@ -1153,12 +1110,7 @@ class IntIOReplaceable(IntIO): ``_ """ # StructIO erzeugen - io_new = StructIO( - self, - name, - frm, - **kwargs - ) + io_new = StructIO(self, name, frm, **kwargs) # StructIO in IO-Liste einfügen self._parentdevice._modio.io._private_register_new_io_object(io_new) @@ -1170,7 +1122,7 @@ class IntIOReplaceable(IntIO): reg_event, kwargs.get("delay", 0), kwargs.get("edge", BOTH), - kwargs.get("as_thread", False) + kwargs.get("as_thread", False), ) @@ -1182,8 +1134,14 @@ class StructIO(IOBase): bereit. Der struct-Formatwert wird bei der Instantiierung festgelegt. """ - __slots__ = "__frm", "_parentio_address", "_parentio_defaultvalue", \ - "_parentio_length", "_parentio_name", "_wordorder" + __slots__ = ( + "__frm", + "_parentio_address", + "_parentio_defaultvalue", + "_parentio_length", + "_parentio_name", + "_wordorder", + ) def __init__(self, parentio, name: str, frm: str, **kwargs): """ @@ -1215,15 +1173,12 @@ class StructIO(IOBase): if frm == "?": if self._wordorder: - raise ValueError( - "you can not use wordorder for bit based ios" - ) + raise ValueError("you can not use wordorder for bit based ios") bitaddress = kwargs.get("bit", 0) max_bits = parentio._length * 8 if not (0 <= bitaddress < max_bits): raise ValueError( - "bitaddress must be a value between 0 and {0}" - "".format(max_bits - 1) + "bitaddress must be a value between 0 and {0}".format(max_bits - 1) ) bitlength = 1 @@ -1246,8 +1201,7 @@ class StructIO(IOBase): raise ValueError("wordorder must be 'little' or 'big'") if byte_length % 2 != 0: raise ValueError( - "the byte length of new io must must be even to " - "use wordorder" + "the byte length of new io must must be even to use wordorder" ) # [name,default,anzbits,adressbyte,export,adressid,bmk,bitaddress] @@ -1260,7 +1214,7 @@ class StructIO(IOBase): False, str(parentio._slc_address.start).rjust(4, "0"), kwargs.get("bmk", ""), - bitaddress + bitaddress, ] else: raise ValueError( @@ -1270,11 +1224,7 @@ class StructIO(IOBase): # Basisklasse instantiieren super().__init__( - parentio._parentdevice, - valuelist, - parentio._iotype, - byteorder, - frm == frm.lower() + parentio._parentdevice, valuelist, parentio._iotype, byteorder, frm == frm.lower() ) self.__frm = bofrm + frm if "export" in kwargs: @@ -1286,13 +1236,11 @@ class StructIO(IOBase): self._export = parentio._export # Platz für neuen IO prüfen - if not (self._slc_address.start >= - parentio._parentdevice._dict_slc[parentio._iotype].start and - self._slc_address.stop <= - parentio._parentdevice._dict_slc[parentio._iotype].stop): - raise BufferError( - "registered value does not fit process image scope" - ) + if not ( + self._slc_address.start >= parentio._parentdevice._dict_slc[parentio._iotype].start + and self._slc_address.stop <= parentio._parentdevice._dict_slc[parentio._iotype].stop + ): + raise BufferError("registered value does not fit process image scope") def __call__(self, value=None): if value is None: @@ -1310,9 +1258,7 @@ class StructIO(IOBase): if self._bitshift: self.set_value(value) elif self._wordorder == "little" and self._length > 2: - self.set_value( - self._swap_word_order(struct.pack(self.__frm, value)) - ) + self.set_value(self._swap_word_order(struct.pack(self.__frm, value))) else: self.set_value(struct.pack(self.__frm, value)) @@ -1343,8 +1289,10 @@ class StructIO(IOBase): array_length = len(bytes_to_swap) swap_array = bytearray(bytes_to_swap) for i in range(0, array_length // 2, 2): - swap_array[-i - 2:array_length - i], swap_array[i:i + 2] = \ - swap_array[i:i + 2], swap_array[-i - 2:array_length - i] + swap_array[-i - 2 : array_length - i], swap_array[i : i + 2] = ( + swap_array[i : i + 2], + swap_array[-i - 2 : array_length - i], + ) return bytes(swap_array) def get_structdefaultvalue(self): @@ -1394,9 +1342,7 @@ class StructIO(IOBase): if self._bitshift: self.set_value(value) elif self._wordorder == "little" and self._length > 2: - self.set_value( - self._swap_word_order(struct.pack(self.__frm, value)) - ) + self.set_value(self._swap_word_order(struct.pack(self.__frm, value))) else: self.set_value(struct.pack(self.__frm, value)) @@ -1422,7 +1368,7 @@ class MemIO(IOBase): if self._bitlength > 64: # STRING try: - val = val.strip(b'\x00').decode() + val = val.strip(b"\x00").decode() except Exception: pass return val diff --git a/src/revpimodio2/modio.py b/src/revpimodio2/modio.py index fbe7bfb..2484a0f 100644 --- a/src/revpimodio2/modio.py +++ b/src/revpimodio2/modio.py @@ -31,10 +31,10 @@ class DevSelect: __slots__ = "type", "other_device_key", "values" def __init__( - self, - device_type=DeviceType.IGNORED, - search_key: str = None, - search_values=(), + self, + device_type=DeviceType.IGNORED, + search_key: str = None, + search_values=(), ): """ Create a customized search filter for RevPiModIOSelected search. @@ -65,20 +65,55 @@ class RevPiModIO(object): Device Positionen oder Device Namen. """ - __slots__ = "__cleanupfunc", \ - "_autorefresh", "_buffedwrite", "_configrsc", "_debug", "_devselect", \ - "_exit", "_exit_level", "_imgwriter", "_ioerror", \ - "_length", "_looprunning", "_lst_devselect", "_lst_refresh", \ - "_maxioerrors", "_monitoring", "_myfh", "_myfh_lck", \ - "_procimg", "_replace_io_file", "_run_on_pi", \ - "_set_device_based_cycle_time", "_simulator", "_init_shared_procimg", \ - "_syncoutputs", "_th_mainloop", "_waitexit", \ - "app", "core", "device", "exitsignal", "io", "summary" + __slots__ = ( + "__cleanupfunc", + "_autorefresh", + "_buffedwrite", + "_configrsc", + "_debug", + "_devselect", + "_exit", + "_exit_level", + "_imgwriter", + "_ioerror", + "_length", + "_looprunning", + "_lst_devselect", + "_lst_refresh", + "_maxioerrors", + "_monitoring", + "_myfh", + "_myfh_lck", + "_procimg", + "_replace_io_file", + "_run_on_pi", + "_set_device_based_cycle_time", + "_simulator", + "_init_shared_procimg", + "_syncoutputs", + "_th_mainloop", + "_waitexit", + "app", + "core", + "device", + "exitsignal", + "io", + "summary", + ) def __init__( - self, autorefresh=False, monitoring=False, syncoutputs=True, - procimg=None, configrsc=None, simulator=False, debug=True, - replace_io_file=None, shared_procimg=False, direct_output=False): + self, + autorefresh=False, + monitoring=False, + syncoutputs=True, + procimg=None, + configrsc=None, + simulator=False, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert die Grundfunktionen. @@ -96,20 +131,27 @@ class RevPiModIO(object): """ # Parameterprüfung acheck( - bool, autorefresh=autorefresh, monitoring=monitoring, - syncoutputs=syncoutputs, simulator=simulator, debug=debug, - shared_procimg=shared_procimg, direct_output=direct_output + bool, + autorefresh=autorefresh, + monitoring=monitoring, + syncoutputs=syncoutputs, + simulator=simulator, + debug=debug, + shared_procimg=shared_procimg, + direct_output=direct_output, ) acheck( - str, procimg_noneok=procimg, configrsc_noneok=configrsc, - replace_io_file_noneok=replace_io_file + str, + procimg_noneok=procimg, + configrsc_noneok=configrsc, + replace_io_file_noneok=replace_io_file, ) # TODO: Remove in next release if direct_output: - warnings.warn(DeprecationWarning( - "direct_output is deprecated - use shared_procimg instead!" - )) + warnings.warn( + DeprecationWarning("direct_output is deprecated - use shared_procimg instead!") + ) self._autorefresh = autorefresh self._configrsc = configrsc @@ -233,13 +275,11 @@ class RevPiModIO(object): # Apply device filter if self._devselect.values: - # Check for supported types in values for dev in self._devselect.values: if type(dev) not in (int, str): raise ValueError( - "need device position as or " - "device name as " + "need device position as or device name as " ) lst_devices = [] @@ -253,8 +293,10 @@ class RevPiModIO(object): continue else: # Auto search depending of value item type - if not (dev["name"] in self._devselect.values - or int(dev["position"]) in self._devselect.values): + if not ( + dev["name"] in self._devselect.values + or int(dev["position"]) in self._devselect.values + ): continue lst_devices.append(dev) @@ -269,7 +311,6 @@ class RevPiModIO(object): # Devices initialisieren err_names_check = {} for device in sorted(lst_devices, key=lambda x: x["offset"]): - # VDev alter piCtory Versionen auf KUNBUS-Standard ändern if device["position"] == "adap.": device["position"] = 64 @@ -281,64 +322,42 @@ class RevPiModIO(object): pt = int(device["productType"]) if pt == ProductType.REVPI_CORE: # RevPi Core - dev_new = devicemodule.Core( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Core(self, device, simulator=self._simulator) self.core = dev_new elif pt == ProductType.REVPI_CONNECT: # RevPi Connect - dev_new = devicemodule.Connect( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Connect(self, device, simulator=self._simulator) self.core = dev_new elif pt == ProductType.REVPI_CONNECT_4: # RevPi Connect 4 - dev_new = devicemodule.Connect4( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Connect4(self, device, simulator=self._simulator) self.core = dev_new elif pt == ProductType.REVPI_COMPACT: # RevPi Compact - dev_new = devicemodule.Compact( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Compact(self, device, simulator=self._simulator) self.core = dev_new elif pt == ProductType.REVPI_FLAT: # RevPi Flat - dev_new = devicemodule.Flat( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Flat(self, device, simulator=self._simulator) self.core = dev_new else: # Base immer als Fallback verwenden - dev_new = devicemodule.Base( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Base(self, device, simulator=self._simulator) elif device["type"] == DeviceType.LEFT_RIGHT: # IOs pt = int(device["productType"]) - if pt == ProductType.DIO \ - or pt == ProductType.DI \ - or pt == ProductType.DO: + if pt == ProductType.DIO or pt == ProductType.DI or pt == ProductType.DO: # DIO / DI / DO - dev_new = devicemodule.DioModule( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.DioModule(self, device, simulator=self._simulator) else: # Alle anderen IO-Devices - dev_new = devicemodule.Device( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Device(self, device, simulator=self._simulator) elif device["type"] == DeviceType.VIRTUAL: # Virtuals - dev_new = devicemodule.Virtual( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Virtual(self, device, simulator=self._simulator) elif device["type"] == DeviceType.EDGE: # Gateways - dev_new = devicemodule.Gateway( - self, device, simulator=self._simulator - ) + dev_new = devicemodule.Gateway(self, device, simulator=self._simulator) elif device["type"] == DeviceType.RIGHT: # Connectdevice dev_new = None @@ -347,7 +366,7 @@ class RevPiModIO(object): warnings.warn( "device type '{0}' on position {1} unknown" "".format(device["type"], device["position"]), - Warning + Warning, ) dev_new = None @@ -378,7 +397,7 @@ class RevPiModIO(object): "equal device name '{0}' in pictory configuration. you can " "access this devices by position number .device[{1}] only!" "".format(check_dev, "|".join(err_names_check[check_dev])), - Warning + Warning, ) # ImgWriter erstellen @@ -393,17 +412,12 @@ class RevPiModIO(object): self.syncoutputs() # Für RS485 errors am core defaults laden sollte procimg NULL sein - if isinstance(self.core, devicemodule.Core) and \ - not (self._monitoring or self._simulator): + if isinstance(self.core, devicemodule.Core) and not (self._monitoring or self._simulator): if self.core._slc_errorlimit1 is not None: - io = self.io[ - self.core.offset + self.core._slc_errorlimit1.start - ][0] + io = self.io[self.core.offset + self.core._slc_errorlimit1.start][0] io.set_value(io._defaultvalue) if self.core._slc_errorlimit2 is not None: - io = self.io[ - self.core.offset + self.core._slc_errorlimit2.start - ][0] + io = self.io[self.core.offset + self.core._slc_errorlimit2.start][0] io.set_value(io._defaultvalue) # RS485 errors schreiben @@ -471,8 +485,7 @@ class RevPiModIO(object): if "defaultvalue" in creplaceio[io]: if dict_replace["frm"] == "?": try: - dict_replace["defaultvalue"] = \ - creplaceio[io].getboolean("defaultvalue") + dict_replace["defaultvalue"] = creplaceio[io].getboolean("defaultvalue") except Exception: raise ValueError( "replace_io_file: could not convert '{0}' " @@ -494,8 +507,7 @@ class RevPiModIO(object): ) else: try: - dict_replace["defaultvalue"] = \ - creplaceio[io].getint("defaultvalue") + dict_replace["defaultvalue"] = creplaceio[io].getint("defaultvalue") except Exception: raise ValueError( "replace_io_file: could not convert '{0}' " @@ -634,23 +646,19 @@ class RevPiModIO(object): self._ioerror += 1 if self._maxioerrors != 0 and self._ioerror >= self._maxioerrors: raise RuntimeError( - "reach max io error count {0} on process image" - "".format(self._maxioerrors) + "reach max io error count {0} on process image".format(self._maxioerrors) ) if not show_warn or self._debug == -1: return if self._debug == 0: - warnings.warn( - "got io error on process image", - RuntimeWarning - ) + warnings.warn("got io error on process image", RuntimeWarning) else: warnings.warn( "got io error during '{0}' and count {1} errors now | {2}" "".format(action, self._ioerror, str(e)), - RuntimeWarning + RuntimeWarning, ) def _set_cycletime(self, milliseconds: int) -> None: @@ -660,10 +668,7 @@ class RevPiModIO(object): :param milliseconds: in Millisekunden """ if self._looprunning: - raise RuntimeError( - "can not change cycletime when cycleloop or mainloop is " - "running" - ) + raise RuntimeError("can not change cycletime when cycleloop or mainloop is running") else: self._imgwriter.refresh = milliseconds @@ -701,7 +706,7 @@ class RevPiModIO(object): else: raise ValueError("value must be 0 or a positive integer") - def _simulate_ioctl(self, request: int, arg=b'') -> None: + def _simulate_ioctl(self, request: int, arg=b"") -> None: """ Simuliert IOCTL Funktionen auf procimg Datei. @@ -717,9 +722,7 @@ class RevPiModIO(object): # Simulatonsmodus schreibt direkt in Datei with self._myfh_lck: self._myfh.seek(byte_address) - int_byte = int.from_bytes( - self._myfh.read(1), byteorder="little" - ) + int_byte = int.from_bytes(self._myfh.read(1), byteorder="little") int_bit = 1 << bit_address if not bool(int_byte & int_bit) == new_value: @@ -741,8 +744,9 @@ class RevPiModIO(object): for i in range(16): if bool(bit_field & 1 << i): - io_byte = self.device[dev_position].offset \ - + int(self.device[dev_position]._lst_counter[i]) + io_byte = self.device[dev_position].offset + int( + self.device[dev_position]._lst_counter[i] + ) break if io_byte == -1: @@ -750,7 +754,7 @@ class RevPiModIO(object): with self._myfh_lck: self._myfh.seek(io_byte) - self._myfh.write(b'\x00\x00\x00\x00') + self._myfh.write(b"\x00\x00\x00\x00") if self._buffedwrite: self._myfh.flush() @@ -796,9 +800,7 @@ class RevPiModIO(object): """ # Prüfen ob ein Loop bereits läuft if self._looprunning: - raise RuntimeError( - "can not start multiple loops mainloop/cycleloop" - ) + raise RuntimeError("can not start multiple loops mainloop/cycleloop") # Prüfen ob Devices in autorefresh sind if len(self._lst_refresh) == 0: @@ -809,16 +811,14 @@ class RevPiModIO(object): # Prüfen ob Funktion callable ist if not callable(func): - raise RuntimeError( - "registered function '{0}' ist not callable".format(func) - ) + raise RuntimeError("registered function '{0}' ist not callable".format(func)) # Thread erstellen, wenn nicht blockieren soll if not blocking: self._th_mainloop = Thread( target=self.cycleloop, args=(func,), - kwargs={"cycletime": cycletime, "blocking": True} + kwargs={"cycletime": cycletime, "blocking": True}, ) self._th_mainloop.start() return @@ -851,9 +851,9 @@ class RevPiModIO(object): break # Just warn, user has to use maxioerrors to kill program - warnings.warn(RuntimeWarning( - "no new io data in cycle loop for 2500 milliseconds" - )) + warnings.warn( + RuntimeWarning("no new io data in cycle loop for 2500 milliseconds") + ) cycleinfo.last = self._exit.is_set() continue @@ -942,7 +942,6 @@ class RevPiModIO(object): cp = ConfigParser() for io in self.io: if isinstance(io, StructIO): - # Required values cp.add_section(io.name) cp[io.name]["replace"] = io._parentio_name @@ -958,8 +957,7 @@ class RevPiModIO(object): if type(io.defaultvalue) is bytes: if any(io.defaultvalue): # Convert each byte to an integer - cp[io.name]["defaultvalue"] = \ - " ".join(map(str, io.defaultvalue)) + cp[io.name]["defaultvalue"] = " ".join(map(str, io.defaultvalue)) elif io.defaultvalue != 0: cp[io.name]["defaultvalue"] = str(io.defaultvalue) if io.bmk != "": @@ -971,10 +969,7 @@ class RevPiModIO(object): with open(filename, "w") as fh: cp.write(fh) except Exception as e: - raise RuntimeError( - "could not write export file '{0}' | {1}" - "".format(filename, e) - ) + raise RuntimeError("could not write export file '{0}' | {1}".format(filename, e)) def get_jconfigrsc(self) -> dict: """ @@ -986,8 +981,8 @@ class RevPiModIO(object): if self._configrsc is not None: if not access(self._configrsc, F_OK | R_OK): raise RuntimeError( - "can not access pictory configuration at {0}".format( - self._configrsc)) + "can not access pictory configuration at {0}".format(self._configrsc) + ) else: # piCtory Konfiguration an bekannten Stellen prüfen lst_rsc = ["/etc/revpi/config.rsc", "/opt/KUNBUS/config.rsc"] @@ -1035,10 +1030,7 @@ class RevPiModIO(object): """ # Prüfen ob Funktion callable ist if not (cleanupfunc is None or callable(cleanupfunc)): - raise RuntimeError( - "registered function '{0}' ist not callable" - "".format(cleanupfunc) - ) + raise RuntimeError("registered function '{0}' ist not callable".format(cleanupfunc)) self.__cleanupfunc = cleanupfunc signal(SIGINT, self.__evt_exit) signal(SIGTERM, self.__evt_exit) @@ -1063,9 +1055,7 @@ class RevPiModIO(object): """ # Prüfen ob ein Loop bereits läuft if self._looprunning: - raise RuntimeError( - "can not start multiple loops mainloop/cycleloop" - ) + raise RuntimeError("can not start multiple loops mainloop/cycleloop") # Prüfen ob Devices in autorefresh sind if len(self._lst_refresh) == 0: @@ -1076,9 +1066,7 @@ class RevPiModIO(object): # Thread erstellen, wenn nicht blockieren soll if not blocking: - self._th_mainloop = Thread( - target=self.mainloop, kwargs={"blocking": True} - ) + self._th_mainloop = Thread(target=self.mainloop, kwargs={"blocking": True}) self._th_mainloop.start() return @@ -1100,17 +1088,17 @@ class RevPiModIO(object): if not regfunc.prefire: continue - if regfunc.edge == BOTH \ - or regfunc.edge == RISING and io.value \ - or regfunc.edge == FALLING and not io.value: + if ( + regfunc.edge == BOTH + or regfunc.edge == RISING + and io.value + or regfunc.edge == FALLING + and not io.value + ): if regfunc.as_thread: - self._imgwriter._eventqth.put( - (regfunc, io._name, io.value), False - ) + self._imgwriter._eventqth.put((regfunc, io._name, io.value), False) else: - self._imgwriter._eventq.put( - (regfunc, io._name, io.value), False - ) + self._imgwriter._eventq.put((regfunc, io._name, io.value), False) # ImgWriter mit Eventüberwachung aktivieren self._imgwriter._collect_events(True) @@ -1118,7 +1106,6 @@ class RevPiModIO(object): runtime = -1 if self._debug == -1 else 0 while not self._exit.is_set(): - # Laufzeit der Eventqueue auf 0 setzen if self._imgwriter._eventq.qsize() == 0: runtime = -1 if self._debug == -1 else 0 @@ -1135,13 +1122,12 @@ class RevPiModIO(object): self._imgwriter._eventq.task_done() # Laufzeitprüfung - if runtime != -1 and \ - default_timer() - runtime > self._imgwriter._refresh: + if runtime != -1 and default_timer() - runtime > self._imgwriter._refresh: runtime = -1 warnings.warn( "can not execute all event functions in one cycle - " "optimize your event functions or rise .cycletime", - RuntimeWarning + RuntimeWarning, ) except Empty: if not self._exit.is_set() and not self._imgwriter.is_alive(): @@ -1177,8 +1163,11 @@ class RevPiModIO(object): if device is None: mylist = self.device else: - dev = device if isinstance(device, devicemodule.Device) \ + dev = ( + device + if isinstance(device, devicemodule.Device) else self.device.__getitem__(device) + ) if dev._selfupdate: raise RuntimeError( @@ -1200,7 +1189,6 @@ class RevPiModIO(object): for dev in mylist: if not dev._selfupdate: - # FileHandler sperren dev._filelock.acquire() @@ -1226,16 +1214,16 @@ class RevPiModIO(object): :param device: nur auf einzelnes Device anwenden """ if self._monitoring: - raise RuntimeError( - "can not set default values, while system is in monitoring " - "mode" - ) + raise RuntimeError("can not set default values, while system is in monitoring mode") if device is None: mylist = self.device else: - dev = device if isinstance(device, devicemodule.Device) \ + dev = ( + device + if isinstance(device, devicemodule.Device) else self.device.__getitem__(device) + ) mylist = [dev] for dev in mylist: @@ -1254,8 +1242,11 @@ class RevPiModIO(object): if device is None: mylist = self.device else: - dev = device if isinstance(device, devicemodule.Device) \ + dev = ( + device + if isinstance(device, devicemodule.Device) else self.device.__getitem__(device) + ) if dev._selfupdate: raise RuntimeError( @@ -1292,16 +1283,16 @@ class RevPiModIO(object): :return: True, wenn Arbeiten an allen Devices erfolgreich waren """ if self._monitoring: - raise RuntimeError( - "can not write process image, while system is in monitoring " - "mode" - ) + raise RuntimeError("can not write process image, while system is in monitoring mode") if device is None: mylist = self.device else: - dev = device if isinstance(device, devicemodule.Device) \ + dev = ( + device + if isinstance(device, devicemodule.Device) else self.device.__getitem__(device) + ) if dev._selfupdate: raise RuntimeError( @@ -1321,9 +1312,7 @@ class RevPiModIO(object): if dev._shared_procimg: for io in dev._shared_write: if not io._write_to_procimg(): - global_ex = IOError( - "error on shared procimg while write" - ) + global_ex = IOError("error on shared procimg while write") dev._shared_write.clear() else: # Outpus auf Bus schreiben @@ -1375,10 +1364,19 @@ class RevPiModIOSelected(RevPiModIO): __slots__ = () def __init__( - self, deviceselection, autorefresh=False, monitoring=False, - syncoutputs=True, procimg=None, configrsc=None, - simulator=False, debug=True, replace_io_file=None, - shared_procimg=False, direct_output=False): + self, + deviceselection, + autorefresh=False, + monitoring=False, + syncoutputs=True, + procimg=None, + configrsc=None, + simulator=False, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert nur fuer angegebene Devices die Grundfunktionen. @@ -1390,8 +1388,16 @@ class RevPiModIOSelected(RevPiModIO): :ref: :func:`RevPiModIO.__init__(...)` """ super().__init__( - autorefresh, monitoring, syncoutputs, procimg, configrsc, - simulator, debug, replace_io_file, shared_procimg, direct_output + autorefresh, + monitoring, + syncoutputs, + procimg, + configrsc, + simulator, + debug, + replace_io_file, + shared_procimg, + direct_output, ) if type(deviceselection) is not DevSelect: @@ -1410,24 +1416,19 @@ class RevPiModIOSelected(RevPiModIO): if len(self.device) == 0: if self._devselect.type: raise DeviceNotFoundError( - "could not find ANY given {0} devices in config" - "".format(self._devselect.type) + "could not find ANY given {0} devices in config".format(self._devselect.type) ) else: - raise DeviceNotFoundError( - "could not find ANY given devices in config" - ) - elif not self._devselect.other_device_key \ - and len(self.device) != len(self._devselect.values): + raise DeviceNotFoundError("could not find ANY given devices in config") + elif not self._devselect.other_device_key and len(self.device) != len( + self._devselect.values + ): if self._devselect.type: raise DeviceNotFoundError( - "could not find ALL given {0} devices in config" - "".format(self._devselect.type) + "could not find ALL given {0} devices in config".format(self._devselect.type) ) else: - raise DeviceNotFoundError( - "could not find ALL given devices in config" - ) + raise DeviceNotFoundError("could not find ALL given devices in config") class RevPiModIODriver(RevPiModIOSelected): @@ -1443,9 +1444,17 @@ class RevPiModIODriver(RevPiModIOSelected): __slots__ = () def __init__( - self, virtdev, autorefresh=False, - syncoutputs=True, procimg=None, configrsc=None, debug=True, - replace_io_file=None, shared_procimg=False, direct_output=False): + self, + virtdev, + autorefresh=False, + syncoutputs=True, + procimg=None, + configrsc=None, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert die Grundfunktionen. @@ -1460,14 +1469,21 @@ class RevPiModIODriver(RevPiModIOSelected): virtdev = (virtdev,) dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev) super().__init__( - dev_select, autorefresh, False, syncoutputs, procimg, configrsc, - True, debug, replace_io_file, shared_procimg, direct_output + dev_select, + autorefresh, + False, + syncoutputs, + procimg, + configrsc, + True, + debug, + replace_io_file, + shared_procimg, + direct_output, ) -def run_plc( - func, cycletime=50, replace_io_file=None, debug=True, - procimg=None, configrsc=None): +def run_plc(func, cycletime=50, replace_io_file=None, debug=True, procimg=None, configrsc=None): """ Run Revoluton Pi as real plc with cycle loop and exclusive IO access. diff --git a/src/revpimodio2/netio.py b/src/revpimodio2/netio.py index 1007a40..2314ac7 100644 --- a/src/revpimodio2/netio.py +++ b/src/revpimodio2/netio.py @@ -18,22 +18,22 @@ from .modio import DevSelect, RevPiModIO as _RevPiModIO from .pictory import DeviceType # Synchronisierungsbefehl -_syssync = b'\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' +_syssync = b"\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" # Disconnectbefehl -_sysexit = b'\x01EX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' +_sysexit = b"\x01EX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" # DirtyBytes von Server entfernen -_sysdeldirty = b'\x01EY\x00\x00\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x17' +_sysdeldirty = b"\x01EY\x00\x00\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x17" # piCtory Konfiguration laden -_syspictory = b'\x01PI\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' -_syspictoryh = b'\x01PH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' +_syspictory = b"\x01PI\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" +_syspictoryh = b"\x01PH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" # ReplaceIO Konfiguration laden -_sysreplaceio = b'\x01RP\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' -_sysreplaceioh = b'\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17' +_sysreplaceio = b"\x01RP\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" +_sysreplaceioh = b"\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" # Hashvalues -HASH_FAIL = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' +HASH_FAIL = b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff" # Header start/stop -HEADER_START = b'\x01' -HEADER_STOP = b'\x17' +HEADER_START = b"\x01" +HEADER_STOP = b"\x17" class AclException(Exception): @@ -57,12 +57,28 @@ class NetFH(Thread): so gesteuert werden. """ - __slots__ = "__buff_size", "__buff_block", "__buff_recv", \ - "__by_buff", "__check_replace_ios", "__config_changed", \ - "__int_buff", "__dictdirty", "__flusherr", "__replace_ios_h", \ - "__pictory_h", "__position", "__sockerr", "__sockend", \ - "__socklock", "__timeout", "__waitsync", "_address", \ - "_serversock", "daemon" + __slots__ = ( + "__buff_size", + "__buff_block", + "__buff_recv", + "__by_buff", + "__check_replace_ios", + "__config_changed", + "__int_buff", + "__dictdirty", + "__flusherr", + "__replace_ios_h", + "__pictory_h", + "__position", + "__sockerr", + "__sockend", + "__socklock", + "__timeout", + "__waitsync", + "_address", + "_serversock", + "daemon", + ) def __init__(self, address: tuple, check_replace_ios: bool, timeout=500): """ @@ -83,8 +99,8 @@ class NetFH(Thread): self.__config_changed = False self.__int_buff = 0 self.__dictdirty = {} - self.__replace_ios_h = b'' - self.__pictory_h = b'' + self.__replace_ios_h = b"" + self.__pictory_h = b"" self.__sockerr = Event() self.__sockend = Event() self.__socklock = Lock() @@ -95,9 +111,7 @@ class NetFH(Thread): # Parameterprüfung if not isinstance(address, tuple): - raise TypeError( - "parameter address must be ('IP', PORT)" - ) + raise TypeError("parameter address must be ('IP', PORT)") if not isinstance(timeout, int): raise TypeError("parameter timeout must be ") @@ -125,7 +139,7 @@ class NetFH(Thread): :param bytecode: Antwort, die geprueft werden solll """ - if bytecode == b'\x18': + if bytecode == b"\x18": # Alles beenden, wenn nicht erlaubt self.__sockend.set() self.__sockerr.set() @@ -174,7 +188,7 @@ class NetFH(Thread): buff_recv = bytearray(recv_len) while recv_len > 0: block = so.recv(recv_len) - if block == b'': + if block == b"": raise OSError("lost connection on hash receive") buff_recv += block recv_len -= len(block) @@ -183,18 +197,19 @@ class NetFH(Thread): if self.__pictory_h and buff_recv[:16] != self.__pictory_h: self.__config_changed = True self.close() - raise ConfigChanged( - "configuration on revolution pi was changed") + raise ConfigChanged("configuration on revolution pi was changed") else: self.__pictory_h = buff_recv[:16] # Änderung an replace_ios prüfen - if self.__check_replace_ios and self.__replace_ios_h \ - and buff_recv[16:] != self.__replace_ios_h: + if ( + self.__check_replace_ios + and self.__replace_ios_h + and buff_recv[16:] != self.__replace_ios_h + ): self.__config_changed = True self.close() - raise ConfigChanged( - "configuration on revolution pi was changed") + raise ConfigChanged("configuration on revolution pi was changed") else: self.__replace_ios_h = buff_recv[16:] except ConfigChanged: @@ -295,11 +310,18 @@ class NetFH(Thread): else: # Nur bestimmte Dirtybytes löschen # b CM ii xx c0000000 b = 16 - buff = self._direct_sr(pack( - "=c2sH2xc7xc", - HEADER_START, b'EY', position, b'\xfe', HEADER_STOP - ), 1) - if buff != b'\x1e': + buff = self._direct_sr( + pack( + "=c2sH2xc7xc", + HEADER_START, + b"EY", + position, + b"\xfe", + HEADER_STOP, + ), + 1, + ) + if buff != b"\x1e": # ACL prüfen und ggf Fehler werfen self.__check_acl(buff) @@ -343,12 +365,18 @@ class NetFH(Thread): try: # b CM ii ii 00000000 b = 16 - buff = self._direct_sr(pack( - "=c2sHH8xc", - HEADER_START, - b'FD', self.__int_buff, len(self.__by_buff), - HEADER_STOP - ) + self.__by_buff, 1) + buff = self._direct_sr( + pack( + "=c2sHH8xc", + HEADER_START, + b"FD", + self.__int_buff, + len(self.__by_buff), + HEADER_STOP, + ) + + self.__by_buff, + 1, + ) except Exception: raise finally: @@ -356,7 +384,7 @@ class NetFH(Thread): self.__int_buff = 0 self.__by_buff.clear() - if buff != b'\x1e': + if buff != b"\x1e": # ACL prüfen und ggf Fehler werfen self.__check_acl(buff) @@ -403,7 +431,7 @@ class NetFH(Thread): """ return int(self.__timeout * 1000) - def ioctl(self, request: int, arg=b'') -> None: + def ioctl(self, request: int, arg=b"") -> None: """ IOCTL Befehle ueber das Netzwerk senden. @@ -419,11 +447,10 @@ class NetFH(Thread): raise TypeError("arg must be ") # b CM xx ii iiii0000 b = 16 - buff = self._direct_sr(pack( - "=c2s2xHI4xc", - HEADER_START, b'IC', len(arg), request, HEADER_STOP - ) + arg, 1) - if buff != b'\x1e': + buff = self._direct_sr( + pack("=c2s2xHI4xc", HEADER_START, b"IC", len(arg), request, HEADER_STOP) + arg, 1 + ) + if buff != b"\x1e": # ACL prüfen und ggf Fehler werfen self.__check_acl(buff) @@ -443,10 +470,9 @@ class NetFH(Thread): raise ValueError("read of closed file") # b CM ii ii 00000000 b = 16 - buff = self._direct_sr(pack( - "=c2sHH8xc", - HEADER_START, b'DA', self.__position, length, HEADER_STOP - ), length) + buff = self._direct_sr( + pack("=c2sHH8xc", HEADER_START, b"DA", self.__position, length, HEADER_STOP), length + ) self.__position += length return buff @@ -466,10 +492,9 @@ class NetFH(Thread): length = len(buffer) # b CM ii ii 00000000 b = 16 - buff = self._direct_sr(pack( - "=c2sHH8xc", - HEADER_START, b'DA', self.__position, length, HEADER_STOP - ), length) + buff = self._direct_sr( + pack("=c2sHH8xc", HEADER_START, b"DA", self.__position, length, HEADER_STOP), length + ) buffer[:] = buff return len(buffer) @@ -484,13 +509,11 @@ class NetFH(Thread): raise ValueError("read of closed file") if self.__pictory_h == HASH_FAIL: - raise RuntimeError( - "could not read/parse piCtory configuration over network" - ) + raise RuntimeError("could not read/parse piCtory configuration over network") buff = self._direct_sr(_syspictory, 4) - recv_length, = unpack("=I", buff) - return self._direct_sr(b'', recv_length) + (recv_length,) = unpack("=I", buff) + return self._direct_sr(b"", recv_length) def readreplaceio(self) -> bytes: """ @@ -502,27 +525,21 @@ class NetFH(Thread): raise ValueError("read of closed file") if self.__replace_ios_h == HASH_FAIL: - raise RuntimeError( - "replace_io_file: could not read/parse over network" - ) + raise RuntimeError("replace_io_file: could not read/parse over network") buff = self._direct_sr(_sysreplaceio, 4) - recv_length, = unpack("=I", buff) - return self._direct_sr(b'', recv_length) + (recv_length,) = unpack("=I", buff) + return self._direct_sr(b"", recv_length) def run(self) -> None: """Handler fuer Synchronisierung.""" state_reconnect = False while not self.__sockend.is_set(): - # Bei Fehlermeldung neu verbinden if self.__sockerr.is_set(): if not state_reconnect: state_reconnect = True - warnings.warn( - "got a network error and try to reconnect", - RuntimeWarning - ) + warnings.warn("got a network error and try to reconnect", RuntimeWarning) self._connect() if self.__sockerr.is_set(): # Verhindert beim Scheitern 100% CPU last @@ -530,10 +547,7 @@ class NetFH(Thread): continue else: state_reconnect = False - warnings.warn( - "successfully reconnected after network error", - RuntimeWarning - ) + warnings.warn("successfully reconnected after network error", RuntimeWarning) # Kein Fehler aufgetreten, sync durchführen wenn socket frei if self.__socklock.acquire(blocking=False): @@ -543,9 +557,7 @@ class NetFH(Thread): self.__buff_recv.clear() recv_lenght = 2 while recv_lenght > 0: - count = self._serversock.recv_into( - self.__buff_block, recv_lenght - ) + count = self._serversock.recv_into(self.__buff_block, recv_lenght) if count == 0: raise IOError("lost network connection on sync") self.__buff_recv += self.__buff_block[:count] @@ -554,11 +566,8 @@ class NetFH(Thread): except IOError: self.__sockerr.set() else: - if self.__buff_recv != b'\x06\x16': - warnings.warn( - "data error on network sync", - RuntimeWarning - ) + if self.__buff_recv != b"\x06\x16": + warnings.warn("data error on network sync", RuntimeWarning) self.__sockerr.set() continue finally: @@ -596,12 +605,13 @@ class NetFH(Thread): try: # b CM ii ii 00000000 b = 16 - buff = self._direct_sr(pack( - "=c2sHH8xc", - HEADER_START, b'EY', position, len(dirtybytes), HEADER_STOP - ) + dirtybytes, 1) + buff = self._direct_sr( + pack("=c2sHH8xc", HEADER_START, b"EY", position, len(dirtybytes), HEADER_STOP) + + dirtybytes, + 1, + ) - if buff != b'\x1e': + if buff != b"\x1e": # ACL prüfen und ggf Fehler werfen self.__check_acl(buff) @@ -627,11 +637,8 @@ class NetFH(Thread): try: # b CM ii xx 00000000 b = 16 - buff = self._direct_sr(pack( - "=c2sH10xc", - HEADER_START, b'CF', value, HEADER_STOP - ), 1) - if buff != b'\x1e': + buff = self._direct_sr(pack("=c2sH10xc", HEADER_START, b"CF", value, HEADER_STOP), 1) + if buff != b"\x1e": raise IOError("set timeout error on network") except Exception: self.__sockerr.set() @@ -666,10 +673,11 @@ class NetFH(Thread): self.__int_buff += 1 # Datenblock mit Position und Länge in Puffer ablegen - self.__by_buff += \ - self.__position.to_bytes(length=2, byteorder="little") \ - + len(bytebuff).to_bytes(length=2, byteorder="little") \ + self.__by_buff += ( + self.__position.to_bytes(length=2, byteorder="little") + + len(bytebuff).to_bytes(length=2, byteorder="little") + bytebuff + ) # TODO: Bufferlänge und dann flushen? @@ -697,9 +705,17 @@ class RevPiNetIO(_RevPiModIO): __slots__ = "_address" def __init__( - self, address, autorefresh=False, monitoring=False, - syncoutputs=True, simulator=False, debug=True, - replace_io_file=None, shared_procimg=False, direct_output=False): + self, + address, + autorefresh=False, + monitoring=False, + syncoutputs=True, + simulator=False, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert die Grundfunktionen. @@ -714,28 +730,20 @@ class RevPiNetIO(_RevPiModIO): could be insecure for automation :param direct_output: Deprecated, use shared_procimg """ - check_ip = compile( - r"^(25[0-5]|(2[0-4]|[01]?\d|)\d)" - r"(\.(25[0-5]|(2[0-4]|[01]?\d|)\d)){3}$" - ) + check_ip = compile(r"^(25[0-5]|(2[0-4]|[01]?\d|)\d)(\.(25[0-5]|(2[0-4]|[01]?\d|)\d)){3}$") # Adresse verarbeiten if isinstance(address, str): self._address = (address, 55234) elif isinstance(address, tuple): - if len(address) == 2 \ - and isinstance(address[0], str) \ - and isinstance(address[1], int): - + if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int): # Werte prüfen if not 0 < address[1] <= 65535: raise ValueError("port number out of range 1 - 65535") self._address = address else: - raise TypeError( - "address tuple must be (, )" - ) + raise TypeError("address tuple must be (, )") else: raise TypeError( "parameter address must be or " @@ -749,8 +757,7 @@ class RevPiNetIO(_RevPiModIO): self._address = (ipv4, self._address[1]) except Exception: raise ValueError( - "can not resolve ip address for hostname '{0}'" - "".format(self._address[0]) + "can not resolve ip address for hostname '{0}'".format(self._address[0]) ) # Vererben @@ -801,10 +808,7 @@ class RevPiNetIO(_RevPiModIO): try: cp.read_string(byte_buff.decode("utf-8")) except Exception as e: - raise RuntimeError( - "replace_io_file: could not read/parse network data | {0}" - "".format(e) - ) + raise RuntimeError("replace_io_file: could not read/parse network data | {0}".format(e)) return cp def disconnect(self) -> None: @@ -862,16 +866,12 @@ class RevPiNetIO(_RevPiModIO): :param device: nur auf einzelnes Device anwenden, sonst auf Alle """ if self.monitoring: - raise RuntimeError( - "can not send default values, while system is in " - "monitoring mode" - ) + raise RuntimeError("can not send default values, while system is in monitoring mode") if device is None: self._myfh.clear_dirtybytes() else: - dev = device if isinstance(device, Device) \ - else self.device.__getitem__(device) + dev = device if isinstance(device, Device) else self.device.__getitem__(device) mylist = [dev] for dev in mylist: @@ -887,16 +887,12 @@ class RevPiNetIO(_RevPiModIO): :param device: nur auf einzelnes Device anwenden, sonst auf Alle """ if self.monitoring: - raise RuntimeError( - "can not send default values, while system is in " - "monitoring mode" - ) + raise RuntimeError("can not send default values, while system is in monitoring mode") if device is None: mylist = self.device else: - dev = device if isinstance(device, Device) \ - else self.device.__getitem__(device) + dev = device if isinstance(device, Device) else self.device.__getitem__(device) mylist = [dev] for dev in mylist: @@ -921,13 +917,10 @@ class RevPiNetIO(_RevPiModIO): int_byte += 1 if bitio._defaultvalue else 0 # Errechneten Int-Wert in ein Byte umwandeln - dirtybytes += \ - int_byte.to_bytes(length=1, byteorder="little") + dirtybytes += int_byte.to_bytes(length=1, byteorder="little") # Dirtybytes an PLC Server senden - self._myfh.set_dirtybytes( - dev._offset + dev._slc_out.start, dirtybytes - ) + self._myfh.set_dirtybytes(dev._offset + dev._slc_out.start, dirtybytes) config_changed = property(get_config_changed) reconnecting = property(get_reconnecting) @@ -946,9 +939,18 @@ class RevPiNetIOSelected(RevPiNetIO): __slots__ = () def __init__( - self, address, deviceselection, autorefresh=False, - monitoring=False, syncoutputs=True, simulator=False, debug=True, - replace_io_file=None, shared_procimg=False, direct_output=False): + self, + address, + deviceselection, + autorefresh=False, + monitoring=False, + syncoutputs=True, + simulator=False, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert nur fuer angegebene Devices die Grundfunktionen. @@ -961,8 +963,15 @@ class RevPiNetIOSelected(RevPiNetIO): :ref: :func:`RevPiNetIO.__init__()` """ super().__init__( - address, autorefresh, monitoring, syncoutputs, simulator, debug, - replace_io_file, shared_procimg, direct_output + address, + autorefresh, + monitoring, + syncoutputs, + simulator, + debug, + replace_io_file, + shared_procimg, + direct_output, ) if type(deviceselection) is not DevSelect: @@ -981,24 +990,19 @@ class RevPiNetIOSelected(RevPiNetIO): if len(self.device) == 0: if self._devselect.type: raise DeviceNotFoundError( - "could not find ANY given {0} devices in config" - "".format(self._devselect.type) + "could not find ANY given {0} devices in config".format(self._devselect.type) ) else: - raise DeviceNotFoundError( - "could not find ANY given devices in config" - ) - elif not self._devselect.other_device_key \ - and len(self.device) != len(self._devselect.values): + raise DeviceNotFoundError("could not find ANY given devices in config") + elif not self._devselect.other_device_key and len(self.device) != len( + self._devselect.values + ): if self._devselect.type: raise DeviceNotFoundError( - "could not find ALL given {0} devices in config" - "".format(self._devselect.type) + "could not find ALL given {0} devices in config".format(self._devselect.type) ) else: - raise DeviceNotFoundError( - "could not find ALL given devices in config" - ) + raise DeviceNotFoundError("could not find ALL given devices in config") class RevPiNetIODriver(RevPiNetIOSelected): @@ -1014,9 +1018,16 @@ class RevPiNetIODriver(RevPiNetIOSelected): __slots__ = () def __init__( - self, address, virtdev, autorefresh=False, - syncoutputs=True, debug=True, replace_io_file=None, - shared_procimg=False, direct_output=False): + self, + address, + virtdev, + autorefresh=False, + syncoutputs=True, + debug=True, + replace_io_file=None, + shared_procimg=False, + direct_output=False, + ): """ Instantiiert die Grundfunktionen. @@ -1032,13 +1043,20 @@ class RevPiNetIODriver(RevPiNetIOSelected): virtdev = (virtdev,) dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev) super().__init__( - address, dev_select, autorefresh, False, syncoutputs, True, debug, - replace_io_file, shared_procimg, direct_output + address, + dev_select, + autorefresh, + False, + syncoutputs, + True, + debug, + replace_io_file, + shared_procimg, + direct_output, ) -def run_net_plc( - address, func, cycletime=50, replace_io_file=None, debug=True): +def run_net_plc(address, func, cycletime=50, replace_io_file=None, debug=True): """ Run Revoluton Pi as real plc with cycle loop and exclusive IO access. diff --git a/src/revpimodio2/pictory.py b/src/revpimodio2/pictory.py index f57be64..01dcdbc 100644 --- a/src/revpimodio2/pictory.py +++ b/src/revpimodio2/pictory.py @@ -13,6 +13,7 @@ __license__ = "LGPLv2" # - RevPiConCan_20180425_1_0.rap # - RevPiGateCANopen_20161102_1_0.rap + class ProductType: CON_BT = 111 CON_CAN = 109 @@ -65,6 +66,7 @@ class ProductType: class DeviceType: """Module key "type" in piCtory file.""" + IGNORED = "" BASE = "BASE" # Core devices EDGE = "EDGE" # Gateways @@ -75,6 +77,7 @@ class DeviceType: class AIO: """Memory value mappings for RevPi AIO 1.0 (RevPiAIO_20170301_1_0.rap).""" + OUT_RANGE_OFF = 0 # Off OUT_RANGE_0_5V = 1 # 0 - 5V OUT_RANGE_0_10V = 2 # 0 - 10V @@ -148,6 +151,7 @@ class AIO: class DI: """Memory value mappings for RevPi DI 1.0 (RevPiDI_20160818_1_0.rap).""" + IN_MODE_DIRECT = 0 # Direct IN_MODE_COUNT_RISING = 1 # Counter, rising edge IN_MODE_COUNT_FALLING = 2 # Counter, falling edge @@ -161,6 +165,7 @@ class DI: class DO: """Memory value mappings for RevPi DO 1.0 (RevPiDO_20160818_1_0.rap).""" + OUT_PWM_FREQ_40HZ = 1 # 40Hz 1% OUT_PWM_FREQ_80HZ = 2 # 80Hz 2% OUT_PWM_FREQ_160HZ = 4 # 160Hz 4% @@ -170,11 +175,13 @@ class DO: class DIO(DI, DO): """Memory value mappings for RevPi DIO 1.0 (RevPiDIO_20160818_1_0.rap).""" + pass class MIO: """Memory value mappings for RevPi MIO 1.0 (RevPiMIO_20200901_1_0.rap).""" + ENCODER_MODE_DISABLED = 0 ENCODER_MODE_ENABLED = 1 @@ -193,6 +200,7 @@ class MIO: class COMPACT: """Memory value mappings for RevPi Compact 1.0 (RevPiCompact_20171023_1_0.rap).""" + DIN_DEBOUNCE_OFF = 0 # Off DIN_DEBOUNCE_25US = 1 # 25us DIN_DEBOUNCE_750US = 2 # 750us @@ -206,5 +214,6 @@ class COMPACT: class FLAT: """Memory value mappings for RevPi Flat 1.0 (RevPiFlat_20200921_1_0.rap).""" + IN_RANGE_0_10V = 0 IN_RANGE_4_20MA = 1