Revised various sections across multiple documentation files to reflect updated methods (`run_plc` replacing manual setup with `cycleloop`) and adjust for new default parameters (e.g., `autorefresh`). Enhanced descriptions for timers, Cycletools usage, and new method explanations. Removed outdated or redundant examples and updated system requirements. Signed-off-by: Sven Sager <akira@narux.de>
12 KiB
Event Programming
Event-driven programming uses callbacks triggered by hardware state changes.
Contents
When to Use Event-Driven Programming
Event-driven programming is ideal for:
- Handling user interactions
- Processing occasional events
- Background tasks
- System integration
- Low CPU usage requirements
Advantages:
- Consumes CPU only when events occur
- Natural for user interfaces
- Simple asynchronous operation
- Efficient for sporadic events
Considerations:
- Non-deterministic timing
- Must handle concurrent events carefully
- Less intuitive for continuous control
Basic Structure
Simple Event Handler
import revpimodio2 rpi = revpimodio2.RevPiModIO(autorefresh=True) def on_button_change(ioname, iovalue): """Called when button changes.""" print(f"{ioname} = {iovalue}") rpi.io.led.value = iovalue # Register event rpi.io.button.reg_event(on_button_change) # Run main loop rpi.handlesignalend() rpi.mainloop()
The callback function receives:
- ioname - Name of the IO that changed
- iovalue - New value of the IO
Event Registration
Value Change Events
Register callbacks for IO value changes:
def on_change(ioname, iovalue): print(f"{ioname} changed to {iovalue}") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Any change rpi.io.sensor.reg_event(on_change) # Rising edge only rpi.io.button.reg_event(on_change, edge=revpimodio2.RISING) # Falling edge only rpi.io.button.reg_event(on_change, edge=revpimodio2.FALLING) rpi.handlesignalend() rpi.mainloop()
Edge types:
- revpimodio2.RISING - False to True transition
- revpimodio2.FALLING - True to False transition
- revpimodio2.BOTH - Any change (default)
Multiple Events
Register multiple callbacks on one IO:
def on_press(ioname, iovalue): print("Pressed!") def on_release(ioname, iovalue): print("Released!") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Different callbacks for different edges rpi.io.button.reg_event(on_press, edge=revpimodio2.RISING) rpi.io.button.reg_event(on_release, edge=revpimodio2.FALLING) rpi.handlesignalend() rpi.mainloop()
Or register one callback on multiple IOs:
def any_sensor_changed(ioname, iovalue): print(f"{ioname} changed to {iovalue}") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Same callback for multiple sensors for sensor in ["sensor1", "sensor2", "sensor3"]: if sensor in rpi.io: rpi.io[sensor].reg_event(any_sensor_changed) rpi.handlesignalend() rpi.mainloop()
Debouncing
Add debounce delays to filter noise and false triggers:
def on_stable_press(ioname, iovalue): """Called only after button is stable for 50ms.""" print(f"Confirmed: {iovalue}") rpi = revpimodio2.RevPiModIO(autorefresh=True) # 50ms debounce delay rpi.io.noisy_button.reg_event(on_stable_press, delay=50) rpi.handlesignalend() rpi.mainloop()
How debouncing works:
- IO value changes
- RevPiModIO waits for delay milliseconds
- If value is still changed, callback is triggered
- If value changed back, callback is not triggered
Typical debounce times:
- Mechanical switches: 20-50ms
- Relays: 10-20ms
- Analog sensors: 100-500ms
Debouncing with Edge Detection
def on_confirmed_press(ioname, iovalue): """Called only for stable button press.""" print("Confirmed press") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Rising edge with 30ms debounce rpi.io.button.reg_event( on_confirmed_press, edge=revpimodio2.RISING, delay=30 ) rpi.handlesignalend() rpi.mainloop()
Timer Events
The timer is started when the IO value changes and executes the passed function - even if the IO value has changed in the meantime. If the timer has not expired and the condition is met again, the timer is NOT reset to the delay value or started a second time.
def periodic_task(ioname, iovalue): """Called after 500ms.""" print(f"Periodic task: {iovalue}") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Execute every 500ms rpi.io.dummy.reg_timerevent(periodic_task, 500) rpi.handlesignalend() rpi.mainloop()
Timer Event Parameters
rpi = revpimodio2.RevPiModIO(autorefresh=True) def blink_led(ioname, iovalue): """Toggle LED every 500ms.""" rpi.io.blink_led.value = not rpi.io.blink_led.value def log_temperature(ioname, iovalue): """Log temperature every 5 seconds.""" temp = rpi.io.temperature.value print(f"Temperature: {temp}°C") # Blink every 500ms, trigger immediately rpi.io.blink_led.reg_timerevent(blink_led, 500, prefire=True) # Log every 5 seconds, don't trigger immediately rpi.io.temperature.reg_timerevent(log_temperature, 5000, prefire=False) rpi.handlesignalend() rpi.mainloop()
Parameters:
- interval - Delay in milliseconds.
- prefire - If True, trigger immediately after starting the mainloop.
Threaded Events
Use threaded events for long-running operations that would block the main loop:
def long_task(eventcallback: revpimodio2.EventCallback): """Threaded handler for time-consuming tasks.""" print(f"Starting task for {eventcallback.ioname}") for i in range(10): # Check if stop requested if eventcallback.exit.is_set(): print("Task cancelled") return # Interruptible wait (1 second) eventcallback.exit.wait(1) print(f"Progress: {(i+1)*10}%") print("Task complete") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Register as threaded event rpi.io.trigger.reg_event( long_task, as_thread=True, edge=revpimodio2.RISING ) rpi.handlesignalend() rpi.mainloop()
EventCallback Object
Threaded callbacks receive an EventCallback object with:
- eventcallback.ioname - Name of the IO
- eventcallback.iovalue - Value that triggered event
- eventcallback.exit - threading.Event for cancellation
Important: Always check eventcallback.exit.is_set() periodically to allow graceful shutdown.
Interruptible Sleep
Use eventcallback.exit.wait() instead of time.sleep() for interruptible delays:
def background_task(eventcallback: revpimodio2.EventCallback): """Long task with interruptible waits.""" while not eventcallback.exit.is_set(): # Do some work process_data() # Wait 5 seconds or until exit requested if eventcallback.exit.wait(5): break # Exit was requested rpi = revpimodio2.RevPiModIO(autorefresh=True) rpi.io.trigger.reg_event(background_task, as_thread=True) rpi.handlesignalend() rpi.mainloop()
Unregistering Events
Remove event callbacks when no longer needed:
def my_callback(ioname, iovalue): print(f"{ioname} = {iovalue}") rpi = revpimodio2.RevPiModIO(autorefresh=True) # Register event rpi.io.button.reg_event(my_callback) # Unregister specific callback rpi.io.button.unreg_event(my_callback) # Unregister all events for this IO rpi.io.button.unreg_event()
Practical Examples
Sensor Logging
import revpimodio2 from datetime import datetime rpi = revpimodio2.RevPiModIO(autorefresh=True) def log_sensor_change(ioname, iovalue): """Log sensor changes with timestamp.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{timestamp} - {ioname}: {iovalue}") # Log all sensor changes for io_name in ["sensor1", "sensor2", "temperature"]: if io_name in rpi.io: rpi.io[io_name].reg_event(log_sensor_change) rpi.handlesignalend() rpi.mainloop()
Threaded Data Processing
import revpimodio2 rpi = revpimodio2.RevPiModIO(autorefresh=True) def process_batch(eventcallback: revpimodio2.EventCallback): """Process data batch in background thread.""" print(f"Starting batch processing...") batch_size = 100 for i in range(batch_size): if eventcallback.exit.is_set(): print("Processing cancelled") return # Simulate processing eventcallback.exit.wait(0.1) if i % 10 == 0: print(f"Progress: {i}/{batch_size}") print("Batch processing complete") rpi.io.done_led.value = True # Trigger on button press rpi.io.start_batch.reg_event( process_batch, as_thread=True, edge=revpimodio2.RISING ) rpi.handlesignalend() rpi.mainloop()
Best Practices
Keep Callbacks Fast
Event callbacks should complete quickly:
# Good - Fast callback def good_callback(ioname, iovalue): rpi.io.output.value = iovalue # Poor - Blocking callback def poor_callback(ioname, iovalue): time.sleep(5) # Blocks event loop! rpi.io.output.value = iovalue
For slow operations, use threaded events:
def slow_task(eventcallback): # Long operation in separate thread process_data() rpi.io.trigger.reg_event(slow_task, as_thread=True)
Handle Errors Gracefully
Protect callbacks from exceptions:
def safe_callback(ioname, iovalue): try: result = risky_operation(iovalue) rpi.io.output.value = result except Exception as e: print(f"Error in callback: {e}") rpi.io.output.value = False # Safe state
Clean Up Threads
Threaded events are automatically cleaned up on exit, but you can manually unregister:
def long_task(eventcallback): while not eventcallback.exit.is_set(): work() eventcallback.exit.wait(1) rpi = revpimodio2.RevPiModIO(autorefresh=True) # Register rpi.io.trigger.reg_event(long_task, as_thread=True) # Later: unregister to stop thread rpi.io.trigger.unreg_event(long_task)
See Also
:doc:`basics` - Core concepts and configuration
System Message: ERROR/3 (<stdin>, line 513); backlink
Unknown interpreted text role "doc".
:doc:`cyclic_programming` - Cyclic programming patterns
System Message: ERROR/3 (<stdin>, line 514); backlink
Unknown interpreted text role "doc".
:doc:`advanced` - Combining paradigms and advanced topics
System Message: ERROR/3 (<stdin>, line 515); backlink
Unknown interpreted text role "doc".
:doc:`api/helper` - EventCallback API reference
System Message: ERROR/3 (<stdin>, line 516); backlink
Unknown interpreted text role "doc".