# -*- coding: utf-8 -*- """LoRaWAN module for RN 2483.""" from queue import Empty, Queue from threading import Thread from serial import Serial __author__ = "Sven Sager" __copyright__ = "Copyright (C) 2019 Sven Sager" __license__ = "GPLv3" class RN2483: CRLF = b'\r\n' def __init__(self, port, baud=57600): self._exit = False self._serial = Serial(port, baud, timeout=0.5) self._read_q = Queue() self._write_q = Queue() # TODO: Clear buffer self.reader = Thread(target=self._reader, daemon=True) self.reader.start() def _reader(self) -> None: """ Reader Thread to manage queues for the device. """ while not self._exit: try: cmd = self._write_q.get_nowait() except Empty: pass else: self._serial.write(cmd + self.CRLF) buff = self._serial.read_until() buff = buff.decode("ASCII").strip() if buff: self._read_q.put_nowait(buff) def close(self) -> None: """ Disconnect from Device. """ self._exit = True self.reader.join() self._serial.close() def get_result(self, timeout=None) -> str: """ Get a result fromMo device queue. Some commands returns more than one line. You can call this function to get the extra lines from the queue. :param timeout: Timeout to wait for result of device :return: Result from device or empty string """ try: result = self._read_q.get(timeout=timeout) except Empty: result = "" return result def send(self, cmd: str, get_result=True, timeout=None) -> str: """ Send a command to Device. This function can return the result from the device of the send command. If you set get_result to False, you have to use get_result(...) to get the answers of the device. :param cmd: Command to send :param get_result: If true, function will return result :param timeout: Timeout to wait for result of device :return: Result from device or empty string """ cmd = cmd.encode("ASCII") self._write_q.put(cmd) if get_result: return self.get_result(timeout) if __name__ == '__main__': from configparser import ConfigParser from time import sleep conf = ConfigParser() conf.read("lora.conf") root = RN2483(conf.get("DEFAULT", "port")) def first_init() -> None: """Init our RN2483 after firmware startup.""" # Configuration: ["cmd", results] cmd_list = [ ["sys get ver", 1], ["mac reset 868", 1], ["sys get hweui", 1], # Set TTN configurations form .conf file ["mac set appskey {0}".format(conf.get("DEFAULT", "appskey")), 1], ["mac set nwkskey {0}".format(conf.get("DEFAULT", "nwkskey")), 1], ["mac set devaddr {0}".format(conf.get("DEFAULT", "devaddr")), 1], ["mac join abp", 2], ["mac set ar on", 1], ] for do_cmd in cmd_list: print("<-", do_cmd[0]) print("->", root.send(do_cmd[0])) # Get awaited extra results results for i in range(do_cmd[1] - 1): print(" ", root.get_result()) first_init() # Command mode while True: results = 1 cmd_input = input("<- ") if cmd_input == "exit": break elif cmd_input == "reset": root.send("sys reset") sleep(3) first_init() continue # Functions to make live more easier elif cmd_input.find("send ") > -1: lst = cmd_input.split() cmd_input = "mac tx uncnf {port} {bytes}".format( port=lst[2] if len(lst) > 2 else 1, bytes=lst[1] if len(lst) > 1 else "", ) results = 2 # Always send command if cmd_input: print("->", root.send(cmd_input)) # Get awaited extra results results for i in range(results - 1): print(" ", root.get_result()) root.close()