82 lines
2.2 KiB
Python
82 lines
2.2 KiB
Python
import requests
|
|
from requests.structures import CaseInsensitiveDict
|
|
import json
|
|
import threading
|
|
from base64 import b64encode
|
|
from datetime import datetime
|
|
from random import randint
|
|
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class LoraController:
|
|
|
|
def __init__(self, main):
|
|
self.main = main
|
|
self.storage = dict()
|
|
self.devices = []
|
|
self.token = ""
|
|
|
|
def generate_token(self):
|
|
|
|
url = "https://lora.plzen.eu/api/v2/login"
|
|
|
|
headers = CaseInsensitiveDict()
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
data = {
|
|
"email": self.main.config["lora_api"]["username"],
|
|
"password": self.main.config["lora_api"]["password"]
|
|
}
|
|
|
|
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
|
|
self.token = data["token"]
|
|
|
|
def new(self, id: int, stop_id: int):
|
|
self.devices.append(LoraDevice(self, id, stop_id))
|
|
|
|
def time(self):
|
|
def to_string():
|
|
return f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}"
|
|
for d in self.devices:
|
|
d.send(to_string)
|
|
|
|
def data(self, data):
|
|
for d in self.devices:
|
|
if d.stop_id == data.stop_id:
|
|
d.send(data.__repr__)
|
|
|
|
class LoraDevice:
|
|
|
|
def __init__(self, controller: LoraController, deveui: int, stop_id: int):
|
|
self.controller = controller
|
|
self.id = deveui
|
|
self.stop_id = stop_id
|
|
self.message_pool = []
|
|
self.thread = None
|
|
self.send(lambda: "CLEAR")
|
|
|
|
def send(self, msg):
|
|
self.message_pool.append(msg)
|
|
if not self.thread or not self.thread.is_alive():
|
|
self.thread = threading.Thread(target=self.thread_sending)
|
|
self.thread.start()
|
|
|
|
def thread_sending(self):
|
|
while True:
|
|
if len(self.message_pool) == 0:
|
|
break
|
|
message = self.message_pool.pop(0)()
|
|
url = f"https://lora.plzen.eu/api/v2/nodes/{self.id:0>16x}/queue"
|
|
headers = CaseInsensitiveDict()
|
|
headers["Content-Type"] = "application/json"
|
|
headers["Authorization"] = f"Bearer {self.controller.token}"
|
|
payload = {
|
|
"confirmed": True,
|
|
"data": b64encode(message.encode("utf-8")).decode("ascii"),
|
|
"fPort": randint(1, 50),
|
|
"reference": "string"
|
|
}
|
|
print(f"{self.id:0>16x} > {message}")
|
|
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
|