Files
revpimodio2/docs/advanced.rst
Nicolai Buchwitz 2eac69b7bd docs: add comprehensive documentation structure and API reference
Created topic-based documentation:
- basics.rst: core concepts and fundamental usage
- cyclic_programming.rst: PLC-style programming with Cycletools
- event_programming.rst: event-driven patterns and callbacks
- advanced.rst: gateway IOs, replace_io_file, watchdog management
- installation.rst and quickstart.rst: getting started guides

Added complete API reference in docs/api/:
- All device classes including ModularBaseConnect_4_5 and GatewayMixin
- I/O, helper, and main class documentation

Enhanced Sphinx configuration with RTD theme and improved autodoc settings.
Removed auto-generated modules.rst and revpimodio2.rst.
2026-02-12 14:00:23 +01:00

18 KiB

<?xml version="1.0" encoding="utf-8" ?> <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en"> <head> <style type="text/css"> /* :Author: David Goodger (goodger@python.org) :Id: $Id: html4css1.css 7952 2016-07-26 18:15:59Z milde $ :Copyright: This stylesheet has been placed in the public domain. Default cascading style sheet for the HTML output of Docutils. See http://docutils.sf.net/docs/howto/html-stylesheets.html for how to customize this style sheet. */ /* used to remove borders from tables and images */ .borderless, table.borderless td, table.borderless th { border: 0 } table.borderless td, table.borderless th { /* Override padding for "table.docutils td" with "! important". The right padding separates the table cells. */ padding: 0 0.5em 0 0 ! important } .first { /* Override more specific margin styles with "! important". */ margin-top: 0 ! important } .last, .with-subtitle { margin-bottom: 0 ! important } .hidden { display: none } .subscript { vertical-align: sub; font-size: smaller } .superscript { vertical-align: super; font-size: smaller } a.toc-backref { text-decoration: none ; color: black } blockquote.epigraph { margin: 2em 5em ; } dl.docutils dd { margin-bottom: 0.5em } object[type="image/svg+xml"], object[type="application/x-shockwave-flash"] { overflow: hidden; } /* Uncomment (and remove this text!) to get bold-faced definition list terms dl.docutils dt { font-weight: bold } */ div.abstract { margin: 2em 5em } div.abstract p.topic-title { font-weight: bold ; text-align: center } div.admonition, div.attention, div.caution, div.danger, div.error, div.hint, div.important, div.note, div.tip, div.warning { margin: 2em ; border: medium outset ; padding: 1em } div.admonition p.admonition-title, div.hint p.admonition-title, div.important p.admonition-title, div.note p.admonition-title, div.tip p.admonition-title { font-weight: bold ; font-family: sans-serif } div.attention p.admonition-title, div.caution p.admonition-title, div.danger p.admonition-title, div.error p.admonition-title, div.warning p.admonition-title, .code .error { color: red ; font-weight: bold ; font-family: sans-serif } /* Uncomment (and remove this text!) to get reduced vertical space in compound paragraphs. div.compound .compound-first, div.compound .compound-middle { margin-bottom: 0.5em } div.compound .compound-last, div.compound .compound-middle { margin-top: 0.5em } */ div.dedication { margin: 2em 5em ; text-align: center ; font-style: italic } div.dedication p.topic-title { font-weight: bold ; font-style: normal } div.figure { margin-left: 2em ; margin-right: 2em } div.footer, div.header { clear: both; font-size: smaller } div.line-block { display: block ; margin-top: 1em ; margin-bottom: 1em } div.line-block div.line-block { margin-top: 0 ; margin-bottom: 0 ; margin-left: 1.5em } div.sidebar { margin: 0 0 0.5em 1em ; border: medium outset ; padding: 1em ; background-color: #ffffee ; width: 40% ; float: right ; clear: right } div.sidebar p.rubric { font-family: sans-serif ; font-size: medium } div.system-messages { margin: 5em } div.system-messages h1 { color: red } div.system-message { border: medium outset ; padding: 1em } div.system-message p.system-message-title { color: red ; font-weight: bold } div.topic { margin: 2em } h1.section-subtitle, h2.section-subtitle, h3.section-subtitle, h4.section-subtitle, h5.section-subtitle, h6.section-subtitle { margin-top: 0.4em } h1.title { text-align: center } h2.subtitle { text-align: center } hr.docutils { width: 75% } img.align-left, .figure.align-left, object.align-left, table.align-left { clear: left ; float: left ; margin-right: 1em } img.align-right, .figure.align-right, object.align-right, table.align-right { clear: right ; float: right ; margin-left: 1em } img.align-center, .figure.align-center, object.align-center { display: block; margin-left: auto; margin-right: auto; } table.align-center { margin-left: auto; margin-right: auto; } .align-left { text-align: left } .align-center { clear: both ; text-align: center } .align-right { text-align: right } /* reset inner alignment in figures */ div.align-right { text-align: inherit } /* div.align-center * { */ /* text-align: left } */ .align-top { vertical-align: top } .align-middle { vertical-align: middle } .align-bottom { vertical-align: bottom } ol.simple, ul.simple { margin-bottom: 1em } ol.arabic { list-style: decimal } ol.loweralpha { list-style: lower-alpha } ol.upperalpha { list-style: upper-alpha } ol.lowerroman { list-style: lower-roman } ol.upperroman { list-style: upper-roman } p.attribution { text-align: right ; margin-left: 50% } p.caption { font-style: italic } p.credits { font-style: italic ; font-size: smaller } p.label { white-space: nowrap } p.rubric { font-weight: bold ; font-size: larger ; color: maroon ; text-align: center } p.sidebar-title { font-family: sans-serif ; font-weight: bold ; font-size: larger } p.sidebar-subtitle { font-family: sans-serif ; font-weight: bold } p.topic-title { font-weight: bold } pre.address { margin-bottom: 0 ; margin-top: 0 ; font: inherit } pre.literal-block, pre.doctest-block, pre.math, pre.code { margin-left: 2em ; margin-right: 2em } pre.code .ln { color: grey; } /* line numbers */ pre.code, code { background-color: #eeeeee } pre.code .comment, code .comment { color: #5C6576 } pre.code .keyword, code .keyword { color: #3B0D06; font-weight: bold } pre.code .literal.string, code .literal.string { color: #0C5404 } pre.code .name.builtin, code .name.builtin { color: #352B84 } pre.code .deleted, code .deleted { background-color: #DEB0A1} pre.code .inserted, code .inserted { background-color: #A3D289} span.classifier { font-family: sans-serif ; font-style: oblique } span.classifier-delimiter { font-family: sans-serif ; font-weight: bold } span.interpreted { font-family: sans-serif } span.option { white-space: nowrap } span.pre { white-space: pre } span.problematic { color: red } span.section-subtitle { /* font-size relative to parent (h1..h6 element) */ font-size: 80% } table.citation { border-left: solid 1px gray; margin-left: 1px } table.docinfo { margin: 2em 4em } table.docutils { margin-top: 0.5em ; margin-bottom: 0.5em } table.footnote { border-left: solid 1px black; margin-left: 1px } table.docutils td, table.docutils th, table.docinfo td, table.docinfo th { padding-left: 0.5em ; padding-right: 0.5em ; vertical-align: top } table.docutils th.field-name, table.docinfo th.docinfo-name { font-weight: bold ; text-align: left ; white-space: nowrap ; padding-left: 0 } /* "booktabs" style (no vertical lines) */ table.docutils.booktabs { border: 0px; border-top: 2px solid; border-bottom: 2px solid; border-collapse: collapse; } table.docutils.booktabs * { border: 0px; } table.docutils.booktabs th { border-bottom: thin solid; text-align: left; } h1 tt.docutils, h2 tt.docutils, h3 tt.docutils, h4 tt.docutils, h5 tt.docutils, h6 tt.docutils { font-size: 100% } ul.auto-toc { list-style-type: none } </style> </head>

