Odjezdova-tabule-MHD/server/main.py

119 lines
2.6 KiB
Python

from departures import Departure, Filter
from requests.structures import CaseInsensitiveDict
import json
import requests
from time import sleep
from lora import LoraController
import threading
from api import API
from pathlib import Path
import yaml
import sys
class Main:
def __init__(self):
config = Path("config.yml")
try:
self.config = yaml.safe_load(config.read_text(encoding="utf-8-sig"))
except:
print("Invalid config file!")
sys.exit(-1)
self.ended = False
lora_controller = LoraController(self)
self.controller = lora_controller
self.api = API(self)
lora_controller.generate_token()
for name in self.config["filters"]:
Filter(name, self.config["filters"][name])
for d in self.config["devices"]:
if "stop_id" in d:
if "enabled" not in d:
d["enabled"] = True
if "filter" not in d:
d["filter"] = None
lora_controller.new(d["id"], str(d["stop_id"]), d["enabled"], Filter.get(d["filter"]))
self.thread = threading.Thread(target=self.update_loop)
self.thread.start()
def fetch(self, stop_id: str, limit: int):
url = "https://jizdnirady.pmdp.cz/odjezdy/vyhledat"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
data = {
"Stop": {
"StopId": stop_id
},
"DateAndTime": None,
"MaxResults": limit,
"FullResults": False
}
resp = requests.post(url, headers=headers, data=json.dumps(data), timeout=1000).json()
for c in resp:
did = f"{stop_id}|{abs(c['ConnectionId']['ConnectionId'])}"
if did not in Departure.storage:
Departure(
did,
stop_id,
c["Line"]["TractionType"],
c["Line"]["Name"],
c["LastStopName"],
c["DepartureTime"],
0 if not c["DelayMin"] else c["DelayMin"]
)
else:
Departure.storage[did].update(
0 if not c["DelayMin"] else c["DelayMin"]
)
def update_loop(self):
refetch = 0
regenerate = 1
while True:
if self.ended:
break
if regenerate == 0:
self.controller.generate_token()
if refetch == 0:
for s in self.config["stops"]:
self.fetch(f"{s}", 15)
for d in self.controller.devices:
d.get_updated_departures()
refetch = (refetch + 1) % 3
regenerate = (regenerate + 1) % (6*30)
sleep(.1)
Departure.clear()
for _ in range(10):
sleep(1)
if self.ended:
break
def stop(self):
print("Stopping...")
self.api.server.stop()
self.ended = True
self.controller.message_pool = []
def input_loop(self):
try:
while not self.ended:
command = input()
if command in ["s", "stop"]:
self.stop()
except KeyboardInterrupt:
self.stop()
main = Main()
main.input_loop()