118 lines
3.1 KiB
Python
118 lines
3.1 KiB
Python
import requests
|
|
from requests.structures import CaseInsensitiveDict
|
|
import json
|
|
import threading
|
|
from operator import itemgetter
|
|
from base64 import b64encode
|
|
from datetime import datetime
|
|
from random import randint, shuffle
|
|
from time import sleep
|
|
from departures import Departure, Filter
|
|
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class LoraController:
|
|
|
|
def __init__(self, main):
|
|
self.main = main
|
|
self.devices = []
|
|
self.token = ""
|
|
|
|
def generate_token(self):
|
|
|
|
url = "https://lora.plzen.eu/api/v2/login"
|
|
|
|
headers = CaseInsensitiveDict()
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
data = {
|
|
"email": self.main.config["lora_api"]["username"],
|
|
"password": self.main.config["lora_api"]["password"]
|
|
}
|
|
|
|
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
|
|
self.token = data["token"]
|
|
|
|
def new(self, id: int, stop_id: str, enabled: bool, filter: Filter):
|
|
self.devices.append(LoraDevice(self, id, stop_id, enabled, filter))
|
|
|
|
def json(self):
|
|
resp = []
|
|
for d in self.devices:
|
|
resp.append({'id': f"{d.id:0>16x}", 'stop_id': d.stop_id, 'enabled': d.enabled, 'filter': d.filter.name if d.filter else None})
|
|
return resp
|
|
|
|
class LoraDevice:
|
|
|
|
def __init__(self, controller: LoraController, deveui: int, stop_id: str, enabled: bool, filter: Filter):
|
|
self.controller = controller
|
|
self.id = deveui
|
|
self.stop_id = stop_id
|
|
self.enabled = enabled
|
|
self.filter = filter
|
|
self.message_pool = []
|
|
self.thread = None
|
|
self.port = 1
|
|
self.clear()
|
|
|
|
def set_stop(self, stop_id: str):
|
|
self.clear()
|
|
self.stop_id = stop_id
|
|
|
|
def set_filter(self, name: str):
|
|
self.filter = Filter.get(name)
|
|
|
|
def get_updated_departures(self):
|
|
if not self.enabled:
|
|
return
|
|
self.send_time()
|
|
departures = Departure.get(self.stop_id)
|
|
if self.filter:
|
|
departures = self.filter.filter(departures)
|
|
for d in departures:
|
|
if self.id not in d.updated or d.updated[self.id] > 0:
|
|
self.send(d.data)
|
|
|
|
def clear(self):
|
|
self.send(lambda id: "CLEAR")
|
|
|
|
def toggle(self, state: bool):
|
|
self.enabled = state
|
|
if not state:
|
|
self.message_pool = []
|
|
|
|
def send_time(self):
|
|
self.send(lambda id: f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}")
|
|
|
|
def send(self, msg):
|
|
self.message_pool.append(msg)
|
|
if not self.thread or not self.thread.is_alive():
|
|
self.thread = threading.Thread(target=self.thread_sending)
|
|
self.thread.start()
|
|
|
|
def thread_sending(self):
|
|
while not self.controller.main.ended:
|
|
if len(self.message_pool) == 0:
|
|
break
|
|
message = self.message_pool.pop(0)(self.id)
|
|
if not message:
|
|
continue
|
|
url = f"https://lora.plzen.eu/api/v2/nodes/{self.id:0>16x}/queue"
|
|
headers = CaseInsensitiveDict()
|
|
headers["Content-Type"] = "application/json"
|
|
headers["Authorization"] = f"Bearer {self.controller.token}"
|
|
payload = {
|
|
"confirmed": True,
|
|
"data": b64encode(message.encode("utf-8")).decode("ascii"),
|
|
"fPort": self.port,
|
|
"reference": "string"
|
|
}
|
|
print(f"{self.id:0>16x} > {message}")
|
|
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
|
|
sleep(.5)
|
|
self.port += 1
|
|
if self.port > 3:
|
|
self.port = 1
|
|
sleep(5)
|