Advanced

Advanced features, patterns, and best practices for RevPiModIO.

Custom IOs (Gateway Modules)

Gateway modules (ModbusTCP, Profinet, etc.) allow defining custom IOs dynamically.

Understanding Gateway IOs

Gateway modules provide raw memory regions that you can map to custom IOs with specific data types and addresses.

Defining Custom IOs

Use the :py:meth:`~revpimodio2.io.MemIO.replace_io` method to define custom IOs on gateway modules.

System Message: ERROR/3 (<stdin>, line 24); backlink

Unknown interpreted text role "py:meth".

Gateway modules provide generic IOs (like Input_1, Output_1, etc.) that you can replace with custom definitions:

import revpimodio2

rpi = revpimodio2.RevPiModIO(autorefresh=True)

# Replace a gateway IO with custom definition
# Gateway IOs have default names like Input_1, Output_1, etc.
rpi.io.Input_1.replace_io(
    "temperature",     # New IO name
    "h",              # struct format: signed short
    defaultvalue=0    # Default value
)

# Use the custom IO by its new name
temp = rpi.io.temperature.value / 10.0  # Scale to degrees
print(f"Temperature: {temp}°C")

rpi.exit()

Parameters:

  • name - Name for the new IO (will be accessible via rpi.io.name)
  • frm - Struct format character (see format codes below)
  • defaultvalue - Optional: Default value for the IO
  • byteorder - Optional: Byte order ('little' or 'big'), default is 'little'
  • bit - Optional: Bit position for boolean IOs (0-7)
  • event - Optional: Register event callback on creation
  • delay - Optional: Event debounce delay in milliseconds
  • edge - Optional: Event edge trigger (RISING, FALLING, or BOTH)

