microblog.pub/poussetaches.py

134 lines
3.4 KiB
Python
Raw Normal View History

2019-04-05 04:35:48 -05:00
import base64
import json
import os
2019-04-22 02:58:11 -05:00
from datetime import datetime
2019-04-05 04:35:48 -05:00
from typing import Any
2019-04-22 02:58:11 -05:00
from typing import Dict
2019-04-05 14:36:56 -05:00
from typing import List
2019-04-22 02:58:11 -05:00
2019-04-05 04:35:48 -05:00
import flask
import requests
2019-04-22 02:58:11 -05:00
from dataclasses import dataclass
2019-04-05 04:35:48 -05:00
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@dataclass
class Task:
req_id: str
tries: int
payload: Any
2019-04-05 14:36:56 -05:00
@dataclass
class GetTask:
payload: Any
expected: int
2019-04-08 10:24:50 -05:00
schedule: str
2019-04-05 14:36:56 -05:00
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
2019-04-05 04:35:48 -05:00
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
self.base_url = base_url
def push(
2019-04-15 15:37:09 -05:00
self,
payload: Any,
path: str,
expected: int = 200,
schedule: str = "",
delay: int = 0,
) -> str:
2019-04-05 04:35:48 -05:00
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
json={
"url": self.base_url + path,
"payload": p,
"expected": expected,
"schedule": schedule,
"delay": delay,
},
2019-04-05 04:35:48 -05:00
)
resp.raise_for_status()
2019-04-05 04:42:14 -05:00
return resp.headers["Poussetaches-Task-ID"]
2019-04-05 04:35:48 -05:00
def parse(self, req: flask.Request) -> Task:
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
raise ValueError("Bad auth key")
# Parse the "envelope"
envelope = json.loads(req.data)
print(req)
print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(
req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
) # type: ignore
2019-04-05 14:36:56 -05:00
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
2019-04-07 05:40:33 -05:00
t["last_run"] = datetime.fromtimestamp(float(t["last_run"] / 1e9))
2019-04-05 14:36:56 -05:00
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(
GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
2019-04-08 10:30:24 -05:00
schedule=t["schedule"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
)
)
2019-04-05 14:36:56 -05:00
return out
def get_cron(self) -> List[GetTask]:
return self._get("cron")
2019-04-05 14:36:56 -05:00
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")