mirror of
https://github.com/naruxde/revpimodio2.git
synced 2026-05-16 08:27:24 +02:00
Merge tag 'unstable/2.8.0_rc1' into pkg/debian_rc
This commit is contained in:
@@ -3,4 +3,4 @@
|
||||
__author__ = "Sven Sager <akira@revpimodio.org>"
|
||||
__copyright__ = "Copyright (C) 2023 Sven Sager"
|
||||
__license__ = "LGPLv2"
|
||||
__version__ = "2.7.2rc1"
|
||||
__version__ = "2.8.0rc1"
|
||||
|
||||
@@ -8,7 +8,11 @@ __license__ = "LGPLv2"
|
||||
OFF = 0
|
||||
GREEN = 1
|
||||
RED = 2
|
||||
ORANGE = 3
|
||||
BLUE = 4
|
||||
CYAN = 5
|
||||
MAGENTA = 6
|
||||
WHITE = 7
|
||||
RISING = 31
|
||||
FALLING = 32
|
||||
BOTH = 33
|
||||
@@ -54,8 +58,16 @@ def consttostr(value) -> str:
|
||||
return "GREEN"
|
||||
elif value == 2:
|
||||
return "RED"
|
||||
elif value == 3:
|
||||
return "ORANGE"
|
||||
elif value == 4:
|
||||
return "BLUE"
|
||||
elif value == 5:
|
||||
return "CYAN"
|
||||
elif value == 6:
|
||||
return "MAGENTA"
|
||||
elif value == 7:
|
||||
return "WHITE"
|
||||
elif value == 31:
|
||||
return "RISING"
|
||||
elif value == 32:
|
||||
@@ -68,5 +80,7 @@ def consttostr(value) -> str:
|
||||
return "OUT"
|
||||
elif value == 302:
|
||||
return "MEM"
|
||||
elif value == 4096:
|
||||
return "PROCESS_IMAGE_SIZE"
|
||||
else:
|
||||
return ""
|
||||
|
||||
+176
-138
@@ -1062,11 +1062,8 @@ class Connect(Core):
|
||||
wdautotoggle = property(_get_wdtoggle, _set_wdtoggle)
|
||||
|
||||
|
||||
class Connect4(ModularBase):
|
||||
"""Klasse fuer den RevPi Connect 4.
|
||||
|
||||
Stellt Funktionen fuer die LEDs und den Status zur Verfuegung.
|
||||
"""
|
||||
class ModularBaseConnect_4_5(ModularBase):
|
||||
"""Class for overlapping functions of Connect 4/5."""
|
||||
|
||||
__slots__ = (
|
||||
"_slc_output",
|
||||
@@ -1085,8 +1082,6 @@ class Connect4(ModularBase):
|
||||
"a5red",
|
||||
"a5green",
|
||||
"a5blue",
|
||||
"x2in",
|
||||
"x2out",
|
||||
)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
@@ -1107,17 +1102,15 @@ class Connect4(ModularBase):
|
||||
"a5red",
|
||||
"a5green",
|
||||
"a5blue",
|
||||
"x2in",
|
||||
"x2out",
|
||||
):
|
||||
raise AttributeError("direct assignment is not supported - use .value Attribute")
|
||||
super(Connect4, self).__setattr__(key, value)
|
||||
super().__setattr__(key, value)
|
||||
|
||||
def __led_calculator(self, led_value: int) -> int:
|
||||
"""
|
||||
Calculate the LED value of Connect 4.
|
||||
Calculate the LED value of Connect 4/5.
|
||||
|
||||
Only the Connect 4 has swapped LED colors red and green. We have to recalculate that
|
||||
Only the Connect 4/5 have swapped LED colors red and green. We have to recalculate that
|
||||
values to match our values for GREEN, RED and BLUE.
|
||||
"""
|
||||
led_calculated = led_value & 0b001
|
||||
@@ -1127,7 +1120,7 @@ class Connect4(ModularBase):
|
||||
return led_calculated
|
||||
|
||||
def _devconfigure(self) -> None:
|
||||
"""Connect4-Klasse vorbereiten."""
|
||||
"""Connect 4/5-Klasse vorbereiten."""
|
||||
super()._devconfigure()
|
||||
|
||||
self._slc_statusbyte = slice(0, 1)
|
||||
@@ -1178,18 +1171,6 @@ class Connect4(ModularBase):
|
||||
exp_a5green = exp_a1red
|
||||
exp_a5blue = exp_a1red
|
||||
|
||||
if len(lst_output) == 8:
|
||||
# prepared for future extension with wdtoggle
|
||||
exp_x2out = lst_output[0].export
|
||||
else:
|
||||
exp_x2out = lst_output[0].export
|
||||
|
||||
lst_status = lst_myios[self._slc_statusbyte.start]
|
||||
if len(lst_status) == 8:
|
||||
exp_x2in = lst_status[6].export
|
||||
else:
|
||||
exp_x2in = lst_status[0].export
|
||||
|
||||
# Echte IOs erzeugen
|
||||
self.a1red = IOBase(
|
||||
self,
|
||||
@@ -1301,7 +1282,176 @@ class Connect4(ModularBase):
|
||||
False,
|
||||
)
|
||||
|
||||
# IO Objekte für WD und X2 in/out erzeugen
|
||||
def _get_leda1(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A1 vom Connect zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau, mixed RGB colors
|
||||
"""
|
||||
return self.__led_calculator(self._ba_devdata[self._slc_led.start] & 0b00000111)
|
||||
|
||||
def _get_leda2(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A2 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau, mixed RGB colors
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start] & 0b00111000) >> 3)
|
||||
|
||||
def _get_leda3(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A3 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau, mixed RGB colors
|
||||
"""
|
||||
word_led = self._ba_devdata[self._slc_led]
|
||||
return self.__led_calculator((unpack("<H", word_led)[0] & 0b0000000111000000) >> 6)
|
||||
|
||||
def _get_leda4(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A4 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau, mixed RGB colors
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start + 1] & 0b00001110) >> 1)
|
||||
|
||||
def _get_leda5(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A5 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau, mixed RGB colors
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start + 1] & 0b01110000) >> 4)
|
||||
|
||||
def _set_leda1(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A1 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue, mixed RGB colors
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a1red(bool(value & 2))
|
||||
self.a1green(bool(value & 1))
|
||||
self.a1blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda2(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A2 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue, mixed RGB colors
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a2red(bool(value & 2))
|
||||
self.a2green(bool(value & 1))
|
||||
self.a2blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda3(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A3 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue, mixed RGB colors
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a3red(bool(value & 2))
|
||||
self.a3green(bool(value & 1))
|
||||
self.a3blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda4(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A4 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue, mixed RGB colors
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a4red(bool(value & 2))
|
||||
self.a4green(bool(value & 1))
|
||||
self.a4blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda5(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A5 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue, mixed RGB colors
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a5red(bool(value & 2))
|
||||
self.a5green(bool(value & 1))
|
||||
self.a5blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def wd_toggle(self):
|
||||
"""Toggle watchdog bit to prevent a timeout."""
|
||||
raise NotImplementedError(
|
||||
"On the Connect 4/5, the hardware watchdog was removed from the process image by "
|
||||
"KUNBUS. This function is no longer available on Connect 4/5 devices."
|
||||
)
|
||||
|
||||
A1 = property(_get_leda1, _set_leda1)
|
||||
A2 = property(_get_leda2, _set_leda2)
|
||||
A3 = property(_get_leda3, _set_leda3)
|
||||
A4 = property(_get_leda4, _set_leda4)
|
||||
A5 = property(_get_leda5, _set_leda5)
|
||||
|
||||
|
||||
class Connect5(ModularBaseConnect_4_5, GatewayMixin):
|
||||
"""Klasse fuer den RevPi Connect 5.
|
||||
|
||||
Stellt Funktionen fuer die LEDs und den Status zur Verfuegung.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class Connect4(ModularBaseConnect_4_5):
|
||||
"""Klasse fuer den RevPi Connect 4.
|
||||
|
||||
Stellt Funktionen fuer die LEDs und den Status zur Verfuegung.
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
"x2in",
|
||||
"x2out",
|
||||
)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
"""Verhindert Ueberschreibung der speziellen IOs."""
|
||||
if hasattr(self, key) and key in (
|
||||
"x2in",
|
||||
"x2out",
|
||||
):
|
||||
raise AttributeError("direct assignment is not supported - use .value Attribute")
|
||||
super().__setattr__(key, value)
|
||||
|
||||
def _devconfigure(self) -> None:
|
||||
"""Connect4-Klasse vorbereiten."""
|
||||
super()._devconfigure()
|
||||
|
||||
# Exportflags prüfen (Byte oder Bit)
|
||||
lst_myios = self._modio.io[self._slc_devoff]
|
||||
lst_output = lst_myios[self._slc_output.start]
|
||||
|
||||
if len(lst_output) == 8:
|
||||
# prepared for future extension with wdtoggle
|
||||
exp_x2out = lst_output[0].export
|
||||
else:
|
||||
exp_x2out = lst_output[0].export
|
||||
|
||||
lst_status = lst_myios[self._slc_statusbyte.start]
|
||||
if len(lst_status) == 8:
|
||||
exp_x2in = lst_status[6].export
|
||||
else:
|
||||
exp_x2in = lst_status[0].export
|
||||
|
||||
# IO Objekte für X2 in/out erzeugen
|
||||
self.x2in = IOBase(
|
||||
self,
|
||||
["core.x2in", 0, 1, self._slc_statusbyte.start, exp_x2in, None, "Connect_X2_IN", "6"],
|
||||
@@ -1317,118 +1467,6 @@ class Connect4(ModularBase):
|
||||
False,
|
||||
)
|
||||
|
||||
def _get_leda1(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A1 vom Connect zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau
|
||||
"""
|
||||
return self.__led_calculator(self._ba_devdata[self._slc_led.start] & 0b00000111)
|
||||
|
||||
def _get_leda2(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A2 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start] & 0b00111000) >> 3)
|
||||
|
||||
def _get_leda3(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A3 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau
|
||||
"""
|
||||
word_led = self._ba_devdata[self._slc_led]
|
||||
return self.__led_calculator((unpack("<H", word_led)[0] & 0b0000000111000000) >> 6)
|
||||
|
||||
def _get_leda4(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A4 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start + 1] & 0b00001110) >> 1)
|
||||
|
||||
def _get_leda5(self) -> int:
|
||||
"""
|
||||
Gibt den Zustand der LED A5 vom Core zurueck.
|
||||
|
||||
:return: 0=aus, 1=gruen, 2=root, 4=blau
|
||||
"""
|
||||
return self.__led_calculator((self._ba_devdata[self._slc_led.start + 1] & 0b01110000) >> 4)
|
||||
|
||||
def _set_leda1(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A1 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a1red(bool(value & 2))
|
||||
self.a1green(bool(value & 1))
|
||||
self.a1blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda2(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A2 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a2red(bool(value & 2))
|
||||
self.a2green(bool(value & 1))
|
||||
self.a2blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda3(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A3 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a3red(bool(value & 2))
|
||||
self.a3green(bool(value & 1))
|
||||
self.a3blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda4(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A4 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a4red(bool(value & 2))
|
||||
self.a4green(bool(value & 1))
|
||||
self.a4blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
def _set_leda5(self, value: int) -> None:
|
||||
"""
|
||||
Setzt den Zustand der LED A5 vom Connect.
|
||||
|
||||
:param: value 0=aus, 1=gruen, 2=rot, 4=blue
|
||||
"""
|
||||
if 0 <= value <= 7:
|
||||
self.a5red(bool(value & 2))
|
||||
self.a5green(bool(value & 1))
|
||||
self.a5blue(bool(value & 4))
|
||||
else:
|
||||
raise ValueError("led status must be between 0 and 7")
|
||||
|
||||
A1 = property(_get_leda1, _set_leda1)
|
||||
A2 = property(_get_leda2, _set_leda2)
|
||||
A3 = property(_get_leda3, _set_leda3)
|
||||
A4 = property(_get_leda4, _set_leda4)
|
||||
A5 = property(_get_leda5, _set_leda5)
|
||||
|
||||
|
||||
class Compact(Base):
|
||||
"""
|
||||
|
||||
@@ -208,6 +208,19 @@ class RevPiModIO(object):
|
||||
self._myfh.close()
|
||||
|
||||
def __enter__(self):
|
||||
# todo: Remove this context manager in future
|
||||
warnings.warn(
|
||||
"This context manager is deprecated and will be removed!\n\n"
|
||||
"You should use the context manager of the IO object `with revpi.io:` "
|
||||
"or with a single device `with revpi.device.my_device:`.\n\n"
|
||||
"This deprecated context manager can be reproduced as follows:\n"
|
||||
"```"
|
||||
"revpi = revpimodio2.RevPiModIO()"
|
||||
"with revpi.io:"
|
||||
" ..."
|
||||
"```",
|
||||
DeprecationWarning,
|
||||
)
|
||||
if self._context_manager:
|
||||
raise RuntimeError("can not use multiple context managers of same instance")
|
||||
if self._looprunning:
|
||||
@@ -355,6 +368,10 @@ class RevPiModIO(object):
|
||||
# RevPi Connect 4
|
||||
dev_new = devicemodule.Connect4(self, device, simulator=self._simulator)
|
||||
self.core = dev_new
|
||||
elif pt == ProductType.REVPI_CONNECT_5:
|
||||
# RevPi Connect 5
|
||||
dev_new = devicemodule.Connect5(self, device, simulator=self._simulator)
|
||||
self.core = dev_new
|
||||
elif pt == ProductType.REVPI_COMPACT:
|
||||
# RevPi Compact
|
||||
dev_new = devicemodule.Compact(self, device, simulator=self._simulator)
|
||||
|
||||
@@ -50,6 +50,7 @@ class ProductType:
|
||||
REVPI_CONNECT = 105
|
||||
REVPI_FLAT = 135
|
||||
REVPI_CONNECT_4 = 136
|
||||
REVPI_CONNECT_5 = 138
|
||||
|
||||
VIRTUAL_CLOUD = 24584
|
||||
VIRTUAL_MODBUS_TCP_SERVER = 24577
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1,309 @@
|
||||
{
|
||||
"App": {
|
||||
"name": "PiCtory",
|
||||
"version": "2.1.2",
|
||||
"saveTS": "20231031130813",
|
||||
"language": "en",
|
||||
"layout": {
|
||||
"north": {
|
||||
"size": 70,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
},
|
||||
"south": {
|
||||
"size": 257,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {
|
||||
"east": {
|
||||
"size": 500,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"east": {
|
||||
"size": 70,
|
||||
"initClosed": true,
|
||||
"initHidden": false,
|
||||
"children": {}
|
||||
},
|
||||
"west": {
|
||||
"size": 200,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"Summary": {
|
||||
"inpTotal": 77,
|
||||
"outTotal": 26
|
||||
},
|
||||
"Devices": [
|
||||
{
|
||||
"GUID": "8012178a-c632-ac37-6d81-98aae223e268",
|
||||
"id": "device_RevPiConnect4_20230409_1_0_001",
|
||||
"type": "BASE",
|
||||
"productType": "136",
|
||||
"position": "0",
|
||||
"name": "RevPi Connect 4",
|
||||
"bmk": "RevPi Connect 4",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "This is a RevPi Connect 4 Device",
|
||||
"offset": 0,
|
||||
"inp": {
|
||||
"0": [
|
||||
"RevPiStatus",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
true,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RevPiIOCycle",
|
||||
"0",
|
||||
"8",
|
||||
"1",
|
||||
true,
|
||||
"0001",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorCnt",
|
||||
"0",
|
||||
"16",
|
||||
"2",
|
||||
false,
|
||||
"0002",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"Core_Temperature",
|
||||
"0",
|
||||
"8",
|
||||
"4",
|
||||
false,
|
||||
"0003",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"4": [
|
||||
"Core_Frequency",
|
||||
"0",
|
||||
"8",
|
||||
"5",
|
||||
false,
|
||||
"0004",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"RevPiOutput",
|
||||
"0",
|
||||
"8",
|
||||
"6",
|
||||
true,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RS485ErrorLimit1",
|
||||
"10",
|
||||
"16",
|
||||
"7",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorLimit2",
|
||||
"1000",
|
||||
"16",
|
||||
"9",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"RevPiLED",
|
||||
"0",
|
||||
"16",
|
||||
"11",
|
||||
true,
|
||||
"0008",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"mem": {},
|
||||
"extend": {}
|
||||
},
|
||||
{
|
||||
"GUID": "fa804e6e-8e80-0ca5-3b96-4ec06bf99ed0",
|
||||
"id": "device_RevPiRO_20231018_1_0_001",
|
||||
"type": "LEFT_RIGHT",
|
||||
"productType": "137",
|
||||
"position": "32",
|
||||
"name": "RevPi RO",
|
||||
"bmk": "RevPi RO",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "",
|
||||
"offset": 13,
|
||||
"inp": {
|
||||
"0": [
|
||||
"Status",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
false,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"RelayOutput_1",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0001",
|
||||
"",
|
||||
"0"
|
||||
],
|
||||
"1": [
|
||||
"RelayOutput_2",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0002",
|
||||
"",
|
||||
"1"
|
||||
],
|
||||
"2": [
|
||||
"RelayOutput_3",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0003",
|
||||
"",
|
||||
"2"
|
||||
],
|
||||
"3": [
|
||||
"RelayOutput_4",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0004",
|
||||
"",
|
||||
"3"
|
||||
],
|
||||
"4": [
|
||||
"RelayOutputPadding_1",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
false,
|
||||
"0009",
|
||||
"",
|
||||
"0"
|
||||
],
|
||||
"5": [
|
||||
"RelayOutputPadding_2",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
false,
|
||||
"0010",
|
||||
"",
|
||||
"1"
|
||||
],
|
||||
"6": [
|
||||
"RelayOutputPadding_3",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
false,
|
||||
"0011",
|
||||
"",
|
||||
"2"
|
||||
],
|
||||
"7": [
|
||||
"RelayOutputPadding_4",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
false,
|
||||
"0012",
|
||||
"",
|
||||
"3"
|
||||
]
|
||||
},
|
||||
"mem": {
|
||||
"0": [
|
||||
"RelayCycleWarningThreshold_1",
|
||||
"0",
|
||||
"32",
|
||||
"2",
|
||||
false,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RelayCycleWarningThreshold_2",
|
||||
"0",
|
||||
"32",
|
||||
"6",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RelayCycleWarningThreshold_3",
|
||||
"0",
|
||||
"32",
|
||||
"10",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"RelayCycleWarningThreshold_4",
|
||||
"0",
|
||||
"32",
|
||||
"14",
|
||||
false,
|
||||
"0008",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"extend": {}
|
||||
}
|
||||
],
|
||||
"Connections": []
|
||||
}
|
||||
@@ -0,0 +1,269 @@
|
||||
{
|
||||
"App": {
|
||||
"name": "PiCtory",
|
||||
"version": "2.1.2",
|
||||
"saveTS": "20231031130813",
|
||||
"language": "en",
|
||||
"layout": {
|
||||
"north": {
|
||||
"size": 70,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
},
|
||||
"south": {
|
||||
"size": 257,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {
|
||||
"east": {
|
||||
"size": 500,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"east": {
|
||||
"size": 70,
|
||||
"initClosed": true,
|
||||
"initHidden": false,
|
||||
"children": {}
|
||||
},
|
||||
"west": {
|
||||
"size": 200,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"Summary": {
|
||||
"inpTotal": 77,
|
||||
"outTotal": 26
|
||||
},
|
||||
"Devices": [
|
||||
{
|
||||
"GUID": "8012178a-c632-ac37-6d81-98aae223e268",
|
||||
"id": "device_RevPiConnect4_20230409_1_0_001",
|
||||
"type": "BASE",
|
||||
"productType": "136",
|
||||
"position": "0",
|
||||
"name": "RevPi Connect 4",
|
||||
"bmk": "RevPi Connect 4",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "This is a RevPi Connect 4 Device",
|
||||
"offset": 0,
|
||||
"inp": {
|
||||
"0": [
|
||||
"RevPiStatus",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
true,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RevPiIOCycle",
|
||||
"0",
|
||||
"8",
|
||||
"1",
|
||||
true,
|
||||
"0001",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorCnt",
|
||||
"0",
|
||||
"16",
|
||||
"2",
|
||||
false,
|
||||
"0002",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"Core_Temperature",
|
||||
"0",
|
||||
"8",
|
||||
"4",
|
||||
false,
|
||||
"0003",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"4": [
|
||||
"Core_Frequency",
|
||||
"0",
|
||||
"8",
|
||||
"5",
|
||||
false,
|
||||
"0004",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"RevPiOutput",
|
||||
"0",
|
||||
"8",
|
||||
"6",
|
||||
true,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RS485ErrorLimit1",
|
||||
"10",
|
||||
"16",
|
||||
"7",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorLimit2",
|
||||
"1000",
|
||||
"16",
|
||||
"9",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"RevPiLED",
|
||||
"0",
|
||||
"16",
|
||||
"11",
|
||||
true,
|
||||
"0008",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"mem": {},
|
||||
"extend": {}
|
||||
},
|
||||
{
|
||||
"GUID": "fa804e6e-8e80-0ca5-3b96-4ec06bf99ed0",
|
||||
"id": "device_RevPiRO_20231018_1_0_001",
|
||||
"type": "LEFT_RIGHT",
|
||||
"productType": "137",
|
||||
"position": "32",
|
||||
"name": "RevPi RO",
|
||||
"bmk": "RevPi RO",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "",
|
||||
"offset": 13,
|
||||
"inp": {
|
||||
"0": [
|
||||
"Status",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
false,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"RelayOutput_1",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0001",
|
||||
"",
|
||||
"0"
|
||||
],
|
||||
"1": [
|
||||
"RelayOutput_2",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0002",
|
||||
"",
|
||||
"1"
|
||||
],
|
||||
"2": [
|
||||
"RelayOutput_3",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0003",
|
||||
"",
|
||||
"2"
|
||||
],
|
||||
"3": [
|
||||
"RelayOutput_4",
|
||||
"0",
|
||||
"1",
|
||||
"1",
|
||||
true,
|
||||
"0004",
|
||||
"",
|
||||
"3"
|
||||
]
|
||||
},
|
||||
"mem": {
|
||||
"0": [
|
||||
"RelayCycleWarningThreshold_1",
|
||||
"0",
|
||||
"32",
|
||||
"2",
|
||||
false,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RelayCycleWarningThreshold_2",
|
||||
"0",
|
||||
"32",
|
||||
"6",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RelayCycleWarningThreshold_3",
|
||||
"0",
|
||||
"32",
|
||||
"10",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"RelayCycleWarningThreshold_4",
|
||||
"0",
|
||||
"32",
|
||||
"13",
|
||||
false,
|
||||
"0008",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"extend": {}
|
||||
}
|
||||
],
|
||||
"Connections": []
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1,25 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test errors in config.rsc"""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestConfigRscBugs(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_overlapping(self):
|
||||
with self.assertWarnsRegex(Warning, r"RelayOutputPadding_[1-4]"):
|
||||
self.modio(configrsc="config_overlapping_bits.rsc")
|
||||
|
||||
with self.assertWarnsRegex(Warning, r"RelayCycleWarningThreshold_4"):
|
||||
self.modio(configrsc="config_overlapping_bytes.rsc")
|
||||
|
||||
def test_floating_offsets(self):
|
||||
with self.assertWarnsRegex(Warning, r"Offset value 31.5"):
|
||||
self.modio(configrsc="config_floating_offset.rsc")
|
||||
@@ -0,0 +1,116 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests instantiation all local classes."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from revpimodio2 import OUT, MEM, INP
|
||||
from revpimodio2.device import Virtual, Base
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestDevicesModule(TestRevPiModIO):
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_device(self):
|
||||
"""Test device attributes."""
|
||||
rpi = self.modio()
|
||||
|
||||
self.assertEqual(rpi.device[64].name, "virt01")
|
||||
self.assertEqual(rpi.device.virt01.length, 64)
|
||||
self.assertEqual(rpi.device.virt01.name, "virt01")
|
||||
self.assertIsInstance(rpi.device.virt01.offset, int)
|
||||
self.assertEqual(rpi.device.virt01.position, 64)
|
||||
self.assertEqual(rpi.device.virt01.producttype, 32768)
|
||||
|
||||
# Magic
|
||||
self.assertEqual("virt01" in rpi.device, True)
|
||||
self.assertEqual("nixnix" in rpi.device, False)
|
||||
self.assertEqual(64 in rpi.device, True)
|
||||
self.assertEqual(128 in rpi.device, False)
|
||||
self.assertEqual(rpi.device.virt01 in rpi.device, True)
|
||||
self.assertIsInstance(bytes(rpi.device.virt01), bytes)
|
||||
|
||||
# We have 7 devices in config.rsc file
|
||||
self.assertEqual(len(rpi.device), 7)
|
||||
|
||||
def test_devs_and_ios(self):
|
||||
"""Test IO grouping of devices."""
|
||||
rpi = self.modio()
|
||||
|
||||
self.assertEqual(len(rpi.device.virt01), 64)
|
||||
|
||||
# getIOs
|
||||
self.assertIsInstance(rpi.device.aio01.get_inputs(), list)
|
||||
self.assertIsInstance(rpi.device.aio01.get_outputs(), list)
|
||||
self.assertIsInstance(rpi.device.aio01.get_memories(), list)
|
||||
int_inputs = len(rpi.device.aio01.get_inputs())
|
||||
int_output = len(rpi.device.aio01.get_outputs())
|
||||
|
||||
self.assertIsInstance(rpi.device.aio01.get_allios(), list)
|
||||
self.assertEqual(len(rpi.device.aio01.get_allios()), int_inputs + int_output)
|
||||
|
||||
# IO Byte vergleichen
|
||||
int_byte = 0
|
||||
for devio in [rpi.device.aio01.get_allios(), rpi.device.aio01.get_memories()]:
|
||||
for io in devio:
|
||||
int_byte += io.length
|
||||
self.assertEqual(len(rpi.device.aio01), int_byte)
|
||||
|
||||
# Test the types of IOs
|
||||
len_end = 0
|
||||
len_start = len_end
|
||||
for io in rpi.device.aio01.get_inputs():
|
||||
self.assertEqual(io.type, INP)
|
||||
len_end += io.length
|
||||
self.assertEqual(len_start, rpi.device.aio01._slc_inp.start)
|
||||
self.assertEqual(len_end, rpi.device.aio01._slc_inp.stop)
|
||||
|
||||
len_start = len_end
|
||||
for io in rpi.device.aio01.get_outputs():
|
||||
self.assertEqual(io.type, OUT)
|
||||
len_end += io.length
|
||||
self.assertEqual(len_start, rpi.device.aio01._slc_out.start)
|
||||
self.assertEqual(len_end, rpi.device.aio01._slc_out.stop)
|
||||
|
||||
len_start = len_end
|
||||
for io in rpi.device.aio01.get_memories():
|
||||
self.assertEqual(io.type, MEM)
|
||||
len_end += io.length
|
||||
self.assertEqual(len_start, rpi.device.aio01._slc_mem.start)
|
||||
self.assertEqual(len_end, rpi.device.aio01._slc_mem.stop)
|
||||
|
||||
def test_device_modifications(self):
|
||||
"""Test device modifications."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Zugriffe
|
||||
self.assertIsInstance(rpi.device.virt01, Virtual)
|
||||
self.assertIsInstance(rpi.device["virt01"], Virtual)
|
||||
|
||||
# IO-Abfragen
|
||||
self.assertEqual("pbit0_7" in rpi.device.virt01, True)
|
||||
self.assertEqual(rpi.io.pbit0_7 in rpi.device.virt01, True)
|
||||
self.assertEqual(33 in rpi.device.virt01, False)
|
||||
self.assertEqual(552 in rpi.device.virt01, True)
|
||||
|
||||
# Löschen
|
||||
del rpi.device.virt01
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.device.virt01
|
||||
self.assertEqual(rpi.device[0].name, "picore01")
|
||||
del rpi.device[0]
|
||||
with self.assertRaises(IndexError):
|
||||
rpi.device[0]
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.device.picore01
|
||||
|
||||
del rpi.device[rpi.device.di01]
|
||||
|
||||
def test_new_basedevice(self):
|
||||
"""Test unknown (new) base device."""
|
||||
rpi = self.modio(configrsc="config_new_base.rsc")
|
||||
self.assertEqual(type(rpi.device[0]), Base)
|
||||
del rpi
|
||||
@@ -0,0 +1,139 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests instantiation all local classes."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os import remove
|
||||
from os.path import join, dirname
|
||||
from shutil import copyfile
|
||||
|
||||
import revpimodio2
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestInitModio(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_init_classes(self):
|
||||
"""Tests instantiation."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
revpimodio2.RevPiModIO(
|
||||
procimg=self.fh_procimg.name,
|
||||
configrsc="/opt/KUNBUS/config_lock.rsc",
|
||||
)
|
||||
|
||||
# Prepare default args for direct ModIO classes
|
||||
defaultkwargs = {
|
||||
"procimg": self.fh_procimg.name,
|
||||
"configrsc": join(self.data_dir, "config.rsc"),
|
||||
}
|
||||
|
||||
# Datei an richtigen Ort kopieren und löschen
|
||||
copyfile(defaultkwargs["configrsc"], "/opt/KUNBUS/config.rsc")
|
||||
rpi = revpimodio2.RevPiModIO(procimg=self.fh_procimg.name)
|
||||
del rpi
|
||||
remove("/opt/KUNBUS/config.rsc")
|
||||
|
||||
# RevPiModIO
|
||||
rpi = self.modio()
|
||||
del rpi
|
||||
rpi = self.modio(autorefresh=True)
|
||||
rpi.cleanup()
|
||||
del rpi
|
||||
rpi = self.modio(monitoring=True)
|
||||
del rpi
|
||||
rpi = self.modio(syncoutputs=False)
|
||||
del rpi
|
||||
rpi = self.modio(simulator=True)
|
||||
del rpi
|
||||
|
||||
# Init with old config.rsc and same device names
|
||||
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
|
||||
rpi = self.modio(configrsc="config_old.rsc")
|
||||
self.assertEqual(rpi.device.virt01.position, 64)
|
||||
self.assertEqual(rpi.device["virt01"].position, 64)
|
||||
self.assertEqual(len(rpi.device), 6)
|
||||
del rpi
|
||||
|
||||
# Init with unknown DeviceType
|
||||
with self.assertWarnsRegex(Warning, r"device type 'XXX' on position 64 unknown"):
|
||||
rpi = self.modio(configrsc="config_unknown.rsc")
|
||||
self.assertEqual(len(rpi.device), 5)
|
||||
del rpi
|
||||
|
||||
# Init with empty config
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.modio(configrsc="config_empty.rsc")
|
||||
|
||||
# Init with RevPi 1.1
|
||||
rpi = self.modio(configrsc="config_rpi11.rsc")
|
||||
self.assertEqual(len(rpi.device), 4)
|
||||
del rpi
|
||||
|
||||
# Init with 'null' JSON
|
||||
rpi = self.modio(configrsc="config_null.rsc")
|
||||
# notaus_ok null
|
||||
# motorschutz_ok "null"
|
||||
self.assertFalse(rpi.io.notaus_ok._defaultvalue)
|
||||
self.assertFalse(rpi.io.motorschutz_ok._defaultvalue)
|
||||
# self.assertEqual(len(rpi.device), 4)
|
||||
del rpi
|
||||
|
||||
# RevPiModIOSelected
|
||||
rpi = revpimodio2.RevPiModIOSelected([32, 33], **defaultkwargs)
|
||||
self.assertEqual(2, len(rpi.device))
|
||||
del rpi
|
||||
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
|
||||
# Liste mit einem ungültigen Device als <class 'list'>
|
||||
rpi = revpimodio2.RevPiModIOSelected([32, 10], **defaultkwargs)
|
||||
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
|
||||
# Ungültiges Device als <class 'int'>
|
||||
rpi = revpimodio2.RevPiModIOSelected(100, **defaultkwargs)
|
||||
with self.assertRaises(ValueError):
|
||||
# Ungültiger Devicetype
|
||||
rpi = revpimodio2.RevPiModIOSelected([True], **defaultkwargs)
|
||||
|
||||
ds = revpimodio2.modio.DevSelect(
|
||||
"", "productType", (str(revpimodio2.pictory.ProductType.DI),)
|
||||
)
|
||||
rpi = revpimodio2.RevPiModIOSelected(ds, **defaultkwargs)
|
||||
self.assertEqual(len(rpi.device), 2)
|
||||
del rpi
|
||||
|
||||
ds = revpimodio2.modio.DevSelect("", "bmk", ("RevPi DO",))
|
||||
rpi = revpimodio2.RevPiModIOSelected(ds, **defaultkwargs)
|
||||
self.assertEqual(len(rpi.device), 2)
|
||||
del rpi
|
||||
|
||||
# RevPiModIODriver
|
||||
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
|
||||
# Liste mit einem ungültigen Device als <class 'list'>
|
||||
rpi = revpimodio2.RevPiModIODriver([64, 100], **defaultkwargs)
|
||||
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
|
||||
# Ungültiges Device als <class 'int'>
|
||||
rpi = revpimodio2.RevPiModIODriver([100], **defaultkwargs)
|
||||
with self.assertRaises(ValueError):
|
||||
# Ungültiger Devicetype
|
||||
rpi = revpimodio2.RevPiModIODriver([True], **defaultkwargs)
|
||||
|
||||
rpi = revpimodio2.RevPiModIODriver(64, **defaultkwargs)
|
||||
self.assertEqual(1, len(rpi.device))
|
||||
del rpi
|
||||
rpi = revpimodio2.RevPiModIODriver("virt01", **defaultkwargs)
|
||||
self.assertEqual(1, len(rpi.device))
|
||||
del rpi
|
||||
|
||||
# Core ios als bits
|
||||
rpi = self.modio(configrsc="config_core_bits.json")
|
||||
del rpi
|
||||
|
||||
# Bad offset
|
||||
with self.assertWarnsRegex(
|
||||
Warning,
|
||||
r"(Device offset ERROR in piCtory configuration!|"
|
||||
r"is not in the device offset and will be ignored)",
|
||||
):
|
||||
rpi = self.modio(configrsc="config_bad_offset.rsc")
|
||||
del rpi
|
||||
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests instantiation all local classes."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
import os
|
||||
from os.path import join, dirname
|
||||
from signal import SIGINT
|
||||
from threading import Event
|
||||
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestModioClassBasics(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_appclass(self):
|
||||
"""Test the .app class."""
|
||||
rpi = self.modio()
|
||||
self.assertEqual(rpi.app.language, "en")
|
||||
self.assertEqual(rpi.app.name, "PiCtory")
|
||||
self.assertEqual(rpi.app.version, "1.2.3")
|
||||
|
||||
self.assertEqual(rpi.app.savets.tm_year, 2017)
|
||||
self.assertEqual(rpi.app.savets.tm_hour, 12)
|
||||
del rpi
|
||||
|
||||
# Alte config ohne saveTS
|
||||
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
|
||||
rpi = self.modio(configrsc="config_old.rsc")
|
||||
self.assertIsNone(rpi.app.savets)
|
||||
del rpi
|
||||
|
||||
rpi = self.modio(configrsc="config_wrong_tstime.rsc")
|
||||
self.assertEqual(rpi.app.savets.tm_year, 1970)
|
||||
del rpi
|
||||
|
||||
def test_modio_attributes(self):
|
||||
"""Test class attributs of RevPiModIO."""
|
||||
rpi = self.modio()
|
||||
|
||||
self.assertEqual(rpi.configrsc, join(self.data_dir, "config.rsc"))
|
||||
self.assertEqual(rpi.cycletime, 20)
|
||||
rpi.cycletime = 60
|
||||
self.assertEqual(rpi.cycletime, 60)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.cycletime = 4
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.cycletime = 2001
|
||||
|
||||
self.assertEqual(rpi.ioerrors, 0)
|
||||
self.assertIs(type(rpi.length), int)
|
||||
self.assertEqual(rpi.maxioerrors, 0)
|
||||
rpi.maxioerrors = 200
|
||||
self.assertEqual(rpi.maxioerrors, 200)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.maxioerrors = -5
|
||||
self.assertEqual(rpi.monitoring, False)
|
||||
self.assertEqual(rpi.procimg, self.fh_procimg.name)
|
||||
self.assertEqual(rpi.simulator, False)
|
||||
self.assertIsNone(rpi.resetioerrors())
|
||||
|
||||
# Exitevent
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.handlesignalend(False)
|
||||
evt_cleanup = Event()
|
||||
|
||||
def test_cleanup_function():
|
||||
# Test dummy for cleanup function
|
||||
evt_cleanup.set()
|
||||
|
||||
rpi.handlesignalend(test_cleanup_function)
|
||||
os.kill(os.getpid(), SIGINT)
|
||||
self.assertTrue(evt_cleanup.is_set())
|
||||
|
||||
def test_procimg(self):
|
||||
"""Test interaction with process image."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Procimg IO alle
|
||||
self.assertIsNone(rpi.setdefaultvalues())
|
||||
self.assertEqual(rpi.writeprocimg(), True)
|
||||
self.assertEqual(rpi.syncoutputs(), True)
|
||||
self.assertEqual(rpi.readprocimg(), True)
|
||||
|
||||
# Procimg IO device
|
||||
self.assertIsNone(rpi.device.virt01.setdefaultvalues())
|
||||
self.assertEqual(rpi.device.virt01.writeprocimg(), True)
|
||||
self.assertEqual(rpi.device.virt01.syncoutputs(), True)
|
||||
self.assertEqual(rpi.device.virt01.readprocimg(), True)
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -4,10 +4,10 @@ __author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import join, dirname
|
||||
from os.path import dirname
|
||||
|
||||
import revpimodio2
|
||||
from tests import TestRevPiModIO
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestCompact(TestRevPiModIO):
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,129 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test cycle loop functions."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname, join
|
||||
from time import sleep
|
||||
|
||||
import revpimodio2
|
||||
from .. import TestRevPiModIO
|
||||
from ..helper import ExitThread
|
||||
|
||||
event_data = (None, None)
|
||||
|
||||
|
||||
def xxx(name, value):
|
||||
"""Test event function."""
|
||||
global event_data
|
||||
event_data = (name, value)
|
||||
|
||||
|
||||
def xxx_thread(th):
|
||||
"""Test event function with thread."""
|
||||
global event_data
|
||||
event_data = (th.ioname, th.iovalue)
|
||||
th.stop()
|
||||
|
||||
|
||||
def xxx_timeout(name, value):
|
||||
"""Test event with long timeout."""
|
||||
sleep(0.1)
|
||||
|
||||
|
||||
class TestCycleloop(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def setUp(self):
|
||||
global event_data
|
||||
event_data = (None, None)
|
||||
super().setUp()
|
||||
|
||||
def test_cycleloop(self):
|
||||
"""Testet Cycleloop-Funktion."""
|
||||
rpi = self.modio()
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.cycleloop(zyklus, 51)
|
||||
|
||||
rpi.autorefresh_all()
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.cycleloop(False, 51)
|
||||
rpi.cycleloop(zyklus, 51)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.cycleloop(lambda: None)
|
||||
|
||||
rpi.exit()
|
||||
|
||||
rpi.autorefresh_all()
|
||||
sleep(0.1)
|
||||
rpi._imgwriter.stop()
|
||||
sleep(0.1)
|
||||
with self.assertRaisesRegex(RuntimeError, r"autorefresh thread not running"):
|
||||
rpi.cycleloop(zyklus)
|
||||
|
||||
rpi.exit()
|
||||
|
||||
def test_cycleloop_longtime(self):
|
||||
"""Testet no data."""
|
||||
rpi = self.modio(autorefresh=True)
|
||||
rpi.debug = -1
|
||||
rpi._imgwriter.lck_refresh.acquire()
|
||||
th_ende = ExitThread(rpi, 4)
|
||||
th_ende.start()
|
||||
|
||||
with self.assertWarnsRegex(
|
||||
RuntimeWarning, r"no new io data in cycle loop for 2500 milliseconds"
|
||||
):
|
||||
rpi.cycleloop(zyklus)
|
||||
|
||||
rpi.exit()
|
||||
|
||||
def test_cycletools(self):
|
||||
rpi = self.modio()
|
||||
ct = revpimodio2.Cycletools(50, rpi)
|
||||
with self.assertRaises(TypeError):
|
||||
ct.changed("bad_value")
|
||||
with self.assertRaises(ValueError):
|
||||
ct.changed(rpi.io.magazin1, edge=revpimodio2._internal.RISING)
|
||||
del rpi
|
||||
|
||||
def test_run_plc(self):
|
||||
self.assertEqual(
|
||||
revpimodio2.run_plc(
|
||||
zyklus,
|
||||
cycletime=30,
|
||||
procimg=self.fh_procimg.name,
|
||||
configrsc=join(self.data_dir, "config.rsc"),
|
||||
),
|
||||
1,
|
||||
)
|
||||
|
||||
|
||||
def zyklus(ct):
|
||||
"""Cycle program for testing the cycle loop."""
|
||||
if ct.flag10c:
|
||||
ct.set_ton("test", 100)
|
||||
ct.set_tof("test", 100)
|
||||
ct.set_tp("test", 100)
|
||||
ct.set_tonc("testc", 3)
|
||||
ct.set_tofc("testc", 3)
|
||||
ct.set_tpc("testc", 3)
|
||||
|
||||
ct.get_ton("test")
|
||||
ct.get_tof("test")
|
||||
ct.get_tp("test")
|
||||
ct.get_tonc("testc")
|
||||
ct.get_tofc("testc")
|
||||
ct.get_tpc("testc")
|
||||
|
||||
t = ct.runtime
|
||||
|
||||
# Check change
|
||||
ct.changed(ct.io.v_druck, edge=revpimodio2._internal.RISING)
|
||||
ct.changed(ct.io.magazin1)
|
||||
|
||||
if ct.flag20c:
|
||||
return 1
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,135 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test events."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
from threading import Event
|
||||
from time import sleep
|
||||
|
||||
from revpimodio2 import RISING, FALLING
|
||||
from .. import TestRevPiModIO
|
||||
from ..helper import ChangeThread
|
||||
|
||||
event_data = (None, None)
|
||||
|
||||
|
||||
def xxx(name, value):
|
||||
"""Test event function."""
|
||||
global event_data
|
||||
event_data = (name, value)
|
||||
|
||||
|
||||
class TestEvents(TestRevPiModIO):
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def setUp(self):
|
||||
global event_data
|
||||
event_data = (None, None)
|
||||
super().setUp()
|
||||
|
||||
def test_events(self):
|
||||
"""Test event functions."""
|
||||
rpi = self.modio()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.magazin1.reg_event(xxx, edge=RISING)
|
||||
rpi.io.magazin1.reg_event(xxx)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.magazin1.reg_event(xxx)
|
||||
self.assertTrue(rpi.io.magazin1 in rpi.device.virt01._dict_events)
|
||||
rpi.io.magazin1.unreg_event()
|
||||
self.assertFalse(rpi.io.magazin1 in rpi.device.virt01._dict_events)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.reg_event(None)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.reg_event(xxx, delay=-100)
|
||||
rpi.io.v_druck.reg_event(xxx, delay=100, edge=RISING)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.v_druck.reg_event(xxx)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.v_druck.reg_event(xxx, edge=RISING)
|
||||
rpi.io.v_druck.reg_event(xxx, edge=FALLING, as_thread=True)
|
||||
rpi.io.v_druck.reg_event(lambda name, value: None, edge=FALLING, as_thread=True)
|
||||
self.assertEqual(len(rpi.device.do01._dict_events[rpi.io.v_druck]), 3)
|
||||
|
||||
rpi.io.v_druck.unreg_event(xxx, RISING)
|
||||
|
||||
def test_dd_timer_events(self):
|
||||
"""Testet timer events."""
|
||||
rpi = self.modio()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.magazin1.reg_timerevent(xxx, 200, edge=RISING)
|
||||
rpi.io.magazin1.reg_timerevent(xxx, 200)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.magazin1.reg_timerevent(xxx, 200)
|
||||
rpi.io.magazin1.reg_timerevent(lambda name, value: None, 200)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.reg_timerevent(None, 200)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.reg_timerevent(xxx, -100)
|
||||
rpi.io.v_druck.reg_timerevent(xxx, 100, edge=RISING)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.v_druck.reg_timerevent(xxx, 200)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.v_druck.reg_timerevent(xxx, 200, edge=RISING)
|
||||
rpi.io.v_druck.reg_timerevent(xxx, 200, edge=FALLING, as_thread=True)
|
||||
|
||||
self.assertEqual(len(rpi.device.do01._dict_events[rpi.io.v_druck]), 2)
|
||||
rpi.io.v_druck.unreg_event(xxx, RISING)
|
||||
self.assertEqual(len(rpi.device.do01._dict_events[rpi.io.v_druck]), 1)
|
||||
rpi.io.v_druck.unreg_event(xxx, FALLING)
|
||||
self.assertFalse(rpi.io.v_druck in rpi.device.do01._dict_events)
|
||||
rpi.io.magazin1.unreg_event()
|
||||
|
||||
def test_dh_wait(self):
|
||||
"""Test .wait functions of IOs."""
|
||||
rpi = self.modio()
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.v_druck.wait()
|
||||
|
||||
rpi.autorefresh_all()
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.wait(edge=30)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.wait(edge=34)
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io.v_druck.wait(exitevent=True)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.v_druck.wait(timeout=-1)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.meldung0_7.wait(edge=RISING)
|
||||
|
||||
self.assertEqual(rpi.io.v_druck.wait(okvalue=False), -1)
|
||||
self.assertEqual(rpi.io.v_druck.wait(timeout=100), 2)
|
||||
self.assertEqual(rpi.io.v_druck.wait(edge=RISING, timeout=100), 2)
|
||||
|
||||
# Exit event caught
|
||||
x = Event()
|
||||
x.set()
|
||||
self.assertEqual(rpi.io.v_druck.wait(exitevent=x), 1)
|
||||
|
||||
# Successful waiting
|
||||
th = ChangeThread(rpi, "fu_lahm", True, 0.5)
|
||||
th.start()
|
||||
|
||||
self.assertEqual(rpi.io.fu_lahm.wait(), 0)
|
||||
|
||||
th = ChangeThread(rpi, "fu_lahm", False, 0.3)
|
||||
th.start()
|
||||
th = ChangeThread(rpi, "fu_lahm", True, 0.6)
|
||||
th.start()
|
||||
|
||||
self.assertEqual(rpi.io.fu_lahm.wait(edge=RISING), 0)
|
||||
|
||||
# Error while refreshing IO data
|
||||
rpi._imgwriter.stop()
|
||||
self.assertEqual(rpi.io.v_druck.wait(timeout=100), 2)
|
||||
|
||||
rpi.io.fu_lahm.value = False
|
||||
sleep(0.1)
|
||||
rpi.exit()
|
||||
@@ -0,0 +1,29 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test signals."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from .. import TestRevPiModIO
|
||||
from ..helper import ExitSignal
|
||||
|
||||
|
||||
class TestSignals(TestRevPiModIO):
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_handle_signal_end(self):
|
||||
rpi = self.modio(autorefresh=True)
|
||||
rpi.io.v_druck.value = True
|
||||
|
||||
def ende():
|
||||
rpi.io.v_druck.value = False
|
||||
|
||||
rpi.handlesignalend(ende)
|
||||
|
||||
th_ende = ExitSignal(1)
|
||||
th_ende.start()
|
||||
rpi.mainloop()
|
||||
|
||||
self.assertFalse(rpi.io.v_druck.value)
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -7,7 +7,7 @@ __license__ = "GPLv2"
|
||||
from os.path import dirname
|
||||
|
||||
import revpimodio2
|
||||
from tests import TestRevPiModIO
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestFlat(TestRevPiModIO):
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Helper functions for all tests."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
import os
|
||||
from signal import SIGINT
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
|
||||
class ChangeThread(Thread):
|
||||
"""Thread to change IO values in background."""
|
||||
|
||||
def __init__(self, revpi, ioname, iovalue, time):
|
||||
"""Init ChangeThread-class."""
|
||||
super().__init__()
|
||||
self.revpi = revpi
|
||||
self.ioname = ioname
|
||||
self.time = time
|
||||
self.iovalue = iovalue
|
||||
|
||||
def run(self):
|
||||
"""Thread starten."""
|
||||
sleep(self.time)
|
||||
self.revpi.io[self.ioname].value = self.iovalue
|
||||
|
||||
|
||||
class ExitSignal(Thread):
|
||||
"""Call SIGINT after given time."""
|
||||
|
||||
def __init__(self, time):
|
||||
"""Signal SIGINT after given time."""
|
||||
super().__init__()
|
||||
self.time = time
|
||||
|
||||
def run(self):
|
||||
sleep(self.time)
|
||||
os.kill(os.getpid(), SIGINT)
|
||||
|
||||
|
||||
class ExitThread(Thread):
|
||||
"""Call .exit() of ModIO after given time."""
|
||||
|
||||
def __init__(self, revpi, time):
|
||||
""""""
|
||||
super().__init__()
|
||||
self.revpi = revpi
|
||||
self.time = time
|
||||
|
||||
def run(self):
|
||||
sleep(self.time)
|
||||
self.revpi.exit()
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,97 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests instantiation all local classes."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from revpimodio2 import OUT
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestIoFunctions(TestRevPiModIO):
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_io_base(self):
|
||||
"""Test io attributes."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Transformations
|
||||
self.assertEqual(rpi.io.v_druck.address, 307)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.address = 10
|
||||
self.assertEqual(rpi.io.v_druck.byteorder, "little")
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.byteorder = "big"
|
||||
self.assertIsInstance(rpi.io.v_druck.defaultvalue, bool)
|
||||
self.assertEqual(rpi.io.v_druck.defaultvalue, 0)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.defaultvalue = 255
|
||||
self.assertEqual(rpi.io.v_druck.length, 0)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.length = 2
|
||||
self.assertEqual(rpi.io.v_druck.name, "v_druck")
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.name = "test"
|
||||
self.assertEqual(rpi.io.v_druck.type, OUT)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.type = 399
|
||||
self.assertFalse(rpi.io.v_druck.value)
|
||||
|
||||
self.assertFalse(rpi.io.v_druck._read_only_io)
|
||||
self.assertTrue(rpi.io.t_stop._read_only_io)
|
||||
|
||||
rpi.io.v_druck(True)
|
||||
self.assertTrue(rpi.io.v_druck.value)
|
||||
rpi.io.v_druck.value = False
|
||||
self.assertFalse(rpi.io.v_druck())
|
||||
|
||||
# Magic-function __call__
|
||||
self.assertEqual(rpi.io.pbit0_7(), 0)
|
||||
self.assertFalse(bool(rpi.io.v_druck))
|
||||
self.assertFalse(bool(rpi.io.magazin1))
|
||||
rpi.io.magazin1(129)
|
||||
self.assertEqual(int(rpi.io.magazin1), 129)
|
||||
self.assertEqual(rpi.io.magazin1(), 129)
|
||||
rpi.io.magazin1.value = 128
|
||||
self.assertTrue(bool(rpi.io.magazin1))
|
||||
self.assertEqual(int(rpi.io.magazin1), 128)
|
||||
self.assertEqual(rpi.io.magazin1(), 128)
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io.magazin1(b"\x00")
|
||||
|
||||
rpi.io.meldung0_7.replace_io("test4", frm="?", bit=4)
|
||||
rpi.io.test4(True)
|
||||
rpi.io.test4(False)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.magazin1.byteorder = 0
|
||||
rpi.io.magazin1.byteorder = "big"
|
||||
self.assertIsInstance(rpi.io.magazin1.defaultvalue, int)
|
||||
|
||||
# Signed and unsigned change
|
||||
self.assertEqual(rpi.io.magazin1.value, 128)
|
||||
self.assertEqual(rpi.io.magazin1.signed, False)
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io.magazin1.signed = 0
|
||||
rpi.io.magazin1.signed = True
|
||||
self.assertEqual(rpi.io.magazin1.signed, True)
|
||||
self.assertEqual(rpi.io.magazin1.value, -128)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io.magazin1.value = "test"
|
||||
|
||||
# Cound IOs
|
||||
int_len = len(rpi.io)
|
||||
int_iter = 0
|
||||
for myio in rpi.io:
|
||||
int_iter += 1
|
||||
self.assertEqual(int_len, int_iter)
|
||||
|
||||
self.assertEqual(rpi.io["v_druck"].name, "v_druck")
|
||||
with self.assertRaises(IndexError):
|
||||
rpi.io[8192]
|
||||
with self.assertRaises(AttributeError):
|
||||
# Prevent input assignment
|
||||
rpi.io.v_druck = True
|
||||
@@ -0,0 +1,107 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests instantiation all local classes."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from revpimodio2.io import IntIOCounter
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestIos(TestRevPiModIO):
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_ios(self):
|
||||
"""Test values of IOs."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Change values
|
||||
rpi.io.magazin1.value = 255
|
||||
self.assertEqual(rpi.io.magazin1.value, 255)
|
||||
rpi.device.virt01.setdefaultvalues()
|
||||
self.assertEqual(rpi.io.magazin1.value, 0)
|
||||
|
||||
# Use __call__ function
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io.magazin1.set_value(44)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.magazin1.set_value(b"\x01\x01")
|
||||
rpi.io.magazin1.set_value(b"\x01")
|
||||
self.assertEqual(rpi.io.magazin1.value, 1)
|
||||
|
||||
# Inputs and Mems
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.magazin1_max.set_value(b"\x01")
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.InputMode_1.set_value(b"\x01")
|
||||
|
||||
rpi.io.magazin1_max._iotype = 303
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.magazin1_max.set_value(b"\x01")
|
||||
|
||||
def test_counter_io(self):
|
||||
"""Test counter inputs."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Just for testing buffered mode
|
||||
rpi._buffedwrite = True
|
||||
|
||||
# Counter vorbereiten
|
||||
self.fh_procimg.seek(rpi.io.Counter_1.address)
|
||||
self.fh_procimg.write(b"\x00\x01")
|
||||
rpi.readprocimg()
|
||||
|
||||
self.assertEqual(type(rpi.io.Counter_1), IntIOCounter)
|
||||
self.assertEqual(rpi.io.Counter_1.value, 256)
|
||||
rpi.io.Counter_1.reset()
|
||||
rpi.readprocimg()
|
||||
self.assertEqual(rpi.io.Counter_1.value, 0)
|
||||
|
||||
# This will use ioctl calls
|
||||
rpi._run_on_pi = True
|
||||
|
||||
with self.assertWarnsRegex(RuntimeWarning, r"'iorst' and count \d"):
|
||||
rpi.io.Counter_1.reset()
|
||||
self.assertEqual(rpi.ioerrors, 1)
|
||||
|
||||
del rpi
|
||||
|
||||
rpi = self.modio(monitoring=True)
|
||||
self.assertEqual(type(rpi.io.Counter_2), IntIOCounter)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.Counter_2.reset()
|
||||
del rpi
|
||||
|
||||
rpi = self.modio(simulator=True)
|
||||
self.assertEqual(type(rpi.io.Counter_3), IntIOCounter)
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.io.Counter_3.reset()
|
||||
del rpi
|
||||
|
||||
def test_superio(self):
|
||||
"""Testet mehrbittige IOs."""
|
||||
rpi = self.modio(configrsc="config_supervirt.rsc")
|
||||
|
||||
# Adressen und Längen prüfen
|
||||
self.assertEqual(rpi.device[65]._offset, 75)
|
||||
|
||||
self.assertEqual(rpi.io.InBit_1.length, 1)
|
||||
self.assertEqual(rpi.io.InBit_2.length, 0)
|
||||
self.assertEqual(rpi.io.InBit_6.address, 75)
|
||||
self.assertEqual(rpi.io.InBit_48.address, 80)
|
||||
self.assertEqual(rpi.io.InDword_1.address, 99)
|
||||
self.assertEqual(rpi.io.OutBit_1.length, 1)
|
||||
self.assertEqual(rpi.io.OutBit_2.length, 0)
|
||||
self.assertEqual(rpi.io.OutBit_8.address, 107)
|
||||
self.assertEqual(rpi.io.OutBit_9.address, 108)
|
||||
|
||||
self.assertEqual(len(rpi.device[65]._ba_devdata), 64)
|
||||
|
||||
# Inputs setzen
|
||||
rpi.io.OutBit_6.value = True
|
||||
self.assertTrue(rpi.io.OutBit_6.value)
|
||||
self.assertEqual(rpi.device[65]._ba_devdata[32:38], b"\x20\x00\x00\x00\x00\x00")
|
||||
rpi.io.OutBit_48.value = True
|
||||
self.assertEqual(rpi.device[65]._ba_devdata[32:38], b"\x20\x00\x00\x00\x00\x80")
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,228 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test mainloop functions."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
from time import sleep
|
||||
|
||||
import revpimodio2
|
||||
from .. import TestRevPiModIO
|
||||
from ..helper import ExitThread, ChangeThread
|
||||
|
||||
event_data = (None, None)
|
||||
|
||||
|
||||
def xxx(name, value):
|
||||
"""Test event function."""
|
||||
global event_data
|
||||
event_data = (name, value)
|
||||
|
||||
|
||||
def xxx_thread(th):
|
||||
"""Test event function with thread."""
|
||||
global event_data
|
||||
event_data = (th.ioname, th.iovalue)
|
||||
th.stop()
|
||||
|
||||
|
||||
def xxx_timeout(name, value):
|
||||
"""Test event with long timeout."""
|
||||
sleep(0.1)
|
||||
|
||||
|
||||
class TestMainloop(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def setUp(self):
|
||||
global event_data
|
||||
event_data = (None, None)
|
||||
super().setUp()
|
||||
|
||||
def test_mainloop(self):
|
||||
"""Test basic mainloop functions."""
|
||||
rpi = self.modio(debug=False)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
# Auto refresh not running
|
||||
rpi.mainloop()
|
||||
|
||||
# Too long refresh time
|
||||
with self.assertRaises(ValueError):
|
||||
rpi._imgwriter.refresh = 4
|
||||
|
||||
# Create events
|
||||
rpi.io.meldung0_7.reg_event(xxx)
|
||||
rpi.io.meldung8_15.reg_event(xxx_thread, as_thread=True)
|
||||
|
||||
# Start mainloop
|
||||
rpi.autorefresh_all()
|
||||
rpi.mainloop(blocking=False)
|
||||
|
||||
sleep(0.1)
|
||||
rpi.io.meldung0_7.value = 100
|
||||
sleep(0.06)
|
||||
self.assertEqual(event_data, ("meldung0_7", 100))
|
||||
rpi.io.meldung8_15.value = 200
|
||||
sleep(0.06)
|
||||
self.assertEqual(event_data, ("meldung8_15", 200))
|
||||
|
||||
self.assertEqual(rpi.ioerrors, 0)
|
||||
|
||||
rpi.exit()
|
||||
rpi.setdefaultvalues()
|
||||
sleep(0.05)
|
||||
|
||||
# Remember old IO before replacing things for tests
|
||||
io_old = rpi.io.meldung0_7
|
||||
self.assertTrue(io_old in io_old._parentdevice._dict_events)
|
||||
self.assertTrue(io_old in rpi.device.virt01)
|
||||
|
||||
rpi.io.meldung0_7.replace_io("test1", "?", event=xxx)
|
||||
rpi.io.meldung0_7.replace_io("test2", "?", bit=1, event=xxx)
|
||||
rpi.io.meldung0_7.replace_io("test3", "?", bit=2)
|
||||
rpi.io.meldung0_7.replace_io("test4", "?", bit=3, event=xxx, delay=300)
|
||||
rpi.io.meldung0_7.replace_io("test5", "?", bit=4, event=xxx_thread, as_thread=True)
|
||||
rpi.io.meldung0_7.replace_io(
|
||||
"test6", "?", bit=5, event=xxx_thread, as_thread=True, delay=200
|
||||
)
|
||||
|
||||
rpi.io.magazin1.reg_timerevent(xxx, 200)
|
||||
rpi.io.test3.reg_timerevent(xxx, 200, edge=revpimodio2._internal.RISING)
|
||||
|
||||
self.assertFalse(io_old in io_old._parentdevice._dict_events)
|
||||
self.assertFalse(io_old in rpi.device.virt01)
|
||||
|
||||
rpi.autorefresh_all()
|
||||
rpi.mainloop(blocking=False)
|
||||
sleep(0.1)
|
||||
|
||||
# Direct events
|
||||
rpi.io.test1.value = True
|
||||
sleep(0.06)
|
||||
self.assertEqual(event_data, ("test1", True))
|
||||
rpi.io.test2.value = True
|
||||
sleep(0.06)
|
||||
self.assertEqual(event_data, ("test2", True))
|
||||
|
||||
# Timer events
|
||||
rpi.io.test3.value = True
|
||||
sleep(0.1)
|
||||
self.assertEqual(event_data, ("test2", True))
|
||||
rpi.io.test3.value = False
|
||||
sleep(0.15)
|
||||
self.assertEqual(event_data, ("test3", True))
|
||||
|
||||
rpi.io.magazin1.value = 1
|
||||
rpi.io.test4.value = True
|
||||
sleep(0.1)
|
||||
self.assertEqual(event_data, ("test3", True))
|
||||
rpi.io.test4.value = False
|
||||
sleep(0.15)
|
||||
self.assertEqual(event_data, ("magazin1", 1))
|
||||
rpi.io.test4.value = True
|
||||
sleep(1)
|
||||
self.assertEqual(event_data, ("test4", True))
|
||||
|
||||
rpi.io.test5.value = True
|
||||
rpi.io.test6.value = True
|
||||
sleep(0.1)
|
||||
self.assertEqual(event_data, ("test5", True))
|
||||
sleep(0.15)
|
||||
self.assertEqual(event_data, ("test6", True))
|
||||
|
||||
self.assertFalse(rpi.exitsignal.is_set())
|
||||
|
||||
rpi.exit(full=False)
|
||||
|
||||
self.assertTrue(rpi.exitsignal.is_set())
|
||||
|
||||
rpi.io.test1.unreg_event()
|
||||
rpi.io.test1.reg_event(xxx_timeout)
|
||||
|
||||
sleep(0.3)
|
||||
|
||||
# Exceed cylcle time in main loop
|
||||
with self.assertWarnsRegex(RuntimeWarning, r"io refresh time of 0 ms exceeded!"):
|
||||
rpi._imgwriter._refresh = 0.0001
|
||||
sleep(0.1)
|
||||
rpi.exit()
|
||||
|
||||
del rpi
|
||||
|
||||
def test_mainloop_bad_things(self):
|
||||
"""Tests incorrect use of the mainloop."""
|
||||
rpi = self.modio(autorefresh=True)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
rpi._imgwriter._collect_events(1)
|
||||
|
||||
# Bad event function without needed arguments
|
||||
rpi.io.meldung0_7.replace_io("test5", "?", bit=4, event=lambda: None)
|
||||
|
||||
th = ChangeThread(rpi, "test5", True, 0.3)
|
||||
th.start()
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.mainloop()
|
||||
|
||||
sleep(0.1)
|
||||
|
||||
rpi.io.meldung0_7.replace_io("test1", "?", event=xxx_timeout)
|
||||
th_ende = ExitThread(rpi, 1)
|
||||
th = ChangeThread(rpi, "test1", True, 0.3)
|
||||
th_ende.start()
|
||||
th.start()
|
||||
with self.assertWarnsRegex(
|
||||
RuntimeWarning, r"can not execute all event functions in one cycle"
|
||||
):
|
||||
rpi.mainloop()
|
||||
|
||||
rpi.autorefresh_all()
|
||||
rpi.mainloop(blocking=False)
|
||||
# Change cycletime while running a loop
|
||||
with self.assertRaisesRegex(
|
||||
RuntimeError, r"can not change cycletime when cycleloop or mainloop is"
|
||||
):
|
||||
rpi.cycletime = 60
|
||||
|
||||
# Start second loop
|
||||
with self.assertRaisesRegex(RuntimeError, r"can not start multiple loops mainloop"):
|
||||
rpi.cycleloop(lambda x: None)
|
||||
rpi.exit()
|
||||
|
||||
# Test imgwriter monitoring
|
||||
rpi.autorefresh_all()
|
||||
sleep(0.2)
|
||||
rpi._imgwriter.stop()
|
||||
sleep(0.1)
|
||||
with self.assertRaisesRegex(RuntimeError, r"autorefresh thread not running"):
|
||||
rpi.mainloop()
|
||||
|
||||
rpi.exit()
|
||||
|
||||
def test_prefire(self):
|
||||
"""Test reg_event with prefire parameter."""
|
||||
rpi = self.modio(autorefresh=True)
|
||||
|
||||
rpi.io.fu_lahm.reg_event(xxx, prefire=True)
|
||||
self.assertFalse(rpi.io.fu_lahm.value)
|
||||
rpi.mainloop(blocking=False)
|
||||
sleep(0.1)
|
||||
|
||||
# Registration without prefire is allowed while running mainloop
|
||||
rpi.io.fu_schnell.reg_event(xxx)
|
||||
with self.assertRaises(RuntimeError):
|
||||
# Registration with prefire ist not allowed while running mainloop
|
||||
rpi.io.Counter_1.reg_event(xxx, prefire=True)
|
||||
|
||||
self.assertEqual(event_data, ("fu_lahm", False))
|
||||
rpi.cleanup()
|
||||
|
||||
rpi = self.modio(autorefresh=True)
|
||||
rpi.io.Input_32.reg_event(xxx_thread, as_thread=True, prefire=True)
|
||||
rpi.mainloop(blocking=False)
|
||||
sleep(0.1)
|
||||
self.assertEqual(event_data, ("Input_32", False))
|
||||
rpi.cleanup()
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -6,13 +6,115 @@ __license__ = "GPLv2"
|
||||
|
||||
from os.path import join, dirname
|
||||
|
||||
from tests import TestRevPiModIO
|
||||
from revpimodio2.io import IntIOReplaceable
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestReplaceIO(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_replacing(self):
|
||||
"""Test replacing IOs."""
|
||||
rpi = self.modio()
|
||||
|
||||
# Test type of IOs on an virtual device
|
||||
for io in rpi.device.virt01:
|
||||
self.assertIsInstance(io, IntIOReplaceable)
|
||||
|
||||
# Try to replace hardware IO
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.v_druck.replace_io("test2", frm="?")
|
||||
|
||||
rpi.io.pbit0_7.replace_io("test4", frm="?", bit=4)
|
||||
rpi.io.pbit0_7.replace_io("test5", frm="?", bit=5, byteorder="big")
|
||||
self.assertFalse(rpi.io.test4())
|
||||
self.assertFalse(rpi.io.test4.value)
|
||||
with self.assertRaises(MemoryError):
|
||||
rpi.io.pbit0_7.replace_io("test4_2", frm="?", bit=4)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.meldung0_7.replace_io("outtest", "?", bit=100)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.meldung0_7.replace_io("outtest", "?", byteorder="test")
|
||||
|
||||
# Work with default values
|
||||
rpi.io.meldung0_7.replace_io("outtest", "?", defaultvalue=True)
|
||||
self.assertTrue(rpi.io.outtest.defaultvalue)
|
||||
self.assertFalse(rpi.io.outtest.value)
|
||||
rpi.io.outtest.value = True
|
||||
self.assertTrue(rpi.io.outtest.value)
|
||||
rpi.io.outtest.value = False
|
||||
|
||||
# Apply given default values
|
||||
rpi.setdefaultvalues()
|
||||
self.assertTrue(rpi.io.outtest.value)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.pbit8_15.replace_io("test2", frm="hf")
|
||||
|
||||
rpi.io.pbit8_15.replace_io("test2", frm="h")
|
||||
rpi.io.meldung8_15.replace_io(
|
||||
"testmeldung1",
|
||||
frm="h",
|
||||
byteorder="big",
|
||||
event=lambda io_name, io_value: None,
|
||||
)
|
||||
with self.assertRaises(MemoryError):
|
||||
rpi.io.meldung0_7.replace_io("testmeldung2", frm="h", byteorder="big")
|
||||
with self.assertRaises(TypeError):
|
||||
rpi.io._private_register_new_io_object(None)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.io.testmeldung1.replace_io("testx", frm="?")
|
||||
|
||||
self.assertEqual(rpi.io.testmeldung1.defaultvalue, 0)
|
||||
self.assertEqual(rpi.io.testmeldung1.frm, "h")
|
||||
self.assertTrue(rpi.io.testmeldung1.signed)
|
||||
self.assertEqual(rpi.io.testmeldung1.value, 0)
|
||||
|
||||
# Set value
|
||||
rpi.io.testmeldung1.value = 200
|
||||
self.assertEqual(rpi.io.testmeldung1(), 200)
|
||||
rpi.io.testmeldung1(100)
|
||||
self.assertEqual(rpi.io.testmeldung1.value, 100)
|
||||
|
||||
with self.assertRaises(BufferError):
|
||||
rpi.io.Output_32.replace_io("test", "h")
|
||||
|
||||
# Byte value with default value
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.Output_9.replace_io("drehzahl", "H", defaultvalue=b"\x00\x00\x00")
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.Output_9.replace_io("drehzahl", "H", defaultvalue=b"\x00")
|
||||
rpi.io.Output_9.replace_io("drehzahl", "H", defaultvalue=b"\xff\xff")
|
||||
self.assertEqual(rpi.io.drehzahl.frm, "H")
|
||||
self.assertFalse(rpi.io.drehzahl.signed)
|
||||
self.assertEqual(rpi.io.drehzahl.defaultvalue, 65535)
|
||||
self.assertEqual(rpi.io.drehzahl.value, 0)
|
||||
rpi.setdefaultvalues()
|
||||
self.assertEqual(rpi.io.drehzahl.value, 65535)
|
||||
|
||||
# Bit value with defaultvalue
|
||||
rpi.io.Output_11._defaultvalue = b"\x02"
|
||||
rpi.io.Output_11.replace_io("bitwert0", "?", bit=0)
|
||||
rpi.io.Output_11.replace_io("bitwert1", "?", bit=1)
|
||||
self.assertFalse(rpi.io.bitwert0.defaultvalue)
|
||||
self.assertTrue(rpi.io.bitwert1.defaultvalue)
|
||||
|
||||
# Multi bytes
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.io.Output_11.replace_io("mehrbyte", "ss")
|
||||
rpi.io.Output_11.replace_io("mehrbyte", "4s")
|
||||
self.assertEqual(rpi.io.mehrbyte.length, 4)
|
||||
self.assertEqual(rpi.io.mehrbyte.frm, "4s")
|
||||
self.assertEqual(rpi.io.mehrbyte.value, b"\x00\x00\x00\x00")
|
||||
rpi.io.mehrbyte.value = b"\xff\xff\xff\xff"
|
||||
self.assertEqual(rpi.io.mehrbyte.value, b"\xff\xff\xff\xff")
|
||||
|
||||
# String defaultvalue (Encoding erros are filled with \x00)
|
||||
rpi.io.Output_15.replace_io("string", "4s", defaultvalue="t\xffst")
|
||||
self.assertEqual(rpi.io.string.value, b"\x00\x00\x00\x00")
|
||||
|
||||
def test_replace_io_file(self):
|
||||
replace_io_file = join(self.data_dir, "replace_io.conf")
|
||||
rpi = self.modio(replace_io_file=replace_io_file)
|
||||
@@ -44,22 +146,18 @@ class TestReplaceIO(TestRevPiModIO):
|
||||
self.assertTrue(rpi.io.byte_test.export)
|
||||
self.assertEqual(rpi.io.byte_test.defaultvalue, b"\xff\x00\x80")
|
||||
|
||||
def test_fb_replace_io_fail(self):
|
||||
def test_replace_io_file_fail(self):
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi = self.modio(replace_io_file=join(self.data_dir, "replace_io_fail.conf"))
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_fail.conf"))
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi = self.modio(replace_io_file="no_file_nonono")
|
||||
self.modio(replace_io_file="no_file_nonono")
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi = self.modio(replace_io_file=join(self.data_dir, "replace_io_failformat.conf"))
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_failformat.conf"))
|
||||
with self.assertRaises(ValueError):
|
||||
rpi = self.modio(
|
||||
replace_io_file=join(self.data_dir, "replace_io_faildefaultvalue_bool.conf")
|
||||
)
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_faildefaultvalue_bool.conf"))
|
||||
with self.assertRaises(ValueError):
|
||||
rpi = self.modio(
|
||||
replace_io_file=join(self.data_dir, "replace_io_faildefaultvalue_int.conf")
|
||||
)
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_faildefaultvalue_int.conf"))
|
||||
with self.assertRaises(ValueError):
|
||||
rpi = self.modio(replace_io_file=join(self.data_dir, "replace_io_failbit_int.conf"))
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_failbit_int.conf"))
|
||||
with self.assertRaisesRegex(ValueError, r"defaultvalue to bytes"):
|
||||
rpi = self.modio(replace_io_file=join(self.data_dir, "replace_io_bytes_fail.conf"))
|
||||
self.modio(replace_io_file=join(self.data_dir, "replace_io_bytes_fail.conf"))
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -0,0 +1,825 @@
|
||||
{
|
||||
"App": {
|
||||
"name": "PiCtory",
|
||||
"version": "1.3.10",
|
||||
"saveTS": "20180731225026",
|
||||
"language": "en",
|
||||
"layout": {
|
||||
"north": {
|
||||
"size": 70,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
},
|
||||
"south": {
|
||||
"size": 420,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {
|
||||
"east": {
|
||||
"size": 500,
|
||||
"initClosed": false,
|
||||
"initHidden": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"east": {
|
||||
"size": 70,
|
||||
"initClosed": true,
|
||||
"initHidden": false,
|
||||
"children": {}
|
||||
},
|
||||
"west": {
|
||||
"size": 200,
|
||||
"initClosed": false,
|
||||
"initHidden": false,
|
||||
"children": {
|
||||
"layout1": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"Summary": {
|
||||
"inpTotal": 38,
|
||||
"outTotal": 37
|
||||
},
|
||||
"Devices": [
|
||||
{
|
||||
"GUID": "6ad3c1a4-6870-3bf1-6d55-b9d991ba9dc0",
|
||||
"id": "device_RevPiConnect_20171023_1_0_001",
|
||||
"type": "BASE",
|
||||
"productType": "105",
|
||||
"position": "0",
|
||||
"name": "RevPi Connect V1.0",
|
||||
"bmk": "RevPi Connect V1.0",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "This is a RevPi Connect",
|
||||
"offset": 0,
|
||||
"inp": {
|
||||
"0": [
|
||||
"RevPiStatus",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
true,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RevPiIOCycle",
|
||||
"0",
|
||||
"8",
|
||||
"1",
|
||||
true,
|
||||
"0001",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorCnt",
|
||||
"0",
|
||||
"16",
|
||||
"2",
|
||||
false,
|
||||
"0002",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"Core_Temperature",
|
||||
"0",
|
||||
"8",
|
||||
"4",
|
||||
false,
|
||||
"0003",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"4": [
|
||||
"Core_Frequency",
|
||||
"0",
|
||||
"8",
|
||||
"5",
|
||||
false,
|
||||
"0004",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"RevPiLED",
|
||||
"0",
|
||||
"8",
|
||||
"6",
|
||||
true,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"RS485ErrorLimit1",
|
||||
"10",
|
||||
"16",
|
||||
"7",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"RS485ErrorLimit2",
|
||||
"1000",
|
||||
"16",
|
||||
"9",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"mem": {},
|
||||
"extend": {}
|
||||
},
|
||||
{
|
||||
"GUID": "437fb6d7-6ef6-8fc8-0bf2-f618576e1aca",
|
||||
"id": "device_RevPiConBT_20180425_1_0_001",
|
||||
"type": "RIGHT",
|
||||
"productType": "111",
|
||||
"position": "32",
|
||||
"name": "Connect Bluetooth",
|
||||
"bmk": "Connect Bluetooth",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "",
|
||||
"offset": 11,
|
||||
"inp": {},
|
||||
"out": {},
|
||||
"mem": {},
|
||||
"extend": {}
|
||||
},
|
||||
{
|
||||
"GUID": "2e9cd04b-b7e6-715a-4925-82ffbf0ff45e",
|
||||
"id": "device_Virtual01_20160818_1_0_001",
|
||||
"type": "VIRTUAL",
|
||||
"productType": "32768",
|
||||
"position": "64",
|
||||
"name": "Virtual Device 32 Byte",
|
||||
"bmk": "Virtual Device 32 Byte",
|
||||
"inpVariant": 0,
|
||||
"outVariant": 0,
|
||||
"comment": "Virtual Device to reserve space in process image for user applications",
|
||||
"offset": 11,
|
||||
"inp": {
|
||||
"0": [
|
||||
"Input_1",
|
||||
"0",
|
||||
"8",
|
||||
"0",
|
||||
false,
|
||||
"0000",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"Input_2",
|
||||
"0",
|
||||
"8",
|
||||
"1",
|
||||
false,
|
||||
"0001",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"Input_3",
|
||||
"0",
|
||||
"8",
|
||||
"2",
|
||||
false,
|
||||
"0002",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"Input_4",
|
||||
"0",
|
||||
"8",
|
||||
"3",
|
||||
false,
|
||||
"0003",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"4": [
|
||||
"Input_5",
|
||||
"0",
|
||||
"8",
|
||||
"4",
|
||||
false,
|
||||
"0004",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"5": [
|
||||
"Input_6",
|
||||
"0",
|
||||
"8",
|
||||
"5",
|
||||
false,
|
||||
"0005",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"6": [
|
||||
"Input_7",
|
||||
"0",
|
||||
"8",
|
||||
"6",
|
||||
false,
|
||||
"0006",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"7": [
|
||||
"Input_8",
|
||||
"0",
|
||||
"8",
|
||||
"7",
|
||||
false,
|
||||
"0007",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"8": [
|
||||
"Input_9",
|
||||
"0",
|
||||
"8",
|
||||
"8",
|
||||
false,
|
||||
"0008",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"9": [
|
||||
"Input_10",
|
||||
"0",
|
||||
"8",
|
||||
"9",
|
||||
false,
|
||||
"0009",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"10": [
|
||||
"Input_11",
|
||||
"0",
|
||||
"8",
|
||||
"10",
|
||||
false,
|
||||
"0010",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"11": [
|
||||
"Input_12",
|
||||
"0",
|
||||
"8",
|
||||
"11",
|
||||
false,
|
||||
"0011",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"12": [
|
||||
"Input_13",
|
||||
"0",
|
||||
"8",
|
||||
"12",
|
||||
false,
|
||||
"0012",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"13": [
|
||||
"Input_14",
|
||||
"0",
|
||||
"8",
|
||||
"13",
|
||||
false,
|
||||
"0013",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"14": [
|
||||
"Input_15",
|
||||
"0",
|
||||
"8",
|
||||
"14",
|
||||
false,
|
||||
"0014",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"15": [
|
||||
"Input_16",
|
||||
"0",
|
||||
"8",
|
||||
"15",
|
||||
false,
|
||||
"0015",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"16": [
|
||||
"Input_17",
|
||||
"0",
|
||||
"8",
|
||||
"16",
|
||||
false,
|
||||
"0016",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"17": [
|
||||
"Input_18",
|
||||
"0",
|
||||
"8",
|
||||
"17",
|
||||
false,
|
||||
"0017",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"18": [
|
||||
"Input_19",
|
||||
"0",
|
||||
"8",
|
||||
"18",
|
||||
false,
|
||||
"0018",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"19": [
|
||||
"Input_20",
|
||||
"0",
|
||||
"8",
|
||||
"19",
|
||||
false,
|
||||
"0019",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"20": [
|
||||
"Input_21",
|
||||
"0",
|
||||
"8",
|
||||
"20",
|
||||
false,
|
||||
"0020",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"21": [
|
||||
"Input_22",
|
||||
"0",
|
||||
"8",
|
||||
"21",
|
||||
false,
|
||||
"0021",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"22": [
|
||||
"Input_23",
|
||||
"0",
|
||||
"8",
|
||||
"22",
|
||||
false,
|
||||
"0022",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"23": [
|
||||
"Input_24",
|
||||
"0",
|
||||
"8",
|
||||
"23",
|
||||
false,
|
||||
"0023",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"24": [
|
||||
"Input_25",
|
||||
"0",
|
||||
"8",
|
||||
"24",
|
||||
false,
|
||||
"0024",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"25": [
|
||||
"Input_26",
|
||||
"0",
|
||||
"8",
|
||||
"25",
|
||||
false,
|
||||
"0025",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"26": [
|
||||
"Input_27",
|
||||
"0",
|
||||
"8",
|
||||
"26",
|
||||
false,
|
||||
"0026",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"27": [
|
||||
"Input_28",
|
||||
"0",
|
||||
"8",
|
||||
"27",
|
||||
false,
|
||||
"0027",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"28": [
|
||||
"Input_29",
|
||||
"0",
|
||||
"8",
|
||||
"28",
|
||||
false,
|
||||
"0028",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"29": [
|
||||
"Input_30",
|
||||
"0",
|
||||
"8",
|
||||
"29",
|
||||
false,
|
||||
"0029",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"30": [
|
||||
"Input_31",
|
||||
"0",
|
||||
"8",
|
||||
"30",
|
||||
false,
|
||||
"0030",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"31": [
|
||||
"Input_32",
|
||||
"0",
|
||||
"8",
|
||||
"31",
|
||||
false,
|
||||
"0031",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"out": {
|
||||
"0": [
|
||||
"Output_1",
|
||||
"0",
|
||||
"8",
|
||||
"32",
|
||||
false,
|
||||
"0032",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"1": [
|
||||
"Output_2",
|
||||
"0",
|
||||
"8",
|
||||
"33",
|
||||
false,
|
||||
"0033",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"2": [
|
||||
"Output_3",
|
||||
"0",
|
||||
"8",
|
||||
"34",
|
||||
false,
|
||||
"0034",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"3": [
|
||||
"Output_4",
|
||||
"0",
|
||||
"8",
|
||||
"35",
|
||||
false,
|
||||
"0035",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"4": [
|
||||
"Output_5",
|
||||
"0",
|
||||
"8",
|
||||
"36",
|
||||
false,
|
||||
"0036",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"5": [
|
||||
"Output_6",
|
||||
"0",
|
||||
"8",
|
||||
"37",
|
||||
false,
|
||||
"0037",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"6": [
|
||||
"Output_7",
|
||||
"0",
|
||||
"8",
|
||||
"38",
|
||||
false,
|
||||
"0038",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"7": [
|
||||
"Output_8",
|
||||
"0",
|
||||
"8",
|
||||
"39",
|
||||
false,
|
||||
"0039",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"8": [
|
||||
"Output_9",
|
||||
"0",
|
||||
"8",
|
||||
"40",
|
||||
false,
|
||||
"0040",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"9": [
|
||||
"Output_10",
|
||||
"0",
|
||||
"8",
|
||||
"41",
|
||||
false,
|
||||
"0041",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"10": [
|
||||
"Output_11",
|
||||
"0",
|
||||
"8",
|
||||
"42",
|
||||
false,
|
||||
"0042",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"11": [
|
||||
"Output_12",
|
||||
"0",
|
||||
"8",
|
||||
"43",
|
||||
false,
|
||||
"0043",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"12": [
|
||||
"Output_13",
|
||||
"0",
|
||||
"8",
|
||||
"44",
|
||||
false,
|
||||
"0044",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"13": [
|
||||
"Output_14",
|
||||
"0",
|
||||
"8",
|
||||
"45",
|
||||
false,
|
||||
"0045",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"14": [
|
||||
"Output_15",
|
||||
"0",
|
||||
"8",
|
||||
"46",
|
||||
false,
|
||||
"0046",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"15": [
|
||||
"Output_16",
|
||||
"0",
|
||||
"8",
|
||||
"47",
|
||||
false,
|
||||
"0047",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"16": [
|
||||
"Output_17",
|
||||
"0",
|
||||
"8",
|
||||
"48",
|
||||
false,
|
||||
"0048",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"17": [
|
||||
"Output_18",
|
||||
"0",
|
||||
"8",
|
||||
"49",
|
||||
false,
|
||||
"0049",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"18": [
|
||||
"Output_19",
|
||||
"0",
|
||||
"8",
|
||||
"50",
|
||||
false,
|
||||
"0050",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"19": [
|
||||
"Output_20",
|
||||
"0",
|
||||
"8",
|
||||
"51",
|
||||
false,
|
||||
"0051",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"20": [
|
||||
"Output_21",
|
||||
"0",
|
||||
"8",
|
||||
"52",
|
||||
false,
|
||||
"0052",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"21": [
|
||||
"Output_22",
|
||||
"0",
|
||||
"8",
|
||||
"53",
|
||||
false,
|
||||
"0053",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"22": [
|
||||
"Output_23",
|
||||
"0",
|
||||
"8",
|
||||
"54",
|
||||
false,
|
||||
"0054",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"23": [
|
||||
"Output_24",
|
||||
"0",
|
||||
"8",
|
||||
"55",
|
||||
false,
|
||||
"0055",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"24": [
|
||||
"Output_25",
|
||||
"0",
|
||||
"8",
|
||||
"56",
|
||||
false,
|
||||
"0056",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"25": [
|
||||
"Output_26",
|
||||
"0",
|
||||
"8",
|
||||
"57",
|
||||
false,
|
||||
"0057",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"26": [
|
||||
"Output_27",
|
||||
"0",
|
||||
"8",
|
||||
"58",
|
||||
false,
|
||||
"0058",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"27": [
|
||||
"Output_28",
|
||||
"0",
|
||||
"8",
|
||||
"59",
|
||||
false,
|
||||
"0059",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"28": [
|
||||
"Output_29",
|
||||
"0",
|
||||
"8",
|
||||
"60",
|
||||
false,
|
||||
"0060",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"29": [
|
||||
"Output_30",
|
||||
"0",
|
||||
"8",
|
||||
"61",
|
||||
false,
|
||||
"0061",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"30": [
|
||||
"Output_31",
|
||||
"0",
|
||||
"8",
|
||||
"62",
|
||||
false,
|
||||
"0062",
|
||||
"",
|
||||
""
|
||||
],
|
||||
"31": [
|
||||
"Output_32",
|
||||
"0",
|
||||
"8",
|
||||
"63",
|
||||
false,
|
||||
"0063",
|
||||
"",
|
||||
""
|
||||
]
|
||||
},
|
||||
"mem": {},
|
||||
"extend": {}
|
||||
}
|
||||
],
|
||||
"Connections": []
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests for RevPi 4 devices."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from revpimodio2 import BLUE
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestRevPiConnect(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_connect(self):
|
||||
"""Test Connect functions."""
|
||||
for conf in ["config_connect.rsc", "config_connect_left.rsc"]:
|
||||
rpi = self.modio(configrsc=conf)
|
||||
|
||||
def get_led_byte():
|
||||
self.fh_procimg.seek(6 if conf == "config_connect.rsc" else 119)
|
||||
return self.fh_procimg.read(1)
|
||||
|
||||
# A3 am Connect testen
|
||||
rpi.core.A3 = 0
|
||||
self.assertEqual(rpi.core.A3, 0)
|
||||
rpi.core.A3 = 1
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x10")
|
||||
self.assertEqual(rpi.core.A3, 1)
|
||||
rpi.writeprocimg()
|
||||
self.assertEqual(get_led_byte(), b"\x10")
|
||||
|
||||
rpi.core.A2 = 0
|
||||
rpi.core.A2 = 1
|
||||
rpi.core.A3 = 2
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x24")
|
||||
self.assertEqual(rpi.core.A3, 2)
|
||||
rpi.writeprocimg()
|
||||
self.assertEqual(get_led_byte(), b"\x24")
|
||||
|
||||
rpi.core.A1 = 0
|
||||
rpi.core.A1 = 2
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x26")
|
||||
rpi.writeprocimg()
|
||||
self.assertEqual(get_led_byte(), b"\x26")
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.core.A3 = BLUE
|
||||
|
||||
# Direkte Zuweisung darf nicht funktionieren
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a3green = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a3green = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.wd = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.x2out = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.x2in = True
|
||||
|
||||
# Test Hardware watchdog
|
||||
rpi.core.wd.value = True
|
||||
# Value: A1 = RED, A2 = GREEN, A3=RED + Bit 7
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\xa6")
|
||||
|
||||
# Test output on connector X2 (Bit 6 on RevPiLED)
|
||||
self.assertFalse(rpi.core.x2out.value)
|
||||
rpi.core.x2out.value = True
|
||||
# Value: A1 = RED, A2 = GREEN, A3=RED + WD=True + Bit 6
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\xe6")
|
||||
rpi.writeprocimg()
|
||||
self.assertEqual(get_led_byte(), b"\xe6")
|
||||
self.assertTrue(rpi.core.x2out.value)
|
||||
|
||||
# Test Input on connector X2 (Bit 6 on RevPiStatus)
|
||||
rpi.readprocimg()
|
||||
self.assertFalse(rpi.core.x2in.value)
|
||||
self.fh_procimg.seek(0 if conf == "config_connect.rsc" else 113)
|
||||
self.fh_procimg.write(b"\x40")
|
||||
rpi.readprocimg()
|
||||
self.assertTrue(rpi.core.x2in.value)
|
||||
@@ -0,0 +1,109 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests for RevPi Core 1/3 devices."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
from revpimodio2 import RED, GREEN, OFF, BLUE
|
||||
from revpimodio2.io import IOBase, IntIO
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestRevPiCore(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_core(self):
|
||||
"""Test Core device."""
|
||||
rpi = self.modio(configrsc="config_core.rsc")
|
||||
|
||||
# Test IOs of core device
|
||||
for io in rpi.core:
|
||||
self.assertIsInstance(io, IntIO)
|
||||
self.assertEqual(type(io.value), int)
|
||||
|
||||
# Test CORE LEDs
|
||||
def get_led_byte():
|
||||
self.fh_procimg.seek(6)
|
||||
return self.fh_procimg.read(1)
|
||||
|
||||
lst_test_led = [
|
||||
(rpi.core._get_leda1, rpi.core._set_leda1, GREEN, b"\x01"),
|
||||
(rpi.core._get_leda1, rpi.core._set_leda1, OFF, b"\x00"),
|
||||
(rpi.core._get_leda1, rpi.core._set_leda1, RED, b"\x02"),
|
||||
(rpi.core._get_leda2, rpi.core._set_leda2, GREEN, b"\x06"),
|
||||
(rpi.core._get_leda2, rpi.core._set_leda2, OFF, b"\x02"),
|
||||
(rpi.core._get_leda2, rpi.core._set_leda2, RED, b"\x0a"),
|
||||
]
|
||||
for get_led, set_led, value, expected in lst_test_led:
|
||||
with rpi.io:
|
||||
set_led(value)
|
||||
self.assertEqual(rpi.io.RevPiLED.get_value(), expected)
|
||||
self.assertEqual(get_led_byte(), expected)
|
||||
self.assertEqual(get_led(), value)
|
||||
with self.assertRaises(ValueError):
|
||||
set_led(BLUE)
|
||||
|
||||
# LED IOs after previews tests both read leds are on
|
||||
self.assertIsInstance(rpi.core.a1green, IOBase)
|
||||
self.assertIsInstance(rpi.core.a1red, IOBase)
|
||||
self.assertIsInstance(rpi.core.a2green, IOBase)
|
||||
self.assertIsInstance(rpi.core.a1red, IOBase)
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a1green = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a1red = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a2green = True
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.a2red = True
|
||||
with rpi.io:
|
||||
self.assertTrue(rpi.core.a1red())
|
||||
self.assertFalse(rpi.core.a1green())
|
||||
self.assertTrue(rpi.core.a2red())
|
||||
self.assertFalse(rpi.core.a2green())
|
||||
|
||||
# Software watchdog (same bit as hardware watchdog on connect 3)
|
||||
self.assertFalse(rpi.core.wd.value)
|
||||
rpi.core.wd_toggle()
|
||||
self.assertTrue(rpi.core.wd.value)
|
||||
|
||||
self.assertIsInstance(rpi.core.status, int)
|
||||
self.assertIsInstance(rpi.core.picontrolrunning, bool)
|
||||
self.assertIsInstance(rpi.core.unconfdevice, bool)
|
||||
self.assertIsInstance(rpi.core.missingdeviceorgate, bool)
|
||||
self.assertIsInstance(rpi.core.overunderflow, bool)
|
||||
self.assertIsInstance(rpi.core.leftgate, bool)
|
||||
self.assertIsInstance(rpi.core.rightgate, bool)
|
||||
self.assertIsInstance(rpi.core.iocycle, int)
|
||||
self.assertIsInstance(rpi.core.temperature, int)
|
||||
self.assertIsInstance(rpi.core.frequency, int)
|
||||
self.assertIsInstance(rpi.core.ioerrorcount, int)
|
||||
self.assertIsInstance(rpi.core.errorlimit1, int)
|
||||
rpi.core.errorlimit1 = 10
|
||||
self.assertEqual(rpi.core.errorlimit1, 10)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.core.errorlimit1 = -1
|
||||
self.assertIsInstance(rpi.core.errorlimit2, int)
|
||||
rpi.core.errorlimit2 = 1100
|
||||
self.assertEqual(rpi.core.errorlimit2, 1100)
|
||||
with self.assertRaises(ValueError):
|
||||
rpi.core.errorlimit2 = 65999
|
||||
|
||||
def test_core_old_errorlimits(self):
|
||||
"""Test non-existing error limits of first core rap file."""
|
||||
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
|
||||
rpi = self.modio(configrsc="config_old.rsc")
|
||||
|
||||
# Errorlimits testen, die es nicht gibt (damals None, jetzt -1)
|
||||
self.assertEqual(rpi.core.errorlimit1, -1)
|
||||
self.assertEqual(rpi.core.errorlimit2, -1)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.core.errorlimit1 = 100
|
||||
with self.assertRaises(RuntimeError):
|
||||
rpi.core.errorlimit2 = 100
|
||||
|
||||
del rpi
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -4,10 +4,10 @@ __author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import join, dirname
|
||||
from os.path import dirname
|
||||
|
||||
import revpimodio2
|
||||
from tests import TestRevPiModIO
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestRevPi4(TestRevPiModIO):
|
||||
@@ -49,7 +49,7 @@ class TestRevPi4(TestRevPiModIO):
|
||||
self.assertIsInstance(rpi.core.temperature, int)
|
||||
self.assertIsInstance(rpi.core.frequency, int)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
rpi.core.wd_toggle()
|
||||
|
||||
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
|
||||
@@ -57,3 +57,23 @@ class TestRevPi4(TestRevPiModIO):
|
||||
|
||||
rpi.exit()
|
||||
del rpi
|
||||
|
||||
def test_connect4_ios(self):
|
||||
rpi = self.modio(configrsc="config_connect4.rsc")
|
||||
rpi.setdefaultvalues()
|
||||
|
||||
# Test X2 output
|
||||
self.assertEqual(rpi.io.RevPiOutput.value, 0)
|
||||
rpi.core.x2out.value = True
|
||||
self.assertEqual(rpi.io.RevPiOutput.value, 1)
|
||||
|
||||
# Test X2 input
|
||||
self.assertEqual(rpi.io.RevPiStatus.value, 0)
|
||||
self.assertFalse(rpi.core.x2in.value)
|
||||
|
||||
# Modify process image: Bit 6 of status is the input (int 64 -> hex 40)
|
||||
self.fh_procimg.write(b"\x40")
|
||||
|
||||
rpi.readprocimg()
|
||||
self.assertEqual(rpi.io.RevPiStatus.value, 64)
|
||||
self.assertTrue(rpi.core.x2in.value)
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Init file for test group."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
@@ -0,0 +1 @@
|
||||
{"App":{"name": "PiCtory", "version": "2.10.0","saveTS": "20241108090523","language": "en","layout": {"north":{"size":70,"initClosed":false,"initHidden":false},"south":{"size":540,"initClosed":false,"initHidden":false,"children":{"layout1":{"east":{"size":500,"initClosed":false,"initHidden":false}}}},"east":{"size":70,"initClosed":true,"initHidden":false,"children":{}},"west":{"size":259,"initClosed":false,"initHidden":false,"children":{"layout1":{}}}}},"Summary":{"inpTotal": 6,"outTotal": 7},"Devices":[{"GUID": "df5a907c-3d82-1d32-4e5f-4c33fb41a559","id": "device_RevPiConnect5_20240315_1_0_001","type": "BASE","productType": "138","position": "0","name": "connect5","bmk": "RevPi Connect 5","inpVariant": 0,"outVariant": 0,"comment": "This is a RevPi Connect 5 Device","offset": 0,"inp": {"0": ["RevPiStatus","0","8","0",true,"0000", "",""],"1": ["RevPiIOCycle","0","8","1",true,"0001", "",""],"2": ["RS485ErrorCnt","0","16","2",false,"0002", "",""],"3": ["Core_Temperature","0","8","4",false,"0003", "",""],"4": ["Core_Frequency","0","8","5",false,"0004", "",""]},"out": {"0": ["RevPiReservedByte","","8","6",false,"0005", "",""],"1": ["RS485ErrorLimit1","10","16","7",false,"0006", "",""],"2": ["RS485ErrorLimit2","1000","16","9",false,"0007", "",""],"3": ["RevPiLED","0","16","11",true,"0008", "",""]},"mem": {},"extend": {}}],"Connections":[]}
|
||||
@@ -0,0 +1,69 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests for RevPi 5 devices."""
|
||||
__author__ = "Sven Sager"
|
||||
__copyright__ = "Copyright (C) 2024 Sven Sager"
|
||||
__license__ = "GPLv2"
|
||||
|
||||
from os.path import dirname
|
||||
|
||||
import revpimodio2
|
||||
from .. import TestRevPiModIO
|
||||
|
||||
|
||||
class TestRevPi5(TestRevPiModIO):
|
||||
|
||||
data_dir = dirname(__file__)
|
||||
|
||||
def test_connect5(self):
|
||||
rpi = self.modio(configrsc="config_connect5.rsc")
|
||||
rpi.setdefaultvalues()
|
||||
|
||||
self.assertIsInstance(rpi.core, revpimodio2.device.Connect5)
|
||||
|
||||
# Test all LED (A1 - A5) with all colors
|
||||
lst_led_test = [
|
||||
(rpi.core._get_leda1, rpi.core._set_leda1),
|
||||
(rpi.core._get_leda2, rpi.core._set_leda2),
|
||||
(rpi.core._get_leda3, rpi.core._set_leda3),
|
||||
(rpi.core._get_leda4, rpi.core._set_leda4),
|
||||
(rpi.core._get_leda5, rpi.core._set_leda5),
|
||||
]
|
||||
for i in range(len(lst_led_test)):
|
||||
get_led = lst_led_test[i][0]
|
||||
set_led = lst_led_test[i][1]
|
||||
for k in (
|
||||
(revpimodio2.GREEN, 2),
|
||||
(revpimodio2.RED, 1),
|
||||
(revpimodio2.BLUE, 4),
|
||||
(revpimodio2.ORANGE, 3),
|
||||
(revpimodio2.MAGENTA, 5), # Switched GR bit
|
||||
(revpimodio2.WHITE, 7),
|
||||
(revpimodio2.CYAN, 6), # Switched GR bit
|
||||
(revpimodio2.OFF, 0),
|
||||
):
|
||||
set_led(k[0])
|
||||
self.assertEqual(
|
||||
rpi.io.RevPiLED.get_value(),
|
||||
(k[1] << (i * 3)).to_bytes(2, "little"),
|
||||
)
|
||||
self.assertEqual(get_led(), k[0])
|
||||
with self.assertRaises(ValueError):
|
||||
set_led(8)
|
||||
|
||||
self.assertIsInstance(rpi.core.temperature, int)
|
||||
self.assertIsInstance(rpi.core.frequency, int)
|
||||
|
||||
with self.assertRaises(NotImplementedError):
|
||||
rpi.core.wd_toggle()
|
||||
|
||||
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
|
||||
rpi.core.a5green = True
|
||||
|
||||
# Connect 5 has no IOs build in
|
||||
with self.assertRaises(AttributeError):
|
||||
output = rpi.core.x2out.value
|
||||
with self.assertRaises(AttributeError):
|
||||
rpi.core.x2in.value = True
|
||||
|
||||
rpi.exit()
|
||||
del rpi
|
||||
@@ -12,3 +12,27 @@ class ModuleImport(unittest.TestCase):
|
||||
import revpimodio2
|
||||
|
||||
self.assertEqual(type(revpimodio2.__version__), str)
|
||||
|
||||
def test_lib_constants(self):
|
||||
"""Tests constants of _internal module."""
|
||||
import revpimodio2
|
||||
|
||||
self.assertEqual(revpimodio2._internal.consttostr(999), "")
|
||||
|
||||
lst_const = [0, 1, 2, 3, 4, 5, 6, 7, 31, 32, 33, 300, 301, 302, 4096]
|
||||
internal_dict = revpimodio2._internal.__dict__ # type: dict
|
||||
for key in internal_dict:
|
||||
if type(internal_dict[key]) is int:
|
||||
const_value = internal_dict[key]
|
||||
self.assertEqual(revpimodio2._internal.consttostr(const_value), key)
|
||||
self.assertTrue(const_value in lst_const)
|
||||
|
||||
# Test argument checker
|
||||
revpimodio2._internal.acheck(bool, arg01=True, arg02_noneok=None)
|
||||
revpimodio2._internal.acheck(int, arg01=0, arg02_noneok=10)
|
||||
revpimodio2._internal.acheck(str, arg01="", arg02_noneok="ja")
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
revpimodio2._internal.acheck(str, arg01=None, arg02_noneok="test")
|
||||
with self.assertRaises(TypeError):
|
||||
revpimodio2._internal.acheck(bool, arg01=True, arg02=None)
|
||||
|
||||
Reference in New Issue
Block a user