Merge tag '2.8.1' into pkg/debian

Release version 2.8.1
This commit is contained in:
2026-02-18 15:43:17 +01:00
41 changed files with 4503 additions and 1209 deletions

19
.readthedocs.yaml Normal file
View File

@@ -0,0 +1,19 @@
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
version: 2
build:
os: ubuntu-24.04
tools:
python: "3.13"
sphinx:
configuration: docs/conf.py
python:
install:
- method: pip
path: .
extra_requirements:
- docs

View File

@@ -73,6 +73,8 @@ clean:
rm -rf build dist src/*.egg-info rm -rf build dist src/*.egg-info
# PyInstaller created files # PyInstaller created files
rm -rf *.spec rm -rf *.spec
# Documentation builds
rm -rf docs/_build
distclean: clean distclean: clean
# Virtual environment # Virtual environment

View File

@@ -1,5 +1,11 @@
# RevPiModIO # RevPiModIO
### Documentation:
For a complete reference of all classes, methods, and functions, please see the
official documentation:
[https://revpimodio2.readthedocs.io/](https://revpimodio2.readthedocs.io/)
### Python3 programming for RevolutionPi of KUNBUS GmbH. ### Python3 programming for RevolutionPi of KUNBUS GmbH.
The module provides all devices and IOs from the piCtory configuration in The module provides all devices and IOs from the piCtory configuration in

0
docs/_static/.gitkeep vendored Normal file
View File

549
docs/advanced.rst Normal file
View File

@@ -0,0 +1,549 @@
========
Advanced
========
Advanced features, patterns, and best practices for RevPiModIO.
.. contents:: Contents
:local:
:depth: 2
Custom IOs (Gateway Modules)
=============================
Gateway modules (ModbusTCP, Profinet, etc.) allow defining custom IOs dynamically.
Understanding Gateway IOs
--------------------------
Gateway modules provide raw memory regions that you can map to custom IOs with specific data types and addresses.
Defining Custom IOs
-------------------
Use the :py:meth:`~revpimodio2.io.MemIO.replace_io` method to define custom IOs on gateway modules.
Gateway modules provide generic IOs (like ``Input_1``, ``Output_1``, etc.) that you can replace with custom definitions:
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Replace a gateway IO with custom definition
# Gateway IOs have default names like Input_1, Output_1, etc.
rpi.io.Input_1.replace_io(
"temperature", # New IO name
"h", # struct format: signed short
)
# Use the custom IO by its new name
temp = rpi.io.temperature.value / 10.0 # Scale to degrees
print(f"Temperature: {temp}°C")
rpi.exit()
**Parameters:**
* ``name`` - Name for the new IO (will be accessible via ``rpi.io.name``)
* ``frm`` - Struct format character (see `format codes <https://docs.python.org/3/library/struct.html#format-characters>`_ below)
* ``defaultvalue`` - Optional: Default value for the IO
* ``byteorder`` - Optional: Byte order (``'little'`` or ``'big'``), default is ``'little'``
* ``bit`` - Optional: Bit position for boolean IOs (0-7)
* ``event`` - Optional: Register event callback on creation
* ``delay`` - Optional: Event debounce delay in milliseconds
* ``edge`` - Optional: Event edge trigger (RISING, FALLING, or BOTH)
**Note:** The memory address is inherited from the IO being replaced (e.g., ``Input_1``). The new IO uses the same address in the process image.
Struct Format Codes
-------------------
Common format codes for ``replace_io`` (see `Python struct format characters <https://docs.python.org/3/library/struct.html#format-characters>`_ for complete reference):
* ``'b'`` - signed byte (-128 to 127)
* ``'B'`` - unsigned byte (0 to 255)
* ``'h'`` - signed short (-32768 to 32767)
* ``'H'`` - unsigned short (0 to 65535)
* ``'i'`` - signed int (-2147483648 to 2147483647)
* ``'I'`` - unsigned int (0 to 4294967295)
* ``'f'`` - float (32-bit)
* ``'d'`` - float (64-bit)
Multiple Custom IOs
-------------------
Define multiple custom IOs programmatically by replacing generic gateway IOs:
.. code-block:: python
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Replace multiple gateway IOs with custom definitions
# Assuming a gateway module with Input_1, Input_2, Output_1, Output_2
rpi.io.Input_1.replace_io("temperature", "h")
rpi.io.Input_2.replace_io("humidity", "h")
rpi.io.Output_1.replace_io("setpoint", "h", defaultvalue=700)
rpi.io.Output_2.replace_io("control_word", "H", defaultvalue=0)
# Use all custom IOs by their new names
temp = rpi.io.temperature.value / 10.0
humidity = rpi.io.humidity.value / 10.0
print(f"Temp: {temp}°C, Humidity: {humidity}%")
# Write to output registers
rpi.io.setpoint.value = 750 # 75.0°C
rpi.io.control_word.value = 0x0001 # Enable bit
rpi.exit()
Using Configuration Files
--------------------------
For complex IO configurations, use the ``replace_io_file`` parameter to load custom IOs from a file:
.. code-block:: python
# Load custom IOs from configuration file
rpi = revpimodio2.RevPiModIO(
autorefresh=True,
replace_io_file="replace_ios.conf"
)
# Custom IOs are now available
temp = rpi.io.temperature.value / 10.0
print(f"Temperature: {temp}°C")
rpi.exit()
**Configuration File Format:**
Create an INI-style configuration file (``replace_ios.conf``):
.. code-block:: ini
[temperature]
replace = Input_1
frm = h
[humidity]
replace = Input_2
frm = h
[setpoint]
replace = Output_1
frm = h
defaultvalue = 700
[control_word]
replace = Output_2
frm = H
byteorder = big
**Configuration Parameters:**
* ``replace`` - Name of the gateway IO to replace (required)
* ``frm`` - Struct format character (required)
* ``bit`` - Bit position for boolean IOs (0-7)
* ``byteorder`` - Byte order: ``little`` or ``big`` (default: ``little``)
* ``wordorder`` - Word order for multi-word values
* ``defaultvalue`` - Default value for the IO
* ``bmk`` - Internal designation/bookmark
* ``export`` - Export flag for RevPiPyLoad/RevPiPyControl
**Exporting Configuration:**
Export your current custom IOs to a file:
.. code-block:: python
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Define custom IOs by replacing gateway IOs
rpi.io.Input_1.replace_io("temperature", "h", defaultvalue=0)
rpi.io.Input_2.replace_io("humidity", "h", defaultvalue=0)
# Export to configuration file
rpi.export_replaced_ios("my_config.conf")
rpi.exit()
This is useful for:
* Sharing IO configurations across multiple programs
* Integration with RevPiPyLoad and RevPiPyControl
* Version control of IO definitions
* Declarative IO configuration
Watchdog Management
===================
The hardware watchdog monitors your program and resets the system if it stops responding. If you
use the software watchdog will will only works if you use RevPiPyControl as runtime for your python
program.
How the Watchdog Works
-----------------------
The watchdog requires periodic toggling. If not toggled within the timeout period, the system resets.
**Important:** Only enable the watchdog when your program logic is working correctly.
Cyclic Watchdog Toggle
-----------------------
.. code-block:: python
import revpimodio2
def main_cycle(ct):
# Toggle every 10 cycles (200ms @ 20ms)
if ct.flank10c:
ct.core.wd_toggle()
# Your control logic
ct.io.output.value = ct.io.input.value
revpimodio2.run_plc(main_cycle)
Event-Driven Watchdog Toggle
-----------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def toggle_wd(ioname, iovalue):
"""Toggle watchdog every 500ms."""
rpi.core.wd_toggle()
# Register timer event for watchdog
rpi.core.wd.reg_timerevent(toggle_wd, 500, prefire=True)
# Your event handlers
def on_button(ioname, iovalue):
rpi.io.led.value = iovalue
rpi.io.button.reg_event(on_button)
rpi.handlesignalend()
rpi.mainloop()
Performance Optimization
========================
Keep Cycle Logic Fast
---------------------
Minimize processing time in each cycle:
.. code-block:: python
def optimized_cycle(ct):
# Good: Heavy work only when needed
if ct.flank100c:
expensive_calculation()
# Good: Keep cycle logic minimal
ct.io.output.value = ct.io.input.value
# Bad: Don't do this every cycle
# expensive_calculation() # 100ms processing!
**Guidelines:**
* Keep cycle time ≥20ms for stability
* Avoid blocking operations (network, file I/O)
* Use flank flags for expensive operations
* Profile your cycle function if experiencing timing issues
Choose Appropriate Cycle Time
------------------------------
Match cycle time to application requirements:
.. code-block:: python
# Fast control (motion, high-speed counting)
rpi.cycletime = 20 # 50 Hz
# Normal control (most applications)
rpi.cycletime = 50 # 20 Hz
# Slow monitoring (temperature, status)
rpi.cycletime = 100 # 10 Hz
**Trade-offs:**
* Faster = Higher CPU usage, better responsiveness
* Slower = Lower CPU usage, adequate for most tasks
Minimize Event Callbacks
-------------------------
Keep event callbacks lightweight:
.. code-block:: python
# Good: Fast callback
def good_callback(ioname, iovalue):
rpi.io.output.value = iovalue
# Poor: Slow callback blocks event loop
def poor_callback(ioname, iovalue):
time.sleep(1) # Blocks!
complex_calculation() # Slow!
rpi.io.output.value = iovalue
# Better: Use threaded events for slow work
def threaded_callback(eventcallback):
complex_calculation()
rpi.io.output.value = result
rpi.io.trigger.reg_event(threaded_callback, as_thread=True)
Error Handling
==============
Graceful Error Recovery
-----------------------
Always implement safe failure modes:
.. code-block:: python
def safe_cycle(ct):
try:
value = ct.io.sensor.value
result = process(value)
ct.io.output.value = result
except ValueError as e:
print(f"Sensor error: {e}")
ct.io.output.value = 0 # Safe default
except Exception as e:
print(f"Unexpected error: {e}")
ct.io.output.value = False # Safe state
Resource Cleanup
----------------
Always clean up resources:
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
try:
# Your program logic
rpi.cycleloop(main_cycle)
except KeyboardInterrupt:
print("Interrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
# Always clean up
rpi.setdefaultvalues() # Reset outputs to defaults
rpi.exit()
Monitor I/O Errors
------------------
Track and handle I/O errors:
.. code-block:: python
maxioerrors = 10 # Exception after 10 errors
def main_cycle(ct):
# Check error count periodically
if ct.flank20c:
if rpi.core.ioerrorcount > maxioerrors:
print(f"Warning: {rpi.core.ioerrorcount} I/O errors detected")
ct.io.warning_led.value = True
# Normal logic
ct.io.output.value = ct.io.input.value
revpimodio2.run_plc(main_cycle)
Best Practices
==============
Naming Conventions
------------------
Use descriptive IO names in piCtory:
.. code-block:: python
# Good - Clear intent
if rpi.io.emergency_stop.value:
rpi.io.motor.value = False
rpi.io.alarm.value = True
# Poor - Generic names
if rpi.io.I_15.value:
rpi.io.O_3.value = False
rpi.io.O_7.value = True
Code Organization
-----------------
Structure your code for maintainability:
.. code-block:: python
import revpimodio2
# Constants
TEMP_HIGH_THRESHOLD = 75
TEMP_LOW_THRESHOLD = 65
# Initialize
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def initialize(ct):
"""Initialize system state."""
ct.var.cooling_active = False
ct.var.alarm_active = False
ct.io.motor.value = False
def monitor_temperature(ct):
"""Temperature monitoring logic."""
temp = ct.io.temperature.value
if temp > TEMP_HIGH_THRESHOLD:
ct.io.cooling.value = True
ct.var.cooling_active = True
if temp < TEMP_LOW_THRESHOLD:
ct.io.cooling.value = False
ct.var.cooling_active = False
def main_cycle(ct):
"""Main control loop."""
if ct.first:
initialize(ct)
monitor_temperature(ct)
if ct.last:
ct.io.cooling.value = False
# Run
try:
rpi.cycleloop(main_cycle)
finally:
rpi.exit()
Documentation
-------------
Document complex logic:
.. code-block:: python
def control_cycle(ct):
"""Control cycle for temperature management.
State machine:
- IDLE: Waiting for start
- HEATING: Active heating to setpoint
- COOLING: Active cooling from overshoot
- ERROR: Fault condition
Hysteresis: ±5°C around setpoint
"""
if ct.first:
ct.var.state = "IDLE"
ct.var.setpoint = 70.0
# State machine implementation
# ...
Logging
-------
Implement proper logging:
.. code-block:: python
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def main_cycle(ct):
if ct.first:
logging.info("System started")
ct.var.error_count = 0
# Log errors
if ct.io.error_sensor.value:
ct.var.error_count += 1
logging.error(f"Error detected: {ct.var.error_count}")
# Log status periodically
if ct.flank100c:
logging.info(f"Temperature: {ct.io.temperature.value}°C")
if ct.last:
logging.info("System stopped")
Security Considerations
=======================
Validate External Input
-----------------------
Always validate external inputs:
.. code-block:: python
def on_setpoint_change(ioname, iovalue):
"""Validate setpoint range."""
if 0 <= iovalue <= 100:
rpi.io.setpoint.value = iovalue
rpi.io.error_led.value = False
else:
print(f"Invalid setpoint: {iovalue}")
rpi.io.error_led.value = True
Fail-Safe Defaults
------------------
Use safe defaults for critical outputs:
.. code-block:: python
def main_cycle(ct):
if ct.first:
# Safe defaults
ct.io.motor.value = False
ct.io.heater.value = False
ct.io.valve.value = False
try:
# Control logic
control_logic(ct)
except Exception as e:
# Revert to safe state on error
ct.io.motor.value = False
ct.io.heater.value = False
See Also
========
* :doc:`basics` - Core concepts and configuration
* :doc:`cyclic_programming` - Cyclic programming patterns
* :doc:`event_programming` - Event-driven programming patterns
* :doc:`api/index` - API reference

172
docs/api/device.rst Normal file
View File

@@ -0,0 +1,172 @@
==============
Device Classes
==============
Classes for managing Revolution Pi devices.
.. currentmodule:: revpimodio2.device
Device
======
.. autoclass:: Device
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Base class for all Revolution Pi devices.
DeviceList
==========
.. autoclass:: DeviceList
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__, __getitem__, __iter__
Container for accessing devices.
**Example:**
.. code-block:: python
# Access device by name
dio_module = rpi.device.DIO_Module_1
# Access device by position
first_device = rpi.device[0]
# Iterate all devices
for device in rpi.device:
print(f"Device: {device.name}")
Base
====
.. autoclass:: Base
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Base class for Revolution Pi base modules.
GatewayMixin
============
.. autoclass:: GatewayMixin
:members:
:undoc-members:
:show-inheritance:
Mixin class providing gateway functionality for piGate modules.
ModularBaseConnect_4_5
======================
.. autoclass:: ModularBaseConnect_4_5
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Base class for Connect 4 and Connect 5 modules.
Core
====
.. autoclass:: Core
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Revolution Pi Core module.
Connect
=======
.. autoclass:: Connect
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Revolution Pi Connect module.
Connect4
========
.. autoclass:: Connect4
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Revolution Pi Connect 4 module.
Connect5
========
.. autoclass:: Connect5
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Revolution Pi Connect 5 module.
DioModule
=========
.. autoclass:: DioModule
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Digital I/O module.
RoModule
========
.. autoclass:: RoModule
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Relay output module.
Gateway
=======
.. autoclass:: Gateway
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Gateway module (ModbusTCP, Profinet, etc.).
Virtual
=======
.. autoclass:: Virtual
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
:special-members: __init__
Virtual device for custom applications.

194
docs/api/helper.rst Normal file
View File

@@ -0,0 +1,194 @@
==============
Helper Classes
==============
Helper classes for cyclic and event-driven programming.
.. currentmodule:: revpimodio2.helper
Cycletools
==========
.. autoclass:: Cycletools
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Toolkit provided to cyclic functions via ``.cycleloop()``.
This class provides tools for cyclic functions including timing flags
and edge markers. Note that edge markers (flank flags) are all True
during the first cycle!
**Attributes:**
Reference to RevPiModIO core object
Reference to RevPiModIO device object
Reference to RevPiModIO io object
True only on first cycle
True when shutdown signal received
Current function execution time in seconds
Container for cycle-persistent variables
**Toggle Flags** - Alternate between True/False:
1 cycle True, 1 cycle False
2 cycles True, 2 cycles False
5 cycles True, 5 cycles False
10 cycles True, 10 cycles False
20 cycles True, 20 cycles False
**Flank Flags** - True every nth cycle:
True every 5 cycles
True every 10 cycles
True every 15 cycles
True every 20 cycles
**Example:**
.. code-block:: python
def main(ct: revpimodio2.Cycletools):
if ct.first:
# Initialize
ct.var.counter = 0
# Main logic
if ct.changed(ct.io.sensor):
ct.var.counter += 1
# Blink LED using timing flag
ct.io.led.value = ct.flag5c
if ct.last:
# Cleanup
print(f"Final: {ct.var.counter}")
Change Detection
----------------
Timer Functions
---------------
On-Delay Timers
~~~~~~~~~~~~~~~
Off-Delay Timers
~~~~~~~~~~~~~~~~
Pulse Timers
~~~~~~~~~~~~
EventCallback
=============
.. autoclass:: EventCallback
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Thread for internal event function calls.
This class is passed to threaded event handlers registered with
``as_thread=True``. The event function receives this thread object
as a parameter to access event information and control execution.
**Attributes:**
Name of IO that triggered the event
Value of IO when event was triggered
Threading event for abort conditions
**Example:**
.. code-block:: python
def threaded_handler(eventcallback: revpimodio2.EventCallback):
print(f"{eventcallback.ioname} = {eventcallback.iovalue}")
# Interruptible wait (3 seconds)
if eventcallback.exit.wait(3):
print("Wait interrupted!")
return
# Check if stop was called
if eventcallback.exit.is_set():
return
# Register as threaded event
rpi.io.button.reg_event(threaded_handler, as_thread=True)
Methods
-------
ProcimgWriter
=============
.. autoclass:: ProcimgWriter
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Internal thread for process image writing and event management.

148
docs/api/index.rst Normal file
View File

@@ -0,0 +1,148 @@
.. _api_reference:
=============
API Reference
=============
Complete API reference for RevPiModIO2 Python library.
.. contents:: Table of Contents
:local:
:depth: 2
Overview
========
RevPiModIO provides several main classes for programming Revolution Pi hardware:
* :class:`~revpimodio2.modio.RevPiModIO` - Main class for managing all devices and IOs
* :class:`~revpimodio2.modio.RevPiModIOSelected` - Manage specific devices only
* :class:`~revpimodio2.modio.RevPiModIODriver` - Write data to virtual device inputs
* :class:`~revpimodio2.io.IOList` - Container for accessing IOs
* :class:`~revpimodio2.io.IOBase` - Base class for all IO objects
* :class:`~revpimodio2.helper.Cycletools` - Toolkit for cyclic programming
* :class:`~revpimodio2.helper.EventCallback` - Event handler class
Quick Examples
==============
Basic Usage
-----------
.. code-block:: python
import revpimodio2
# Initialize
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Read input and control output
if rpi.io.button.value:
rpi.io.led.value = True
# Cleanup
rpi.exit()
Cyclic Programming
------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def main_cycle(ct: revpimodio2.Cycletools):
if ct.first:
ct.var.counter = 0
if ct.changed(ct.io.sensor):
ct.var.counter += 1
rpi.cycleloop(main_cycle)
Event-Driven Programming
------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_change(ioname, iovalue):
print(f"{ioname} = {iovalue}")
rpi.io.button.reg_event(on_change)
rpi.handlesignalend()
rpi.mainloop()
Constants
=========
Edge Detection
--------------
.. py:data:: revpimodio2.RISING
:type: int
Detect low-to-high transitions
.. py:data:: revpimodio2.FALLING
:type: int
Detect high-to-low transitions
.. py:data:: revpimodio2.BOTH
:type: int
Detect any transition
LED Colors
----------
.. py:data:: revpimodio2.OFF
:type: int
LED off
.. py:data:: revpimodio2.GREEN
:type: int
Green LED
.. py:data:: revpimodio2.RED
:type: int
Red LED
IO Types
--------
.. py:data:: INP
:type: int
:value: 300
Input type
.. py:data:: OUT
:type: int
:value: 301
Output type
.. py:data:: MEM
:type: int
:value: 302
Memory type
See Also
========
* :doc:`../installation` - Installation guide
* :doc:`../quickstart` - Quick start guide
* :doc:`../basics` - Core concepts
* :doc:`../cyclic_programming` - Cyclic programming patterns
* :doc:`../event_programming` - Event-driven programming patterns
* :doc:`../advanced` - Advanced topics

212
docs/api/io.rst Normal file
View File

@@ -0,0 +1,212 @@
====================
IO Classes and Types
====================
Classes for managing Revolution Pi inputs and outputs.
.. currentmodule:: revpimodio2.io
IOList
======
.. autoclass:: IOList
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__, __getitem__, __contains__, __iter__
Container for accessing all IO objects.
The IOList provides multiple ways to access IOs:
* **Direct attribute access**: ``rpi.io.button.value``
* **String-based access**: ``rpi.io["button"].value``
* **Iteration**: ``for io in rpi.io: ...``
**Example:**
.. code-block:: python
# Direct access
value = rpi.io.I_1.value
rpi.io.O_1.value = True
# String-based access
value = rpi.io["I_1"].value
# Check if IO exists
if "sensor" in rpi.io:
print(rpi.io.sensor.value)
# Iterate all IOs
for io in rpi.io:
print(f"{io.name}: {io.value}")
IOBase
======
.. autoclass:: IOBase
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Base class for all IO objects.
**Properties:**
IO name from piCtory configuration
Current IO value (read/write)
Byte address in process image
Byte length (0 for single bits)
IO type: 300=INPUT, 301=OUTPUT, 302=MEMORY
Whether value is signed
"little" or "big" endian
Configured default value from piCtory
Comment/description from piCtory
Export flag status
Event Registration Methods
---------------------------
Value Manipulation Methods
---------------------------
IntIO
=====
.. autoclass:: IntIO
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
IO objects with integer value access.
**Example:**
.. code-block:: python
# Get integer value
temp = rpi.io.temperature.get_intvalue()
# Set integer value
rpi.io.setpoint.set_intvalue(1500)
Integer Value Methods
---------------------
IntIOCounter
============
.. autoclass:: IntIOCounter
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Counter input objects with reset capability.
**Example:**
.. code-block:: python
# Read counter
count = rpi.io.counter.value
# Reset counter
rpi.io.counter.reset()
StructIO
========
.. autoclass:: StructIO
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Structured IO with format strings for complex data types.
**Example:**
.. code-block:: python
# Get structured value
value = rpi.io.sensor_data.get_structvalue()
Structured Value Methods
------------------------
MemIO
=====
.. autoclass:: MemIO
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Memory IO with variant value access (string or integer).
RelaisOutput
============
.. autoclass:: RelaisOutput
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Relay output with switching cycle counter.
**Example:**
.. code-block:: python
# Get number of switching cycles
cycles = rpi.io.relay.get_switching_cycles()
IOEvent
=======
.. autoclass:: IOEvent
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Internal class for IO event management.

141
docs/api/revpimodio.rst Normal file
View File

@@ -0,0 +1,141 @@
==================
RevPiModIO Classes
==================
Main classes for managing Revolution Pi hardware.
.. currentmodule:: revpimodio2.modio
RevPiModIO
==========
.. autoclass:: RevPiModIO
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Main class for managing all devices and IOs from the piCtory configuration.
This class manages the complete piCtory configuration and loads all devices
and IOs. It handles exclusive management of the process image and ensures
data synchronization.
**Constructor Parameters:**
:param autorefresh: Automatically sync process image (recommended: True)
:type autorefresh: bool
:param monitoring: Read-only mode for supervision (no writes)
:type monitoring: bool
:param syncoutputs: Load current output values on initialization
:type syncoutputs: bool
:param debug: Enable detailed error messages and logging
:type debug: bool
**Key Attributes:**
Access to all configured inputs/outputs
Access to RevPi Core values (LEDs, status)
Access to specific devices by name
Update frequency in milliseconds
Threading event for clean shutdown
Count of read/write failures
Exception threshold (0 = disabled)
**Example:**
.. code-block:: python
import revpimodio2
# Initialize with auto-refresh
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Access IOs
if rpi.io.button.value:
rpi.io.led.value = True
# Clean shutdown
rpi.exit()
Loop Execution Methods
----------------------
Data Synchronization Methods
-----------------------------
Utility Methods
---------------
RevPiModIOSelected
==================
.. autoclass:: RevPiModIOSelected
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Manage only specific devices from the piCtory configuration.
Use this class when you only need to control specific devices instead of
loading the entire configuration.
**Example:**
.. code-block:: python
# Manage only specific devices
rpi = revpimodio2.RevPiModIOSelected("DIO_Module_1", "AIO_Module_1")
RevPiModIODriver
================
.. autoclass:: RevPiModIODriver
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Write data to virtual device inputs for driver development.
**Example:**
.. code-block:: python
# Create driver for virtual device
driver = revpimodio2.RevPiModIODriver("VirtualDevice")
DevSelect
=========
.. autoclass:: DevSelect
:members:
:undoc-members:
:show-inheritance:
:special-members: __init__
Customized search filter for RevPiModIOSelected.

332
docs/basics.rst Normal file
View File

@@ -0,0 +1,332 @@
======
Basics
======
Core concepts and fundamental usage of RevPiModIO.
.. contents:: Contents
:local:
:depth: 2
Programming Paradigms
=====================
RevPiModIO supports two complementary programming approaches:
**Cyclic Programming** - Execute a function at regular intervals, similar to PLC programming.
* Best for deterministic timing, state machines, and time-critical control
* Runs your function every cycle (typically 20-50ms)
* See :doc:`cyclic_programming` for details
**Event-Driven Programming** - Register callbacks triggered by hardware state changes.
* Best for user interactions, sporadic events, and system integration
* Consumes CPU only when events occur
* See :doc:`event_programming` for details
Both approaches can be combined in a single application. See :doc:`advanced` for examples.
Getting Started
===============
Basic Instantiation
-------------------
Create a RevPiModIO instance to access your hardware:
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Your code here
rpi.exit()
Configuration Parameters
------------------------
.. code-block:: python
rpi = revpimodio2.RevPiModIO(
autorefresh=True, # Auto-sync process image (recommended)
monitoring=False, # Read-only mode
syncoutputs=True, # Load output values on init
debug=False # Enable debug messages
)
**autorefresh** - Automatically reads inputs and writes outputs. Set to ``True`` for most applications.
**monitoring** - Read-only mode. Use when monitoring without controlling hardware.
**syncoutputs** - Load current output values on startup. Prevents outputs from resetting.
**debug** - Enable debug logging for troubleshooting.
Cycle Timing
------------
Default update rates depend on your hardware:
* **Core 1**: 40ms (25Hz)
* **Core3/Connect**: 20ms (50Hz)
* **NetIO**: 50ms (20Hz)
Adjust cycle time to match your needs:
.. code-block:: python
rpi = revpimodio2.RevPiModIO()
rpi.cycletime = 100 # Set to 100ms
rpi.autorefresh_all()
**Important:** Faster cycle times consume more CPU. Choose the slowest cycle time that meets your requirements. Default values will fit most needs.
Error Handling
--------------
Configure I/O error threshold:
.. code-block:: python
maxioerrors = 10 # Raise exception after 10 errors
# Check error count
if rpi.core.ioerrorcount > maxioerrors:
print("Warning: I/O errors detected")
Core Objects
============
rpi.io - Input/Output Access
-----------------------------
Access all configured IOs from piCtory:
.. code-block:: python
# Direct attribute access
value = rpi.io.button.value
rpi.io.led.value = True
# String-based access
rpi.io["button"].value
# Check existence
if "sensor" in rpi.io:
print(rpi.io.sensor.value)
# Iterate all IOs
for io in rpi.io:
print(f"{io.name}: {io.value}")
IO Properties
~~~~~~~~~~~~~
Each IO object has these properties:
* ``.name`` - IO name from piCtory
* ``.value`` - Current value (read/write)
* ``.address`` - Byte address in process image
* ``.type`` - IO type (INPUT=300, OUTPUT=301, MEMORY=302)
* ``.defaultvalue`` - Default value from piCtory
rpi.core - System Control
--------------------------
Access Revolution Pi system features:
LED Control
~~~~~~~~~~~
.. code-block:: python
# Using constants
rpi.core.A1 = revpimodio2.GREEN
rpi.core.A2 = revpimodio2.RED
rpi.core.A3 = revpimodio2.OFF
# Individual colors
rpi.core.a1green.value = True
rpi.core.a1red.value = False
System Status
~~~~~~~~~~~~~
.. code-block:: python
# CPU information
temp = rpi.core.temperature.value
freq = rpi.core.frequency.value
# piBridge status
cycle_time = rpi.core.iocycle.value
errors = rpi.core.ioerrorcount.value
Watchdog
~~~~~~~~
.. code-block:: python
# Toggle watchdog
rpi.core.wd_toggle()
# Watchdog IO object
rpi.core.wd.value = True
See :doc:`advanced` for complete watchdog management examples.
rpi.device - Device Access
---------------------------
Access specific hardware devices:
.. code-block:: python
# By name
dio = rpi.device.DIO_Module_1
# By position
first = rpi.device[0]
# Iterate
for device in rpi.device:
print(device.name)
Signal Handling
===============
Graceful Shutdown
-----------------
Handle SIGINT and SIGTERM for clean program termination:
.. code-block:: python
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Enable signal handling
rpi.handlesignalend()
# Run main loop
rpi.mainloop()
Custom Signal Handler
---------------------
Implement custom cleanup logic:
.. code-block:: python
def cleanup(signum, frame):
print("Shutting down...")
rpi.setdefaultvalues()
rpi.exit()
rpi.handlesignalend(cleanup)
rpi.mainloop()
Simple Examples
===============
Read Input, Control Output
---------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Read input and control output
if rpi.io.button.value:
rpi.io.led.value = True
else:
rpi.io.led.value = False
rpi.exit()
LED Control
-----------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Control status LEDs
rpi.core.A1 = revpimodio2.GREEN
rpi.core.A2 = revpimodio2.RED
rpi.exit()
Iterate All IOs
---------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Print all IOs and their values
for io in rpi.io:
print(f"{io.name}: {io.value}")
rpi.exit()
Best Practices
==============
Use Descriptive IO Names
-------------------------
Configure descriptive names in piCtory:
.. code-block:: python
# Good - Clear intent
if rpi.io.emergency_stop.value:
rpi.io.motor.value = False
# Poor - Generic names
if rpi.io.I_15.value:
rpi.io.O_3.value = False
Always Clean Up
---------------
Always call ``rpi.exit()`` to clean up resources:
.. code-block:: python
rpi = revpimodio2.RevPiModIO(autorefresh=True)
try:
# Your code here
pass
finally:
rpi.exit()
Check IO Existence
------------------
Verify IOs exist before accessing:
.. code-block:: python
if "optional_sensor" in rpi.io:
value = rpi.io.optional_sensor.value
else:
print("Sensor not configured")
See Also
========
* :doc:`cyclic_programming` - Cyclic programming patterns
* :doc:`event_programming` - Event-driven programming patterns
* :doc:`advanced` - Advanced topics and best practices
* :doc:`api/index` - API reference

View File

@@ -15,19 +15,74 @@ project = 'revpimodio2'
copyright = '2023, Sven Sager' copyright = '2023, Sven Sager'
author = 'Sven Sager' author = 'Sven Sager'
version = __version__ version = __version__
release = __version__
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
extensions = [ extensions = [
'sphinx.ext.autodoc', 'sphinx.ext.autodoc',
'sphinx.ext.napoleon',
'sphinx.ext.viewcode',
'sphinx.ext.intersphinx',
'sphinx.ext.autosummary',
'sphinx.ext.todo', 'sphinx.ext.todo',
'sphinx.ext.viewcode'
] ]
# Autodoc configuration
autodoc_default_options = {
'members': True,
'member-order': 'bysource',
'special-members': '__init__',
'undoc-members': True,
'inherited-members': True,
'exclude-members': '__weakref__'
}
# Napoleon settings (for NumPy and Google style docstrings)
napoleon_google_docstring = True
napoleon_numpy_docstring = True
napoleon_include_init_with_doc = True
napoleon_include_private_with_doc = False
napoleon_include_special_with_doc = True
napoleon_use_admonition_for_examples = True
napoleon_use_admonition_for_notes = True
napoleon_use_admonition_for_references = True
napoleon_use_ivar = False
napoleon_use_param = True
napoleon_use_rtype = True
napoleon_preprocess_types = True
napoleon_type_aliases = None
napoleon_attr_annotations = True
# Autosummary settings
autosummary_generate = True
# Intersphinx configuration
intersphinx_mapping = {
'python': ('https://docs.python.org/3', None),
}
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates'] templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output ------------------------------------------------- # -- Options for HTML output -------------------------------------------------
html_theme = 'alabaster' html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static'] html_static_path = ['_static']
# Theme options
html_theme_options = {
'navigation_depth': 4,
'collapse_navigation': False,
'sticky_navigation': True,
'includehidden': True,
'titles_only': False
}
# -- Options for todo extension ----------------------------------------------
todo_include_todos = True

590
docs/cyclic_programming.rst Normal file
View File

@@ -0,0 +1,590 @@
==================
Cyclic Programming
==================
Cyclic programming executes a function at regular intervals, similar to PLC programming.
.. contents:: Contents
:local:
:depth: 2
When to Use Cyclic Programming
===============================
**Cyclic programming is ideal for:**
* Deterministic timing requirements
* Traditional PLC-style logic
* State machines
* Time-critical control
* Continuous monitoring and control
**Advantages:**
* Predictable timing
* Simple mental model
* Easy to reason about program flow
* Natural for control systems
**Considerations:**
* Consumes CPU even when idle
* Cycle time affects responsiveness
* Must keep cycle logic fast
Basic Structure
===============
Simple Cycle Loop
-----------------
.. code-block:: python
import revpimodio2
def main_cycle(ct: revpimodio2.Cycletools):
"""Execute each cycle."""
if ct.io.start_button.value:
ct.io.motor.value = True
if ct.io.stop_button.value:
ct.io.motor.value = False
revpimodio2.run_plc(main_cycle)
# .run_plc is a shortcut for:
# rpi = revpimodio2.RevPiModIO(autorefresh=True)
# rpi.handlesignalend()
# rpi.cycleloop(main_cycle)
The ``main_cycle`` function is called repeatedly at the configured cycle time (typically 20-50ms).
**Info:** ``rpi.handlesignalend()``
Understanding Cycle Time
-------------------------
The cycle time determines execution frequency:
* **Core 1**: 40ms (25 Hz)
* **Core3/Connect**: 20ms (50 Hz)
* **NetIO**: 50ms (20 Hz)
Adjust cycle time to match your needs:
.. code-block:: python
revpimodio2.run_plc(main_cycle, cycletime=100) # 100ms = 10 Hz
**Important:** Faster cycle times consume more CPU but will detect fast changes of input values.
Cycletools Object
=================
The ``Cycletools`` object is passed to your cycle function, providing access to:
* ``ct.io`` - All IOs
* ``ct.core`` - System control
* ``ct.device`` - Device access
* ``ct.var`` - Persistent variables
* Lifecycle flags (``first``, ``last``)
* Timing flags (``flag5c``, ``flank10c``, etc.)
* Timer functions (``set_tonc``, ``get_tofc``, etc.)
* Change detection (``changed``)
Initialization and Cleanup
===========================
Use ``ct.first`` and ``ct.last`` for setup and teardown:
.. code-block:: python
def main_cycle(ct: revpimodio2.Cycletools):
if ct.first:
# Initialize on first cycle
ct.var.counter = 0
ct.var.state = "IDLE"
print("System started")
# Main logic runs every cycle
ct.var.counter += 1
if ct.last:
# Cleanup before exit
ct.io.motor.value = False
print(f"Total cycles: {ct.var.counter}")
revpimodio2.run_plc(main_cycle)
Persistent Variables
====================
Use ``ct.var`` to store variables that persist between cycles:
.. code-block:: python
def main_cycle(ct):
if ct.first:
ct.var.counter = 0
ct.var.state = "IDLE"
ct.var.accumulator = 0.0
# Variables persist between cycles
ct.var.counter += 1
ct.var.accumulator += ct.io.sensor.value
# Access variables later
average = ct.var.accumulator / ct.var.counter
Variables defined in ``ct.var`` maintain their values across all cycle executions.
Change Detection
================
Detect input changes efficiently without storing previous values:
.. code-block:: python
def main_cycle(ct: revpimodio2.Cycletools):
# Detect any change
if ct.changed(ct.io.sensor):
print(f"Sensor changed to: {ct.io.sensor.value}")
# Detect rising edge (button press)
if ct.changed(ct.io.button, edge=revpimodio2.RISING):
print("Button pressed!")
# Detect falling edge (button release)
if ct.changed(ct.io.button, edge=revpimodio2.FALLING):
print("Button released!")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.handlesignalend()
rpi.cycleloop(main_cycle)
Edge types:
* ``revpimodio2.RISING`` - False to True transition
* ``revpimodio2.FALLING`` - True to False transition
* ``revpimodio2.BOTH`` - Any change (default)
Timing Flags
============
Built-in timing flags provide periodic execution without manual counting.
Toggle Flags
------------
Toggle flags alternate between True/False at regular intervals:
.. code-block:: python
def main_cycle(ct):
# Blink LED - flag5c alternates every cycle
ct.io.blink_led.value = ct.flag1c
# Different blink rates
ct.io.fast_blink.value = ct.flag5c # Every 5 cycles
ct.io.slow_blink.value = ct.flag20c # Every 20 cycles
**Available toggle flags:**
* ``ct.flag1c`` - Every cycle
* ``ct.flag5c`` - Every 5 cycles
* ``ct.flag10c`` - Every 10 cycles
* ``ct.flag20c`` - Every 20 cycles
Flank Flags
-----------
Flank flags are True for exactly one cycle at regular intervals:
.. code-block:: python
def main_cycle(ct):
# Execute task every 10 cycles
if ct.flank10c:
print(f"Runtime: {ct.runtime:.3f}s")
# Execute task every 20 cycles
if ct.flank20c:
temp = ct.io.temperature.value
print(f"Temperature: {temp}°C")
**Available flank flags:**
* ``ct.flank5c`` - True every 5 cycles
* ``ct.flank10c`` - True every 10 cycles
* ``ct.flank15c`` - True every 15 cycles
* ``ct.flank20c`` - True every 20 cycles
Timers
======
RevPiModIO provides three timer types based on PLC standards. All timers are specified in cycle counts or milliseconds.
On-Delay Timer (TON/TONC)
--------------------------
Output becomes True only after input is continuously True for specified cycles (use ton with milliseconds value instead of cycles):
.. code-block:: python
def main_cycle(ct):
# Input: sensor value
ct.set_tonc("delay", 10)
# Output goes high after input is high for 10 cycles
if ct.get_tonc("delay"):
ct.io.output.value = True
else:
ct.io.output.value = False
**How it works:**
1. Input goes True
2. Timer starts counting
3. If input stays True for 10 cycles, output goes True
4. If input goes False before 10 cycles, timer resets
**Use cases:**
* Button debouncing
* Startup delays
* Confirming sustained conditions
Off-Delay Timer (TOF/TOFC)
---------------------------
Output stays True for specified cycles or milliseconds after input goes False (use tof with milliseconds value instead of cycles):
.. code-block:: python
def main_cycle(ct):
# Input: button value
ct.set_tofc("motor_coast", 20)
# Motor continues for 20 cycles after button release
ct.io.motor.value = ct.get_tofc("motor_coast")
**How it works:**
1. Input is True, output is True
2. Input goes False
3. Output stays True for 20 more cycles
4. After 20 cycles, output goes False
**Use cases:**
* Motor coast-down
* Relay hold-in
* Graceful shutdowns
Pulse Timer (TP/TPC)
--------------------
Generates a one-shot pulse of specified duration (use tp with milliseconds value instead of cycles):
.. code-block:: python
def main_cycle(ct):
# Trigger pulse on button press
if ct.changed(ct.io.trigger, edge=revpimodio2.RISING):
ct.set_tpc("pulse", 5)
# Output is True for 5 cycles
ct.io.pulse_output.value = ct.get_tpc("pulse")
**How it works:**
1. Call ``set_tpc`` to trigger pulse
2. Output is True for 5 cycles
3. After 5 cycles, output goes False
4. Additional triggers during pulse are ignored
**Use cases:**
* One-shot operations
* Acknowledgment pulses
* Retriggerable delays
State Machines
==============
State machines implement complex control logic with distinct operational modes.
Simple State Machine
---------------------
.. code-block:: python
def traffic_light(ct: revpimodio2.Cycletools):
"""Traffic light controller."""
if ct.first:
ct.var.state = "GREEN"
if ct.var.state == "GREEN":
ct.io.green_led.value = True
ct.io.yellow_led.value = False
ct.io.red_led.value = False
# After 2 seconds, go to yellow
ct.set_ton("green_time", 2000)
if ct.get_ton("green_time"):
ct.var.state = "YELLOW"
elif ct.var.state == "YELLOW":
ct.io.green_led.value = False
ct.io.yellow_led.value = True
ct.set_ton("yellow_time", 500)
if ct.get_ton("yellow_time"):
ct.var.state = "RED"
elif ct.var.state == "RED":
ct.io.yellow_led.value = False
ct.io.red_led.value = True
ct.set_ton("red_time", 3000)
if ct.get_ton("red_time"):
ct.var.state = "GREEN"
rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.handlesignalend()
rpi.cycleloop(traffic_light)
Complex State Machine
----------------------
.. code-block:: python
def machine_controller(ct: revpimodio2.Cycletools):
"""Multi-state machine controller."""
if ct.first:
ct.var.state = "IDLE"
ct.var.production_count = 0
# State: IDLE - Ready to start
if ct.var.state == "IDLE":
ct.io.motor.value = False
ct.io.green_led.value = True
ct.io.red_led.value = False
if ct.changed(ct.io.start_button, edge=revpimodio2.RISING):
ct.var.state = "STARTING"
print("Starting...")
# State: STARTING - Startup sequence
elif ct.var.state == "STARTING":
ct.io.yellow_led.value = True
# 2-second startup delay
ct.set_ton("startup", 2000)
if ct.get_ton("startup"):
ct.var.state = "RUNNING"
print("Running")
# State: RUNNING - Normal operation
elif ct.var.state == "RUNNING":
ct.io.motor.value = True
ct.io.yellow_led.value = False
ct.io.green_led.value = ct.flag5c # Blink
# Count production
if ct.changed(ct.io.sensor, edge=revpimodio2.RISING):
ct.var.production_count += 1
# Check for stop
if ct.io.stop_button.value:
ct.var.state = "STOPPING"
# Check for error
if ct.io.error_sensor.value:
ct.var.state = "ERROR"
# State: STOPPING - Controlled shutdown
elif ct.var.state == "STOPPING":
# Coast motor for 1 second
ct.set_tof("coast", 1000)
ct.io.motor.value = ct.get_tof("coast")
if not ct.io.motor.value:
ct.var.state = "IDLE"
print("Stopped")
# State: ERROR - Fault condition
elif ct.var.state == "ERROR":
ct.io.motor.value = False
ct.io.red_led.value = ct.flag5c # Blink red
if ct.changed(ct.io.ack_button, edge=revpimodio2.RISING):
if not ct.io.error_sensor.value:
ct.var.state = "IDLE"
print("Error cleared")
if ct.last:
print(f"Total production: {ct.var.production_count}")
revpimodio.run_plc(machine_controller)
Practical Examples
==================
Temperature Control
-------------------
Temperature monitoring with hysteresis control:
.. code-block:: python
def temperature_monitor(ct: revpimodio2.Cycletools):
"""Monitor temperature and control cooling."""
if ct.first:
ct.var.cooling_active = False
temp = ct.io.temperature.value
# Hysteresis: ON at 75°C, OFF at 65°C
if temp > 75 and not ct.var.cooling_active:
ct.io.cooling_fan.value = True
ct.var.cooling_active = True
print(f"Cooling ON: {temp}°C")
elif temp < 65 and ct.var.cooling_active:
ct.io.cooling_fan.value = False
ct.var.cooling_active = False
print(f"Cooling OFF: {temp}°C")
# Warning if too hot
if temp > 85:
ct.core.a1green.value = False
ct.core.a1red.value = ct.flag5c # Blink
else:
ct.core.a1green.value = ct.flag5c # Blink
ct.core.a1red.value = False
# Emergency shutdown
if temp > 95:
ct.io.emergency_shutdown.value = True
revpimodio2.run_plc(temperature_monitor)
Production Counter
------------------
Count production items with start/stop control:
.. code-block:: python
def production_counter(ct: revpimodio2.Cycletools):
"""Track production count."""
if ct.first:
ct.var.total_count = 0
ct.var.running = False
# Start/stop control
if ct.changed(ct.io.start_button, edge=revpimodio2.RISING):
ct.var.running = True
if ct.changed(ct.io.stop_button, edge=revpimodio2.RISING):
ct.var.running = False
# Count items
if ct.var.running:
if ct.changed(ct.io.item_sensor, edge=revpimodio2.RISING):
ct.var.total_count += 1
ct.set_tpc("count_pulse", 5) # Pulse LED
print(f"Item #{ct.var.total_count}")
ct.io.count_led.value = ct.get_tpc("count_pulse")
# Reset counter
if ct.changed(ct.io.reset_button, edge=revpimodio2.RISING):
print(f"Final count: {ct.var.total_count}")
ct.var.total_count = 0
revpimodio2.run_plc(production_counter)
Best Practices
==============
Keep Cycle Logic Fast
----------------------
Minimize processing time in each cycle:
.. code-block:: python
def optimized_cycle(ct):
# Heavy work only when needed
if ct.flank100c:
heavy_calculation()
# Keep cycle logic minimal
ct.io.output.value = ct.io.input.value
**Guidelines:**
* Avoid blocking operations (network, file I/O)
* Use flank flags for expensive operations or even Threads
* Keep cycle time ≥20ms for stability
Use Appropriate Cycle Time
---------------------------
Match cycle time to application requirements:
.. code-block:: python
# Fast control (motion, high-speed counting)
rpi.cycletime = 20 # 50 Hz
# Normal control (most applications)
rpi.cycletime = 50 # 20 Hz
# Slow monitoring (temperature, status)
rpi.cycletime = 100 # 10 Hz
Handle Errors Safely
--------------------
Always implement safe failure modes:
.. code-block:: python
def safe_cycle(ct):
try:
value = ct.io.sensor.value
ct.io.output.value = process(value)
except Exception as e:
print(f"Error: {e}")
ct.io.output.value = False # Safe state
Initialize Properly
-------------------
Use ``ct.first`` for all initialization:
.. code-block:: python
def main_cycle(ct):
if ct.first:
# Initialize all variables
ct.var.counter = 0
ct.var.state = "IDLE"
ct.var.last_value = 0
# Set initial outputs
ct.io.motor.value = False
See Also
========
* :doc:`basics` - Core concepts and configuration
* :doc:`event_programming` - Event-driven programming
* :doc:`advanced` - Advanced topics and examples
* :doc:`api/helper` - Cycletools API reference

516
docs/event_programming.rst Normal file
View File

@@ -0,0 +1,516 @@
====================
Event Programming
====================
Event-driven programming uses callbacks triggered by hardware state changes.
.. contents:: Contents
:local:
:depth: 2
When to Use Event-Driven Programming
=====================================
**Event-driven programming is ideal for:**
* Handling user interactions
* Processing occasional events
* Background tasks
* System integration
* Low CPU usage requirements
**Advantages:**
* Consumes CPU only when events occur
* Natural for user interfaces
* Simple asynchronous operation
* Efficient for sporadic events
**Considerations:**
* Non-deterministic timing
* Must handle concurrent events carefully
* Less intuitive for continuous control
Basic Structure
===============
Simple Event Handler
--------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_button_change(ioname, iovalue):
"""Called when button changes."""
print(f"{ioname} = {iovalue}")
rpi.io.led.value = iovalue
# Register event
rpi.io.button.reg_event(on_button_change)
# Run main loop
rpi.handlesignalend()
rpi.mainloop()
The callback function receives:
* ``ioname`` - Name of the IO that changed
* ``iovalue`` - New value of the IO
Event Registration
==================
Value Change Events
-------------------
Register callbacks for IO value changes:
.. code-block:: python
def on_change(ioname, iovalue):
print(f"{ioname} changed to {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Any change
rpi.io.sensor.reg_event(on_change)
# Rising edge only
rpi.io.button.reg_event(on_change, edge=revpimodio2.RISING)
# Falling edge only
rpi.io.button.reg_event(on_change, edge=revpimodio2.FALLING)
rpi.handlesignalend()
rpi.mainloop()
**Edge types:**
* ``revpimodio2.RISING`` - False to True transition
* ``revpimodio2.FALLING`` - True to False transition
* ``revpimodio2.BOTH`` - Any change (default)
Multiple Events
---------------
Register multiple callbacks on one IO:
.. code-block:: python
def on_press(ioname, iovalue):
print("Pressed!")
def on_release(ioname, iovalue):
print("Released!")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Different callbacks for different edges
rpi.io.button.reg_event(on_press, edge=revpimodio2.RISING)
rpi.io.button.reg_event(on_release, edge=revpimodio2.FALLING)
rpi.handlesignalend()
rpi.mainloop()
Or register one callback on multiple IOs:
.. code-block:: python
def any_sensor_changed(ioname, iovalue):
print(f"{ioname} changed to {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Same callback for multiple sensors
for sensor in ["sensor1", "sensor2", "sensor3"]:
if sensor in rpi.io:
rpi.io[sensor].reg_event(any_sensor_changed)
rpi.handlesignalend()
rpi.mainloop()
Debouncing
==========
Add debounce delays to filter noise and false triggers:
.. code-block:: python
def on_stable_press(ioname, iovalue):
"""Called only after button is stable for 50ms."""
print(f"Confirmed: {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# 50ms debounce delay
rpi.io.noisy_button.reg_event(on_stable_press, delay=50)
rpi.handlesignalend()
rpi.mainloop()
**How debouncing works:**
1. IO value changes
2. RevPiModIO waits for ``delay`` milliseconds
3. If value is still changed, callback is triggered
4. If value changed back, callback is not triggered
**Typical debounce times:**
* Mechanical switches: 20-50ms
* Relays: 10-20ms
* Analog sensors: 100-500ms
Debouncing with Edge Detection
-------------------------------
.. code-block:: python
def on_confirmed_press(ioname, iovalue):
"""Called only for stable button press."""
print("Confirmed press")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Rising edge with 30ms debounce
rpi.io.button.reg_event(
on_confirmed_press,
edge=revpimodio2.RISING,
delay=30
)
rpi.handlesignalend()
rpi.mainloop()
Timer Events
============
The timer is started when the IO value changes and executes the passed
function - even if the IO value has changed in the meantime. If the
timer has not expired and the condition is met again, the timer is NOT
reset to the delay value or started a second time.
.. code-block:: python
def periodic_task(ioname, iovalue):
"""Called after 500ms."""
print(f"Periodic task: {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Execute every 500ms
rpi.io.dummy.reg_timerevent(periodic_task, 500)
rpi.handlesignalend()
rpi.mainloop()
Timer Event Parameters
----------------------
.. code-block:: python
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def blink_led(ioname, iovalue):
"""Toggle LED every 500ms."""
rpi.io.blink_led.value = not rpi.io.blink_led.value
def log_temperature(ioname, iovalue):
"""Log temperature every 5 seconds."""
temp = rpi.io.temperature.value
print(f"Temperature: {temp}°C")
# Blink every 500ms, trigger immediately
rpi.io.blink_led.reg_timerevent(blink_led, 500, prefire=True)
# Log every 5 seconds, don't trigger immediately
rpi.io.temperature.reg_timerevent(log_temperature, 5000, prefire=False)
rpi.handlesignalend()
rpi.mainloop()
**Parameters:**
* ``interval`` - Delay in milliseconds.
* ``prefire`` - If True, trigger immediately after starting the mainloop.
Threaded Events
===============
Use threaded events for long-running operations that would block the main loop:
.. code-block:: python
def long_task(eventcallback: revpimodio2.EventCallback):
"""Threaded handler for time-consuming tasks."""
print(f"Starting task for {eventcallback.ioname}")
for i in range(10):
# Check if stop requested
if eventcallback.exit.is_set():
print("Task cancelled")
return
# Interruptible wait (1 second)
eventcallback.exit.wait(1)
print(f"Progress: {(i+1)*10}%")
print("Task complete")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register as threaded event
rpi.io.trigger.reg_event(
long_task,
as_thread=True,
edge=revpimodio2.RISING
)
rpi.handlesignalend()
rpi.mainloop()
EventCallback Object
--------------------
Threaded callbacks receive an ``EventCallback`` object with:
* ``eventcallback.ioname`` - Name of the IO
* ``eventcallback.iovalue`` - Value that triggered event
* ``eventcallback.exit`` - ``threading.Event`` for cancellation
**Important:** Always check ``eventcallback.exit.is_set()`` periodically to allow graceful shutdown.
Interruptible Sleep
-------------------
Use ``eventcallback.exit.wait()`` instead of ``time.sleep()`` for interruptible delays:
.. code-block:: python
def background_task(eventcallback: revpimodio2.EventCallback):
"""Long task with interruptible waits."""
while not eventcallback.exit.is_set():
# Do some work
process_data()
# Wait 5 seconds or until exit requested
if eventcallback.exit.wait(5):
break # Exit was requested
rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.io.trigger.reg_event(background_task, as_thread=True)
rpi.handlesignalend()
rpi.mainloop()
Unregistering Events
====================
Remove event callbacks when no longer needed:
.. code-block:: python
def my_callback(ioname, iovalue):
print(f"{ioname} = {iovalue}")
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register event
rpi.io.button.reg_event(my_callback)
# Unregister specific callback
rpi.io.button.unreg_event(my_callback)
# Unregister all events for this IO
rpi.io.button.unreg_event()
Practical Examples
==================
LED Toggle on Button Press
---------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def toggle_led(ioname, iovalue):
"""Toggle LED on button press."""
rpi.io.led.value = not rpi.io.led.value
print(f"LED: {rpi.io.led.value}")
rpi.io.button.reg_event(toggle_led, edge=revpimodio2.RISING)
rpi.handlesignalend()
rpi.mainloop()
Multiple Button Handler
------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_start(ioname, iovalue):
print("Starting motor...")
rpi.io.motor.value = True
rpi.core.A1 = revpimodio2.GREEN
def on_stop(ioname, iovalue):
print("Stopping motor...")
rpi.io.motor.value = False
rpi.core.A1 = revpimodio2.RED
def on_emergency(ioname, iovalue):
print("EMERGENCY STOP!")
rpi.io.motor.value = False
rpi.io.alarm.value = True
rpi.core.A1 = revpimodio2.RED
# Register different buttons
rpi.io.start_button.reg_event(on_start, edge=revpimodio2.RISING)
rpi.io.stop_button.reg_event(on_stop, edge=revpimodio2.RISING)
rpi.io.emergency_stop.reg_event(on_emergency, edge=revpimodio2.RISING)
rpi.handlesignalend()
rpi.mainloop()
Sensor Logging
--------------
.. code-block:: python
import revpimodio2
from datetime import datetime
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def log_sensor_change(ioname, iovalue):
"""Log sensor changes with timestamp."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{timestamp} - {ioname}: {iovalue}")
# Log all sensor changes
for io_name in ["sensor1", "sensor2", "temperature"]:
if io_name in rpi.io:
rpi.io[io_name].reg_event(log_sensor_change)
rpi.handlesignalend()
rpi.mainloop()
Threaded Data Processing
-------------------------
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def process_batch(eventcallback: revpimodio2.EventCallback):
"""Process data batch in background thread."""
print(f"Starting batch processing...")
batch_size = 100
for i in range(batch_size):
if eventcallback.exit.is_set():
print("Processing cancelled")
return
# Simulate processing
eventcallback.exit.wait(0.1)
if i % 10 == 0:
print(f"Progress: {i}/{batch_size}")
print("Batch processing complete")
rpi.io.done_led.value = True
# Trigger on button press
rpi.io.start_batch.reg_event(
process_batch,
as_thread=True,
edge=revpimodio2.RISING
)
rpi.handlesignalend()
rpi.mainloop()
Best Practices
==============
Keep Callbacks Fast
-------------------
Event callbacks should complete quickly:
.. code-block:: python
# Good - Fast callback
def good_callback(ioname, iovalue):
rpi.io.output.value = iovalue
# Poor - Blocking callback
def poor_callback(ioname, iovalue):
time.sleep(5) # Blocks event loop!
rpi.io.output.value = iovalue
For slow operations, use threaded events:
.. code-block:: python
def slow_task(eventcallback):
# Long operation in separate thread
process_data()
rpi.io.trigger.reg_event(slow_task, as_thread=True)
Handle Errors Gracefully
-------------------------
Protect callbacks from exceptions:
.. code-block:: python
def safe_callback(ioname, iovalue):
try:
result = risky_operation(iovalue)
rpi.io.output.value = result
except Exception as e:
print(f"Error in callback: {e}")
rpi.io.output.value = False # Safe state
Clean Up Threads
----------------
Threaded events are automatically cleaned up on exit, but you can manually unregister:
.. code-block:: python
def long_task(eventcallback):
while not eventcallback.exit.is_set():
work()
eventcallback.exit.wait(1)
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Register
rpi.io.trigger.reg_event(long_task, as_thread=True)
# Later: unregister to stop thread
rpi.io.trigger.unreg_event(long_task)
See Also
========
* :doc:`basics` - Core concepts and configuration
* :doc:`cyclic_programming` - Cyclic programming patterns
* :doc:`advanced` - Combining paradigms and advanced topics
* :doc:`api/helper` - EventCallback API reference

View File

@@ -1,18 +1,86 @@
.. revpimodio2 documentation main file, created by ====================================
sphinx-quickstart on Sun Jan 22 17:49:41 2023. Welcome to RevPiModIO Documentation!
You can adapt this file completely to your liking, but it should at least ====================================
contain the root `toctree` directive.
Welcome to revpimodio2's documentation! RevPiModIO is a Python3 module for programming Revolution Pi hardware from KUNBUS GmbH.
======================================= It provides easy access to all devices and IOs from the piCtory configuration, supporting
both cyclic (PLC-style) and event-driven programming paradigms.
.. note::
**New to RevPiModIO?** Start with :doc:`installation` and :doc:`quickstart`.
Key Features
============
* **Dual Programming Models**: Cyclic (PLC-style) and event-driven approaches
* **Direct Hardware Access**: Simple Python interface to all I/O devices
* **Automatic Configuration**: Loads piCtory device configuration
* **Event System**: Callbacks for value changes and timer events
* **Open Source**: LGPLv2 license, no licensing fees
Quick Example
=============
**Cyclic Programming**::
import revpimodio2
def main(ct):
if ct.io.button.value:
ct.io.led.value = True
revpimodio2.run_plc(main)
**Event-Driven Programming**::
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_change(ioname, iovalue):
print(f"{ioname} = {iovalue}")
rpi.io.button.reg_event(on_change)
rpi.handlesignalend()
rpi.mainloop()
Documentation
=============
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2
:caption: Contents: :caption: Getting Started
installation
quickstart
.. toctree::
:maxdepth: 2
:caption: User Guide
Indices and tables basics
cyclic_programming
event_programming
advanced
.. toctree::
:maxdepth: 3
:caption: API Reference
api/index
api/revpimodio
api/io
api/helper
api/device
External Resources
==================
* **Official Website**: `revpimodio.org <https://revpimodio.org>`_
* **GitHub Repository**: `github.com/naruxde/ <https://github.com/naruxde/>`_
* **Discussion Forum**: `revpimodio.org/diskussionsforum/ <https://revpimodio.org/diskussionsforum/>`_
* **Revolution Pi Hardware**: `revolution.kunbus.com <https://revolution.kunbus.com>`_
Indices and Tables
================== ==================
* :ref:`genindex` * :ref:`genindex`

88
docs/installation.rst Normal file
View File

@@ -0,0 +1,88 @@
============
Installation
============
System Requirements
===================
* Python 3.2 or higher
* Revolution Pi hardware (Core, Core3, Connect, Compact, Flat)
* piCtory configuration tool
Prerequisites
=============
User Permissions
----------------
On Bookworm images, users must belong to the ``picontrol`` group::
sudo usermod -a -G picontrol username
Log out and log back in for the group change to take effect.
Installing RevPiModIO
=====================
RevPiModIO is preinstalled on your Revolution Pi. It is distributed as debian package and will be
updated by `apt`.
Using pip
---------
Install from PyPI::
pip install revpimodio2
From Source
-----------
Clone the repository and install::
git clone https://github.com/naruxde/revpimodio2.git
cd revpimodio2
pip install .
Verify Installation
===================
Test the installation::
python3 -c "import revpimodio2; print(revpimodio2.__version__)"
Optional Components
===================
RevPiPyLoad
-----------
For advanced features like XML-RPC server and MQTT integration::
sudo apt-get update
sudo apt-get install revpipyload
Configure XML-RPC Server
~~~~~~~~~~~~~~~~~~~~~~~~~
Edit ``/etc/revpipyload/revpipyload.conf``::
[XMLRPC]
xmlrpc = 1
Configure access permissions in ``/etc/revpipyload/aclxmlrpc.conf``, then restart::
sudo service revpipyload restart
RevPi Commander
---------------
RevPi Commander provides a GUI for testing I/O without programming:
1. Download from `revpimodio.org <https://revpimodio.org/quellen/revpicommander/>`_
2. Configure connection via File → Connections with your RevPi's IP address (port: 55123)
3. Use "PLC watch mode" to monitor sensors and control outputs
Next Steps
==========
After installation, proceed to :doc:`quickstart` to write your first program.

View File

@@ -1,7 +0,0 @@
src
===
.. toctree::
:maxdepth: 4
revpimodio2

205
docs/quickstart.rst Normal file
View File

@@ -0,0 +1,205 @@
==========
Quick Start
==========
This guide will help you write your first RevPiModIO program.
Basic Concepts
==============
RevPiModIO provides two main programming paradigms:
* **Cyclic Programming** - Execute a function at regular intervals (PLC-style)
* **Event-Driven Programming** - Register callbacks triggered by hardware changes
Both approaches use the same core objects:
* ``rpi.io`` - Access inputs and outputs by name
* ``rpi.core`` - Control LEDs, watchdog, and system status
* ``rpi.device`` - Access specific hardware devices
Hardware Configuration
======================
Before programming, configure your hardware using piCtory:
1. Access piCtory web interface on your RevPi Core module
2. Add and configure your I/O modules
3. Assign symbolic names to inputs and outputs
* Example: ``button``, ``led``, ``temperature``
* Good names make your code readable
4. Save configuration and activate
Your First Program
==================
Simple Input to Output
----------------------
The simplest program reads an input and controls an output:
.. code-block:: python
import revpimodio2
# Initialize with auto-refresh
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Read input and control output
if rpi.io.button.value:
rpi.io.led.value = True
else:
rpi.io.led.value = False
# Clean up
rpi.exit()
Cyclic Program
--------------
For continuous operation, use a cyclic loop:
.. code-block:: python
import revpimodio2
def main_cycle(ct: revpimodio2.Cycletools):
"""Called every cycle (default: 20-50ms)."""
if ct.first:
# Initialize on first cycle
ct.var.counter = 0
print("Program started")
# Main logic
if ct.io.button.value:
ct.io.led.value = True
else:
ct.io.led.value = False
# Count button presses
if ct.changed(ct.io.button, edge=revpimodio2.RISING):
ct.var.counter += 1
print(f"Button pressed {ct.var.counter} times")
if ct.last:
# Cleanup on exit
print("Program stopped")
# Run cyclic loop
revpimodio2.run_plc(main_cycle)
Event-Driven Program
--------------------
For event-based operation, use callbacks:
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
def on_button_press(ioname, iovalue):
"""Called when button changes."""
print(f"Button is now: {iovalue}")
rpi.io.led.value = iovalue
# Register event callback
rpi.io.button.reg_event(on_button_press)
# Handle shutdown signals
rpi.handlesignalend()
# Start event loop
rpi.mainloop()
LED Control
===========
Control the RevPi status LEDs:
.. code-block:: python
import revpimodio2
rpi = revpimodio2.RevPiModIO(autorefresh=True)
# Set LED colors using constants
rpi.core.A1 = revpimodio2.GREEN # Success
rpi.core.A2 = revpimodio2.RED # Error
rpi.core.A3 = revpimodio2.OFF # Off
# Or control individual colors
rpi.core.a1green.value = True
rpi.core.a1red.value = False
rpi.exit()
Common Patterns
===============
Initialize and Cleanup
----------------------
Always initialize variables and clean up resources:
.. code-block:: python
def main_cycle(ct):
if ct.first:
# Initialize
ct.var.state = "IDLE"
ct.var.error_count = 0
# Main logic here...
if ct.last:
# Cleanup
ct.io.motor.value = False
print(f"Errors: {ct.var.error_count}")
Edge Detection
--------------
Detect rising or falling edges:
.. code-block:: python
def main_cycle(ct):
# Detect button press (rising edge)
if ct.changed(ct.io.button, edge=revpimodio2.RISING):
print("Button pressed!")
# Detect button release (falling edge)
if ct.changed(ct.io.button, edge=revpimodio2.FALLING):
print("Button released!")
Timers
------
Use built-in cycle-based timers:
.. code-block:: python
def main_cycle(ct):
# On-delay: Input must be True for 10 cycles
ct.set_tonc("startup", 10)
if ct.get_tonc("startup"):
ct.io.motor.value = True
# Pulse: Generate 5-cycle pulse
if ct.io.trigger.value:
ct.set_tpc("pulse", 5)
ct.io.pulse_output.value = ct.get_tpc("pulse")
Next Steps
==========
* :doc:`basics` - Core concepts and configuration
* :doc:`cyclic_programming` - Cyclic programming patterns
* :doc:`event_programming` - Event-driven programming patterns
* :doc:`advanced` - Advanced topics and best practices
* :doc:`api/index` - API reference

View File

@@ -1,85 +0,0 @@
revpimodio2 package
===================
Submodules
----------
revpimodio2.app module
----------------------
.. automodule:: revpimodio2.app
:members:
:undoc-members:
:show-inheritance:
revpimodio2.device module
-------------------------
.. automodule:: revpimodio2.device
:members:
:undoc-members:
:show-inheritance:
revpimodio2.errors module
-------------------------
.. automodule:: revpimodio2.errors
:members:
:undoc-members:
:show-inheritance:
revpimodio2.helper module
-------------------------
.. automodule:: revpimodio2.helper
:members:
:undoc-members:
:show-inheritance:
revpimodio2.io module
---------------------
.. automodule:: revpimodio2.io
:members:
:undoc-members:
:show-inheritance:
revpimodio2.modio module
------------------------
.. automodule:: revpimodio2.modio
:members:
:undoc-members:
:show-inheritance:
revpimodio2.netio module
------------------------
.. automodule:: revpimodio2.netio
:members:
:undoc-members:
:show-inheritance:
revpimodio2.pictory module
--------------------------
.. automodule:: revpimodio2.pictory
:members:
:undoc-members:
:show-inheritance:
revpimodio2.summary module
--------------------------
.. automodule:: revpimodio2.summary
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: revpimodio2
:members:
:undoc-members:
:show-inheritance:

View File

@@ -2,6 +2,7 @@
pytest-cov pytest-cov
setuptools setuptools
sphinx sphinx
sphinx-rtd-theme
wheel wheel
# Runtime dependencies, must match install_requires in setup.py # Runtime dependencies, must match install_requires in setup.py

View File

@@ -16,18 +16,24 @@ with open("README.md") as fh:
setup( setup(
name="revpimodio2", name="revpimodio2",
version=__version__, version=__version__,
packages=find_namespace_packages("src"), packages=find_namespace_packages("src"),
package_dir={'': 'src'}, package_dir={"": "src"},
include_package_data=True, include_package_data=True,
python_requires=">= 3.2", python_requires=">= 3.2",
install_requires=[], install_requires=[],
extras_require={
"docs": [
"sphinx",
"sphinx_rtd_theme",
],
},
entry_points={}, entry_points={},
platforms=["all"], platforms=["all"],
url="https://revpimodio.org/", url="https://revpimodio.org/",
project_urls={
"Documentation": "https://revpimodio2.readthedocs.io/",
"Source": "https://github.com/naruxde/revpimodio2",
},
license="LGPLv2", license="LGPLv2",
author="Sven Sager", author="Sven Sager",
author_email="akira@narux.de", author_email="akira@narux.de",
@@ -41,12 +47,11 @@ setup(
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"Environment :: Console", "Environment :: Console",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"License :: OSI Approved :: " "License :: OSI Approved :: " "GNU Lesser General Public License v2 (LGPLv2)",
"GNU Lesser General Public License v2 (LGPLv2)",
"Operating System :: MacOS :: MacOS X", "Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows", "Operating System :: Microsoft :: Windows",
"Operating System :: POSIX", "Operating System :: POSIX",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Topic :: Software Development :: Libraries :: Python Modules" "Topic :: Software Development :: Libraries :: Python Modules",
], ],
) )

View File

@@ -3,4 +3,4 @@
__author__ = "Sven Sager <akira@revpimodio.org>" __author__ = "Sven Sager <akira@revpimodio.org>"
__copyright__ = "Copyright (C) 2023 Sven Sager" __copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2" __license__ = "LGPLv2"
__version__ = "2.8.0" __version__ = "2.8.1"

View File

@@ -1,16 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Stellt alle Klassen fuer den RevolutionPi zur Verfuegung. Provides all classes for the RevolutionPi.
Webpage: https://revpimodio.org/ Webpage: https://revpimodio.org/
Stellt Klassen fuer die einfache Verwendung des Revolution Pis der Provides classes for easy use of the Revolution Pi from
KUNBUS GmbH (https://revolution.kunbus.de/) zur Verfuegung. Alle I/Os werden KUNBUS GmbH (https://revolutionpi.com/) . All I/Os are
aus der piCtory Konfiguration eingelesen und mit deren Namen direkt zugreifbar read from the piCtory configuration and made directly accessible by their names.
gemacht. Fuer Gateways sind eigene IOs ueber mehrere Bytes konfigurierbar For gateways, custom IOs can be configured across multiple bytes.
Mit den definierten Namen greift man direkt auf die gewuenschten Daten zu. With the defined names, the desired data is accessed directly.
Auf alle IOs kann der Benutzer Funktionen als Events registrieren. Diese The user can register functions as events for all IOs. The module
fuehrt das Modul bei Datenaenderung aus. executes these when data changes.
""" """
__all__ = [ __all__ = [
"IOEvent", "IOEvent",

View File

@@ -45,12 +45,12 @@ def acheck(check_type, **kwargs) -> None:
def consttostr(value) -> str: def consttostr(value) -> str:
""" """
Gibt <class 'str'> fuer Konstanten zurueck. Returns <class 'str'> for constants.
Diese Funktion ist erforderlich, da enum in Python 3.2 nicht existiert. This function is required because enum does not exist in Python 3.2.
:param value: Konstantenwert :param value: Constant value
:return: <class 'str'> Name der Konstanten :return: <class 'str'> Name of the constant
""" """
if value == 0: if value == 0:
return "OFF" return "OFF"

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Bildet die App Sektion von piCtory ab.""" """Maps the App section from piCtory."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager" __copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2" __license__ = "LGPLv2"
@@ -8,15 +8,15 @@ from time import gmtime, strptime
class App: class App:
"""Bildet die App Sektion der config.rsc ab.""" """Maps the App section of config.rsc."""
__slots__ = "name", "version", "language", "layout", "savets" __slots__ = "name", "version", "language", "layout", "savets"
def __init__(self, app: dict): def __init__(self, app: dict):
""" """
Instantiiert die App-Klasse. Instantiates the App class.
:param app: piCtory Appinformationen :param app: piCtory app information
""" """
self.name = app.get("name", "") self.name = app.get("name", "")
"""Name of creating app""" """Name of creating app"""
@@ -28,7 +28,7 @@ class App:
"""Language of creating app""" """Language of creating app"""
self.savets = app.get("saveTS", None) self.savets = app.get("saveTS", None)
"""Timestamp of configuraiton""" """Timestamp of configuration"""
if self.savets is not None: if self.savets is not None:
try: try:
@@ -36,5 +36,5 @@ class App:
except Exception: except Exception:
self.savets = gmtime(0) self.savets = gmtime(0)
# TODO: Layout untersuchen und anders abbilden # TODO: Examine layout and map differently
self.layout = app.get("layout", {}) self.layout = app.get("layout", {})

File diff suppressed because it is too large Load Diff

View File

@@ -6,8 +6,12 @@ __license__ = "LGPLv2"
class RevPiModIOError(Exception): class RevPiModIOError(Exception):
"""Base exception class for RevPiModIO errors."""
pass pass
class DeviceNotFoundError(RevPiModIOError): class DeviceNotFoundError(RevPiModIOError):
"""Raised when a requested device cannot be found in the configuration."""
pass pass

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""RevPiModIO Helperklassen und Tools.""" """RevPiModIO helper classes and tools."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager" __copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2" __license__ = "LGPLv2"
@@ -16,25 +16,25 @@ from .io import IOBase
class EventCallback(Thread): class EventCallback(Thread):
"""Thread fuer das interne Aufrufen von Event-Funktionen. """Thread for internal calling of event functions.
Der Eventfunktion, welche dieser Thread aufruft, wird der Thread selber The event function that this thread calls will receive the thread itself
als Parameter uebergeben. Darauf muss bei der definition der Funktion as a parameter. This must be considered when defining the function, e.g.,
geachtet werden z.B. "def event(th):". Bei umfangreichen Funktionen kann "def event(th):". For extensive functions, this can be evaluated to
dieser ausgewertet werden um z.B. doppeltes Starten zu verhindern. prevent duplicate starts.
Ueber EventCallback.ioname kann der Name des IO-Objekts abgerufen werden, The name of the IO object can be retrieved via EventCallback.ioname,
welches das Event ausgeloest hast. EventCallback.iovalue gibt den Wert des which triggered the event. EventCallback.iovalue returns the value of
IO-Objekts zum Ausloesezeitpunkt zurueck. the IO object at the time of triggering.
Der Thread stellt das EventCallback.exit Event als Abbruchbedingung fuer The thread provides the EventCallback.exit event as an abort condition
die aufgerufene Funktion zur Verfuegung. for the called function.
Durch Aufruf der Funktion EventCallback.stop() wird das exit-Event gesetzt By calling the EventCallback.stop() function, the exit event is set
und kann bei Schleifen zum Abbrechen verwendet werden. and can be used to abort loops.
Mit dem .exit() Event auch eine Wartefunktion realisiert A wait function can also be implemented with the .exit() event:
werden: "th.exit.wait(0.5)" - Wartet 500ms oder bricht sofort ab, wenn "th.exit.wait(0.5)" - waits 500ms or aborts immediately if .stop()
fuer den Thread .stop() aufgerufen wird. is called on the thread.
while not th.exit.is_set(): while not th.exit.is_set():
# IO-Arbeiten # Work with IOs
th.exit.wait(0.5) th.exit.wait(0.5)
""" """
@@ -44,9 +44,9 @@ class EventCallback(Thread):
""" """
Init EventCallback class. Init EventCallback class.
:param func: Funktion die beim Start aufgerufen werden soll :param func: Function that should be called at startup
:param name: IO-Name :param name: IO name
:param value: IO-Value zum Zeitpunkt des Events :param value: IO value at the time of the event
""" """
super().__init__() super().__init__()
self.daemon = True self.daemon = True
@@ -56,35 +56,33 @@ class EventCallback(Thread):
self.iovalue = value self.iovalue = value
def run(self): def run(self):
"""Ruft die registrierte Funktion auf.""" """Calls the registered function."""
self.func(self) self.func(self)
def stop(self): def stop(self):
"""Setzt das exit-Event mit dem die Funktion beendet werden kann.""" """Sets the exit event that can be used to terminate the function."""
self.exit.set() self.exit.set()
class Cycletools: class Cycletools:
""" """
Werkzeugkasten fuer Cycleloop-Funktion. Toolbox for cycle loop function.
Diese Klasse enthaelt Werkzeuge fuer Zyklusfunktionen, wie Taktmerker This class contains tools for cycle functions, such as clock flags and edge flags.
und Flankenmerker. Note that all edge flags have the value True on the first cycle! The Cycletools.first
Zu beachten ist, dass die Flankenmerker beim ersten Zyklus alle den Wert flag can be used to determine if it is the first cycle.
True haben! Ueber den Merker Cycletools.first kann ermittelt werden,
ob es sich um den ersten Zyklus handelt.
Taktmerker flag1c, flag5c, flag10c, usw. haben den als Zahl angegebenen Clock flags flag1c, flag5c, flag10c, etc. have the numerically specified
Wert an Zyklen jeweils False und True. value for the specified number of cycles, alternating between False and True.
Beispiel: flag5c hat 5 Zyklen den Wert False und in den naechsten 5 Zyklen
den Wert True.
Flankenmerker flank5c, flank10c, usw. haben immer im, als Zahl angebenen Example: flag5c has the value False for 5 cycles and True for the next 5 cycles.
Zyklus fuer einen Zyklusdurchlauf den Wert True, sonst False.
Beispiel: flank5c hat immer alle 5 Zyklen den Wert True.
Diese Merker koennen z.B. verwendet werden um, an Outputs angeschlossene, Edge flags flank5c, flank10c, etc. always have the value True for one cycle
Lampen synchron blinken zu lassen. at the numerically specified cycle, otherwise False.
Example: flank5c always has the value True every 5 cycles.
These flags can be used, for example, to make lamps connected to outputs blink synchronously.
""" """
__slots__ = ( __slots__ = (
@@ -129,7 +127,7 @@ class Cycletools:
self.device = revpi_object.device self.device = revpi_object.device
self.io = revpi_object.io self.io = revpi_object.io
# Taktmerker # Clock flags
self.first = True self.first = True
self.flag1c = False self.flag1c = False
self.flag5c = False self.flag5c = False
@@ -138,28 +136,28 @@ class Cycletools:
self.flag20c = False self.flag20c = False
self.last = False self.last = False
# Flankenmerker # Edge flags
self.flank5c = True self.flank5c = True
self.flank10c = True self.flank10c = True
self.flank15c = True self.flank15c = True
self.flank20c = True self.flank20c = True
# Benutzerdaten # User data
class Var: class Var:
"""Hier remanente Variablen anfuegen.""" """Add remanent variables here."""
pass pass
self.var = Var() self.var = Var()
def _docycle(self) -> None: def _docycle(self) -> None:
"""Zyklusarbeiten.""" """Cycle operations."""
# Einschaltverzoegerung # Turn-off delay
for tof in self.__dict_tof: for tof in self.__dict_tof:
if self.__dict_tof[tof] > 0: if self.__dict_tof[tof] > 0:
self.__dict_tof[tof] -= 1 self.__dict_tof[tof] -= 1
# Ausschaltverzoegerung # Turn-on delay
for ton in self.__dict_ton: for ton in self.__dict_ton:
if self.__dict_ton[ton][1]: if self.__dict_ton[ton][1]:
if self.__dict_ton[ton][0] > 0: if self.__dict_ton[ton][0] > 0:
@@ -168,7 +166,7 @@ class Cycletools:
else: else:
self.__dict_ton[ton][0] = -1 self.__dict_ton[ton][0] = -1
# Impuls # Pulse
for tp in self.__dict_tp: for tp in self.__dict_tp:
if self.__dict_tp[tp][1]: if self.__dict_tp[tp][1]:
if self.__dict_tp[tp][0] > 0: if self.__dict_tp[tp][0] > 0:
@@ -178,7 +176,7 @@ class Cycletools:
else: else:
self.__dict_tp[tp][0] = -1 self.__dict_tp[tp][0] = -1
# Flankenmerker # Edge flags
self.flank5c = False self.flank5c = False
self.flank10c = False self.flank10c = False
self.flank15c = False self.flank15c = False
@@ -188,7 +186,7 @@ class Cycletools:
self.first = False self.first = False
self.flag1c = not self.flag1c self.flag1c = not self.flag1c
# Berechnete Flags # Calculated flags
self.__cycle += 1 self.__cycle += 1
if self.__cycle == 5: if self.__cycle == 5:
self.__ucycle += 1 self.__ucycle += 1
@@ -239,64 +237,64 @@ class Cycletools:
def get_tof(self, name: str) -> bool: def get_tof(self, name: str) -> bool:
""" """
Wert der Ausschaltverzoegerung. Value of the off-delay.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> der Ausschaltverzoegerung :return: Value <class 'bool'> of the off-delay
""" """
return self.__dict_tof.get(name, 0) > 0 return self.__dict_tof.get(name, 0) > 0
def get_tofc(self, name: str) -> bool: def get_tofc(self, name: str) -> bool:
""" """
Wert der Ausschaltverzoegerung. Value of the off-delay.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> der Ausschaltverzoegerung :return: Value <class 'bool'> of the off-delay
""" """
return self.__dict_tof.get(name, 0) > 0 return self.__dict_tof.get(name, 0) > 0
def set_tof(self, name: str, milliseconds: int) -> None: def set_tof(self, name: str, milliseconds: int) -> None:
""" """
Startet bei Aufruf einen ausschaltverzoegerten Timer. Starts an off-delay timer when called.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param milliseconds: Verzoegerung in Millisekunden :param milliseconds: Delay in milliseconds
""" """
self.__dict_tof[name] = ceil(milliseconds / self.__cycletime) self.__dict_tof[name] = ceil(milliseconds / self.__cycletime)
def set_tofc(self, name: str, cycles: int) -> None: def set_tofc(self, name: str, cycles: int) -> None:
""" """
Startet bei Aufruf einen ausschaltverzoegerten Timer. Starts an off-delay timer when called.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn nicht neu gestartet :param cycles: Number of cycles for the delay if not restarted
""" """
self.__dict_tof[name] = cycles self.__dict_tof[name] = cycles
def get_ton(self, name: str) -> bool: def get_ton(self, name: str) -> bool:
""" """
Einschaltverzoegerung. On-delay.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> der Einschaltverzoegerung :return: Value <class 'bool'> of the on-delay
""" """
return self.__dict_ton.get(name, [-1])[0] == 0 return self.__dict_ton.get(name, [-1])[0] == 0
def get_tonc(self, name: str) -> bool: def get_tonc(self, name: str) -> bool:
""" """
Einschaltverzoegerung. On-delay.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> der Einschaltverzoegerung :return: Value <class 'bool'> of the on-delay
""" """
return self.__dict_ton.get(name, [-1])[0] == 0 return self.__dict_ton.get(name, [-1])[0] == 0
def set_ton(self, name: str, milliseconds: int) -> None: def set_ton(self, name: str, milliseconds: int) -> None:
""" """
Startet einen einschaltverzoegerten Timer. Starts an on-delay timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param milliseconds: Millisekunden, der Verzoegerung wenn neu gestartet :param milliseconds: Milliseconds for the delay if restarted
""" """
if self.__dict_ton.get(name, [-1])[0] == -1: if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True] self.__dict_ton[name] = [ceil(milliseconds / self.__cycletime), True]
@@ -305,10 +303,10 @@ class Cycletools:
def set_tonc(self, name: str, cycles: int) -> None: def set_tonc(self, name: str, cycles: int) -> None:
""" """
Startet einen einschaltverzoegerten Timer. Starts an on-delay timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param cycles: Zyklusanzahl, der Verzoegerung wenn neu gestartet :param cycles: Number of cycles for the delay if restarted
""" """
if self.__dict_ton.get(name, [-1])[0] == -1: if self.__dict_ton.get(name, [-1])[0] == -1:
self.__dict_ton[name] = [cycles, True] self.__dict_ton[name] = [cycles, True]
@@ -317,28 +315,28 @@ class Cycletools:
def get_tp(self, name: str) -> bool: def get_tp(self, name: str) -> bool:
""" """
Impulstimer. Pulse timer.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> des Impulses :return: Value <class 'bool'> of the pulse
""" """
return self.__dict_tp.get(name, [-1])[0] > 0 return self.__dict_tp.get(name, [-1])[0] > 0
def get_tpc(self, name: str) -> bool: def get_tpc(self, name: str) -> bool:
""" """
Impulstimer. Pulse timer.
:param name: Eindeutiger Name des Timers :param name: Unique name of the timer
:return: Wert <class 'bool'> des Impulses :return: Value <class 'bool'> of the pulse
""" """
return self.__dict_tp.get(name, [-1])[0] > 0 return self.__dict_tp.get(name, [-1])[0] > 0
def set_tp(self, name: str, milliseconds: int) -> None: def set_tp(self, name: str, milliseconds: int) -> None:
""" """
Startet einen Impuls Timer. Starts a pulse timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param milliseconds: Millisekunden, die der Impuls anstehen soll :param milliseconds: Milliseconds the pulse should be active
""" """
if self.__dict_tp.get(name, [-1])[0] == -1: if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True] self.__dict_tp[name] = [ceil(milliseconds / self.__cycletime), True]
@@ -347,10 +345,10 @@ class Cycletools:
def set_tpc(self, name: str, cycles: int) -> None: def set_tpc(self, name: str, cycles: int) -> None:
""" """
Startet einen Impuls Timer. Starts a pulse timer.
:param name: Eindeutiger Name fuer Zugriff auf Timer :param name: Unique name for accessing the timer
:param cycles: Zyklusanzahl, die der Impuls anstehen soll :param cycles: Number of cycles the pulse should be active
""" """
if self.__dict_tp.get(name, [-1])[0] == -1: if self.__dict_tp.get(name, [-1])[0] == -1:
self.__dict_tp[name] = [cycles, True] self.__dict_tp[name] = [cycles, True]
@@ -371,11 +369,11 @@ class Cycletools:
class ProcimgWriter(Thread): class ProcimgWriter(Thread):
""" """
Klasse fuer Synchroniseriungs-Thread. Class for synchronization thread.
Diese Klasse wird als Thread gestartet, wenn das Prozessabbild zyklisch This class is started as a thread if the process image should be
synchronisiert werden soll. Diese Funktion wird hauptsaechlich fuer das synchronized cyclically. This function is mainly used for event
Event-Handling verwendet. handling.
""" """
__slots__ = ( __slots__ = (
@@ -409,7 +407,7 @@ class ProcimgWriter(Thread):
self.newdata = Event() self.newdata = Event()
def __check_change(self, dev) -> None: def __check_change(self, dev) -> None:
"""Findet Aenderungen fuer die Eventueberwachung.""" """Finds changes for event monitoring."""
for io_event in dev._dict_events: for io_event in dev._dict_events:
if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]: if dev._ba_datacp[io_event._slc_address] == dev._ba_devdata[io_event._slc_address]:
continue continue
@@ -435,7 +433,7 @@ class ProcimgWriter(Thread):
else: else:
self._eventq.put((regfunc, io_event._name, io_event.value), False) self._eventq.put((regfunc, io_event._name, io_event.value), False)
else: else:
# Verzögertes Event in dict einfügen # Insert delayed event into dict
tup_fire = ( tup_fire = (
regfunc, regfunc,
io_event._name, io_event._name,
@@ -454,7 +452,7 @@ class ProcimgWriter(Thread):
else: else:
self._eventq.put((regfunc, io_event._name, io_event.value), False) self._eventq.put((regfunc, io_event._name, io_event.value), False)
else: else:
# Verzögertes Event in dict einfügen # Insert delayed event into dict
tup_fire = ( tup_fire = (
regfunc, regfunc,
io_event._name, io_event._name,
@@ -464,11 +462,11 @@ class ProcimgWriter(Thread):
if regfunc.overwrite or tup_fire not in self.__dict_delay: if regfunc.overwrite or tup_fire not in self.__dict_delay:
self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh) self.__dict_delay[tup_fire] = ceil(regfunc.delay / 1000 / self._refresh)
# Nach Verarbeitung aller IOs die Bytes kopieren (Lock ist noch drauf) # Copy the bytes after processing all IOs (lock is still active)
dev._ba_datacp = dev._ba_devdata[:] dev._ba_datacp = dev._ba_devdata[:]
def __exec_th(self) -> None: def __exec_th(self) -> None:
"""Laeuft als Thread, der Events als Thread startet.""" """Runs as thread that starts events as threads."""
while self.__eventwork: while self.__eventwork:
try: try:
tup_fireth = self._eventqth.get(timeout=1) tup_fireth = self._eventqth.get(timeout=1)
@@ -480,15 +478,15 @@ class ProcimgWriter(Thread):
def _collect_events(self, value: bool) -> bool: def _collect_events(self, value: bool) -> bool:
""" """
Aktiviert oder Deaktiviert die Eventueberwachung. Enables or disables event monitoring.
:param value: True aktiviert / False deaktiviert :param value: True activates / False deactivates
:return: True, wenn Anforderung erfolgreich war :return: True, if request was successful
""" """
if type(value) != bool: if type(value) != bool:
raise TypeError("value must be <class 'bool'>") raise TypeError("value must be <class 'bool'>")
# Nur starten, wenn System läuft # Only start if system is running
if not self.is_alive(): if not self.is_alive():
self.__eventwork = False self.__eventwork = False
return False return False
@@ -497,12 +495,12 @@ class ProcimgWriter(Thread):
with self.lck_refresh: with self.lck_refresh:
self.__eventwork = value self.__eventwork = value
if not value: if not value:
# Nur leeren beim deaktivieren # Only empty when deactivating
self._eventqth = queue.Queue() self._eventqth = queue.Queue()
self._eventq = queue.Queue() self._eventq = queue.Queue()
self.__dict_delay = {} self.__dict_delay = {}
# Threadmanagement # Thread management
if value and not self.__eventth.is_alive(): if value and not self.__eventth.is_alive():
self.__eventth = Thread(target=self.__exec_th) self.__eventth = Thread(target=self.__exec_th)
self.__eventth.daemon = True self.__eventth.daemon = True
@@ -512,14 +510,14 @@ class ProcimgWriter(Thread):
def get_refresh(self) -> int: def get_refresh(self) -> int:
""" """
Gibt Zykluszeit zurueck. Returns cycle time.
:return: <class 'int'> Zykluszeit in Millisekunden :return: <class 'int'> cycle time in milliseconds
""" """
return int(self._refresh * 1000) return int(self._refresh * 1000)
def run(self): def run(self):
"""Startet die automatische Prozessabbildsynchronisierung.""" """Starts automatic process image synchronization."""
fh = self._modio._create_myfh() fh = self._modio._create_myfh()
mrk_delay = self._refresh mrk_delay = self._refresh
@@ -537,7 +535,7 @@ class ProcimgWriter(Thread):
RuntimeWarning, RuntimeWarning,
) )
mrk_delay = self._refresh mrk_delay = self._refresh
# Nur durch cycleloop erreichbar - keine verzögerten Events # Only reachable through cycleloop - no delayed events
continue continue
try: try:
@@ -558,7 +556,7 @@ class ProcimgWriter(Thread):
bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata)) bytesbuff[dev._slc_devoff] = fh.read(len(dev._ba_devdata))
if self._modio._monitoring or dev._shared_procimg: if self._modio._monitoring or dev._shared_procimg:
# Inputs und Outputs in Puffer # Inputs and outputs in buffer
dev._ba_devdata[:] = bytesbuff[dev._slc_devoff] dev._ba_devdata[:] = bytesbuff[dev._slc_devoff]
if ( if (
self.__eventwork self.__eventwork
@@ -567,7 +565,7 @@ class ProcimgWriter(Thread):
): ):
self.__check_change(dev) self.__check_change(dev)
else: else:
# Inputs in Puffer, Outputs in Prozessabbild # Inputs in buffer, outputs in process image
dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff] dev._ba_devdata[dev._slc_inp] = bytesbuff[dev._slc_inpoff]
if ( if (
self.__eventwork self.__eventwork
@@ -601,12 +599,12 @@ class ProcimgWriter(Thread):
) )
mrk_warn = True mrk_warn = True
# Alle aufwecken # Wake all
self.lck_refresh.release() self.lck_refresh.release()
self.newdata.set() self.newdata.set()
finally: finally:
# Verzögerte Events prüfen # Check delayed events
if self.__eventwork: if self.__eventwork:
for tup_fire in tuple(self.__dict_delay.keys()): for tup_fire in tuple(self.__dict_delay.keys()):
if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]: if tup_fire[0].overwrite and tup_fire[3].value != tup_fire[2]:
@@ -614,7 +612,7 @@ class ProcimgWriter(Thread):
else: else:
self.__dict_delay[tup_fire] -= 1 self.__dict_delay[tup_fire] -= 1
if self.__dict_delay[tup_fire] <= 0: if self.__dict_delay[tup_fire] <= 0:
# Verzögertes Event übernehmen und löschen # Put event to queue and delete delayed event
if tup_fire[0].as_thread: if tup_fire[0].as_thread:
self._eventqth.put(tup_fire, False) self._eventqth.put(tup_fire, False)
else: else:
@@ -633,18 +631,18 @@ class ProcimgWriter(Thread):
# Sleep and not .wait (.wait uses system clock) # Sleep and not .wait (.wait uses system clock)
sleep(self._refresh - mrk_delay) sleep(self._refresh - mrk_delay)
# Alle am Ende erneut aufwecken # Wake all again at the end
self._collect_events(False) self._collect_events(False)
self.newdata.set() self.newdata.set()
fh.close() fh.close()
def stop(self): def stop(self):
"""Beendet die automatische Prozessabbildsynchronisierung.""" """Terminates automatic process image synchronization."""
self._work.set() self._work.set()
def set_refresh(self, value): def set_refresh(self, value):
"""Setzt die Zykluszeit in Millisekunden. """Sets the cycle time in milliseconds.
@param value <class 'int'> Millisekunden""" @param value <class 'int'> Milliseconds"""
if type(value) == int and 5 <= value <= 2000: if type(value) == int and 5 <= value <= 2000:
self._refresh = value / 1000 self._refresh = value / 1000
else: else:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""RevPiModIO Hauptklasse fuer Netzwerkzugriff."""
"""RevPiModIO main class for network access."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager" __copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2" __license__ = "LGPLv2"
@@ -17,16 +18,16 @@ from .errors import DeviceNotFoundError
from .modio import DevSelect, RevPiModIO as _RevPiModIO from .modio import DevSelect, RevPiModIO as _RevPiModIO
from .pictory import DeviceType from .pictory import DeviceType
# Synchronisierungsbefehl # Synchronization command
_syssync = b"\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _syssync = b"\x01\x06\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# Disconnectbefehl # Disconnectbefehl
_sysexit = b"\x01EX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _sysexit = b"\x01EX\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# DirtyBytes von Server entfernen # Remove DirtyBytes from server
_sysdeldirty = b"\x01EY\x00\x00\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x17" _sysdeldirty = b"\x01EY\x00\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x17"
# piCtory Konfiguration laden # Load piCtory configuration
_syspictory = b"\x01PI\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _syspictory = b"\x01PI\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
_syspictoryh = b"\x01PH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _syspictoryh = b"\x01PH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# ReplaceIO Konfiguration laden # Load ReplaceIO configuration
_sysreplaceio = b"\x01RP\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _sysreplaceio = b"\x01RP\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
_sysreplaceioh = b"\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17" _sysreplaceioh = b"\x01RH\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17"
# Hashvalues # Hashvalues
@@ -37,24 +38,23 @@ HEADER_STOP = b"\x17"
class AclException(Exception): class AclException(Exception):
"""Probleme mit Berechtigungen.""" """Problems with permissions."""
pass pass
class ConfigChanged(Exception): class ConfigChanged(Exception):
"""Aenderung der piCtory oder replace_ios Datei.""" """Change to the piCtory or replace_ios file."""
pass pass
class NetFH(Thread): class NetFH(Thread):
""" """
Netzwerk File Handler fuer das Prozessabbild. Network file handler for the process image.
Dieses FileObject-like Object verwaltet das Lesen und Schriben des This file-object-like object manages reading and writing of the
Prozessabbilds ueber das Netzwerk. Ein entfernter Revolution Pi kann process image via the network. A remote Revolution Pi can be controlled this way.
so gesteuert werden.
""" """
__slots__ = ( __slots__ = (
@@ -84,9 +84,9 @@ class NetFH(Thread):
""" """
Init NetFH-class. Init NetFH-class.
:param address: IP Adresse, Port des RevPi als <class 'tuple'> :param address: IP address, port of the RevPi as <class 'tuple'>
:param check_replace_ios: Prueft auf Veraenderungen der Datei :param check_replace_ios: Checks for changes to the file
:param timeout: Timeout in Millisekunden der Verbindung :param timeout: Timeout in milliseconds for the connection
""" """
super().__init__() super().__init__()
self.daemon = True self.daemon = True
@@ -109,38 +109,38 @@ class NetFH(Thread):
self._address = address self._address = address
self._serversock = None # type: socket.socket self._serversock = None # type: socket.socket
# Parameterprüfung # Parameter validation
if not isinstance(address, tuple): if not isinstance(address, tuple):
raise TypeError("parameter address must be <class 'tuple'> ('IP', PORT)") raise TypeError("parameter address must be <class 'tuple'> ('IP', PORT)")
if not isinstance(timeout, int): if not isinstance(timeout, int):
raise TypeError("parameter timeout must be <class 'int'>") raise TypeError("parameter timeout must be <class 'int'>")
# Verbindung herstellen # Establish connection
self.__set_systimeout(timeout) self.__set_systimeout(timeout)
self._connect() self._connect()
if self._serversock is None: if self._serversock is None:
raise FileNotFoundError("can not connect to revpi server") raise FileNotFoundError("can not connect to revpi server")
# NetFH konfigurieren # Configure NetFH
self.__position = 0 self.__position = 0
self.start() self.start()
def __del__(self): def __del__(self):
"""NetworkFileHandler beenden.""" """Terminate NetworkFileHandler."""
self.close() self.close()
def __check_acl(self, bytecode: bytes) -> None: def __check_acl(self, bytecode: bytes) -> None:
""" """
Pueft ob ACL auf RevPi den Vorgang erlaubt. Checks if ACL allows the operation on RevPi.
Ist der Vorgang nicht zulässig, wird der Socket sofort geschlossen If the operation is not permitted, the socket is immediately closed
und eine Exception geworfen. and an exception is thrown.
:param bytecode: Antwort, die geprueft werden solll :param bytecode: Response to be checked
""" """
if bytecode == b"\x18": if bytecode == b"\x18":
# Alles beenden, wenn nicht erlaubt # Terminate everything if not permitted
self.__sockend.set() self.__sockend.set()
self.__sockerr.set() self.__sockerr.set()
self._serversock.close() self._serversock.close()
@@ -152,39 +152,39 @@ class NetFH(Thread):
def __set_systimeout(self, value: int) -> None: def __set_systimeout(self, value: int) -> None:
""" """
Systemfunktion fuer Timeoutberechnung. System function for timeout calculation.
:param value: Timeout in Millisekunden 100 - 60000 :param value: Timeout in milliseconds 100 - 60000
""" """
if isinstance(value, int) and (100 <= value <= 60000): if isinstance(value, int) and (100 <= value <= 60000):
self.__timeout = value / 1000 self.__timeout = value / 1000
# Timeouts in Socket setzen # Set timeouts in socket
if self._serversock is not None: if self._serversock is not None:
self._serversock.settimeout(self.__timeout) self._serversock.settimeout(self.__timeout)
# 45 Prozent vom Timeout für Synctimer verwenden # Use 45 percent of timeout for sync timer
self.__waitsync = self.__timeout / 100 * 45 self.__waitsync = self.__timeout / 100 * 45
else: else:
raise ValueError("value must between 10 and 60000 milliseconds") raise ValueError("value must between 10 and 60000 milliseconds")
def _connect(self) -> None: def _connect(self) -> None:
"""Stellt die Verbindung zu einem RevPiPlcServer her.""" """Establishes the connection to a RevPiPlcServer."""
# Neuen Socket aufbauen # Build new socket
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM) so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
so.connect(self._address) so.connect(self._address)
so.settimeout(self.__timeout) so.settimeout(self.__timeout)
# Hashwerte anfordern # Request hash values
recv_len = 16 recv_len = 16
so.sendall(_syspictoryh) so.sendall(_syspictoryh)
if self.__check_replace_ios: if self.__check_replace_ios:
so.sendall(_sysreplaceioh) so.sendall(_sysreplaceioh)
recv_len += 16 recv_len += 16
# Hashwerte empfangen mit eigenen Puffern, da nicht gelocked # Receive hash values with own buffers, as not locked
buff_recv = bytearray(recv_len) buff_recv = bytearray(recv_len)
while recv_len > 0: while recv_len > 0:
block = so.recv(recv_len) block = so.recv(recv_len)
@@ -193,7 +193,7 @@ class NetFH(Thread):
buff_recv += block buff_recv += block
recv_len -= len(block) recv_len -= len(block)
# Änderung an piCtory prüfen # Check for changes to piCtory
if self.__pictory_h and buff_recv[:16] != self.__pictory_h: if self.__pictory_h and buff_recv[:16] != self.__pictory_h:
self.__config_changed = True self.__config_changed = True
self.close() self.close()
@@ -201,7 +201,7 @@ class NetFH(Thread):
else: else:
self.__pictory_h = buff_recv[:16] self.__pictory_h = buff_recv[:16]
# Änderung an replace_ios prüfen # Check for changes to replace_ios
if ( if (
self.__check_replace_ios self.__check_replace_ios
and self.__replace_ios_h and self.__replace_ios_h
@@ -218,7 +218,7 @@ class NetFH(Thread):
except Exception: except Exception:
so.close() so.close()
else: else:
# Alten Socket trennen # Disconnect old socket
with self.__socklock: with self.__socklock:
if self._serversock is not None: if self._serversock is not None:
self._serversock.close() self._serversock.close()
@@ -226,10 +226,10 @@ class NetFH(Thread):
self._serversock = so self._serversock = so
self.__sockerr.clear() self.__sockerr.clear()
# Timeout setzen # Set timeout
self.set_timeout(int(self.__timeout * 1000)) self.set_timeout(int(self.__timeout * 1000))
# DirtyBytes übertragen # Transfer dirty bytes
for pos in self.__dictdirty: for pos in self.__dictdirty:
self.set_dirtybytes(pos, self.__dictdirty[pos]) self.set_dirtybytes(pos, self.__dictdirty[pos])
@@ -285,19 +285,19 @@ class NetFH(Thread):
def clear_dirtybytes(self, position=None) -> None: def clear_dirtybytes(self, position=None) -> None:
""" """
Entfernt die konfigurierten Dirtybytes vom RevPi Server. Removes the configured dirty bytes from the RevPi server.
Diese Funktion wirft keine Exception bei einem uebertragungsfehler, This function does not throw an exception on transmission error,
veranlasst aber eine Neuverbindung. but triggers a reconnection.
:param position: Startposition der Dirtybytes :param position: Start position of the dirty bytes
""" """
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
# Daten immer übernehmen # Always accept data
if position is None: if position is None:
self.__dictdirty.clear() self.__dictdirty.clear()
elif position in self.__dictdirty: elif position in self.__dictdirty:
@@ -305,10 +305,10 @@ class NetFH(Thread):
try: try:
if position is None: if position is None:
# Alle Dirtybytes löschen # Clear all dirty bytes
buff = self._direct_sr(_sysdeldirty, 1) buff = self._direct_sr(_sysdeldirty, 1)
else: else:
# Nur bestimmte Dirtybytes löschen # Only clear specific dirty bytes
# b CM ii xx c0000000 b = 16 # b CM ii xx c0000000 b = 16
buff = self._direct_sr( buff = self._direct_sr(
pack( pack(
@@ -322,7 +322,7 @@ class NetFH(Thread):
1, 1,
) )
if buff != b"\x1e": if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen # Check ACL and throw error if necessary
self.__check_acl(buff) self.__check_acl(buff)
raise IOError("clear dirtybytes error on network") raise IOError("clear dirtybytes error on network")
@@ -333,14 +333,14 @@ class NetFH(Thread):
self.__sockerr.set() self.__sockerr.set()
def close(self) -> None: def close(self) -> None:
"""Verbindung trennen.""" """Disconnect connection."""
if self.__sockend.is_set(): if self.__sockend.is_set():
return return
self.__sockend.set() self.__sockend.set()
self.__sockerr.set() self.__sockerr.set()
# Vom Socket sauber trennen # Cleanly disconnect from socket
if self._serversock is not None: if self._serversock is not None:
try: try:
self.__socklock.acquire() self.__socklock.acquire()
@@ -354,7 +354,7 @@ class NetFH(Thread):
self._serversock.close() self._serversock.close()
def flush(self) -> None: def flush(self) -> None:
"""Schreibpuffer senden.""" """Send write buffer."""
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set(): if self.__sockend.is_set():
@@ -380,12 +380,12 @@ class NetFH(Thread):
except Exception: except Exception:
raise raise
finally: finally:
# Puffer immer leeren # Always clear buffer
self.__int_buff = 0 self.__int_buff = 0
self.__by_buff.clear() self.__by_buff.clear()
if buff != b"\x1e": if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen # Check ACL and throw error if necessary
self.__check_acl(buff) self.__check_acl(buff)
self.__sockerr.set() self.__sockerr.set()
@@ -393,23 +393,23 @@ class NetFH(Thread):
def get_closed(self) -> bool: def get_closed(self) -> bool:
""" """
Pruefen ob Verbindung geschlossen ist. Check if connection is closed.
:return: True, wenn Verbindung geschlossen ist :return: True if connection is closed
""" """
return self.__sockend.is_set() return self.__sockend.is_set()
def get_config_changed(self) -> bool: def get_config_changed(self) -> bool:
""" """
Pruefen ob RevPi Konfiguration geaendert wurde. Check if RevPi configuration was changed.
:return: True, wenn RevPi Konfiguration geaendert ist :return: True if RevPi configuration was changed
""" """
return self.__config_changed return self.__config_changed
def get_name(self) -> str: def get_name(self) -> str:
""" """
Verbindugnsnamen zurueckgeben. Return connection name.
:return: <class 'str'> IP:PORT :return: <class 'str'> IP:PORT
""" """
@@ -417,23 +417,23 @@ class NetFH(Thread):
def get_reconnecting(self) -> bool: def get_reconnecting(self) -> bool:
""" """
Interner reconnect aktiv wegen Netzwerkfehlern. Internal reconnect active due to network errors.
:return: True, wenn reconnect aktiv :return: True if reconnect is active
""" """
return self.__sockerr.is_set() return self.__sockerr.is_set()
def get_timeout(self) -> int: def get_timeout(self) -> int:
""" """
Gibt aktuellen Timeout zurueck. Returns current timeout.
:return: <class 'int'> in Millisekunden :return: <class 'int'> in milliseconds
""" """
return int(self.__timeout * 1000) return int(self.__timeout * 1000)
def ioctl(self, request: int, arg=b"") -> None: def ioctl(self, request: int, arg=b"") -> None:
""" """
IOCTL Befehle ueber das Netzwerk senden. Send IOCTL commands via the network.
:param request: Request as <class 'int'> :param request: Request as <class 'int'>
:param arg: Argument as <class 'byte'> :param arg: Argument as <class 'byte'>
@@ -451,7 +451,7 @@ class NetFH(Thread):
pack("=c2s2xHI4xc", HEADER_START, b"IC", len(arg), request, HEADER_STOP) + arg, 1 pack("=c2s2xHI4xc", HEADER_START, b"IC", len(arg), request, HEADER_STOP) + arg, 1
) )
if buff != b"\x1e": if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen # Check ACL and throw error if necessary
self.__check_acl(buff) self.__check_acl(buff)
self.__sockerr.set() self.__sockerr.set()
@@ -459,10 +459,10 @@ class NetFH(Thread):
def read(self, length: int) -> bytes: def read(self, length: int) -> bytes:
""" """
Daten ueber das Netzwerk lesen. Read data via the network.
:param length: Anzahl der Bytes :param length: Number of bytes
:return: Gelesene <class 'bytes'> :return: Read <class 'bytes'>
""" """
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
@@ -501,9 +501,9 @@ class NetFH(Thread):
def readpictory(self) -> bytes: def readpictory(self) -> bytes:
""" """
Ruft die piCtory Konfiguration ab. Retrieves the piCtory configuration.
:return: <class 'bytes'> piCtory Datei :return: <class 'bytes'> piCtory file
""" """
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("read of closed file") raise ValueError("read of closed file")
@@ -517,7 +517,7 @@ class NetFH(Thread):
def readreplaceio(self) -> bytes: def readreplaceio(self) -> bytes:
""" """
Ruft die replace_io Konfiguration ab. Retrieves the replace_io configuration.
:return: <class 'bytes'> replace_io_file :return: <class 'bytes'> replace_io_file
""" """
@@ -532,24 +532,24 @@ class NetFH(Thread):
return self._direct_sr(b"", recv_length) return self._direct_sr(b"", recv_length)
def run(self) -> None: def run(self) -> None:
"""Handler fuer Synchronisierung.""" """Handler for synchronization."""
state_reconnect = False state_reconnect = False
while not self.__sockend.is_set(): while not self.__sockend.is_set():
# Bei Fehlermeldung neu verbinden # Reconnect on error message
if self.__sockerr.is_set(): if self.__sockerr.is_set():
if not state_reconnect: if not state_reconnect:
state_reconnect = True state_reconnect = True
warnings.warn("got a network error and try to reconnect", RuntimeWarning) warnings.warn("got a network error and try to reconnect", RuntimeWarning)
self._connect() self._connect()
if self.__sockerr.is_set(): if self.__sockerr.is_set():
# Verhindert beim Scheitern 100% CPU last # Prevents 100% CPU load on failure
self.__sockend.wait(self.__waitsync) self.__sockend.wait(self.__waitsync)
continue continue
else: else:
state_reconnect = False state_reconnect = False
warnings.warn("successfully reconnected after network error", RuntimeWarning) warnings.warn("successfully reconnected after network error", RuntimeWarning)
# Kein Fehler aufgetreten, sync durchführen wenn socket frei # No error occurred, perform sync if socket is free
if self.__socklock.acquire(blocking=False): if self.__socklock.acquire(blocking=False):
try: try:
self._serversock.sendall(_syssync) self._serversock.sendall(_syssync)
@@ -573,12 +573,12 @@ class NetFH(Thread):
finally: finally:
self.__socklock.release() self.__socklock.release()
# Warten nach Sync damit Instantiierung funktioniert # Wait after sync so instantiation works
self.__sockerr.wait(self.__waitsync) self.__sockerr.wait(self.__waitsync)
def seek(self, position: int) -> None: def seek(self, position: int) -> None:
"""Springt an angegebene Position. """Jump to specified position.
@param position An diese Position springen""" @param position Jump to this position"""
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set(): if self.__sockend.is_set():
@@ -587,20 +587,20 @@ class NetFH(Thread):
def set_dirtybytes(self, position: int, dirtybytes: bytes) -> None: def set_dirtybytes(self, position: int, dirtybytes: bytes) -> None:
""" """
Konfiguriert Dirtybytes fuer Prozessabbild bei Verbindungsfehler. Configures dirty bytes for process image on connection error.
Diese Funktion wirft keine Exception bei einem uebertragungsfehler, This function does not throw an exception on transmission error,
veranlasst aber eine Neuverbindung. but triggers a reconnection.
:param position: Startposition zum Schreiben :param position: Start position for writing
:param dirtybytes: <class 'bytes'> die geschrieben werden sollen :param dirtybytes: <class 'bytes'> to be written
""" """
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
# Daten immer übernehmen # Always accept data
self.__dictdirty[position] = dirtybytes self.__dictdirty[position] = dirtybytes
try: try:
@@ -612,7 +612,7 @@ class NetFH(Thread):
) )
if buff != b"\x1e": if buff != b"\x1e":
# ACL prüfen und ggf Fehler werfen # Check ACL and throw error if necessary
self.__check_acl(buff) self.__check_acl(buff)
raise IOError("set dirtybytes error on network") raise IOError("set dirtybytes error on network")
@@ -625,14 +625,14 @@ class NetFH(Thread):
def set_timeout(self, value: int) -> None: def set_timeout(self, value: int) -> None:
""" """
Setzt Timeoutwert fuer Verbindung. Sets timeout value for connection.
:param value: Timeout in Millisekunden :param value: Timeout in milliseconds
""" """
if self.__sockend.is_set(): if self.__sockend.is_set():
raise ValueError("I/O operation on closed file") raise ValueError("I/O operation on closed file")
# Timeoutwert verarbeiten (könnte Exception auslösen) # Process timeout value (could throw exception)
self.__set_systimeout(value) self.__set_systimeout(value)
try: try:
@@ -645,9 +645,9 @@ class NetFH(Thread):
def tell(self) -> int: def tell(self) -> int:
""" """
Gibt aktuelle Position zurueck. Returns actual position in file.
:return: Aktuelle Position :return: Actual position in file
""" """
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
@@ -657,10 +657,10 @@ class NetFH(Thread):
def write(self, bytebuff: bytes) -> int: def write(self, bytebuff: bytes) -> int:
""" """
Daten ueber das Netzwerk schreiben. Write data via the network.
:param bytebuff: Bytes zum schreiben :param bytebuff: Bytes to write
:return: <class 'int'> Anzahl geschriebener bytes :return: <class 'int'> Number of written bytes
""" """
if self.__config_changed: if self.__config_changed:
raise ConfigChanged("configuration on revolution pi was changed") raise ConfigChanged("configuration on revolution pi was changed")
@@ -672,14 +672,14 @@ class NetFH(Thread):
with self.__socklock: with self.__socklock:
self.__int_buff += 1 self.__int_buff += 1
# Datenblock mit Position und Länge in Puffer ablegen # Store data block with position and length in buffer
self.__by_buff += ( self.__by_buff += (
self.__position.to_bytes(length=2, byteorder="little") self.__position.to_bytes(length=2, byteorder="little")
+ len(bytebuff).to_bytes(length=2, byteorder="little") + len(bytebuff).to_bytes(length=2, byteorder="little")
+ bytebuff + bytebuff
) )
# TODO: Bufferlänge und dann flushen? # TODO: Buffer length and then flush?
return len(bytebuff) return len(bytebuff)
@@ -692,14 +692,13 @@ class NetFH(Thread):
class RevPiNetIO(_RevPiModIO): class RevPiNetIO(_RevPiModIO):
""" """
Klasse fuer die Verwaltung der piCtory Konfiguration ueber das Netzwerk. Class for managing the piCtory configuration via the network.
Diese Klasse uebernimmt die gesamte Konfiguration aus piCtory und bilded This class takes over the entire configuration from piCtory and maps
die Devices und IOs ab. Sie uebernimmt die exklusive Verwaltung des the devices and IOs. It takes over exclusive management of the
Prozessabbilds und stellt sicher, dass die Daten synchron sind. process image and ensures that the data is synchronized.
Sollten nur einzelne Devices gesteuert werden, verwendet man If only individual devices should be controlled, use
RevPiModIOSelected() und uebergibt bei Instantiierung eine Liste mit RevPiNetIOSelected() and pass a list with device positions or device names during instantiation.
Device Positionen oder Device Namen.
""" """
__slots__ = "_address" __slots__ = "_address"
@@ -716,26 +715,26 @@ class RevPiNetIO(_RevPiModIO):
shared_procimg=False, shared_procimg=False,
): ):
""" """
Instantiiert die Grundfunktionen. Instantiates the basic functions.
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'> :param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param autorefresh: Wenn True, alle Devices zu autorefresh hinzufuegen :param autorefresh: If True, add all devices to autorefresh
:param monitoring: In- und Outputs werden gelesen, niemals geschrieben :param monitoring: Inputs and outputs are read, never written
:param syncoutputs: Aktuell gesetzte Outputs vom Prozessabbild einlesen :param syncoutputs: Read currently set outputs from process image
:param simulator: Laedt das Modul als Simulator und vertauscht IOs :param simulator: Loads the module as simulator and swaps IOs
:param debug: Gibt bei allen Fehlern komplette Meldungen aus :param debug: Output complete messages for all errors
:param replace_io_file: Replace IO Konfiguration aus Datei laden :param replace_io_file: Load replace IO configuration from file
:param shared_procimg: Share process image with other processes, this :param shared_procimg: Share process image with other processes, this
could be insecure for automation could be insecure for automation
""" """
check_ip = compile(r"^(25[0-5]|(2[0-4]|[01]?\d|)\d)(\.(25[0-5]|(2[0-4]|[01]?\d|)\d)){3}$") check_ip = compile(r"^(25[0-5]|(2[0-4]|[01]?\d|)\d)(\.(25[0-5]|(2[0-4]|[01]?\d|)\d)){3}$")
# Adresse verarbeiten # Process address
if isinstance(address, str): if isinstance(address, str):
self._address = (address, 55234) self._address = (address, 55234)
elif isinstance(address, tuple): elif isinstance(address, tuple):
if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int): if len(address) == 2 and isinstance(address[0], str) and isinstance(address[1], int):
# Werte prüfen # Check values
if not 0 < address[1] <= 65535: if not 0 < address[1] <= 65535:
raise ValueError("port number out of range 1 - 65535") raise ValueError("port number out of range 1 - 65535")
@@ -748,7 +747,7 @@ class RevPiNetIO(_RevPiModIO):
"like (<class 'str'>, <class 'int'>)" "like (<class 'str'>, <class 'int'>)"
) )
# IP-Adresse prüfen und ggf. auflösen # Check IP address and resolve if necessary
if check_ip.match(self._address[0]) is None: if check_ip.match(self._address[0]) is None:
try: try:
ipv4 = socket.gethostbyname(self._address[0]) ipv4 = socket.gethostbyname(self._address[0])
@@ -772,16 +771,16 @@ class RevPiNetIO(_RevPiModIO):
) )
self._set_device_based_cycle_time = False self._set_device_based_cycle_time = False
# Netzwerkfilehandler anlegen # Create network file handler
self._myfh = self._create_myfh() self._myfh = self._create_myfh()
# Nur Konfigurieren, wenn nicht vererbt # Only configure if not inherited
if type(self) == RevPiNetIO: if type(self) == RevPiNetIO:
self._configure(self.get_jconfigrsc()) self._configure(self.get_jconfigrsc())
def _create_myfh(self): def _create_myfh(self):
""" """
Erstellt NetworkFileObject. Creates NetworkFileObject.
:return: FileObject :return: FileObject
""" """
@@ -790,15 +789,15 @@ class RevPiNetIO(_RevPiModIO):
def _get_cpreplaceio(self) -> ConfigParser: def _get_cpreplaceio(self) -> ConfigParser:
""" """
Laed die replace_io Konfiguration ueber das Netzwerk. Loads the replace_io configuration via the network.
:return: <class 'ConfigParser'> der replace io daten :return: <class 'ConfigParser'> of the replace io data
""" """
# Normale Verwendung über Elternklasse erledigen # Handle normal usage via parent class
if self._replace_io_file != ":network:": if self._replace_io_file != ":network:":
return super()._get_cpreplaceio() return super()._get_cpreplaceio()
# Replace IO Daten über das Netzwerk beziehen # Obtain replace IO data via the network
byte_buff = self._myfh.readreplaceio() byte_buff = self._myfh.readreplaceio()
cp = ConfigParser() cp = ConfigParser()
@@ -809,12 +808,12 @@ class RevPiNetIO(_RevPiModIO):
return cp return cp
def disconnect(self) -> None: def disconnect(self) -> None:
"""Trennt Verbindungen und beendet autorefresh inkl. alle Threads.""" """Disconnects connections and terminates autorefresh including all threads."""
self.cleanup() self.cleanup()
def exit(self, full=True) -> None: def exit(self, full=True) -> None:
""" """
Beendet mainloop() und optional autorefresh. Terminates mainloop() and optionally autorefresh.
:ref: :func:`RevPiModIO.exit()` :ref: :func:`RevPiModIO.exit()`
""" """
@@ -825,20 +824,20 @@ class RevPiNetIO(_RevPiModIO):
def get_config_changed(self) -> bool: def get_config_changed(self) -> bool:
""" """
Pruefen ob RevPi Konfiguration geaendert wurde. Check if RevPi configuration was changed.
In diesem Fall ist die Verbindung geschlossen und RevPiNetIO muss In this case, the connection is closed and RevPiNetIO must be
neu instanziert werden. reinstantiated.
:return: True, wenn RevPi Konfiguration geaendert ist :return: True if RevPi configuration was changed
""" """
return self._myfh.config_changed return self._myfh.config_changed
def get_jconfigrsc(self) -> dict: def get_jconfigrsc(self) -> dict:
""" """
Laedt die piCotry Konfiguration und erstellt ein <class 'dict'>. Loads the piCtory configuration and creates a <class 'dict'>.
:return: <class 'dict'> der piCtory Konfiguration :return: <class 'dict'> of the piCtory configuration
""" """
mynh = NetFH(self._address, False) mynh = NetFH(self._address, False)
byte_buff = mynh.readpictory() byte_buff = mynh.readpictory()
@@ -847,20 +846,20 @@ class RevPiNetIO(_RevPiModIO):
def get_reconnecting(self) -> bool: def get_reconnecting(self) -> bool:
""" """
Interner reconnect aktiv wegen Netzwerkfehlern. Internal reconnect active due to network errors.
Das Modul versucht intern die Verbindung neu herzustellen. Es ist The module tries internally to reestablish the connection. No
kein weiteres Zutun noetig. further action is needed.
:return: True, wenn reconnect aktiv :return: True if reconnect is active
""" """
return self._myfh.reconnecting return self._myfh.reconnecting
def net_cleardefaultvalues(self, device=None) -> None: def net_cleardefaultvalues(self, device=None) -> None:
""" """
Loescht Defaultwerte vom PLC Server. Clears default values from the PLC server.
:param device: nur auf einzelnes Device anwenden, sonst auf Alle :param device: Only apply to single device, otherwise to all
""" """
if self.monitoring: if self.monitoring:
raise RuntimeError("can not send default values, while system is in monitoring mode") raise RuntimeError("can not send default values, while system is in monitoring mode")
@@ -876,12 +875,12 @@ class RevPiNetIO(_RevPiModIO):
def net_setdefaultvalues(self, device=None) -> None: def net_setdefaultvalues(self, device=None) -> None:
""" """
Konfiguriert den PLC Server mit den piCtory Defaultwerten. Configures the PLC server with the piCtory default values.
Diese Werte werden auf dem RevPi gesetzt, wenn die Verbindung These values are set on the RevPi if the connection is
unerwartet (Netzwerkfehler) unterbrochen wird. unexpectedly interrupted (network error).
:param device: nur auf einzelnes Device anwenden, sonst auf Alle :param device: Only apply to single device, otherwise to all
""" """
if self.monitoring: if self.monitoring:
raise RuntimeError("can not send default values, while system is in monitoring mode") raise RuntimeError("can not send default values, while system is in monitoring mode")
@@ -898,25 +897,25 @@ class RevPiNetIO(_RevPiModIO):
listlen = len(lst_io) listlen = len(lst_io)
if listlen == 1: if listlen == 1:
# Byteorientierte Outputs direkt übernehmen # Take byte-oriented outputs directly
dirtybytes += lst_io[0]._defaultvalue dirtybytes += lst_io[0]._defaultvalue
elif listlen > 1: elif listlen > 1:
# Bitorientierte Outputs in ein Byte zusammenfassen # Combine bit-oriented outputs into one byte
int_byte = 0 int_byte = 0
lstbyte = lst_io.copy() lstbyte = lst_io.copy()
lstbyte.reverse() lstbyte.reverse()
for bitio in lstbyte: for bitio in lstbyte:
# Von hinten die bits nach vorne schieben # Shift the bits from back to front
int_byte <<= 1 int_byte <<= 1
if bitio is not None: if bitio is not None:
int_byte += 1 if bitio._defaultvalue else 0 int_byte += 1 if bitio._defaultvalue else 0
# Errechneten Int-Wert in ein Byte umwandeln # Convert calculated int value to a byte
dirtybytes += int_byte.to_bytes(length=1, byteorder="little") dirtybytes += int_byte.to_bytes(length=1, byteorder="little")
# Dirtybytes an PLC Server senden # Send dirtybytes to PLC server
self._myfh.set_dirtybytes(dev._offset + dev._slc_out.start, dirtybytes) self._myfh.set_dirtybytes(dev._offset + dev._slc_out.start, dirtybytes)
config_changed = property(get_config_changed) config_changed = property(get_config_changed)
@@ -925,12 +924,12 @@ class RevPiNetIO(_RevPiModIO):
class RevPiNetIOSelected(RevPiNetIO): class RevPiNetIOSelected(RevPiNetIO):
""" """
Klasse fuer die Verwaltung einzelner Devices aus piCtory. Class for managing individual devices from piCtory.
Diese Klasse uebernimmt nur angegebene Devices der piCtory Konfiguration This class only takes over specified devices from the piCtory configuration
und bildet sie inkl. IOs ab. Sie uebernimmt die exklusive Verwaltung des and maps them including IOs. It takes over exclusive management of the
Adressbereichs im Prozessabbild an dem sich die angegebenen Devices address range in the process image where the specified devices are located
befinden und stellt sicher, dass die Daten synchron sind. and ensures that the data is synchronized.
""" """
__slots__ = () __slots__ = ()
@@ -948,14 +947,14 @@ class RevPiNetIOSelected(RevPiNetIO):
shared_procimg=False, shared_procimg=False,
): ):
""" """
Instantiiert nur fuer angegebene Devices die Grundfunktionen. Instantiates the basic functions only for specified devices.
Der Parameter deviceselection kann eine einzelne The parameter deviceselection can be a single
Device Position / einzelner Device Name sein oder eine Liste mit device position / single device name or a list with
mehreren Positionen / Namen multiple positions / names
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'> :param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param deviceselection: Positionsnummer oder Devicename :param deviceselection: Position number or device name
:ref: :func:`RevPiNetIO.__init__()` :ref: :func:`RevPiNetIO.__init__()`
""" """
super().__init__( super().__init__(
@@ -1002,12 +1001,11 @@ class RevPiNetIOSelected(RevPiNetIO):
class RevPiNetIODriver(RevPiNetIOSelected): class RevPiNetIODriver(RevPiNetIOSelected):
""" """
Klasse um eigene Treiber fuer die virtuellen Devices zu erstellen. Class to create custom drivers for virtual devices.
Mit dieser Klasse werden nur angegebene Virtuelle Devices mit RevPiModIO With this class, only specified virtual devices are managed with RevPiModIO.
verwaltet. Bei Instantiierung werden automatisch die Inputs und Outputs During instantiation, inputs and outputs are automatically swapped to allow
verdreht, um das Schreiben der Inputs zu ermoeglichen. Die Daten koennen writing of inputs. The data can then be retrieved from the devices via logiCAD.
dann ueber logiCAD an den Devices abgerufen werden.
""" """
__slots__ = () __slots__ = ()
@@ -1023,16 +1021,16 @@ class RevPiNetIODriver(RevPiNetIOSelected):
shared_procimg=False, shared_procimg=False,
): ):
""" """
Instantiiert die Grundfunktionen. Instantiates the basic functions.
Parameter 'monitoring' und 'simulator' stehen hier nicht zur Parameters 'monitoring' and 'simulator' are not available here,
Verfuegung, da diese automatisch gesetzt werden. as these are set automatically.
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'> :param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param virtdev: Virtuelles Device oder mehrere als <class 'list'> :param virtdev: Virtual device or multiple as <class 'list'>
:ref: :func:`RevPiModIO.__init__()` :ref: :func:`RevPiModIO.__init__()`
""" """
# Parent mit monitoring=False und simulator=True laden # Load parent with monitoring=False and simulator=True
if type(virtdev) not in (list, tuple): if type(virtdev) not in (list, tuple):
virtdev = (virtdev,) virtdev = (virtdev,)
dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev) dev_select = DevSelect(DeviceType.VIRTUAL, "", virtdev)
@@ -1062,7 +1060,7 @@ def run_net_plc(address, func, cycletime=50, replace_io_file=None, debug=True):
rpi.handlesignalend() rpi.handlesignalend()
return rpi.cycleloop(func, cycletime) return rpi.cycleloop(func, cycletime)
:param address: IP-Adresse <class 'str'> / (IP, Port) <class 'tuple'> :param address: IP address <class 'str'> / (IP, Port) <class 'tuple'>
:param func: Function to run every set milliseconds :param func: Function to run every set milliseconds
:param cycletime: Cycle time in milliseconds :param cycletime: Cycle time in milliseconds
:param replace_io_file: Load replace IO configuration from file :param replace_io_file: Load replace IO configuration from file

View File

@@ -15,6 +15,8 @@ __license__ = "LGPLv2"
class ProductType: class ProductType:
"""Product type constants for Revolution Pi devices and modules."""
CON_BT = 111 CON_BT = 111
CON_CAN = 109 CON_CAN = 109
CON_MBUS = 110 CON_MBUS = 110

View File

@@ -1,20 +1,20 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Bildet die Summary-Sektion von piCtory ab.""" """Maps the Summary section from piCtory."""
__author__ = "Sven Sager" __author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager" __copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "LGPLv2" __license__ = "LGPLv2"
class Summary: class Summary:
"""Bildet die Summary-Sektion der config.rsc ab.""" """Maps the Summary section of config.rsc."""
__slots__ = "inptotal", "outtotal" __slots__ = "inptotal", "outtotal"
def __init__(self, summary: dict): def __init__(self, summary: dict):
""" """
Instantiiert die RevPiSummary-Klasse. Instantiates the RevPiSummary class.
:param summary: piCtory Summaryinformationen :param summary: piCtory summary information
""" """
self.inptotal = summary.get("inpTotal", -1) self.inptotal = summary.get("inpTotal", -1)
self.outtotal = summary.get("outTotal", -1) self.outtotal = summary.get("outTotal", -1)

View File

@@ -96,7 +96,7 @@ class TestDevicesModule(TestRevPiModIO):
self.assertEqual(33 in rpi.device.virt01, False) self.assertEqual(33 in rpi.device.virt01, False)
self.assertEqual(552 in rpi.device.virt01, True) self.assertEqual(552 in rpi.device.virt01, True)
# Löschen # Delete
del rpi.device.virt01 del rpi.device.virt01
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):
rpi.device.virt01 rpi.device.virt01

View File

@@ -93,10 +93,10 @@ class TestInitModio(TestRevPiModIO):
self.assertEqual(2, len(rpi.device)) self.assertEqual(2, len(rpi.device))
del rpi del rpi
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError): with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Liste mit einem ungültigen Device als <class 'list'> # List with an invalid device as <class 'list'>
rpi = revpimodio2.RevPiModIOSelected([32, 10], **defaultkwargs) rpi = revpimodio2.RevPiModIOSelected([32, 10], **defaultkwargs)
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError): with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Ungültiges Device als <class 'int'> # Invalid device as <class 'int'>
rpi = revpimodio2.RevPiModIOSelected(100, **defaultkwargs) rpi = revpimodio2.RevPiModIOSelected(100, **defaultkwargs)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
# Ungültiger Devicetype # Ungültiger Devicetype
@@ -116,10 +116,10 @@ class TestInitModio(TestRevPiModIO):
# RevPiModIODriver # RevPiModIODriver
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError): with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Liste mit einem ungültigen Device als <class 'list'> # List with an invalid device as <class 'list'>
rpi = revpimodio2.RevPiModIODriver([64, 100], **defaultkwargs) rpi = revpimodio2.RevPiModIODriver([64, 100], **defaultkwargs)
with self.assertRaises(revpimodio2.errors.DeviceNotFoundError): with self.assertRaises(revpimodio2.errors.DeviceNotFoundError):
# Ungültiges Device als <class 'int'> # Invalid device as <class 'int'>
rpi = revpimodio2.RevPiModIODriver([100], **defaultkwargs) rpi = revpimodio2.RevPiModIODriver([100], **defaultkwargs)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
# Ungültiger Devicetype # Ungültiger Devicetype
@@ -132,7 +132,7 @@ class TestInitModio(TestRevPiModIO):
self.assertEqual(1, len(rpi.device)) self.assertEqual(1, len(rpi.device))
del rpi del rpi
# Core ios als bits # Core ios as bits
rpi = self.modio(configrsc="config_core_bits.json") rpi = self.modio(configrsc="config_core_bits.json")
del rpi del rpi

View File

@@ -27,7 +27,7 @@ class TestModioClassBasics(TestRevPiModIO):
self.assertEqual(rpi.app.savets.tm_hour, 12) self.assertEqual(rpi.app.savets.tm_hour, 12)
del rpi del rpi
# Alte config ohne saveTS # Old config without saveTS
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."): with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
rpi = self.modio(configrsc="config_old.rsc") rpi = self.modio(configrsc="config_old.rsc")
self.assertIsNone(rpi.app.savets) self.assertIsNone(rpi.app.savets)
@@ -79,7 +79,7 @@ class TestModioClassBasics(TestRevPiModIO):
"""Test interaction with process image.""" """Test interaction with process image."""
rpi = self.modio() rpi = self.modio()
# Procimg IO alle # Procimg IO all
self.assertIsNone(rpi.setdefaultvalues()) self.assertIsNone(rpi.setdefaultvalues())
self.assertEqual(rpi.writeprocimg(), True) self.assertEqual(rpi.writeprocimg(), True)
self.assertEqual(rpi.syncoutputs(), True) self.assertEqual(rpi.syncoutputs(), True)

View File

@@ -19,7 +19,7 @@ class TestCompact(TestRevPiModIO):
self.assertIsInstance(rpi.core, revpimodio2.device.Compact) self.assertIsInstance(rpi.core, revpimodio2.device.Compact)
# COMPACT LEDs prüfen # Check COMPACT LEDs
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00") self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00")
rpi.core.A1 = revpimodio2.OFF rpi.core.A1 = revpimodio2.OFF
self.assertEqual(rpi.core.A1, 0) self.assertEqual(rpi.core.A1, 0)
@@ -44,12 +44,12 @@ class TestCompact(TestRevPiModIO):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
rpi.core.A2 = 5 rpi.core.A2 = 5
# Spezielle Werte aufrufen # Call special values
self.assertIsInstance(rpi.core.temperature, int) self.assertIsInstance(rpi.core.temperature, int)
self.assertIsInstance(rpi.core.frequency, int) self.assertIsInstance(rpi.core.frequency, int)
rpi.core.wd_toggle() rpi.core.wd_toggle()
# Directzuweisung nicht erlaubt # Direct assignment not allowed
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"): with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
rpi.core.a1green = True rpi.core.a1green = True

View File

@@ -20,7 +20,7 @@ class TestFlat(TestRevPiModIO):
self.assertIsInstance(rpi.core, revpimodio2.device.Flat) self.assertIsInstance(rpi.core, revpimodio2.device.Flat)
# FLAT LEDs prüfen # Check FLAT LEDs
rpi.core.A1 = revpimodio2.OFF rpi.core.A1 = revpimodio2.OFF
self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00\x00") self.assertEqual(rpi.io.RevPiLED.get_value(), b"\x00\x00")
self.assertEqual(rpi.core.A1, 0) self.assertEqual(rpi.core.A1, 0)
@@ -77,12 +77,12 @@ class TestFlat(TestRevPiModIO):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
rpi.core.A5 = 5 rpi.core.A5 = 5
# Spezielle Werte aufrufen # Call special values
self.assertIsInstance(rpi.core.temperature, int) self.assertIsInstance(rpi.core.temperature, int)
self.assertIsInstance(rpi.core.frequency, int) self.assertIsInstance(rpi.core.frequency, int)
rpi.core.wd_toggle() rpi.core.wd_toggle()
# Directzuweisung nicht erlaubt # Direct assignment not allowed
with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"): with self.assertRaisesRegex(AttributeError, r"direct assignment is not supported"):
rpi.core.a1green = True rpi.core.a1green = True

View File

@@ -84,7 +84,7 @@ class TestIos(TestRevPiModIO):
"""Testet mehrbittige IOs.""" """Testet mehrbittige IOs."""
rpi = self.modio(configrsc="config_supervirt.rsc") rpi = self.modio(configrsc="config_supervirt.rsc")
# Adressen und Längen prüfen # Check addresses and lengths
self.assertEqual(rpi.device[65]._offset, 75) self.assertEqual(rpi.device[65]._offset, 75)
self.assertEqual(rpi.io.InBit_1.length, 1) self.assertEqual(rpi.io.InBit_1.length, 1)

View File

@@ -48,7 +48,7 @@ class TestRevPiConnect(TestRevPiModIO):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
rpi.core.A3 = BLUE rpi.core.A3 = BLUE
# Direkte Zuweisung darf nicht funktionieren # Direct assignment must not work
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):
rpi.core.a3green = True rpi.core.a3green = True
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):

View File

@@ -97,7 +97,7 @@ class TestRevPiCore(TestRevPiModIO):
with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."): with self.assertWarnsRegex(Warning, r"equal device name '.*' in pictory configuration."):
rpi = self.modio(configrsc="config_old.rsc") rpi = self.modio(configrsc="config_old.rsc")
# Errorlimits testen, die es nicht gibt (damals None, jetzt -1) # Test error limits that don't exist (formerly None, now -1)
self.assertEqual(rpi.core.errorlimit1, -1) self.assertEqual(rpi.core.errorlimit1, -1)
self.assertEqual(rpi.core.errorlimit2, -1) self.assertEqual(rpi.core.errorlimit2, -1)