1
0
mirror of https://github.com/naruxde/revpi-revpiday2019.git synced 2025-11-08 15:03:51 +01:00
Files
revpi-revpiday2019/praezipos/praezipos_gui.py
2019-09-28 12:21:06 +02:00

283 lines
8.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""GUI for forklift terminals."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2019 Sven Sager"
__license__ = "GPLv3"
from PyQt5 import QtCore, QtGui, QtWidgets
from revpimodio2 import RevPiNetIODriver
from revpimodio2.netio import ConfigChanged
from ui.main_ui import Ui_frm_main
class PraeziPos(QtWidgets.QMainWindow, Ui_frm_main):
def __init__(self):
super(PraeziPos, self).__init__()
self.setupUi(self)
# Setup start positions
take_back = 550
self.wid_fork.move(self.wid_fork.x(), self.wid_fork.y() + take_back)
self.wid_lift.move(self.wid_lift.x(), self.wid_lift.y() + take_back)
self.wid_load.move(self.wid_load.x(), self.wid_load.y() + take_back)
self.wid_fork_geo = self.wid_fork.geometry()
self.wid_lift_geo = self.wid_lift.geometry()
self.wid_load_geo = self.wid_load.geometry()
# Animation thread
self.ani = AnimatePosition(self, take_back)
self.ani.move_forklift.connect(self.on_ani_move_forklift)
self.ani.move_load.connect(self.on_ani_move_load)
self.ani.start()
# Create revpi manager an connect thread save to events
self.rm = RevPiManager(self)
self.rm.forklift_changed.connect(self.on_forklift_changed)
self.rm.load_pos_changed.connect(self.on_load_pos_changed)
self.rm.program_running_changed.connect(self.on_program_running_changed)
self.status.showMessage("Verbinde zum Revolution Pi...")
self.rm.start()
def closeEvent(self, a0: QtGui.QCloseEvent) -> None:
"""Shutdown threads before closing."""
self.ani.requestInterruption()
self.rm.requestInterruption()
while self.rm.isRunning():
self.rm.wait(100)
@QtCore.pyqtSlot(int)
def on_ani_move_forklift(self, y: int):
"""Animate the forklift."""
self.wid_fork.setVisible(y > 0)
self.wid_lift.setVisible(y > 0)
self.wid_fork.move(
self.wid_fork.x(),
self.wid_fork_geo.y() - y
)
self.wid_lift.move(
self.wid_lift.x(),
self.wid_lift_geo.y() - y
)
@QtCore.pyqtSlot(int)
def on_ani_move_load(self, y: int):
"""Animate the load of forklift."""
self.wid_load.setVisible(y > 0)
self.wid_load.move(
self.wid_load.x(),
self.wid_load_geo.y() - y
)
@QtCore.pyqtSlot(str, bool)
def on_forklift_changed(self, name: str, exists: bool):
if exists:
# Forklift is coming
self.wid_lift.move(
self.wid_lift_geo.x() + int(self.rm.rpi.io.load_pos.value / 100),
self.wid_lift.y()
)
self.ani.come()
else:
# Forklift is going
self.ani.leave()
@QtCore.pyqtSlot(str, int)
def on_load_pos_changed(self, name: str, position: int):
if not self.rm.rpi.io.program_running.value:
return
if not self.rm.rpi.io.forklift.value:
self.wid_more_left.setVisible(False)
self.wid_more_right.setVisible(False)
return
# Directions
status = True
if self.rm.rpi.io.more_left.value:
status = False
self.wid_more_left.setPixmap(QtGui.QPixmap(":/global/png/left.png"))
if self.rm.rpi.io.more_right.value:
status = False
self.wid_more_right.setPixmap(QtGui.QPixmap(":/global/png/right.png"))
if status:
self.wid_more_left.setPixmap(QtGui.QPixmap(":/global/png/ok.png"))
self.wid_more_right.setPixmap(QtGui.QPixmap(":/global/png/ok.png"))
self.wid_more_left.setVisible(self.rm.rpi.io.more_left.value or status)
self.wid_more_right.setVisible(self.rm.rpi.io.more_right.value or status)
# Move load and fork
self.wid_fork.move(
self.wid_fork_geo.x() + int(position / 100),
self.wid_fork.y()
)
self.wid_load.move(
self.wid_load_geo.x() + int(position / 100),
self.wid_load.y()
)
@QtCore.pyqtSlot(str, bool)
def on_program_running_changed(self, name: str, running: bool):
if running:
self.status.showMessage("Steuerprogramm läuft...", 2000)
else:
self.status.showMessage("Steuerprogramm ist nicht aktiv!")
@QtCore.pyqtSlot()
def on_btn_fullscreen_clicked(self):
if self.isFullScreen():
self.showNormal()
else:
self.showFullScreen()
class AnimatePosition(QtCore.QThread):
move_forklift = QtCore.pyqtSignal(int)
move_load = QtCore.pyqtSignal(int)
def __init__(self, parent, pixel_way: int):
super(AnimatePosition, self).__init__(parent)
self._come = False
self._leave = False
self._load_arrived = False
self._step_time = 10
self._step_value = 2
self._step_wait = 1000
self._way = 0
self._y_way = pixel_way
def come(self):
self._leave = False
self._come = True
def leave(self):
self._come = False
self._leave = True
def requestInterruption(self) -> None:
self._come = False
self._leave = False
super(AnimatePosition, self).requestInterruption()
def run(self) -> None:
while not self.isInterruptionRequested():
while self._come:
self._way += self._step_value
# Animation signals
self.move_forklift.emit(self._way)
self.move_load.emit(self._way)
if self._way < self._y_way:
self._load_arrived = False
self.msleep(self._step_time)
else:
self._load_arrived = True
self._come = False
while self._leave:
self._way -= self._step_value
# Animation signals
self.move_forklift.emit(self._way)
if not self._load_arrived:
self.move_load.emit(self._way)
if self._way > 0:
self.msleep(self._step_wait if self._way <= self._step_value else self._step_time)
else:
self.move_load.emit(self._way)
self._leave = False
if not (self._come or self._leave):
self.msleep(self._step_time)
class RevPiManager(QtCore.QThread):
forklift_changed = QtCore.pyqtSignal(str, bool)
load_pos_changed = QtCore.pyqtSignal(str, int)
program_running_changed = QtCore.pyqtSignal(str, bool)
def __init__(self, parent):
super(RevPiManager, self).__init__(parent)
self.rpi = None # type: RevPiNetIODriver
def _config_rpi(self) -> None:
"""Connect to RevPiPyLoad and register events."""
self.rpi = RevPiNetIODriver(
"192.168.1.2",
"panel01",
autorefresh=True,
replace_io_file=":network:",
)
self.rpi.cycletime = 25
self.rpi.setdefaultvalues()
self.rpi.net_setdefaultvalues()
# Fire events thread save to GUI elements
self.rpi.io.forklift.reg_event(self.forklift_changed.emit, prefire=True)
self.rpi.io.load_pos.reg_event(self.load_pos_changed.emit, prefire=True)
self.rpi.io.program_running.reg_event(self.program_running_changed.emit, prefire=True)
def run(self) -> None:
"""Mainloop of thread to connect to RevPi and monitor status."""
while not self.isInterruptionRequested():
# Connect RevPi
if self.rpi is None:
try:
self._config_rpi()
except Exception as e:
self.msleep(250)
continue
# Set values
self.rpi.io.p_online.value = True
# Start event system of RevPiModIO
try:
self.rpi.mainloop()
except ConfigChanged:
# Configuration was changed, try to reconnect
self.rpi.disconnect()
self.rpi = None
# Clean up
if self.rpi is not None:
self.rpi.io.p_online.value = False
self.rpi.disconnect()
def requestInterruption(self) -> None:
# Just exit blocking mainloop function.
super(RevPiManager, self).requestInterruption()
self.rpi.exit(full=False)
@property
def error(self):
return self.rpi is None or self.rpi.reconnecting
if __name__ == '__main__':
import sys
app = QtWidgets.QApplication([])
frm_main = PraeziPos()
if "-f" in sys.argv:
# Show in fullscreen
frm_main.showFullScreen()
frm_main.show()
# Run main app
rc = app.exec_()
sys.exit(rc)