Note: The memory address is inherited from the IO being replaced (e.g., Input_1). The new IO uses the same address in the process image.

Struct Format Codes

Common format codes for replace_io (see Python struct format characters for complete reference):

  • 'b' - signed byte (-128 to 127)
  • 'B' - unsigned byte (0 to 255)
  • 'h' - signed short (-32768 to 32767)
  • 'H' - unsigned short (0 to 65535)
  • 'i' - signed int (-2147483648 to 2147483647)
  • 'I' - unsigned int (0 to 4294967295)
  • 'f' - float (32-bit)

Multiple Custom IOs

Define multiple custom IOs programmatically by replacing generic gateway IOs:

rpi = revpimodio2.RevPiModIO(autorefresh=True)

# Replace multiple gateway IOs with custom definitions
# Assuming a gateway module with Input_1, Input_2, Output_1, Output_2
rpi.io.Input_1.replace_io("temperature", "h", defaultvalue=0)
rpi.io.Input_2.replace_io("humidity", "h", defaultvalue=0)
rpi.io.Output_1.replace_io("setpoint", "h", defaultvalue=700)
rpi.io.Output_2.replace_io("control_word", "H", defaultvalue=0)

# Use all custom IOs by their new names
temp = rpi.io.temperature.value / 10.0
humidity = rpi.io.humidity.value / 10.0
print(f"Temp: {temp}°C, Humidity: {humidity}%")

# Write to output registers
rpi.io.setpoint.value = 750  # 75.0°C
rpi.io.control_word.value = 0x0001  # Enable bit

rpi.exit()

Using Configuration Files

For complex IO configurations, use the replace_io_file parameter to load custom IOs from a file:

# Load custom IOs from configuration file
rpi = revpimodio2.RevPiModIO(
    autorefresh=True,
    replace_io_file="replace_ios.conf"
)

# Custom IOs are now available
temp = rpi.io.temperature.value / 10.0
print(f"Temperature: {temp}°C")

rpi.exit()

Configuration File Format:

Create an INI-style configuration file (replace_ios.conf):

[temperature]
replace = Input_1
frm = h
defaultvalue = 0

[humidity]
replace = Input_2
frm = h
defaultvalue = 0

[setpoint]
replace = Output_1
frm = h
defaultvalue = 700

[control_word]
replace = Output_2
frm = H
byteorder = big

Configuration Parameters:

  • replace - Name of the gateway IO to replace (required)
  • frm - Struct format character (required)
  • bit - Bit position for boolean IOs (0-7)
  • byteorder - Byte order: little or big (default: little)
  • wordorder - Word order for multi-word values
  • defaultvalue - Default value for the IO
  • bmk - Internal designation/bookmark
  • export - Export flag for RevPiPyLoad/RevPiPyControl

Exporting Configuration:

Export your current custom IOs to a file:

rpi = revpimodio2.RevPiModIO(autorefresh=True)

# Define custom IOs by replacing gateway IOs
rpi.io.Input_1.replace_io("temperature", "h", defaultvalue=0)
rpi.io.Input_2.replace_io("humidity", "h", defaultvalue=0)

# Export to configuration file
rpi.export_replaced_ios("my_config.conf")

rpi.exit()

This is useful for:

  • Sharing IO configurations across multiple programs
  • Integration with RevPiPyLoad and RevPiPyControl
  • Version control of IO definitions
  • Declarative IO configuration

Watchdog Management

The hardware watchdog monitors your program and resets the system if it stops responding.

How the Watchdog Works

The watchdog requires periodic toggling. If not toggled within the timeout period, the system resets.

Important: Only enable the watchdog when your program logic is working correctly.

Cyclic Watchdog Toggle

import revpimodio2

rpi = revpimodio2.RevPiModIO(autorefresh=True)

def main_cycle(ct):
    # Toggle every 10 cycles (200ms @ 20ms)
    if ct.flank10c:
        ct.core.wd_toggle()

    # Your control logic
    ct.io.output.value = ct.io.input.value

rpi.cycleloop(main_cycle)

Event-Driven Watchdog Toggle

import revpimodio2

rpi = revpimodio2.RevPiModIO(autorefresh=True)

def toggle_wd(ioname, iovalue):
    """Toggle watchdog every 500ms."""
    rpi.core.wd_toggle()

# Register timer event for watchdog
rpi.core.wd.reg_timerevent(toggle_wd, 500, prefire=True)

# Your event handlers
def on_button(ioname, iovalue):
    rpi.io.led.value = iovalue

rpi.io.button.reg_event(on_button)

rpi.handlesignalend()
rpi.mainloop()

Conditional Watchdog

Enable watchdog only when system is operational:

def machine_with_watchdog(ct):
    if ct.first:
        ct.var.state = "IDLE"
        ct.var.watchdog_enabled = False

    # Enable watchdog only in RUNNING state
    if ct.var.state == "RUNNING":
        if not ct.var.watchdog_enabled:
            ct.var.watchdog_enabled = True
            print("Watchdog enabled")

        # Toggle watchdog
        if ct.flank10c:
            ct.core.wd_toggle()

    else:
        ct.var.watchdog_enabled = False

    # State machine logic
    if ct.var.state == "IDLE":
        if ct.io.start_button.value:
            ct.var.state = "RUNNING"

    elif ct.var.state == "RUNNING":
        ct.io.motor.value = True
        if ct.io.stop_button.value:
            ct.var.state = "IDLE"

rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.cycleloop(machine_with_watchdog)

Combining Paradigms

Combine cyclic and event-driven programming for optimal results.

Cyclic Control with Event UI

Use cyclic for time-critical control, events for user interface:

import revpimodio2
import threading

rpi = revpimodio2.RevPiModIO(autorefresh=True)

def cyclic_control(ct: revpimodio2.Cycletools):
    """Fast control loop."""
    if ct.first:
        ct.var.setpoint = 50.0
        ct.var.running = False

    if ct.var.running:
        # Fast control logic
        error = ct.var.setpoint - ct.io.sensor.value
        if error > 5:
            ct.io.actuator.value = True
        elif error < -5:
            ct.io.actuator.value = False

def on_setpoint_change(ioname, iovalue):
    """Event handler for user setpoint changes."""
    print(f"New setpoint: {iovalue}")
    # Access ct.var from event requires thread-safe approach
    # In practice, use shared data structure or message queue

def on_start(ioname, iovalue):
    print("System started")

def on_stop(ioname, iovalue):
    print("System stopped")

# Register user events
rpi.io.start_button.reg_event(on_start, edge=revpimodio2.RISING)
rpi.io.stop_button.reg_event(on_stop, edge=revpimodio2.RISING)
rpi.io.setpoint_input.reg_event(on_setpoint_change, delay=100)

# Run cyclic loop in background
threading.Thread(
    target=lambda: rpi.cycleloop(cyclic_control),
    daemon=True
).start()

# Run event loop in main thread
rpi.handlesignalend()
rpi.mainloop()

