diff --git a/app.py b/app.py index 54eeaf3..fe678fc 100644 --- a/app.py +++ b/app.py @@ -722,6 +722,7 @@ def inbox(): data = remote_data activity = ap.parse_activity(data) logger.debug(f"inbox activity={g.request_id}/{activity}/{data}") + post_to_inbox(activity) return Response(status=201) diff --git a/blueprints/tasks.py b/blueprints/tasks.py index 7b3447d..4ffff69 100644 --- a/blueprints/tasks.py +++ b/blueprints/tasks.py @@ -7,6 +7,7 @@ import flask import requests from flask import current_app as app from little_boxes import activitypub as ap +from little_boxes.activitypub import _to_list from little_boxes.errors import ActivityGoneError from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import NotAnActivityError @@ -21,10 +22,13 @@ from core.activitypub import Box from core.activitypub import _actor_hash from core.activitypub import _add_answers_to_question from core.activitypub import post_to_outbox +from core.activitypub import save_reply from core.activitypub import update_cached_actor +from core.db import find_one_activity from core.db import update_one_activity from core.inbox import process_inbox from core.meta import MetaKey +from core.meta import by_object_id from core.meta import by_remote_id from core.meta import flag from core.meta import upsert @@ -310,9 +314,10 @@ def task_cache_actor() -> _Response: if not activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): return "" - if activity.get_object()._data.get( - "attachment", [] - ) or activity.get_object().has_type(ap.ActivityType.VIDEO): + if activity.has_type(ap.ActivityType.CREATE) and ( + activity.get_object()._data.get("attachment", []) + or activity.get_object().has_type(ap.ActivityType.VIDEO) + ): Tasks.cache_attachments(iri) except (ActivityGoneError, ActivityNotFoundError): @@ -478,6 +483,79 @@ def task_cleanup() -> _Response: return "" +def _is_local_reply(activity: ap.BaseActivity) -> bool: + for dest in _to_list(activity.to or []): + if dest.startswith(config.BASE_URL): + return True + + for dest in _to_list(activity.cc or []): + if dest.startswith(config.BASE_URL): + return True + + return False + + +@blueprint.route("/task/process_reply", methods=["POST"]) +def task_process_reply() -> _Response: + """Process `Announce`d posts from Pleroma relays in order to process replies of activities that are in the inbox.""" + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"checking for reply activity={activity!r}") + + # Some AP server always return Create when requesting an object + if activity.has_type(ap.ActivityType.CREATE): + activity = activity.get_object() + + in_reply_to = activity.get_in_reply_to() + if not in_reply_to: + # If it's not reply, we can drop it + app.logger.info(f"activity={activity!r} is not a reply, dropping it") + return "" + + # new_threads = [] + root_reply = in_reply_to + reply = ap.fetch_remote_activity(root_reply) + if reply.has_type(ap.ActivityType.CREATE): + reply = reply.get_object() + + while reply is not None: + in_reply_to = reply.get_in_reply_to() + if not in_reply_to: + break + root_reply = in_reply_to + reply = ap.fetch_remote_activity(root_reply) + if reply.has_type(ap.ActivityType.CREATE): + reply = reply.get_object() + + app.logger.info(f"root_reply={reply!r} for activity={activity!r}") + + # Ensure the "root reply" is present in the inbox/outbox + if not find_one_activity(by_object_id(root_reply)): + return "" + + actor = activity.get_actor() + + save_reply( + activity, + { + "meta.thread_root_parent": root_reply, + **flag(MetaKey.ACTOR, actor.to_dict(embed=True)), + }, + ) + # FIXME(tsileo): cache actor here, spawn a task to cache attachment if needed + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"dropping activity {iri}, skip processing") + return "" + except Exception as err: + app.logger.exception(f"failed to process new activity {iri}") + raise TaskError() from err + + return "" + + @blueprint.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 def task_process_new_activity() -> _Response: """Process an activity received in the inbox""" diff --git a/core/activitypub.py b/core/activitypub.py index 196711c..ed3f4cb 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -12,7 +12,6 @@ from urllib.parse import urljoin from urllib.parse import urlparse from bson.objectid import ObjectId -from cachetools import LRUCache from flask import url_for from little_boxes import activitypub as ap from little_boxes import strtobool @@ -43,7 +42,6 @@ _NewMeta = Dict[str, Any] SIG_AUTH = HTTPSigAuth(KEY) -ACTORS_CACHE = LRUCache(maxsize=256) MY_PERSON = ap.Person(**ME) @@ -157,6 +155,16 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: ) return + # If the message is coming from a Pleroma relay, we process it as a possible reply for a stream activity + if ( + actor.has_type(ap.ActivityType.APPLICATION) + and actor.id.endswith("/relay") + and activity.has_type(ap.ActivityType.ANNOUNCE) + and not DB.replies.find_one({"remote_id": activity.id}) + ): + Tasks.process_reply(activity.get_object_id()) + return + if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}): # The activity is already in the inbox logger.info(f"received duplicate activity {activity!r}, dropping it") @@ -170,6 +178,30 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: Tasks.finish_post_to_inbox(activity.id) +def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None: + visibility = ap.get_visibility(activity) + is_public = False + if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: + is_public = True + + DB.replies.insert_one( + { + "activity": activity.to_dict(), + "type": _to_list(activity.type), + "remote_id": activity.id, + "meta": { + "undo": False, + "deleted": False, + "public": is_public, + "server": urlparse(activity.id).netloc, + "visibility": visibility.name, + "actor_id": activity.get_actor().id, + **meta, + }, + } + ) + + def post_to_outbox(activity: ap.BaseActivity) -> str: if activity.has_type(ap.CREATE_TYPES): activity = activity.build_create() diff --git a/core/meta.py b/core/meta.py index 683265e..7e9e762 100644 --- a/core/meta.py +++ b/core/meta.py @@ -98,4 +98,4 @@ def upsert(data: Dict[MetaKey, Any]) -> _SubQuery: def published_after(dt: datetime) -> _SubQuery: - return flag(MetaKey.PUBLISHED, {"gt": ap.format(dt)}) + return flag(MetaKey.PUBLISHED, {"gt": ap.format_datetime(dt)}) diff --git a/core/notifications.py b/core/notifications.py index 56a50e9..614b090 100644 --- a/core/notifications.py +++ b/core/notifications.py @@ -140,16 +140,16 @@ def _announce_set_inbox_flags(activity: ap.Announce, new_meta: _NewMeta) -> None _set_flag(new_meta, MetaKey.GC_KEEP) # Dedup boosts (it's annoying to see the same note multipe times on the same page) - # if not find_one_activity( - # { - # **in_inbox(), - # **by_object_id(obj.id), - # **flag(MetaKey.STREAM, True), - # **published_after(datetime.now(timezone.utc) - timedelta(hours=12)), - # } - # ): - # Display it in the stream only it not there already (only looking at the last 12 hours) - _set_flag(new_meta, MetaKey.STREAM) + if not find_one_activity( + { + **in_inbox(), + **by_object_id(obj.id), + **flag(MetaKey.STREAM, True), + **published_after(datetime.now(timezone.utc) - timedelta(hours=12)), + } + ): + # Display it in the stream only it not there already (only looking at the last 12 hours) + _set_flag(new_meta, MetaKey.STREAM) return None diff --git a/core/tasks.py b/core/tasks.py index 131dc60..bb20554 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -44,6 +44,10 @@ class Tasks: def fetch_og_meta(iri: str) -> None: p.push(iri, "/task/fetch_og_meta") + @staticmethod + def process_reply(iri: str) -> None: + p.push(iri, "/task/process_reply") + @staticmethod def process_new_activity(iri: str) -> None: p.push(iri, "/task/process_new_activity") diff --git a/requirements.txt b/requirements.txt index ebdac15..6440d98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,5 @@ git+https://github.com/erikriver/opengraph.git git+https://github.com/tsileo/little-boxes.git@litepub pyyaml pillow -cachetools emoji-unicode html5lib