diff --git a/Makefile b/Makefile index 1cd4e26..daad832 100644 --- a/Makefile +++ b/Makefile @@ -18,9 +18,6 @@ reload-dev: # docker build . -t microblogpub:latest docker-compose -f docker-compose-dev.yml up -d --force-recreate -update-poussetaches: - git clone https://github.com/tsileo/poussetaches.git tmp_poussetaches && cd tmp_poussetaches && docker build . -t poussetaches:latest && cd - && rm -rf tmp_poussetaches - update: git pull docker build . -t microblogpub:latest diff --git a/activitypub.py b/activitypub.py index 20b5f4c..9185745 100644 --- a/activitypub.py +++ b/activitypub.py @@ -1,5 +1,6 @@ import logging import os +import json from datetime import datetime from enum import Enum from typing import Any @@ -78,52 +79,6 @@ class Box(Enum): REPLIES = "replies" -def save(box: Box, activity: ap.BaseActivity) -> None: - """Custom helper for saving an activity to the DB.""" - DB.activities.insert_one( - { - "box": box.value, - "activity": activity.to_dict(), - "type": _to_list(activity.type), - "remote_id": activity.id, - "meta": {"undo": False, "deleted": False}, - } - ) - - -def followers() -> List[str]: - q = { - "box": Box.INBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - return [doc["activity"]["actor"] for doc in DB.activities.find(q)] - - -def following() -> List[str]: - q = { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - return [doc["activity"]["object"] for doc in DB.activities.find(q)] - - -def followers_as_recipients() -> List[str]: - q = { - "box": Box.INBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - recipients = [] - for doc in DB.activities.find(q): - recipients.append( - doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"] - ) - - return list(set(recipients)) - - class MicroblogPubBackend(Backend): """Implements a Little Boxes backend, backed by MongoDB.""" @@ -149,18 +104,73 @@ class MicroblogPubBackend(Backend): """URL for activity link.""" return f"{BASE_URL}/note/{obj_id}" + def save(self, box: Box, activity: ap.BaseActivity) -> None: + """Custom helper for saving an activity to the DB.""" + DB.activities.insert_one( + { + "box": box.value, + "activity": activity.to_dict(), + "type": _to_list(activity.type), + "remote_id": activity.id, + "meta": {"undo": False, "deleted": False}, + } + ) + + def followers(self) -> List[str]: + q = { + "box": Box.INBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + return [doc["activity"]["actor"] for doc in DB.activities.find(q)] + + def followers_as_recipients(self) -> List[str]: + q = { + "box": Box.INBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + recipients = [] + for doc in DB.activities.find(q): + recipients.append( + doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"] + ) + + return list(set(recipients)) + + def following(self) -> List[str]: + q = { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + return [doc["activity"]["object"] for doc in DB.activities.find(q)] + def parse_collection( self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None ) -> List[str]: """Resolve/fetch a `Collection`/`OrderedCollection`.""" # Resolve internal collections via MongoDB directly if url == ID + "/followers": - return followers() + return self.followers() elif url == ID + "/following": - return following() + return self.following() return super().parse_collection(payload, url) + @ensure_it_is_me + def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: + return bool( + DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.BLOCK.value, + "activity.object": actor_id, + "meta.undo": False, + } + ) + ) + def _fetch_iri(self, iri: str) -> ap.ObjectType: if iri == ME["id"]: return ME @@ -219,6 +229,13 @@ class MicroblogPubBackend(Backend): return data + @ensure_it_is_me + def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: + return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri})) + + def set_post_to_remote_inbox(self, cb): + self.post_to_remote_inbox_cb = cb + @ensure_it_is_me def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: DB.activities.update_one( @@ -454,7 +471,7 @@ class MicroblogPubBackend(Backend): ) if not creply: # It means the activity is not in the inbox, and not in the outbox, we want to save it - save(Box.REPLIES, reply) + self.save(Box.REPLIES, reply) new_threads.append(reply.id) while reply is not None: @@ -465,7 +482,7 @@ class MicroblogPubBackend(Backend): reply = ap.fetch_remote_activity(root_reply) q = {"activity.object.id": root_reply} if not DB.activities.count(q): - save(Box.REPLIES, reply) + self.save(Box.REPLIES, reply) new_threads.append(reply.id) DB.activities.update_one( @@ -476,6 +493,25 @@ class MicroblogPubBackend(Backend): {"$set": {"meta.thread_root_parent": root_reply}}, ) + def post_to_outbox(self, activity: ap.BaseActivity) -> None: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + self.save(Box.OUTBOX, activity) + + # Assign create a random ID + obj_id = self.random_object_id() + activity.set_id(self.activity_url(obj_id), obj_id) + + recipients = activity.recipients() + logger.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + payload = json.dumps(activity) + for recp in recipients: + logger.debug(f"posting to {recp}") + self.post_to_remote_inbox(self.get_actor(), payload, recp) + def gen_feed(): fg = FeedGenerator() diff --git a/app.py b/app.py index a2b9cfc..00580bc 100644 --- a/app.py +++ b/app.py @@ -66,8 +66,6 @@ import config import tasks # noqa: here just for the migration # FIXME(tsileo): remove me from activitypub import Box from activitypub import embed_collection -from activitypub import save -from activitypub import followers_as_recipients from config import USER_AGENT from config import ADMIN_API_KEY from config import BASE_URL @@ -1645,7 +1643,7 @@ def inbox(): data["object"] ): logger.info(f"received a Delete for an actor {data!r}") - if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": data["id"]}): + if get_backend().inbox_check_duplicate(MY_PERSON, data["id"]): # The activity is already in the inbox logger.info(f"received duplicate activity {data!r}, dropping it") @@ -2297,9 +2295,7 @@ def task_finish_post_to_outbox(): elif obj.has_type(ap.ActivityType.ANNOUNCE): back.outbox_undo_announce(MY_PERSON, obj) elif obj.has_type(ap.ActivityType.FOLLOW): - DB.activities.update_one( - {"remote_id": obj.id}, {"$set": {"meta.undo": True}} - ) + back.undo_new_following(MY_PERSON, obj) app.logger.info(f"recipients={recipients}") activity = ap.clean_activity(activity.to_dict()) @@ -2349,9 +2345,7 @@ def task_finish_post_to_inbox(): elif obj.has_type(ap.ActivityType.ANNOUNCE): back.inbox_undo_announce(MY_PERSON, obj) elif obj.has_type(ap.ActivityType.FOLLOW): - DB.activities.update_one( - {"remote_id": obj.id}, {"$set": {"meta.undo": True}} - ) + back.undo_new_follower(MY_PERSON, obj) try: invalidate_cache(activity) except Exception: @@ -2373,7 +2367,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: obj_id = back.random_object_id() activity.set_id(back.activity_url(obj_id), obj_id) - save(Box.OUTBOX, activity) + back.save(Box.OUTBOX, activity) Tasks.cache_actor(activity.id) Tasks.finish_post_to_outbox(activity.id) return activity.id @@ -2382,14 +2376,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: def post_to_inbox(activity: ap.BaseActivity) -> None: # Check for Block activity actor = activity.get_actor() - if DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.BLOCK.value, - "activity.object": actor.id, - "meta.undo": False, - } - ): + if back.outbox_is_blocked(MY_PERSON, actor.id): app.logger.info( f"actor {actor!r} is blocked, dropping the received activity {activity!r}" ) @@ -2399,7 +2386,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: # The activity is already in the inbox app.logger.info(f"received duplicate activity {activity!r}, dropping it") - save(Box.INBOX, activity) + back.save(Box.INBOX, activity) Tasks.process_new_activity(activity.id) app.logger.info(f"spawning task for {activity!r}") @@ -2667,7 +2654,7 @@ def task_forward_activity(): iri = task.payload try: activity = ap.fetch_remote_activity(iri) - recipients = followers_as_recipients() + recipients = back.followers_as_recipients() app.logger.debug(f"Forwarding {activity!r} to {recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity) diff --git a/tasks.py b/tasks.py index 519dd48..f34d217 100644 --- a/tasks.py +++ b/tasks.py @@ -312,7 +312,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: # The activity is already in the inbox log.info(f"received duplicate activity {activity!r}, dropping it") - activitypub.save(Box.INBOX, activity) + back.save(Box.INBOX, activity) process_new_activity.delay(activity.id) log.info(f"spawning task for {activity!r}") @@ -387,7 +387,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: obj_id = back.random_object_id() activity.set_id(back.activity_url(obj_id), obj_id) - activitypub.save(Box.OUTBOX, activity) + back.save(Box.OUTBOX, activity) cache_actor.delay(activity.id) finish_post_to_outbox.delay(activity.id) return activity.id @@ -440,7 +440,7 @@ def finish_post_to_outbox(self, iri: str) -> None: def forward_activity(self, iri: str) -> None: try: activity = ap.fetch_remote_activity(iri) - recipients = activitypub.followers_as_recipients() + recipients = back.followers_as_recipients() log.debug(f"Forwarding {activity!r} to {recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity)