Prepare SSH tunnel server and password GUI

This commit is contained in:
2023-01-06 17:10:06 +01:00
parent da5d0a4a59
commit 2f595f66aa
5 changed files with 383 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
"""Package of ssh tunnel connections."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "GPLv3"

View File

@@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
"""
Connect to a remote host and tunnel a port.
This was crated on base of the paramiko library demo file forward.py, see on
GitHub https://github.com/paramiko/paramiko/blob/main/demos/forward.py
"""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "GPLv3"
import select
from socketserver import BaseRequestHandler, ThreadingTCPServer
from threading import Thread
from paramiko.client import SSHClient, WarningPolicy
from paramiko.rsakey import RSAKey
from paramiko.ssh_exception import PasswordRequiredException
from paramiko.transport import Transport
class ForwardServer(ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
class Handler(BaseRequestHandler):
def handle(self):
try:
chan = self.ssh_transport.open_channel(
"direct-tcpip",
(self.chain_host, self.chain_port),
self.request.getpeername(),
)
except Exception as e:
return
if chan is None:
return
while True:
r, w, x = select.select([self.request, chan], [], [], 5.0)
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.send(data)
chan.close()
self.request.close()
class SSHLocalTunnel:
def __init__(self, remote_tunnel_port: int, ssh_host: str, ssh_port: int = 22):
"""
Connect to a ssh remote host and tunnel a port to your host.
:param remote_tunnel_port: Port on the remote host to tunnel through ssh
:param ssh_host: ssh remote host address
:param ssh_port: ssh remote host port
"""
self._remote_tunnel_port = remote_tunnel_port
self._ssh_host = ssh_host
self._ssh_port = ssh_port
self._th_server = Thread()
self._ssh_client = SSHClient()
self._ssh_client.load_system_host_keys()
self._ssh_client.set_missing_host_key_policy(WarningPolicy())
self._ssh_transport = None # type: Transport
self._forward_server = None # type: ThreadingTCPServer
self._local_tunnel_port = None # type: int
def __th_target(self):
"""Server thread for socket mirror."""
self._forward_server.serve_forever()
def _configure_forward_server(self) -> int:
"""
Configure forward server for port mirror.
:return: Local port on wich the remote port is connected
"""
self._ssh_transport = self._ssh_client.get_transport()
class SubHandler(Handler):
chain_host = "127.0.0.1"
chain_port = self._remote_tunnel_port
ssh_transport = self._ssh_transport
self._forward_server = ForwardServer(("127.0.0.1", 0), SubHandler)
self._local_tunnel_port = self._forward_server.socket.getsockname()[1]
self._th_server = Thread(target=self.__th_target)
self._th_server.start()
return self._local_tunnel_port
def connect_by_credentials(self, username: str, password: str) -> int:
"""
Connect to a ssh remote host and tunnel specified port of localhost.
:return: Local port on wich the remote port is connected
"""
if self._th_server.is_alive():
raise RuntimeError("Already connected")
self._ssh_client.connect(
hostname=self._ssh_host,
port=self._ssh_port,
username=username,
password=password,
)
return self._configure_forward_server()
def connect_by_keyfile(self, username: str, key_file: str, key_password: str = None) -> int:
"""
Connect to a ssh remote host and tunnel specified port of localhost.
:return: Local port on wich the remote port is connected
"""
if self._th_server.is_alive():
raise RuntimeError("Already connected")
if self.key_file_password_protected(key_file):
private_key = RSAKey.from_private_key_file(key_file, key_password)
else:
private_key = RSAKey.from_private_key_file(key_file)
self._ssh_client.connect(
hostname=self._ssh_host,
port=self._ssh_port,
username=username,
pkey=private_key,
look_for_keys=True,
)
return self._configure_forward_server()
def disconnect(self):
"""Close SSH tunnel connection."""
self._local_tunnel_port = None
if self._forward_server:
self._forward_server.shutdown()
self._forward_server.server_close()
if self._ssh_transport:
self._ssh_transport.close()
self._ssh_client.close()
@staticmethod
def key_file_password_protected(key_file: str) -> bool:
try:
RSAKey.from_private_key_file(key_file)
except PasswordRequiredException:
return True
return False
@property
def connected(self):
"""Check connection state of ssh tunnel."""
return self._ssh_transport and self._ssh_transport.is_active()
@property
def local_tunnel_port(self) -> int:
return self._local_tunnel_port

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""Authentication dialog for SSH."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2023 Sven Sager"
__license__ = "GPLv3"
from enum import Enum
from PyQt5 import QtWidgets
from revpicommander.ui.sshauth_ui import Ui_diag_sshauth
class SSHAuthType(Enum):
PASS = "pass"
KEYS = "keys"
class SSHAuth(QtWidgets.QDialog, Ui_diag_sshauth):
"""Version information window."""
def __init__(self, auth_type: SSHAuthType, parent=None):
super(SSHAuth, self).__init__(parent)
self.setupUi(self)
self.wid_password.setVisible(auth_type is SSHAuthType.PASS)
self.wid_keys.setVisible(auth_type is SSHAuthType.KEYS)
@property
def password(self) -> str:
return self.txt_password.text()
@property
def username(self) -> str:
return self.txt_username.text()
@username.setter
def username(self, value: str):
self.txt_username.setText(value)

View File

@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'sshauth.ui'
#
# Created by: PyQt5 UI code generator 5.15.7
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_diag_sshauth(object):
def setupUi(self, diag_sshauth):
diag_sshauth.setObjectName("diag_sshauth")
diag_sshauth.setWindowModality(QtCore.Qt.ApplicationModal)
diag_sshauth.resize(275, 170)
self.verticalLayout = QtWidgets.QVBoxLayout(diag_sshauth)
self.verticalLayout.setObjectName("verticalLayout")
self.wid_password = QtWidgets.QWidget(diag_sshauth)
self.wid_password.setObjectName("wid_password")
self.formLayout = QtWidgets.QFormLayout(self.wid_password)
self.formLayout.setObjectName("formLayout")
self.lbl_username = QtWidgets.QLabel(self.wid_password)
self.lbl_username.setObjectName("lbl_username")
self.formLayout.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.lbl_username)
self.lbl_password = QtWidgets.QLabel(self.wid_password)
self.lbl_password.setObjectName("lbl_password")
self.formLayout.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.lbl_password)
self.txt_password = QtWidgets.QLineEdit(self.wid_password)
self.txt_password.setEchoMode(QtWidgets.QLineEdit.Password)
self.txt_password.setObjectName("txt_password")
self.formLayout.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.txt_password)
self.txt_username = QtWidgets.QLineEdit(self.wid_password)
self.txt_username.setObjectName("txt_username")
self.formLayout.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.txt_username)
self.verticalLayout.addWidget(self.wid_password)
self.wid_keys = QtWidgets.QWidget(diag_sshauth)
self.wid_keys.setObjectName("wid_keys")
self.verticalLayout.addWidget(self.wid_keys)
self.buttonBox = QtWidgets.QDialogButtonBox(diag_sshauth)
self.buttonBox.setOrientation(QtCore.Qt.Horizontal)
self.buttonBox.setStandardButtons(QtWidgets.QDialogButtonBox.Cancel|QtWidgets.QDialogButtonBox.Ok)
self.buttonBox.setObjectName("buttonBox")
self.verticalLayout.addWidget(self.buttonBox)
self.retranslateUi(diag_sshauth)
self.buttonBox.accepted.connect(diag_sshauth.accept) # type: ignore
self.buttonBox.rejected.connect(diag_sshauth.reject) # type: ignore
QtCore.QMetaObject.connectSlotsByName(diag_sshauth)
def retranslateUi(self, diag_sshauth):
_translate = QtCore.QCoreApplication.translate
diag_sshauth.setWindowTitle(_translate("diag_sshauth", "SSH authentication"))
self.lbl_username.setText(_translate("diag_sshauth", "SSH username:"))
self.lbl_password.setText(_translate("diag_sshauth", "SSH password:"))
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
diag_sshauth = QtWidgets.QDialog()
ui = Ui_diag_sshauth()
ui.setupUi(diag_sshauth)
diag_sshauth.show()
sys.exit(app.exec_())

100
ui_dev/sshauth.ui Normal file
View File

@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>diag_sshauth</class>
<widget class="QDialog" name="diag_sshauth">
<property name="windowModality">
<enum>Qt::ApplicationModal</enum>
</property>
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>275</width>
<height>170</height>
</rect>
</property>
<property name="windowTitle">
<string>SSH authentication</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QWidget" name="wid_password" native="true">
<layout class="QFormLayout" name="formLayout">
<item row="0" column="0">
<widget class="QLabel" name="lbl_username">
<property name="text">
<string>SSH username:</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="lbl_password">
<property name="text">
<string>SSH password:</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="txt_password">
<property name="echoMode">
<enum>QLineEdit::Password</enum>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QLineEdit" name="txt_username"/>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QWidget" name="wid_keys" native="true"/>
</item>
<item>
<widget class="QDialogButtonBox" name="buttonBox">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="standardButtons">
<set>QDialogButtonBox::Cancel|QDialogButtonBox::Ok</set>
</property>
</widget>
</item>
</layout>
</widget>
<resources/>
<connections>
<connection>
<sender>buttonBox</sender>
<signal>accepted()</signal>
<receiver>diag_sshauth</receiver>
<slot>accept()</slot>
<hints>
<hint type="sourcelabel">
<x>248</x>
<y>254</y>
</hint>
<hint type="destinationlabel">
<x>157</x>
<y>274</y>
</hint>
</hints>
</connection>
<connection>
<sender>buttonBox</sender>
<signal>rejected()</signal>
<receiver>diag_sshauth</receiver>
<slot>reject()</slot>
<hints>
<hint type="sourcelabel">
<x>316</x>
<y>260</y>
</hint>
<hint type="destinationlabel">
<x>286</x>
<y>274</y>
</hint>
</hints>
</connection>
</connections>
</ui>