Improved device search function and create run_net_plc shortcut

This commit is contained in:
2023-01-20 12:31:59 +01:00
parent c7b2e59063
commit 1b7397e608
4 changed files with 87 additions and 54 deletions

View File

@@ -15,7 +15,7 @@ fuehrt das Modul bei Datenaenderung aus.
__all__ = [
"IOEvent",
"RevPiModIO", "RevPiModIODriver", "RevPiModIOSelected", "run_plc",
"RevPiNetIO", "RevPiNetIODriver", "RevPiNetIOSelected",
"RevPiNetIO", "RevPiNetIODriver", "RevPiNetIOSelected", "run_net_plc",
"Cycletools", "EventCallback",
"ProductType", "AIO", "COMPACT", "DI", "DO", "DIO", "FLAT", "MIO",
]
@@ -29,5 +29,5 @@ from ._internal import *
from .helper import Cycletools, EventCallback
from .io import IOEvent
from .modio import RevPiModIO, RevPiModIODriver, RevPiModIOSelected, run_plc
from .netio import RevPiNetIO, RevPiNetIODriver, RevPiNetIOSelected
from .netio import RevPiNetIO, RevPiNetIODriver, RevPiNetIOSelected, run_net_plc
from .pictory import ProductType, AIO, COMPACT, DI, DO, DIO, FLAT, MIO

View File

@@ -27,7 +27,7 @@ from .io import IOList
from .io import StructIO
from .pictory import DeviceType, ProductType
DevSelect = namedtuple("DevSelect", ["type", "key", "values"])
DevSelect = namedtuple("DevSelect", ["type", "other_device_key", "values"])
"""Leave type, key empty for auto search name and position depending on type in values."""
@@ -104,7 +104,7 @@ class RevPiModIO(object):
self.__cleanupfunc = None
self._buffedwrite = False
self._debug = 1
self._devselect = DevSelect("", "", ())
self._devselect = DevSelect(DeviceType.IGNORED, "", ())
self._exit = Event()
self._exit_level = 0
self._imgwriter = None
@@ -212,19 +212,28 @@ class RevPiModIO(object):
# Apply device filter
if self._devselect.values:
# Check for supported types in values
for dev in self._devselect.values:
if type(dev) not in (int, str):
raise ValueError(
"need device position as <class 'int'> or "
"device name as <class 'str'>"
)
lst_devices = []
for dev in jconfigrsc["Devices"]:
if self._devselect.type and self._devselect.type != dev["type"]:
continue
if self._devselect.key:
if str(dev[self._devselect.key]) not in self._devselect.values:
if self._devselect.other_device_key:
key_value = str(dev[self._devselect.other_device_key])
if key_value not in self._devselect.values:
# The list is always filled with <class 'str'>
continue
else:
# Auto search depending of value item type
if dev["name"] not in self._devselect.values \
and not (dev["position"].isdigit()
and int(dev["position"]) in self._devselect.values):
if not (dev["name"] in self._devselect.values
or int(dev["position"]) in self._devselect.values):
continue
lst_devices.append(dev)
@@ -1326,7 +1335,7 @@ class RevPiModIOSelected(RevPiModIO):
Klasse fuer die Verwaltung einzelner Devices aus piCtory.
Diese Klasse uebernimmt nur angegebene Devices der piCtory Konfiguration
und laedt sie inkl. IOs. Sie uebernimmt die exklusive Verwaltung des
und bildet sie inkl. IOs ab. Sie uebernimmt die exklusive Verwaltung des
Adressbereichs im Prozessabbild an dem sich die angegebenen Devices
befinden und stellt sicher, dass die Daten synchron sind.
"""
@@ -1355,22 +1364,11 @@ class RevPiModIOSelected(RevPiModIO):
if type(deviceselection) is not DevSelect:
# Convert to tuple
if type(deviceselection) in (int, str):
if type(deviceselection) not in (list, tuple):
deviceselection = (deviceselection,)
# Check supported types
for dev in deviceselection:
if type(dev) not in (int, str):
raise ValueError(
"need device position as <class 'int'> or "
"device name as <class 'str'>"
)
# Automatic search for name and position depends on type int / str
self._devselect = DevSelect(
"VIRTUAL" if type(self) is RevPiModIODriver else "", "",
tuple(deviceselection),
)
self._devselect = DevSelect(DeviceType.IGNORED, "", deviceselection)
else:
self._devselect = deviceselection
@@ -1378,23 +1376,25 @@ class RevPiModIOSelected(RevPiModIO):
self._configure(self.get_jconfigrsc())
if len(self.device) == 0:
if type(self) == RevPiModIODriver:
if self._devselect.type:
raise DeviceNotFoundError(
"could not find any given VIRTUAL devices in config"
"could not find ANY given {0} devices in config"
"".format(self._devselect.type)
)
else:
raise DeviceNotFoundError(
"could not find any given devices in config"
"could not find ANY given devices in config"
)
elif not self._devselect.key \
elif not self._devselect.other_device_key \
and len(self.device) != len(self._devselect.values):
if type(self) == RevPiModIODriver:
if self._devselect.type:
raise DeviceNotFoundError(
"could not find all given VIRTUAL devices in config"
"could not find ALL given {0} devices in config"
"".format(self._devselect.type)
)
else:
raise DeviceNotFoundError(
"could not find all given devices in config"
"could not find ALL given devices in config"
)
@@ -1422,11 +1422,13 @@ class RevPiModIODriver(RevPiModIOSelected):
:param virtdev: Virtuelles Device oder mehrere als <class 'list'>
:ref: :func:`RevPiModIO.__init__()`
"""
# Parent mit monitoring=False und simulator=True laden
if type(virtdev) not in (list, tuple):
virtdev = (virtdev,)
dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev)
super().__init__(
virtdev, autorefresh, False, syncoutputs, procimg, configrsc,
dev_select, autorefresh, False, syncoutputs, procimg, configrsc,
True, debug, replace_io_file, shared_procimg, direct_output
)

