feat: Add support for RO device

This adds the RO device as its own class. The RO device has a special
ioctl function, via which the switching cycles of the relays can be
called up. Relay outputs from the RO device get the additional
attribute 'cycles', which returns the number of switching cycles.

The implementation via RevPiNetIO classes cannot currently be
implemented because the server does not support a return value.

Signed-off-by: Sven Sager <akira@narux.de>
This commit is contained in:
2023-11-01 11:12:43 +01:00
parent c87f1cceb6
commit cc689a58eb
3 changed files with 144 additions and 1 deletions

View File

@@ -10,7 +10,7 @@ from threading import Event, Lock, Thread
from ._internal import INP, OUT, MEM, PROCESS_IMAGE_SIZE
from .helper import ProcimgWriter
from .io import IOBase, IntIO, IntIOCounter, IntIOReplaceable, MemIO
from .io import IOBase, IntIO, IntIOCounter, IntIOReplaceable, MemIO, RelaisOutput, IntRelaisOutput
from .pictory import ProductType
@@ -333,6 +333,15 @@ class Device(object):
if iotype == MEM:
# Memory setting
io_new = MemIO(self, dict_io[key], iotype, "little", False)
elif isinstance(self, RoModule) and dict_io[key][3] == "1":
# Relais of RO are on device address "1" and has a cycle counter
if dict_io[key][7]:
# Each relais output has a single bit
io_new = RelaisOutput(self, dict_io[key], iotype, "little", False)
else:
# All relais outputs are in one byte
io_new = IntRelaisOutput(self, dict_io[key], iotype, "little", False)
elif bool(dict_io[key][7]):
# Bei Bitwerten IOBase verwenden
io_new = IOBase(self, dict_io[key], iotype, "little", False)
@@ -1918,6 +1927,18 @@ class DioModule(Device):
super().__init__(parentmodio, dict_device, simulator=simulator)
class RoModule(Device):
"""Relais output (RO) module with"""
def __init__(self, parentmodio, dict_device, simulator=False):
"""
Relais outputs of this device has a cycle counter for the relais.
:rev: :func:`Device.__init__()`
"""
super().__init__(parentmodio, dict_device, simulator=simulator)
class Gateway(Device):
"""
Klasse fuer die RevPi Gateway-Devices.

View File

@@ -1187,6 +1187,125 @@ class IntIOReplaceable(IntIO):
)
class RelaisOutput(IOBase):
"""
Class for relais outputs to access the cycle counters.
This class extends the function of <class 'IOBase'> to the function
'get_cycles' and the property 'cycles' to retrieve the relay cycle
counters.
:ref: :class:`IOBase`
"""
def __init__(self, parentdevice, valuelist, iotype, byteorder, signed):
"""
Extend <class 'IOBase'> with functions to access cycle counters.
:ref: :func:`IOBase.__init__(...)`
"""
super().__init__(parentdevice, valuelist, iotype, byteorder, signed)
"""
typedef struct SROGetCountersStr
{
/* Address of module in current configuration */
uint8_t i8uAddress;
uint32_t counter[REVPI_RO_NUM_RELAY_COUNTERS];
} SROGetCounters;
"""
# Device position + padding + four counter with 4 byte each
self.__ioctl_arg_format = "<BIIII"
self.__ioctl_arg = struct.pack(
self.__ioctl_arg_format,
parentdevice._position,
0,
0,
0,
0,
)
def get_switching_cycles(self):
"""
Get the number of switching cycles from this relay.
If each relay output is represented as BOOL, this function returns a
single integer value. If all relays are displayed as a BYTE, this
function returns a tuple that contains the values of all relay outputs.
The setting is determined by PiCtory and the selected output variant by
the RO device.
This function is only available locally on a Revolution Pi. This
function cannot be used via RevPiNetIO.
:return: Integer of switching cycles as single value or tuple of all
"""
# Using ioctl request K+29 = 19229
if self._parentdevice._modio._run_on_pi:
# IOCTL to piControl on the RevPi
with self._parentdevice._modio._myfh_lck:
try:
ioctl_return_value = ioctl(
self._parentdevice._modio._myfh,
19229,
self.__ioctl_arg,
)
except Exception as e:
# If not implemented, we return the max value and set an error
ioctl_return_value = b"\xff" * struct.calcsize(self.__ioctl_arg_format)
self._parentdevice._modio._gotioerror("rocounter", e)
elif hasattr(self._parentdevice._modio._myfh, "ioctl"):
# IOCTL over network
"""
The ioctl function over the network does not return a value. Only the successful
execution of the ioctl call is checked and reported back. If a new function has been
implemented in RevPiPyLoad, the subsequent source code can be activated.
with self._parentdevice._modio._myfh_lck:
try:
ioctl_return_value = self._parentdevice._modio._myfh.ioctl(
19229, self.__ioctl_arg
)
except Exception as e:
self._parentdevice._modio._gotioerror("net_rocounter", e)
"""
raise RuntimeError("Can not be called over network via RevPiNetIO")
else:
# Simulate IOCTL on a regular file returns the value of relais index
ioctl_return_value = self.__ioctl_arg
if self._bitaddress == -1:
# Return cycle values of all relais as tuple, if this is a BYTE output
# Remove fist element, which is the ioctl request value
return struct.unpack(self.__ioctl_arg_format, ioctl_return_value)[1:]
else:
# Return cycle value of just one relais as int, if this is a BOOL output
# Increase bit-address bei 1 to ignore fist element, which is the ioctl request value
return struct.unpack(self.__ioctl_arg_format, ioctl_return_value)[self._bitaddress + 1]
switching_cycles = property(get_switching_cycles)
class IntRelaisOutput(IntIO, RelaisOutput):
"""
Class for relais outputs to access the cycle counters.
This class combines the function of <class 'IntIO'> and
<class 'RelaisOutput'> to add the function 'get_cycles' and the property
'cycles' to retrieve the relay cycle counters.
Since both classes inherit from BaseIO, both __init__ functions are called
and the logic is combined. In this case, there is only one 'self' object of
IOBase, which of both classes in inheritance is extended with this.
:ref: :class:`IOBase`
"""
pass
class StructIO(IOBase):
"""
Klasse fuer den Zugriff auf Daten ueber ein definierten struct.

View File

@@ -353,6 +353,9 @@ class RevPiModIO(object):
if pt == ProductType.DIO or pt == ProductType.DI or pt == ProductType.DO:
# DIO / DI / DO
dev_new = devicemodule.DioModule(self, device, simulator=self._simulator)
elif pt == ProductType.RO:
# RO
dev_new = devicemodule.RoModule(self, device, simulator=self._simulator)
else:
# Alle anderen IO-Devices
dev_new = devicemodule.Device(self, device, simulator=self._simulator)