microblog.pub/poussetaches.py
2019-04-07 11:01:46 +02:00

109 lines
3 KiB
Python

import base64
import json
import os
from typing import Dict
from typing import Any
from typing import List
from dataclasses import dataclass
import flask
import requests
from datetime import datetime
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@dataclass
class Task:
req_id: str
tries: int
payload: Any
@dataclass
class GetTask:
payload: Any
expected: int
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
self.base_url = base_url
def push(self, payload: Any, path: str, expected=200) -> str:
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
json={"url": self.base_url + path, "payload": p, "expected": expected},
)
resp.raise_for_status()
return resp.headers["Poussetaches-Task-ID"]
def parse(self, req: flask.Request) -> Task:
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
raise ValueError("Bad auth key")
# Parse the "envelope"
envelope = json.loads(req.data)
print(req)
print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9))
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
))
return out
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")