Odjezdova-tabule-MHD/server/lora.py

118 lines
3.1 KiB
Python

import requests
from requests.structures import CaseInsensitiveDict
import json
import threading
from operator import itemgetter
from base64 import b64encode
from datetime import datetime
from random import randint, shuffle
from time import sleep
from departures import Departure, Filter
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class LoraController:
def __init__(self, main):
self.main = main
self.devices = []
self.token = ""
def generate_token(self):
url = "https://lora.plzen.eu/api/v2/login"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
data = {
"email": self.main.config["lora_api"]["username"],
"password": self.main.config["lora_api"]["password"]
}
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
self.token = data["token"]
def new(self, id: int, stop_id: str, enabled: bool, filter: Filter):
self.devices.append(LoraDevice(self, id, stop_id, enabled, filter))
def json(self):
resp = []
for d in self.devices:
resp.append({'id': f"{d.id:0>16x}", 'stop_id': d.stop_id, 'enabled': d.enabled, 'filter': d.filter.name if d.filter else None})
return resp
class LoraDevice:
def __init__(self, controller: LoraController, deveui: int, stop_id: str, enabled: bool, filter: Filter):
self.controller = controller
self.id = deveui
self.stop_id = stop_id
self.enabled = enabled
self.filter = filter
self.message_pool = []
self.thread = None
self.port = 1
self.clear()
def set_stop(self, stop_id: str):
self.clear()
self.stop_id = stop_id
def set_filter(self, name: str):
self.filter = Filter.get(name)
def get_updated_departures(self):
if not self.enabled:
return
self.send_time()
departures = Departure.get(self.stop_id)
if self.filter:
departures = self.filter.filter(departures)
for d in departures:
if self.id not in d.updated or d.updated[self.id] > 0:
self.send(d.data)
def clear(self):
self.send(lambda id: "CLEAR")
def toggle(self, state: bool):
self.enabled = state
if not state:
self.message_pool = []
def send_time(self):
self.send(lambda id: f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}")
def send(self, msg):
self.message_pool.append(msg)
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(target=self.thread_sending)
self.thread.start()
def thread_sending(self):
while not self.controller.main.ended:
if len(self.message_pool) == 0:
break
message = self.message_pool.pop(0)(self.id)
if not message:
continue
url = f"https://lora.plzen.eu/api/v2/nodes/{self.id:0>16x}/queue"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = f"Bearer {self.controller.token}"
payload = {
"confirmed": True,
"data": b64encode(message.encode("utf-8")).decode("ascii"),
"fPort": self.port,
"reference": "string"
}
print(f"{self.id:0>16x} > {message}")
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
sleep(.5)
self.port += 1
if self.port > 3:
self.port = 1
sleep(5)