Odjezdova-tabule-MHD/server/lora.py

89 lines
2.2 KiB
Python

import requests
from requests.structures import CaseInsensitiveDict
import config
import json
import threading
from time import sleep
from base64 import b64encode
from datetime import datetime
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class LoraController:
def __init__(self):
self.storage = dict()
self.devices = []
self.message_pool = []
self.thread = None
def generate_token(self):
url = f"https://lora.plzen.eu/api/v2/login"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
data = {
"email": config.lora_api_username,
"password": config.lora_api_password
}
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
config.lora_api_token = data["token"]
def new(self, id):
self.devices.append(LoraDevice(self, id))
def time(self):
def to_string():
return f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}"
for d in self.devices:
d.send(to_string)
def data(self, data):
for d in self.devices:
d.send(data.__repr__)
def send(self, id, port, msg):
self.message_pool.append((id, port, msg))
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(target=self.thread_sending)
self.thread.start()
def thread_sending(self):
while True:
if not len(self.message_pool):
break
data = self.message_pool.pop()
url = f"https://lora.plzen.eu/api/v2/nodes/{data[0]:0>16x}/queue"
string = data[2]()
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = f"Bearer {config.lora_api_token}"
payload = {
"confirmed": True,
"data": b64encode(string.encode("utf-8")).decode("ascii"),
"fPort": data[1],
"reference": "string"
}
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
sleep(2)
class LoraDevice:
def __init__(self, controller: LoraController, deveui: int):
self.controller = controller
self.id = deveui
self.port = 0
def send(self, msg):
self.controller.send(self.id, self.port+1, msg)
self.port = (self.port + 1) % 15
lora_controller = LoraController()
lora_controller.new(0xbdea85badeedf1)