From a49ba89a7e0a950a07982fb061bcb540dfb31e85 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 1 Sep 2019 10:26:25 +0200 Subject: [PATCH] Improve follower/following management --- .dockerignore | 2 ++ core/activitypub.py | 17 ++++++++++ core/inbox.py | 29 ++++++++++++++++ core/meta.py | 18 ++++++++++ core/migrations.py | 78 +++++++++++++++++++++++++++++++++++++++++-- core/notifications.py | 32 +++++++++--------- 6 files changed, 157 insertions(+), 19 deletions(-) diff --git a/.dockerignore b/.dockerignore index ccc0367..f24c558 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,5 @@ +.git/ __pycache__/ data/ +data2/ tests/ diff --git a/core/activitypub.py b/core/activitypub.py index 2f064f2..65e488c 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -33,6 +33,7 @@ from core.db import find_one_activity from core.db import update_many_activities from core.db import update_one_activity from core.meta import Box +from core.meta import FollowStatus from core.meta import MetaKey from core.meta import by_object_id from core.meta import by_remote_id @@ -52,6 +53,16 @@ SIG_AUTH = HTTPSigAuth(KEY) MY_PERSON = ap.Person(**ME) +_LOCAL_NETLOC = urlparse(BASE_URL).netloc + + +def is_from_outbox(activity: ap.BaseActivity) -> bool: + return activity.id.startswith(BASE_URL) + + +def is_local_url(url: str) -> bool: + return urlparse(url).netloc == _LOCAL_NETLOC + def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" @@ -116,6 +127,11 @@ def save(box: Box, activity: ap.BaseActivity) -> None: actor_id = activity.get_actor().id + # Set some "type"-related neta + extra = {} + if box == Box.OUTBOX and activity.has_type(ap.Follow): + extra[MetaKey.FOLLOW_STATUS.value] = FollowStatus.WAITING.value + DB.activities.insert_one( { "box": box.value, @@ -135,6 +151,7 @@ def save(box: Box, activity: ap.BaseActivity) -> None: MetaKey.PUBLISHED.value: activity.published if activity.published else now(), + **extra, }, } ) diff --git a/core/inbox.py b/core/inbox.py index df5740b..939f2f0 100644 --- a/core/inbox.py +++ b/core/inbox.py @@ -9,10 +9,12 @@ from little_boxes.errors import NotAnActivityError import config from core.activitypub import _answer_key from core.activitypub import handle_replies +from core.activitypub import is_from_outbox from core.activitypub import post_to_outbox from core.activitypub import update_cached_actor from core.db import DB from core.db import update_one_activity +from core.meta import FollowStatus from core.meta import MetaKey from core.meta import by_object_id from core.meta import by_remote_id @@ -174,6 +176,33 @@ def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None: post_to_outbox(accept) +def _update_follow_status(follow: ap.BaseActivity, status: FollowStatus) -> None: + if not follow.has_type(ap.Follow) or not is_from_outbox(follow): + _logger.warning( + "received an Accept/Reject from an unexpected activity: {follow!r}" + ) + return None + + update_one_activity( + by_remote_id(follow.id), upsert({MetaKey.FOLLOW_STATUS: status.value}) + ) + + +@process_inbox.register +def _accept_process_inbox(activity: ap.Accept, new_meta: _NewMeta) -> None: + _logger.info(f"process_inbox activity={activity!r}") + # Set a flag on the follow + follow = activity.get_object() + _update_follow_status(follow, FollowStatus.ACCEPTED) + + +@process_inbox.register +def _reject_process_inbox(activity: ap.Reject, new_meta: _NewMeta) -> None: + _logger.info(f"process_inbox activity={activity!r}") + follow = activity.get_object() + _update_follow_status(follow, FollowStatus.REJECTED) + + @process_inbox.register def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None: _logger.info(f"process_inbox activity={activity!r}") diff --git a/core/meta.py b/core/meta.py index f5dde02..5e5a750 100644 --- a/core/meta.py +++ b/core/meta.py @@ -18,6 +18,13 @@ class Box(Enum): REPLIES = "replies" +@unique +class FollowStatus(Enum): + WAITING = "waiting" + ACCEPTED = "accepted" + REJECTED = "rejected" + + @unique class MetaKey(Enum): NOTIFICATION = "notification" @@ -38,6 +45,9 @@ class MetaKey(Enum): OBJECT_ACTOR_ID = "object_actor_id" OBJECT_ACTOR_HASH = "object_actor_hash" PUBLIC = "public" + + FOLLOW_STATUS = "follow_status" + THREAD_ROOT_PARENT = "thread_root_parent" IN_REPLY_TO_SELF = "in_reply_to_self" @@ -83,6 +93,10 @@ def by_type(type_: Union[ap.ActivityType, List[ap.ActivityType]]) -> _SubQuery: return {"type": type_.value} +def follow_request_accepted() -> _SubQuery: + return flag(MetaKey.FOLLOW_STATUS, FollowStatus.ACCEPTED.value) + + def not_undo() -> _SubQuery: return flag(MetaKey.UNDO, False) @@ -95,6 +109,10 @@ def by_actor(actor: ap.BaseActivity) -> _SubQuery: return flag(MetaKey.ACTOR_ID, actor.id) +def by_actor_id(actor_id: str) -> _SubQuery: + return flag(MetaKey.ACTOR_ID, actor_id) + + def by_object_id(object_id: str) -> _SubQuery: return flag(MetaKey.OBJECT_ID, object_id) diff --git a/core/migrations.py b/core/migrations.py index 7c9d727..9f20a9d 100644 --- a/core/migrations.py +++ b/core/migrations.py @@ -10,8 +10,16 @@ from core import activitypub from core.db import DB from core.db import find_activities from core.db import update_one_activity +from core.meta import FollowStatus from core.meta import MetaKey from core.meta import _meta +from core.meta import by_actor_id +from core.meta import by_actor +from core.meta import by_remote_id +from core.meta import by_type +from core.meta import in_inbox +from core.meta import not_undo +from core.meta import upsert from utils.migrations import Migration from utils.migrations import logger from utils.migrations import perform # noqa: just here for export @@ -156,10 +164,10 @@ class _2_FollowMigration(Migration): {"_id": data["_id"]}, {"$set": {"meta.actor": actor}} ) except Exception: - logger.exception("failed to process actor {data!r}") + logger.exception(f"failed to process actor {data!r}") -class _20190808_MetaPublishedMigration(Migration): +class _20190830_MetaPublishedMigration(Migration): """Add the `meta.published` field to old activities.""" def migrate(self) -> None: @@ -180,4 +188,68 @@ class _20190808_MetaPublishedMigration(Migration): ) except Exception: - logger.exception("failed to process activity {data!r}") + logger.exception(f"failed to process activity {data!r}") + + +class _20190830_FollowFollowBackMigration(Migration): + """Add the new meta flags for tracking accepted/rejected status and following/follows back info.""" + + def migrate(self) -> None: + for data in find_activities({**by_type(ap.ActivityType.ACCEPT), **in_inbox()}): + try: + update_one_activity( + {**by_type(ap.ActivityType.FOLLOW), **by_remote_id(data["meta"]["object_id"])}, + upsert({MetaKey.FOLLOW_STATUS: FollowStatus.ACCEPTED.value}), + ) + # Check if we are following this actor + follow_query = { + **in_inbox(), + **by_type(ap.ActivityType.FOLLOW), + **by_actor_id(data["meta"]["actor_id"]), + **not_undo(), + } + raw_follow = DB.activities.find_one(follow_query) + if raw_follow: + DB.activities.update_many( + follow_query, + {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}, + ) + + except Exception: + logger.exception(f"failed to process activity {data!r}") + + for data in find_activities({**by_type(ap.ActivityType.REJECT), **in_inbox()}): + try: + update_one_activity( + {**by_type(ap.ActivityType.FOLLOW), **by_remote_id(data["meta"]["object_id"])}, + upsert({MetaKey.FOLLOW_STATUS: FollowStatus.REJECTED.value}), + ) + except Exception: + logger.exception(f"failed to process activity {data!r}") + + for data in find_activities({**by_type(ap.ActivityType.FOLLOW), **in_inbox()}): + try: + accept_query = { + **in_inbox(), + **by_type(ap.ActivityType.ACCEPT), + **by_actor_id(data["meta"]["actor_id"]), + **not_undo(), + } + raw_accept = DB.activities.find_one(accept_query) + if raw_accept: + DB.activities.update_many( + accept_query, + {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}, + ) + + except Exception: + logger.exception(f"failed to process activity {data!r}") + + DB.activities.update_many( + { + **by_type(ap.ActivityType.FOLLOW), + **in_inbox(), + "meta.follow_status": {"$exists": False}, + }, + {"$set": {"meta.follow_status": "waiting"}}, + ) diff --git a/core/notifications.py b/core/notifications.py index 1710a85..dfb6d7d 100644 --- a/core/notifications.py +++ b/core/notifications.py @@ -5,12 +5,12 @@ from datetime import timezone from functools import singledispatch from typing import Any from typing import Dict -from urllib.parse import urlparse from little_boxes import activitypub as ap -from config import BASE_URL from config import DB +from core.activitypub import is_from_outbox +from core.activitypub import is_local_url from core.db import find_one_activity from core.meta import MetaKey from core.meta import _meta @@ -27,16 +27,6 @@ _logger = logging.getLogger(__name__) _NewMeta = Dict[str, Any] -_LOCAL_NETLOC = urlparse(BASE_URL).netloc - - -def _is_from_outbox(activity: ap.BaseActivity) -> bool: - return activity.id.startswith(BASE_URL) - - -def _is_local(url: str) -> bool: - return urlparse(url).netloc == _LOCAL_NETLOC - def _flag_as_notification(activity: ap.BaseActivity, new_meta: _NewMeta) -> None: new_meta.update( @@ -83,6 +73,16 @@ def _accept_set_inbox_flags(activity: ap.Accept, new_meta: _NewMeta) -> None: return None +@set_inbox_flags.register +def _reject_set_inbox_flags(activity: ap.Reject, new_meta: _NewMeta) -> None: + """Handle notifications for "rejected" following requests.""" + _logger.info(f"set_inbox_flags activity={activity!r}") + # This Accept will be a "You started following $actor" notification + _flag_as_notification(activity, new_meta) + _set_flag(new_meta, MetaKey.GC_KEEP) + return None + + @set_inbox_flags.register def _follow_set_inbox_flags(activity: ap.Follow, new_meta: _NewMeta) -> None: """Handle notification for new followers.""" @@ -114,7 +114,7 @@ def _follow_set_inbox_flags(activity: ap.Follow, new_meta: _NewMeta) -> None: def _like_set_inbox_flags(activity: ap.Like, new_meta: _NewMeta) -> None: _logger.info(f"set_inbox_flags activity={activity!r}") # Is it a Like of local acitivty/from the outbox - if _is_from_outbox(activity.get_object()): + if is_from_outbox(activity.get_object()): # Flag it as a notification _flag_as_notification(activity, new_meta) @@ -132,7 +132,7 @@ def _announce_set_inbox_flags(activity: ap.Announce, new_meta: _NewMeta) -> None _logger.info(f"set_inbox_flags activity={activity!r}") obj = activity.get_object() # Is it a Annnounce/boost of local acitivty/from the outbox - if _is_from_outbox(obj): + if is_from_outbox(obj): # Flag it as a notification _flag_as_notification(activity, new_meta) @@ -180,7 +180,7 @@ def _create_set_inbox_flags(activity: ap.Create, new_meta: _NewMeta) -> None: in_reply_to = obj.get_in_reply_to() # Check if it's a local reply - if in_reply_to and _is_local(in_reply_to): + if in_reply_to and is_local_url(in_reply_to): # TODO(tsileo): fetch the reply to check for poll answers more precisely # reply_of = ap.fetch_remote_activity(in_reply_to) @@ -199,7 +199,7 @@ def _create_set_inbox_flags(activity: ap.Create, new_meta: _NewMeta) -> None: # Check for mention for mention in obj.get_mentions(): - if mention.href and _is_local(mention.href): + if mention.href and is_local_url(mention.href): # Flag it as a notification _flag_as_notification(activity, new_meta)