Event Triggers with Cyclic Processing

Use events to trigger actions, cyclic for processing:

import revpimodio2

rpi = revpimodio2.RevPiModIO(autorefresh=True)

def cyclic_processor(ct):
    """Process work queue."""
    if ct.first:
        ct.var.work_queue = []

    # Process queued work
    if ct.var.work_queue:
        item = ct.var.work_queue.pop(0)
        process_item(item)

def on_new_item(ioname, iovalue):
    """Queue work from events."""
    # Note: Accessing ct.var from events requires synchronization
    # This is a simplified example
    print(f"New item queued from {ioname}")

rpi.io.trigger1.reg_event(on_new_item, edge=revpimodio2.RISING)
rpi.io.trigger2.reg_event(on_new_item, edge=revpimodio2.RISING)

rpi.cycleloop(cyclic_processor)

Performance Optimization

Keep Cycle Logic Fast

Minimize processing time in each cycle:

def optimized_cycle(ct):
    # Good: Heavy work only when needed
    if ct.flank100c:
        expensive_calculation()

    # Good: Keep cycle logic minimal
    ct.io.output.value = ct.io.input.value

    # Bad: Don't do this every cycle
    # expensive_calculation()  # 100ms processing!

Guidelines:

  • Keep cycle time ≥20ms for stability
  • Avoid blocking operations (network, file I/O)
  • Use flank flags for expensive operations
  • Profile your cycle function if experiencing timing issues

Choose Appropriate Cycle Time

Match cycle time to application requirements:

# Fast control (motion, high-speed counting)
rpi.cycletime = 20  # 50 Hz

# Normal control (most applications)
rpi.cycletime = 50  # 20 Hz

# Slow monitoring (temperature, status)
rpi.cycletime = 100  # 10 Hz

Trade-offs:

  • Faster = Higher CPU usage, better responsiveness
  • Slower = Lower CPU usage, adequate for most tasks

Minimize Event Callbacks

Keep event callbacks lightweight:

# Good: Fast callback
def good_callback(ioname, iovalue):
    rpi.io.output.value = iovalue

# Poor: Slow callback blocks event loop
def poor_callback(ioname, iovalue):
    time.sleep(1)  # Blocks!
    complex_calculation()  # Slow!
    rpi.io.output.value = iovalue

# Better: Use threaded events for slow work
def threaded_callback(eventcallback):
    complex_calculation()
    rpi.io.output.value = result

rpi.io.trigger.reg_event(threaded_callback, as_thread=True)

Error Handling

Graceful Error Recovery

Always implement safe failure modes:

def safe_cycle(ct):
    try:
        value = ct.io.sensor.value
        result = process(value)
        ct.io.output.value = result
    except ValueError as e:
        print(f"Sensor error: {e}")
        ct.io.output.value = 0  # Safe default
    except Exception as e:
        print(f"Unexpected error: {e}")
        ct.io.output.value = False  # Safe state

Resource Cleanup

Always clean up resources:

import revpimodio2

rpi = revpimodio2.RevPiModIO(autorefresh=True)

try:
    # Your program logic
    rpi.cycleloop(main_cycle)
except KeyboardInterrupt:
    print("Interrupted by user")
except Exception as e:
    print(f"Error: {e}")
finally:
    # Always clean up
    rpi.setdefaultvalues()  # Reset outputs to defaults
    rpi.exit()

Monitor I/O Errors

Track and handle I/O errors:

rpi = revpimodio2.RevPiModIO(autorefresh=True)
rpi.maxioerrors = 10  # Exception after 10 errors

def main_cycle(ct):
    # Check error count periodically
    if ct.flank20c:
        if rpi.ioerrors > 5:
            print(f"Warning: {rpi.ioerrors} I/O errors detected")
            ct.io.warning_led.value = True

    # Normal logic
    ct.io.output.value = ct.io.input.value

try:
    rpi.cycleloop(main_cycle)
except RuntimeError as e:
    print(f"I/O error threshold exceeded: {e}")

Best Practices

Naming Conventions

Use descriptive IO names in piCtory:

# Good - Clear intent
if rpi.io.emergency_stop.value:
    rpi.io.motor.value = False
    rpi.io.alarm.value = True

