157 lines
4.3 KiB
Python
157 lines
4.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""LoRaWAN module for RN 2483."""
|
|
from queue import Empty, Queue
|
|
from threading import Thread
|
|
|
|
from serial import Serial
|
|
|
|
__author__ = "Sven Sager"
|
|
__copyright__ = "Copyright (C) 2019 Sven Sager"
|
|
__license__ = "GPLv3"
|
|
|
|
|
|
class RN2483:
|
|
|
|
CRLF = b'\r\n'
|
|
|
|
def __init__(self, port, baud=57600):
|
|
self._exit = False
|
|
self._serial = Serial(port, baud, timeout=0.5)
|
|
self._read_q = Queue()
|
|
self._write_q = Queue()
|
|
|
|
# TODO: Clear buffer
|
|
self.reader = Thread(target=self._reader, daemon=True)
|
|
self.reader.start()
|
|
|
|
def _reader(self) -> None:
|
|
"""
|
|
Reader Thread to manage queues for the device.
|
|
"""
|
|
while not self._exit:
|
|
try:
|
|
cmd = self._write_q.get_nowait()
|
|
except Empty:
|
|
pass
|
|
else:
|
|
self._serial.write(cmd + self.CRLF)
|
|
|
|
buff = self._serial.read_until()
|
|
buff = buff.decode("ASCII").strip()
|
|
if buff:
|
|
self._read_q.put_nowait(buff)
|
|
|
|
def close(self) -> None:
|
|
"""
|
|
Disconnect from Device.
|
|
"""
|
|
self._exit = True
|
|
self.reader.join()
|
|
self._serial.close()
|
|
|
|
def get_result(self, timeout=None) -> str:
|
|
"""
|
|
Get a result fromMo device queue.
|
|
|
|
Some commands returns more than one line. You can call this function
|
|
to get the extra lines from the queue.
|
|
|
|
:param timeout: Timeout to wait for result of device
|
|
:return: Result from device or empty string
|
|
"""
|
|
try:
|
|
result = self._read_q.get(timeout=timeout)
|
|
except Empty:
|
|
result = ""
|
|
return result
|
|
|
|
def send(self, cmd: str, get_result=True, timeout=None) -> str:
|
|
"""
|
|
Send a command to Device.
|
|
|
|
This function can return the result from the device of the send
|
|
command. If you set get_result to False, you have to use
|
|
get_result(...) to get the answers of the device.
|
|
|
|
:param cmd: Command to send
|
|
:param get_result: If true, function will return result
|
|
:param timeout: Timeout to wait for result of device
|
|
:return: Result from device or empty string
|
|
"""
|
|
cmd = cmd.encode("ASCII")
|
|
self._write_q.put(cmd)
|
|
if get_result:
|
|
return self.get_result(timeout)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from configparser import ConfigParser
|
|
from time import sleep
|
|
|
|
conf = ConfigParser()
|
|
conf.read("lora.conf")
|
|
|
|
root = RN2483(
|
|
conf.get("DEFAULT", "port"),
|
|
conf.get("DEFAULT", "baud", fallback=57600),
|
|
)
|
|
|
|
def first_init() -> None:
|
|
"""Init our RN2483 after firmware startup."""
|
|
|
|
# Configuration: ["cmd", results]
|
|
cmd_list = [
|
|
["sys get ver", 1],
|
|
["mac reset 868", 1],
|
|
["sys get hweui", 1],
|
|
|
|
# Set TTN configurations form .conf file
|
|
["mac set appskey {0}".format(conf.get("DEFAULT", "appskey")), 1],
|
|
["mac set nwkskey {0}".format(conf.get("DEFAULT", "nwkskey")), 1],
|
|
["mac set devaddr {0}".format(conf.get("DEFAULT", "devaddr")), 1],
|
|
|
|
["mac join abp", 2],
|
|
["mac set ar on", 1],
|
|
]
|
|
for do_cmd in cmd_list:
|
|
print("<-", do_cmd[0])
|
|
print("->", root.send(do_cmd[0]))
|
|
|
|
# Get awaited extra results results
|
|
for i in range(do_cmd[1] - 1):
|
|
print(" ", root.get_result())
|
|
|
|
first_init()
|
|
|
|
# Command mode
|
|
while True:
|
|
results = 1
|
|
|
|
cmd_input = input("<- ")
|
|
if cmd_input == "exit":
|
|
break
|
|
elif cmd_input == "reset":
|
|
root.send("sys reset")
|
|
sleep(3)
|
|
first_init()
|
|
continue
|
|
|
|
# Functions to make live more easier
|
|
elif cmd_input.find("send ") > -1:
|
|
lst = cmd_input.split()
|
|
cmd_input = "mac tx uncnf {port} {bytes}".format(
|
|
port=lst[2] if len(lst) > 2 else 1,
|
|
bytes=lst[1] if len(lst) > 1 else "",
|
|
)
|
|
results = 2
|
|
|
|
# Always send command
|
|
if cmd_input:
|
|
print("->", root.send(cmd_input))
|
|
|
|
# Get awaited extra results results
|
|
for i in range(results - 1):
|
|
print(" ", root.get_result())
|
|
|
|
root.close()
|