Erster Release

This commit is contained in:
2019-09-21 23:13:32 +02:00
parent c44ccfa374
commit 9587a53506
11 changed files with 224 additions and 0 deletions

1
.gitignore vendored
View File

@@ -114,3 +114,4 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
/test/lora.conf

17
.idea/$CACHE_FILE$ generated Normal file
View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectInspectionProfilesVisibleTreeState">
<entry key="Project Default">
<profile-state>
<expanded-state>
<State />
</expanded-state>
<selected-state>
<State>
<id>Angular</id>
</State>
</selected-state>
</profile-state>
</entry>
</component>
</project>

2
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,2 @@
# Default ignored files
/workspace.xml

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/rn2483lora.iml" filepath="$PROJECT_DIR$/.idea/rn2483lora.iml" />
</modules>
</component>
</project>

14
.idea/rn2483lora.iml generated Normal file
View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/test" />
</content>
<orderEntry type="jdk" jdkName="Python 3.6" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

5
__init__.py Normal file
View File

@@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
"""Module for LoRaWAN chip RN2483."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2019 Sven Sager"
__license__ = "GPLv3"

5
lora.conf Normal file
View File

@@ -0,0 +1,5 @@
[DEFAULT]
port = /dev/ttyUSB0
appskey =
nwkskey =
devaddr =

153
rn2483lora.py Normal file
View File

@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
"""LoRaWAN module for RN 2483."""
from queue import Empty, Queue
from threading import Thread
from serial import Serial
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2019 Sven Sager"
__license__ = "GPLv3"
class RN2483:
CRLF = b'\r\n'
def __init__(self, port, baud=57600):
self._exit = False
self._serial = Serial(port, baud, timeout=0.5)
self._read_q = Queue()
self._write_q = Queue()
# TODO: Clear buffer
self.reader = Thread(target=self._reader, daemon=True)
self.reader.start()
def _reader(self) -> None:
"""
Reader Thread to manage queues for the device.
"""
while not self._exit:
try:
cmd = self._write_q.get_nowait()
except Empty:
pass
else:
self._serial.write(cmd + self.CRLF)
buff = self._serial.read_until()
buff = buff.decode("ASCII").strip()
if buff:
self._read_q.put_nowait(buff)
def close(self) -> None:
"""
Disconnect from Device.
"""
self._exit = True
self.reader.join()
self._serial.close()
def get_result(self, timeout=None) -> str:
"""
Get a result fromMo device queue.
Some commands returns more than one line. You can call this function
to get the extra lines from the queue.
:param timeout: Timeout to wait for result of device
:return: Result from device or empty string
"""
try:
result = self._read_q.get(timeout=timeout)
except Empty:
result = ""
return result
def send(self, cmd: str, get_result=True, timeout=None) -> str:
"""
Send a command to Device.
This function can return the result from the device of the send
command. If you set get_result to False, you have to use
get_result(...) to get the answers of the device.
:param cmd: Command to send
:param get_result: If true, function will return result
:param timeout: Timeout to wait for result of device
:return: Result from device or empty string
"""
cmd = cmd.encode("ASCII")
self._write_q.put(cmd)
if get_result:
return self.get_result(timeout)
if __name__ == '__main__':
from configparser import ConfigParser
from time import sleep
conf = ConfigParser()
conf.read("lora.conf")
root = RN2483(conf.get("DEFAULT", "port"))
def first_init() -> None:
"""Init our RN2483 after firmware startup."""
# Configuration: ["cmd", results]
cmd_list = [
["sys get ver", 1],
["mac reset 868", 1],
["sys get hweui", 1],
# Set TTN configurations form .conf file
["mac set appskey {0}".format(conf.get("DEFAULT", "appskey")), 1],
["mac set nwkskey {0}".format(conf.get("DEFAULT", "nwkskey")), 1],
["mac set devaddr {0}".format(conf.get("DEFAULT", "devaddr")), 1],
["mac join abp", 2],
["mac set ar on", 1],
]
for do_cmd in cmd_list:
print("<-", do_cmd[0])
print("->", root.send(do_cmd[0]))
# Get awaited extra results results
for i in range(do_cmd[1] - 1):
print(" ", root.get_result())
first_init()
# Command mode
while True:
results = 1
cmd_input = input("<- ")
if cmd_input == "exit":
break
elif cmd_input == "reset":
root.send("sys reset")
sleep(3)
first_init()
continue
# Functions to make live more easier
elif cmd_input.find("send ") > -1:
lst = cmd_input.split()
cmd_input = "mac tx uncnf {port} {bytes}".format(
port=lst[2] if len(lst) > 2 else 1,
bytes=lst[1] if len(lst) > 1 else "",
)
results = 2
# Always send command
if cmd_input:
print("->", root.send(cmd_input))
# Get awaited extra results results
for i in range(results - 1):
print(" ", root.get_result())
root.close()