View File

@@ -15,6 +15,8 @@ from threading import Event, Lock, Thread
from .device import Device
from .errors import DeviceNotFoundError
from .modio import DevSelect, RevPiModIO as _RevPiModIO
from .pictory import DeviceType
# Synchronisierungsbefehl
_syssync = b'\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17'
# Disconnectbefehl
@@ -934,7 +936,7 @@ class RevPiNetIOSelected(RevPiNetIO):
Klasse fuer die Verwaltung einzelner Devices aus piCtory.
Diese Klasse uebernimmt nur angegebene Devices der piCtory Konfiguration
und bilded sie inkl. IOs ab. Sie uebernimmt die exklusive Verwaltung des
und bildet sie inkl. IOs ab. Sie uebernimmt die exklusive Verwaltung des
Adressbereichs im Prozessabbild an dem sich die angegebenen Devices
befinden und stellt sicher, dass die Daten synchron sind.
"""
@@ -963,22 +965,11 @@ class RevPiNetIOSelected(RevPiNetIO):
if type(deviceselection) is not DevSelect:
# Convert to tuple
if type(deviceselection) in (int, str):
if type(deviceselection) not in (list, tuple):
deviceselection = (deviceselection,)
# Check supported types
for dev in deviceselection:
if type(dev) not in (int, str):
raise ValueError(
"need device position as <class 'int'> or "
"device name as <class 'str'>"
)
# Automatic search for name and position depends on type int / str
self._devselect = DevSelect(
"VIRTUAL" if type(self) is RevPiNetIODriver else "", "",
tuple(deviceselection),
)
self._devselect = DevSelect(DeviceType.IGNORED, "", deviceselection)
else:
self._devselect = deviceselection
@@ -986,23 +977,25 @@ class RevPiNetIOSelected(RevPiNetIO):
self._configure(self.get_jconfigrsc())
if len(self.device) == 0:
if type(self) == RevPiNetIODriver:
if self._devselect.type:
raise DeviceNotFoundError(
"could not find any given VIRTUAL devices in config"
"could not find ANY given {0} devices in config"
"".format(self._devselect.type)
)
else:
raise DeviceNotFoundError(
"could not find any given devices in config"
"could not find ANY given devices in config"
)
elif not self._devselect.key \
elif not self._devselect.other_device_key \
and len(self.device) != len(self._devselect.values):
if type(self) == RevPiNetIODriver:
if self._devselect.type:
raise DeviceNotFoundError(
"could not find all given VIRTUAL devices in config"
"could not find ALL given {0} devices in config"
"".format(self._devselect.type)
)
else:
raise DeviceNotFoundError(
"could not find all given devices in config"
"could not find ALL given devices in config"
)
@@ -1033,7 +1026,44 @@ class RevPiNetIODriver(RevPiNetIOSelected):
:ref: :func:`RevPiModIO.__init__()`
"""
# Parent mit monitoring=False und simulator=True laden
if type(virtdev) not in (list, tuple):
virtdev = (virtdev,)
dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev)
super().__init__(
address, virtdev, autorefresh, False, syncoutputs, True, debug,
address, dev_select, autorefresh, False, syncoutputs, True, debug,
replace_io_file, shared_procimg, direct_output
)
def run_net_plc(
address, func, cycletime=50, replace_io_file=None, debug=True):
"""
Run Revoluton Pi as real plc with cycle loop and exclusive IO access.
This function is just a shortcut to run the module in cycle loop mode and
handle the program exit signal. You will access the .io, .core, .device
via the cycletools in your cycle function.
Shortcut for this source code:
rpi = RevPiModIO(autorefresh=True, replace_io_file=..., debug=...)
rpi.handlesignalend()
return rpi.cycleloop(func, cycletime)
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'>
:param func: Function to run every set milliseconds
:param cycletime: Cycle time in milliseconds
:param replace_io_file: Load replace IO configuration from file
:param debug: Print all warnings and detailed error messages
:param procimg: Use different process image
:param configrsc: Use different piCtory configuration
:return: None or the return value of the cycle function
"""
rpi = RevPiNetIO(
address=address,
autorefresh=True,
replace_io_file=replace_io_file,
debug=debug,
)
rpi.handlesignalend()
return rpi.cycleloop(func, cycletime)

View File

@@ -47,6 +47,7 @@ class ProductType:
class DeviceType:
"""Module key "type" in piCtory file."""
IGNORED = ""
BASE = "BASE" # Core devices
EDGE = "EDGE" # Gateways
LEFT_RIGHT = "LEFT_RIGHT" # IOs