From 55ff15ff86340d291af00128177b90734bef0de2 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 29 Jul 2018 16:07:27 +0200 Subject: [PATCH] Move more DB stuff to celery --- activitypub.py | 40 +++++++++--------- app.py | 46 +++++++++------------ tasks.py | 108 ++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 145 insertions(+), 49 deletions(-) diff --git a/activitypub.py b/activitypub.py index a5fff31..470dd84 100644 --- a/activitypub.py +++ b/activitypub.py @@ -1,5 +1,6 @@ import logging import os +import json from datetime import datetime from enum import Enum from typing import Any @@ -120,10 +121,6 @@ class MicroblogPubBackend(Backend): def set_save_cb(self, cb): self.save_cb = cb - @ensure_it_is_me - def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: - self.save(Box.OUTBOX, activity) - def followers(self) -> List[str]: q = { "box": Box.INBOX.value, @@ -241,21 +238,9 @@ class MicroblogPubBackend(Backend): def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri})) - @ensure_it_is_me - def inbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: - self.save(Box.INBOX, activity) - def set_post_to_remote_inbox(self, cb): self.post_to_remote_inbox_cb = cb - @ensure_it_is_me - def post_to_remote_inbox(self, as_actor: ap.Person, payload: str, to: str) -> None: - self.post_to_remote_inbox_cb(payload, to) - - @ensure_it_is_me - def new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: - pass - @ensure_it_is_me def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: DB.activities.update_one( @@ -268,10 +253,6 @@ class MicroblogPubBackend(Backend): {"remote_id": follow.id}, {"$set": {"meta.undo": True}} ) - @ensure_it_is_me - def new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: - pass - @ensure_it_is_me def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() @@ -527,6 +508,25 @@ class MicroblogPubBackend(Backend): {"$set": {"meta.thread_root_parent": root_reply}}, ) + def post_to_outbox(self, activity: ap.BaseActivity) -> None: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + self.save(Box.OUTBOX, activity) + + # Assign create a random ID + obj_id = self.random_object_id() + activity.set_id(self.activity_url(obj_id), obj_id) + + recipients = activity.recipients() + logger.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + payload = json.dumps(activity) + for recp in recipients: + logger.debug(f"posting to {recp}") + self.post_to_remote_inbox(self.get_actor(), payload, recp) + def gen_feed(): fg = FeedGenerator() diff --git a/app.py b/app.py index a2fbd3b..af78472 100644 --- a/app.py +++ b/app.py @@ -74,7 +74,6 @@ from config import PASS from config import USERNAME from config import VERSION from config import _drop_db -from config import custom_cache_purge_hook from utils.key import get_secret_key from utils.lookup import lookup from utils.media import Kind @@ -91,9 +90,6 @@ def save_cb(box: Box, iri: str) -> None: back.set_save_cb(save_cb) - -back.set_post_to_remote_inbox(tasks.post_to_inbox.delay) - ap.use_backend(back) MY_PERSON = ap.Person(**ME) @@ -117,9 +113,6 @@ else: SIG_AUTH = HTTPSigAuth(KEY) -OUTBOX = ap.Outbox(MY_PERSON) -INBOX = ap.Inbox(MY_PERSON) - def verify_pass(pwd): return bcrypt.verify(pwd, PASS) @@ -615,7 +608,7 @@ def authorize_follow(): return redirect("/following") follow = ap.Follow(actor=MY_PERSON.id, object=actor) - OUTBOX.post(follow) + tasks.post_to_outbox(follow) return redirect("/following") @@ -1121,12 +1114,9 @@ def outbox(): data = request.get_json(force=True) print(data) activity = ap.parse_activity(data) - OUTBOX.post(activity) + activity_id = tasks.post_to_outbox(activity) - # Purge the cache if a custom hook is set, as new content was published - custom_cache_purge_hook() - - return Response(status=201, headers={"Location": activity.id}) + return Response(status=201, headers={"Location": activity_id}) @app.route("/outbox/") @@ -1465,9 +1455,9 @@ def api_delete(): note = _user_api_get_note(from_outbox=True) delete = note.build_delete() - OUTBOX.post(delete) + delete_id = tasks.post_to_outbox(delete) - return _user_api_response(activity=delete.id) + return _user_api_response(activity=delete_id) @app.route("/api/boost", methods=["POST"]) @@ -1476,9 +1466,9 @@ def api_boost(): note = _user_api_get_note() announce = note.build_announce(MY_PERSON) - OUTBOX.post(announce) + announce_id = tasks.post_to_outbox(announce) - return _user_api_response(activity=announce.id) + return _user_api_response(activity=announce_id) @app.route("/api/like", methods=["POST"]) @@ -1487,9 +1477,9 @@ def api_like(): note = _user_api_get_note() like = note.build_like(MY_PERSON) - OUTBOX.post(like) + like_id = tasks.post_to_outbox(like) - return _user_api_response(activity=like.id) + return _user_api_response(activity=like_id) @app.route("/api/note/pin", methods=["POST"]) @@ -1534,9 +1524,9 @@ def api_undo(): obj = ap.parse_activity(doc.get("activity")) # FIXME(tsileo): detect already undo-ed and make this API call idempotent undo = obj.build_undo() - OUTBOX.post(undo) + undo_id = tasks.post_to_outbox(undo) - return _user_api_response(activity=undo.id) + return _user_api_response(activity=undo_id) @app.route("/admin/stream") @@ -1605,7 +1595,7 @@ def inbox(): ) activity = ap.parse_activity(data) logger.debug(f"inbox activity={activity}/{data}") - INBOX.post(activity) + tasks.post_to_inbox(activity) return Response(status=201) @@ -1691,9 +1681,9 @@ def api_new_note(): note = ap.Note(**raw_note) create = note.build_create() - OUTBOX.post(create) + create_id = tasks.post_to_outbox(create) - return _user_api_response(activity=create.id) + return _user_api_response(activity=create_id) @app.route("/api/stream") @@ -1724,9 +1714,9 @@ def api_block(): return _user_api_response(activity=existing["activity"]["id"]) block = ap.Block(actor=MY_PERSON.id, object=actor) - OUTBOX.post(block) + block_id = tasks.post_to_outbox(block) - return _user_api_response(activity=block.id) + return _user_api_response(activity=block_id) @app.route("/api/follow", methods=["POST"]) @@ -1746,9 +1736,9 @@ def api_follow(): return _user_api_response(activity=existing["activity"]["id"]) follow = ap.Follow(actor=MY_PERSON.id, object=actor) - OUTBOX.post(follow) + follow_id = tasks.post_to_outbox(follow) - return _user_api_response(activity=follow.id) + return _user_api_response(activity=follow_id) @app.route("/followers") diff --git a/tasks.py b/tasks.py index 5bcce31..6cad65b 100644 --- a/tasks.py +++ b/tasks.py @@ -14,8 +14,10 @@ from little_boxes.linked_data_sig import generate_signature from requests.exceptions import HTTPError import activitypub +from activitypub import Box from config import DB from config import HEADERS +from config import ME from config import ID from config import KEY from config import MEDIA_CACHE @@ -33,6 +35,8 @@ SigAuth = HTTPSigAuth(KEY) back = activitypub.MicroblogPubBackend() ap.use_backend(back) +MY_PERSON = ap.Person(**ME) + @app.task(bind=True, max_retries=12) # noqa: C901 def process_new_activity(self, iri: str) -> None: @@ -253,8 +257,110 @@ def cache_attachments(self, iri: str) -> None: self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries)) +def post_to_inbox(activity: ap.BaseActivity) -> None: + # Check for Block activity + actor = activity.get_actor() + if back.outbox_is_blocked(MY_PERSON, actor.id): + log.info( + f"actor {actor!r} is blocked, dropping the received activity {activity!r}" + ) + return + + if back.inbox_check_duplicate(MY_PERSON, activity.id): + # The activity is already in the inbox + log.info(f"received duplicate activity {activity!r}, dropping it") + + back.save(Box.INBOX, activity) + finish_post_to_inbox.delay(activity.id) + + +@app.task(bind=True, max_retries=12) # noqa: C901 +def finish_post_to_inbox(self, iri: str) -> None: + try: + activity = ap.fetch_remote_activity(iri) + log.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.DELETE): + back.inbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.inbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.inbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.inbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.FOLLOW): + # Reply to a Follow with an Accept + accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True)) + post_to_outbox(accept) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.inbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_follower(MY_PERSON, obj) + + except Exception as err: + log.exception(f"failed to cache attachments for {iri}") + self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries)) + + +def post_to_outbox(activity: ap.BaseActivity) -> str: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + # Assign create a random ID + obj_id = back.random_object_id() + activity.set_id(back.activity_url(obj_id), obj_id) + + back.save(Box.OUTBOX, activity) + finish_post_to_outbox.delay(activity.id) + return activity.id + + +@app.task(bind=True, max_retries=12) # noqa:C901 +def finish_post_to_outbox(self, iri: str) -> None: + try: + activity = ap.fetch_remote_activity(iri) + log.info(f"activity={activity!r}") + + if activity.has_type(ap.activitytype.delete): + back.outbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.outbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.outbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.outbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.outbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_following(MY_PERSON, obj) + + recipients = activity.recipients() + log.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + payload = json.dumps(activity) + for recp in recipients: + log.debug(f"posting to {recp}") + post_to_remote_inbox.delay(payload, recp) + except Exception as err: + log.exception(f"failed to cache attachments for {iri}") + self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries)) + + @app.task(bind=True, max_retries=12) -def post_to_inbox(self, payload: str, to: str) -> None: +def post_to_remote_inbox(self, payload: str, to: str) -> None: try: log.info("payload=%s", payload) log.info("generating sig")