microblog.pub/poussetaches.py

110 lines
3 KiB
Python
Raw Normal View History

2019-04-05 04:35:48 -05:00
import base64
import json
import os
2019-04-05 14:36:56 -05:00
from typing import Dict
2019-04-05 04:35:48 -05:00
from typing import Any
2019-04-05 14:36:56 -05:00
from typing import List
2019-04-05 04:35:48 -05:00
from dataclasses import dataclass
import flask
import requests
2019-04-05 14:36:56 -05:00
from datetime import datetime
2019-04-05 04:35:48 -05:00
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@dataclass
class Task:
req_id: str
tries: int
payload: Any
2019-04-05 14:36:56 -05:00
@dataclass
class GetTask:
payload: Any
expected: int
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
2019-04-05 04:35:48 -05:00
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
self.base_url = base_url
def push(self, payload: Any, path: str, expected=200) -> str:
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
json={"url": self.base_url + path, "payload": p, "expected": expected},
)
resp.raise_for_status()
2019-04-05 04:42:14 -05:00
return resp.headers["Poussetaches-Task-ID"]
2019-04-05 04:35:48 -05:00
def parse(self, req: flask.Request) -> Task:
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
raise ValueError("Bad auth key")
# Parse the "envelope"
envelope = json.loads(req.data)
print(req)
print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"]))
2019-04-05 13:03:49 -05:00
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore
2019-04-05 14:36:56 -05:00
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9))
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
))
return out
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")