# Poor - Generic names
if rpi.io.I_15.value:
    rpi.io.O_3.value = False
    rpi.io.O_7.value = True

Code Organization

Structure your code for maintainability:

import revpimodio2

# Constants
TEMP_HIGH_THRESHOLD = 75
TEMP_LOW_THRESHOLD = 65

# Initialize
rpi = revpimodio2.RevPiModIO(autorefresh=True)

def initialize(ct):
    """Initialize system state."""
    ct.var.cooling_active = False
    ct.var.alarm_active = False
    ct.io.motor.value = False

def monitor_temperature(ct):
    """Temperature monitoring logic."""
    temp = ct.io.temperature.value

    if temp > TEMP_HIGH_THRESHOLD:
        ct.io.cooling.value = True
        ct.var.cooling_active = True

    if temp < TEMP_LOW_THRESHOLD:
        ct.io.cooling.value = False
        ct.var.cooling_active = False

def main_cycle(ct):
    """Main control loop."""
    if ct.first:
        initialize(ct)

    monitor_temperature(ct)

    if ct.last:
        ct.io.cooling.value = False

# Run
try:
    rpi.cycleloop(main_cycle)
finally:
    rpi.exit()

Documentation

Document complex logic:

def control_cycle(ct):
    """Control cycle for temperature management.

    State machine:
    - IDLE: Waiting for start
    - HEATING: Active heating to setpoint
    - COOLING: Active cooling from overshoot
    - ERROR: Fault condition

    Hysteresis: ±5°C around setpoint
    """
    if ct.first:
        ct.var.state = "IDLE"
        ct.var.setpoint = 70.0

    # State machine implementation
    # ...

Testing

Test your code thoroughly:

def test_temperature_control(ct):
    """Test temperature control logic."""

    if ct.first:
        ct.var.cooling_active = False
        ct.var.test_temp = 20.0

    # Simulate temperature increase
    if ct.var.test_temp < 80:
        ct.var.test_temp += 0.5

    # Test control logic
    temp = ct.var.test_temp

    if temp > 75 and not ct.var.cooling_active:
        assert ct.io.cooling.value == True
        ct.var.cooling_active = True

    if temp < 65 and ct.var.cooling_active:
        assert ct.io.cooling.value == False
        ct.var.cooling_active = False

Logging

Implement proper logging:

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def main_cycle(ct):
    if ct.first:
        logging.info("System started")
        ct.var.error_count = 0

    # Log errors
    if ct.io.error_sensor.value:
        ct.var.error_count += 1
        logging.error(f"Error detected: {ct.var.error_count}")

    # Log status periodically
    if ct.flank100c:
        logging.info(f"Temperature: {ct.io.temperature.value}°C")

    if ct.last:
        logging.info("System stopped")

Security Considerations

Validate External Input

Always validate external inputs:

def on_setpoint_change(ioname, iovalue):
    """Validate setpoint range."""
    if 0 <= iovalue <= 100:
        rpi.io.setpoint.value = iovalue
    else:
        print(f"Invalid setpoint: {iovalue}")
        rpi.io.error_led.value = True

Fail-Safe Defaults

Use safe defaults for critical outputs:

def main_cycle(ct):
    if ct.first:
        # Safe defaults
        ct.io.motor.value = False
        ct.io.heater.value = False
        ct.io.valve.value = False

    try:
        # Control logic
        control_logic(ct)
    except Exception as e:
        # Revert to safe state on error
        ct.io.motor.value = False
        ct.io.heater.value = False

See Also

  • :doc:`basics` - Core concepts and configuration

    System Message: ERROR/3 (<stdin>, line 708); backlink

    Unknown interpreted text role "doc".

  • :doc:`cyclic_programming` - Cyclic programming patterns

    System Message: ERROR/3 (<stdin>, line 709); backlink

    Unknown interpreted text role "doc".

  • :doc:`event_programming` - Event-driven programming patterns

    System Message: ERROR/3 (<stdin>, line 710); backlink

    Unknown interpreted text role "doc".

  • :doc:`api/index` - API reference

    System Message: ERROR/3 (<stdin>, line 711); backlink

    Unknown interpreted text role "doc".

</html>