Bei Cores werden Adressen von IOs statisch festgelegt unabhängig von IO-Anzahl

Unterstützung mehrere IO Arten der Cores
Fehler bei Exportflag von Connect xin beseitigt
This commit is contained in:
2018-09-30 19:02:03 +02:00
parent 2b601eac5c
commit 40409a6f0f
6 changed files with 147 additions and 104 deletions

View File

@@ -22,7 +22,7 @@ __author__ = "Sven Sager <akira@revpimodio.org>"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "LGPLv3"
__name__ = "revpimodio2"
__version__ = "2.2.4"
__version__ = "2.2.5"
# Global package values
OFF = 0

View File

@@ -449,90 +449,96 @@ class Core(Device):
"""
__slots__ = "_iocycle", "_ioerrorcnt", "_iostatusbyte", "_iotemperature", \
"_ioerrorlimit1", "_ioerrorlimit2", "_iofrequency", "_ioled", \
"a1green", "a1red", "a2green", "a2red"
__slots__ = "_slc_cycle", "_slc_errorcnt", "_slc_statusbyte", \
"_slc_temperature", "_slc_errorlimit1", "_slc_errorlimit2", \
"_slc_frequency", "_slc_led", "a1green", "a1red", "a2green", "a2red"
def _devconfigure(self):
"""Core-Klasse vorbereiten."""
# Eigene IO-Liste aufbauen
lst_io = [x for x in self.__iter__()]
self._iostatusbyte = lst_io[0]
self._iocycle = None
self._iotemperature = None
self._iofrequency = None
self._ioerrorcnt = None
self._ioled = lst_io[1]
self._ioerrorlimit1 = None
self._ioerrorlimit2 = None
# Statische IO Verknüpfungen je nach Core-Variante
# 2 Byte = Core1.0
self._slc_statusbyte = slice(0, 1)
self._slc_led = slice(1, 2)
int_lenio = len(lst_io)
if int_lenio == 6:
# Core 1.1
self._iocycle = lst_io[1]
self._ioerrorcnt = lst_io[2]
self._ioled = lst_io[3]
self._ioerrorlimit1 = lst_io[4]
self._ioerrorlimit2 = lst_io[5]
elif int_lenio == 8:
# Core 1.2
self._iocycle = lst_io[1]
self._ioerrorcnt = lst_io[2]
self._iotemperature = lst_io[3]
self._iofrequency = lst_io[4]
self._ioled = lst_io[5]
self._ioerrorlimit1 = lst_io[6]
self._ioerrorlimit2 = lst_io[7]
self._slc_cycle = None
self._slc_temperature = None
self._slc_frequency = None
self._slc_errorcnt = None
self._slc_errorlimit1 = None
self._slc_errorlimit2 = None
if self._length == 9:
# 9 Byte = Core1.1
self._slc_cycle = slice(1, 2)
self._slc_errorcnt = slice(2, 4)
self._slc_led = slice(4, 5)
self._slc_errorlimit1 = slice(5, 7)
self._slc_errorlimit2 = slice(7, 9)
elif self._length == 11:
# 11 Byte = Core1.2 / Connect
self._slc_cycle = slice(1, 2)
self._slc_errorcnt = slice(2, 4)
self._slc_temperature = slice(4, 5)
self._slc_frequency = slice(5, 6)
self._slc_led = slice(6, 7)
self._slc_errorlimit1 = slice(7, 9)
self._slc_errorlimit2 = slice(9, 11)
# Exportflags prüfen (Byte oder Bit)
lst_led = self._modio.io[self._slc_devoff][self._slc_led.start]
if len(lst_led) == 8:
exp_a1green = lst_led[0].export
exp_a1red = lst_led[1].export
exp_a2green = lst_led[2].export
exp_a2red = lst_led[3].export
else:
exp_a1green = lst_led[0].export
exp_a1red = exp_a1green
exp_a2green = exp_a1green
exp_a2red = exp_a1green
# Echte IOs erzeugen
self.a1green = IOBase(self, [
"core.a1green", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A1_GREEN", "0"
"core.a1green", 0, 1, self._slc_led.start,
exp_a1green, None, "LED_A1_GREEN", "0"
], OUT, "little", False)
self.a1red = IOBase(self, [
"core.a1red", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A1_RED", "1"
"core.a1red", 0, 1, self._slc_led.start,
exp_a1red, None, "LED_A1_RED", "1"
], OUT, "little", False)
self.a2green = IOBase(self, [
"core.a2green", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A2_GREEN", "2"
"core.a2green", 0, 1, self._slc_led.start,
exp_a2green, None, "LED_A2_GREEN", "2"
], OUT, "little", False)
self.a2red = IOBase(self, [
"core.a2red", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A2_RED", "3"
"core.a2red", 0, 1, self._slc_led.start,
exp_a2red, None, "LED_A2_RED", "3"
], OUT, "little", False)
def __errorlimit(self, io, errorlimit):
"""Verwaltet das Lesen und Schreiben der ErrorLimits.
@param io IOs Objekt fuer ErrorLimit
def __errorlimit(self, slc_io, errorlimit):
"""Verwaltet das Schreiben der ErrorLimits.
@param slc_io Byte Slice vom ErrorLimit
@return Aktuellen ErrorLimit oder None wenn nicht verfuegbar"""
if errorlimit is None:
return None if io is None else int.from_bytes(
io.get_value(), byteorder="little"
)
if 0 <= errorlimit <= 65535:
self._ba_devdata[slc_io] = \
errorlimit.to_bytes(2, byteorder="little")
else:
if 0 <= errorlimit <= 65535:
io.set_value(
errorlimit.to_bytes(2, byteorder="little")
)
else:
raise ValueError(
"errorlimit value must be between 0 and 65535"
)
raise ValueError(
"errorlimit value must be between 0 and 65535"
)
def _get_status(self):
"""Gibt den RevPi Core Status zurueck.
@return Status als <class 'int'>"""
return int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
)
def _get_leda1(self):
"""Gibt den Zustand der LED A1 vom Core zurueck.
@return 0=aus, 1=gruen, 2=rot"""
int_led = int.from_bytes(
self._ioled.get_value(), byteorder="little"
self._ba_devdata[self._slc_led], byteorder="little"
)
led = int_led & 1
led += int_led & 2
@@ -542,7 +548,7 @@ class Core(Device):
"""Gibt den Zustand der LED A2 vom Core zurueck.
@return 0=aus, 1=gruen, 2=rot"""
int_led = int.from_bytes(
self._ioled.get_value(), byteorder="little"
self._ba_devdata[self._slc_led], byteorder="little"
) >> 2
led = int_led & 1
led += int_led & 2
@@ -554,7 +560,7 @@ class Core(Device):
@param shifed_value Bits vergleichen"""
# Byte als int holen
int_led = int.from_bytes(
self._ioled.get_value(), byteorder="little"
self._ba_devdata[self._slc_led], byteorder="little"
)
for int_bit in addresslist:
@@ -567,7 +573,8 @@ class Core(Device):
int_led -= int_bit
# Zurückschreiben wenn verändert
self._ioled.set_value(int_led.to_bytes(length=1, byteorder="little"))
self._ba_devdata[self._slc_led] = \
int_led.to_bytes(length=1, byteorder="little")
def _set_leda1(self, value):
"""Setzt den Zustand der LED A1 vom Core.
@@ -594,7 +601,7 @@ class Core(Device):
"""Statusbit fuer piControl-Treiber laeuft.
@return True, wenn Treiber laeuft"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 1)
@property
@@ -602,7 +609,7 @@ class Core(Device):
"""Statusbit fuer ein IO-Modul nicht mit PiCtory konfiguriert.
@return True, wenn IO Modul nicht konfiguriert"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 2)
@property
@@ -610,7 +617,7 @@ class Core(Device):
"""Statusbit fuer ein IO-Modul fehlt oder piGate konfiguriert.
@return True, wenn IO-Modul fehlt oder piGate konfiguriert"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 4)
@property
@@ -618,7 +625,7 @@ class Core(Device):
"""Statusbit Modul belegt mehr oder weniger Speicher als konfiguriert.
@return True, wenn falscher Speicher belegt ist"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 8)
@property
@@ -626,7 +633,7 @@ class Core(Device):
"""Statusbit links vom RevPi ist ein piGate Modul angeschlossen.
@return True, wenn piGate links existiert"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 16)
@property
@@ -634,64 +641,78 @@ class Core(Device):
"""Statusbit rechts vom RevPi ist ein piGate Modul angeschlossen.
@return True, wenn piGate rechts existiert"""
return bool(int.from_bytes(
self._iostatusbyte.get_value(), byteorder="little"
self._ba_devdata[self._slc_statusbyte], byteorder="little"
) & 32)
@property
def iocycle(self):
"""Gibt Zykluszeit der Prozessabbildsynchronisierung zurueck.
@return Zykluszeit in ms"""
return None if self._iocycle is None else int.from_bytes(
self._iocycle.get_value(), byteorder="little"
return None if self._slc_cycle is None else int.from_bytes(
self._ba_devdata[self._slc_cycle], byteorder="little"
)
@property
def temperature(self):
"""Gibt CPU-Temperatur zurueck.
@return CPU-Temperatur in Celsius"""
return None if self._iotemperature is None else int.from_bytes(
self._iotemperature.get_value(), byteorder="little"
return None if self._slc_temperature is None else int.from_bytes(
self._ba_devdata[self._slc_temperature], byteorder="little"
)
@property
def frequency(self):
"""Gibt CPU Taktfrequenz zurueck.
@return CPU Taktfrequenz in MHz"""
return None if self._iofrequency is None else int.from_bytes(
self._iofrequency.get_value(), byteorder="little"
return None if self._slc_frequency is None else int.from_bytes(
self._ba_devdata[self._slc_frequency], byteorder="little"
) * 10
@property
def ioerrorcount(self):
"""Gibt Fehleranzahl auf RS485 piBridge Bus zurueck.
@return Fehleranzahl der piBridge"""
return None if self._ioerrorcnt is None else int.from_bytes(
self._ioerrorcnt.get_value(), byteorder="little"
return None if self._slc_errorcnt is None else int.from_bytes(
self._ba_devdata[self._slc_errorcnt], byteorder="little"
)
@property
def errorlimit1(self):
"""Gibt RS485 ErrorLimit1 Wert zurueck.
@return Aktueller Wert fuer ErrorLimit1"""
return self.__errorlimit(self._ioerrorlimit1, None)
return None if self._slc_errorlimit1 is None else int.from_bytes(
self._ba_devdata[self._slc_errorlimit1], byteorder="little"
)
@errorlimit1.setter
def errorlimit1(self, value):
"""Setzt RS485 ErrorLimit1 auf neuen Wert.
@param value Neuer ErrorLimit1 Wert"""
self.__errorlimit(self._ioerrorlimit1, value)
if self._slc_errorlimit1 is None:
raise RuntimeError(
"selected core item in piCtory does not support errorlimit1"
)
else:
self.__errorlimit(self._slc_errorlimit1, value)
@property
def errorlimit2(self):
"""Gibt RS485 ErrorLimit2 Wert zurueck.
@return Aktueller Wert fuer ErrorLimit2"""
return self.__errorlimit(self._ioerrorlimit2, None)
return None if self._slc_errorlimit2 is None else int.from_bytes(
self._ba_devdata[self._slc_errorlimit2], byteorder="little"
)
@errorlimit2.setter
def errorlimit2(self, value):
"""Setzt RS485 ErrorLimit2 auf neuen Wert.
@param value Neuer ErrorLimit2 Wert"""
self.__errorlimit(self._ioerrorlimit2, value)
if self._slc_errorlimit2 is None:
raise RuntimeError(
"selected core item in piCtory does not support errorlimit2"
)
else:
self.__errorlimit(self._slc_errorlimit2, value)
class Connect(Core):
@@ -713,38 +734,58 @@ class Connect(Core):
def _devconfigure(self):
"""Connect-Klasse vorbereiten."""
super()._devconfigure()
self.__evt_wdtoggle = Event()
self.__th_wdtoggle = None
# Exportflags prüfen (Byte oder Bit)
lst_myios = self._modio.io[self._slc_devoff]
lst_led = lst_myios[self._slc_led.start]
if len(lst_led) == 8:
exp_a3green = lst_led[4].export
exp_a3red = lst_led[5].export
exp_x2out = lst_led[6].export
exp_wd = lst_led[7].export
else:
exp_a3green = lst_led[0].export
exp_a3red = exp_a3green
exp_x2out = exp_a3green
exp_wd = exp_a3green
lst_status = lst_myios[self._slc_statusbyte.start]
if len(lst_led) == 8:
exp_x2in = lst_status[6].export
else:
exp_x2in = lst_status[0].export
# Echte IOs erzeugen
self.a3green = IOBase(self, [
"core.a3green", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A3_GREEN", "4"
"core.a3green", 0, 1, self._slc_led.start,
exp_a3green, None, "LED_A3_GREEN", "4"
], OUT, "little", False)
self.a3red = IOBase(self, [
"core.a3red", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "LED_A3_RED", "5"
"core.a3red", 0, 1, self._slc_led.start,
exp_a3red, None, "LED_A3_RED", "5"
], OUT, "little", False)
# IO Objekte für WD und X2 in/out erzeugen
self.wd = IOBase(self, [
"core.wd", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "Connect_WatchDog", "7"
"core.wd", 0, 1, self._slc_led.start,
exp_wd, None, "Connect_WatchDog", "7"
], OUT, "little", False)
self.x2in = IOBase(self, [
"core.x2in", 0, 1, self._iostatusbyte._slc_address.start,
self._ioled.export, None, "Connect_X2_IN", "6"
"core.x2in", 0, 1, self._slc_statusbyte.start,
exp_x2in, None, "Connect_X2_IN", "6"
], INP, "little", False)
self.x2out = IOBase(self, [
"core.x2out", 0, 1, self._ioled._slc_address.start,
self._ioled.export, None, "Connect_X2_OUT", "6"
"core.x2out", 0, 1, self._slc_led.start,
exp_x2out, None, "Connect_X2_OUT", "6"
], OUT, "little", False)
def _get_leda3(self):
"""Gibt den Zustand der LED A3 vom Connect zurueck.
@return 0=aus, 1=gruen, 2=rot"""
int_led = int.from_bytes(
self._ioled.get_value(), byteorder="little"
self._ba_devdata[self._slc_led], byteorder="little"
) >> 4
led = int_led & 1
led += int_led & 2

View File

@@ -236,14 +236,16 @@ class RevPiModIO(object):
# Für RS485 errors am core defaults laden sollte procimg NULL sein
if not (self.core is None or self._monitoring or self._simulator):
if self.core._ioerrorlimit1 is not None:
self.core._ioerrorlimit1.set_value(
self.core._ioerrorlimit1._defaultvalue
)
if self.core._ioerrorlimit2 is not None:
self.core._ioerrorlimit2.set_value(
self.core._ioerrorlimit2._defaultvalue
)
if self.core._slc_errorlimit1 is not None:
io = self.io[
self.core.offset + self.core._slc_errorlimit1.start
][0]
io.set_value(io._defaultvalue)
if self.core._slc_errorlimit2 is not None:
io = self.io[
self.core.offset + self.core._slc_errorlimit2.start
][0]
io.set_value(io._defaultvalue)
# RS485 errors schreiben
self.writeprocimg(self.core)