More poussetaches integrations

This commit is contained in:
Thomas Sileo 2019-04-05 21:36:56 +02:00
parent 8c3eedac7d
commit 5d8fa38d5e
4 changed files with 110 additions and 62 deletions

102
app.py
View file

@ -62,7 +62,7 @@ from werkzeug.utils import secure_filename
import activitypub
import config
import tasks
import tasks # noqa: here just for the migration # FIXME(tsileo): remove me
from activitypub import Box
from activitypub import embed_collection
from config import USER_AGENT
@ -2210,6 +2210,9 @@ def token_endpoint():
)
#################
# Feeds
@app.route("/feed.json")
def json_feed():
return Response(
@ -2234,22 +2237,48 @@ def rss_feed():
)
@app.route("/task/t1")
def task_t1():
p.push(
"https://mastodon.cloud/@iulius/101852467780804071/activity",
"/task/cache_object",
)
return "ok"
###########
# Tasks
class Tasks:
@staticmethod
def cache_object(iri: str) -> None:
p.push(iri, "/task/cache_object")
@app.route("/task/t2", methods=["POST"])
def task_t2():
print(request)
print(request.headers)
task = p.parse(request)
print(task)
return "yay"
@staticmethod
def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
p.push(
{"iri": iri, "also_cache_attachments": also_cache_attachments},
"/task/cache_actor",
)
@staticmethod
def post_to_remote_inbox(payload: str, recp: str) -> None:
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
@staticmethod
def forward_activity(iri: str) -> None:
p.push(iri, "/task/forward_activity")
@staticmethod
def fetch_og_meta(iri: str) -> None:
p.push(iri, "/task/fetch_og_meta")
@staticmethod
def process_new_activity(iri: str) -> None:
p.push(iri, "/task/process_new_activity")
@staticmethod
def cache_attachments(iri: str) -> None:
p.push(iri, "/task/cache_attachments")
@staticmethod
def finish_post_to_inbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_inbox")
@staticmethod
def finish_post_to_outbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_outbox")
@app.route("/task/fetch_og_meta", methods=["POST"])
@ -2321,48 +2350,6 @@ def task_cache_object():
abort(500)
return ""
class Tasks:
@staticmethod
def cache_object(iri: str) -> None:
p.push(iri, "/task/cache_object")
@staticmethod
def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
p.push(
{"iri": iri, "also_cache_attachments": also_cache_attachments},
"/task/cache_actor",
)
@staticmethod
def post_to_remote_inbox(payload: str, recp: str) -> None:
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
@staticmethod
def forward_activity(iri: str) -> None:
p.push(iri, "/task/forward_activity")
@staticmethod
def fetch_og_meta(iri: str) -> None:
p.push(iri, "/task/fetch_og_meta")
@staticmethod
def process_new_activity(iri: str) -> None:
p.push(iri, "/task/process_new_activity")
@staticmethod
def cache_attachments(iri: str) -> None:
p.push(iri, "/task/cache_attachments")
@staticmethod
def finish_post_to_inbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_inbox")
@staticmethod
def finish_post_to_outbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_outbox")
@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
def task_finish_post_to_outbox():
task = p.parse(request)
@ -2748,6 +2735,7 @@ def task_forward_activity():
@app.route("/task/post_to_remote_inbox", methods=["POST"])
def task_post_to_remote_inbox():
"""Post an activity to a remote inbox."""
task = p.parse(request)
app.logger.info(f"task={task!r}")
payload, to = task.payload["payload"], task.payload["to"]

View file

@ -33,7 +33,6 @@ services:
- RABBITMQ_NODENAME=rabbit@my-rabbit
poussetaches:
image: "poussetaches:latest"
ports:
- '7991'
environment:
- POUSSETACHES_AUTH_KEY=123

View file

@ -14,7 +14,7 @@ services:
environment:
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
- POUSSETACHES_AUTH_KEY=123
- POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY}
celery:
image: 'microblogpub:latest'
links:
@ -40,7 +40,7 @@ services:
- "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq"
poussetaches:
image: "poussetaches:latest"
ports:
- '7991'
volumes:
- "${DATA_DIR}/poussetaches:/app/poussetaches/poussetaches_data"
environment:
- POUSSETACHES_AUTH_KEY=123
- POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY}

View file

@ -1,10 +1,13 @@
import base64
import json
import os
from typing import Dict
from typing import Any
from typing import List
from dataclasses import dataclass
import flask
import requests
from datetime import datetime
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@ -17,6 +20,18 @@ class Task:
payload: Any
@dataclass
class GetTask:
payload: Any
expected: int
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
@ -46,3 +61,49 @@ class PousseTaches:
payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9))
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
))
return out
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")