"""Migrations that will be run automatically at startup.""" from typing import Any from typing import Dict from urllib.parse import urlparse from little_boxes import activitypub as ap from config import ID from core import activitypub from core.db import DB from core.db import find_activities from core.db import update_one_activity from core.meta import FollowStatus from core.meta import MetaKey from core.meta import _meta from core.meta import by_actor_id from core.meta import by_actor from core.meta import by_remote_id from core.meta import by_type from core.meta import in_inbox from core.meta import not_undo from core.meta import upsert from utils.migrations import Migration from utils.migrations import logger from utils.migrations import perform # noqa: just here for export back = activitypub.MicroblogPubBackend() ap.use_backend(back) class _1_MetaMigration(Migration): """Add new metadata to simplify querying.""" def __guess_visibility(self, data: Dict[str, Any]) -> ap.Visibility: to = data.get("to", []) cc = data.get("cc", []) if ap.AS_PUBLIC in to: return ap.Visibility.PUBLIC elif ap.AS_PUBLIC in cc: return ap.Visibility.UNLISTED else: # Uses a bit of heuristic here, it's too expensive to fetch the actor, so assume the followers # collection has "/collection" in it (which is true for most software), and at worst, we will # classify it as "DIRECT" which behave the same as "FOLLOWERS_ONLY" (i.e. no Announce) followers_only = False for item in to: if "/followers" in item: followers_only = True break if not followers_only: for item in cc: if "/followers" in item: followers_only = True break if followers_only: return ap.Visibility.FOLLOWERS_ONLY return ap.Visibility.DIRECT def migrate(self) -> None: # noqa: C901 # too complex for data in DB.activities.find(): logger.info(f"before={data}") obj = data["activity"].get("object") set_meta: Dict[str, Any] = {} # Set `meta.object_id` (str) if not data["meta"].get("object_id"): set_meta["meta.object_id"] = None if obj: if isinstance(obj, str): set_meta["meta.object_id"] = data["activity"]["object"] elif isinstance(obj, dict): obj_id = obj.get("id") if obj_id: set_meta["meta.object_id"] = obj_id # Set `meta.object_visibility` (str) if not data["meta"].get("object_visibility"): set_meta["meta.object_visibility"] = None object_id = data["meta"].get("object_id") or set_meta.get( "meta.object_id" ) if object_id: obj = data["meta"].get("object") or data["activity"].get("object") if isinstance(obj, dict): set_meta["meta.object_visibility"] = self.__guess_visibility( obj ).name # Set `meta.actor_id` (str) if not data["meta"].get("actor_id"): set_meta["meta.actor_id"] = None actor = data["activity"].get("actor") if actor: if isinstance(actor, str): set_meta["meta.actor_id"] = data["activity"]["actor"] elif isinstance(actor, dict): actor_id = actor.get("id") if actor_id: set_meta["meta.actor_id"] = actor_id # Set `meta.poll_answer` (bool) if not data["meta"].get("poll_answer"): set_meta["meta.poll_answer"] = False if obj: if isinstance(obj, dict): if ( obj.get("name") and not obj.get("content") and obj.get("inReplyTo") ): set_meta["meta.poll_answer"] = True # Set `meta.visibility` (str) if not data["meta"].get("visibility"): set_meta["meta.visibility"] = self.__guess_visibility( data["activity"] ).name if not data["meta"].get("server"): set_meta["meta.server"] = urlparse(data["remote_id"]).netloc logger.info(f"meta={set_meta}\n") if set_meta: DB.activities.update_one({"_id": data["_id"]}, {"$set": set_meta}) class _2_FollowMigration(Migration): """Add new metadata to update the cached actor in Follow activities.""" def migrate(self) -> None: actor_cache: Dict[str, Dict[str, Any]] = {} for data in DB.activities.find({"type": ap.ActivityType.FOLLOW.value}): try: if data["meta"]["actor_id"] == ID: # It's a "following" actor = actor_cache.get(data["meta"]["object_id"]) if not actor: actor = ap.parse_activity( ap.get_backend().fetch_iri( data["meta"]["object_id"], no_cache=True ) ).to_dict(embed=True) if not actor: raise ValueError(f"missing actor {data!r}") actor_cache[actor["id"]] = actor DB.activities.update_one( {"_id": data["_id"]}, {"$set": {"meta.object": actor}} ) else: # It's a "followers" actor = actor_cache.get(data["meta"]["actor_id"]) if not actor: actor = ap.parse_activity( ap.get_backend().fetch_iri( data["meta"]["actor_id"], no_cache=True ) ).to_dict(embed=True) if not actor: raise ValueError(f"missing actor {data!r}") actor_cache[actor["id"]] = actor DB.activities.update_one( {"_id": data["_id"]}, {"$set": {"meta.actor": actor}} ) except Exception: logger.exception(f"failed to process actor {data!r}") class _20190830_MetaPublishedMigration(Migration): """Add the `meta.published` field to old activities.""" def migrate(self) -> None: for data in find_activities({"meta.published": {"$exists": False}}): try: raw = data["activity"] # If the activity has its own `published` field, we'll use it if "published" in raw: published = raw["published"] else: # Otherwise, we take the date we received the activity as the published time published = ap.format_datetime(data["_id"].generation_time) # Set the field in the DB update_one_activity( {"_id": data["_id"]}, {"$set": {_meta(MetaKey.PUBLISHED): published}}, ) except Exception: logger.exception(f"failed to process activity {data!r}") class _20190830_FollowFollowBackMigration(Migration): """Add the new meta flags for tracking accepted/rejected status and following/follows back info.""" def migrate(self) -> None: for data in find_activities({**by_type(ap.ActivityType.ACCEPT), **in_inbox()}): try: update_one_activity( {**by_type(ap.ActivityType.FOLLOW), **by_remote_id(data["meta"]["object_id"])}, upsert({MetaKey.FOLLOW_STATUS: FollowStatus.ACCEPTED.value}), ) # Check if we are following this actor follow_query = { **in_inbox(), **by_type(ap.ActivityType.FOLLOW), **by_actor_id(data["meta"]["actor_id"]), **not_undo(), } raw_follow = DB.activities.find_one(follow_query) if raw_follow: DB.activities.update_many( follow_query, {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}, ) except Exception: logger.exception(f"failed to process activity {data!r}") for data in find_activities({**by_type(ap.ActivityType.REJECT), **in_inbox()}): try: update_one_activity( {**by_type(ap.ActivityType.FOLLOW), **by_remote_id(data["meta"]["object_id"])}, upsert({MetaKey.FOLLOW_STATUS: FollowStatus.REJECTED.value}), ) except Exception: logger.exception(f"failed to process activity {data!r}") for data in find_activities({**by_type(ap.ActivityType.FOLLOW), **in_inbox()}): try: accept_query = { **in_inbox(), **by_type(ap.ActivityType.ACCEPT), **by_actor_id(data["meta"]["actor_id"]), **not_undo(), } raw_accept = DB.activities.find_one(accept_query) if raw_accept: DB.activities.update_many( accept_query, {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}, ) except Exception: logger.exception(f"failed to process activity {data!r}") DB.activities.update_many( { **by_type(ap.ActivityType.FOLLOW), **in_inbox(), "meta.follow_status": {"$exists": False}, }, {"$set": {"meta.follow_status": "waiting"}}, )