diff --git a/blueprints/tasks.py b/blueprints/tasks.py index 686d72a..7350181 100644 --- a/blueprints/tasks.py +++ b/blueprints/tasks.py @@ -17,11 +17,16 @@ import config from config import DB from core import gc from core.activitypub import Box +from core.activitypub import _actor_hash from core.activitypub import _add_answers_to_question +from core.activitypub import no_cache from core.activitypub import post_to_outbox +from core.db import update_one_activity from core.inbox import process_inbox from core.meta import MetaKey -from core.meta import _meta +from core.meta import by_remote_id +from core.meta import flag +from core.meta import upsert from core.notifications import set_inbox_flags from core.outbox import process_outbox from core.remote import track_failed_send @@ -135,16 +140,19 @@ def task_cache_object() -> _Response: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") obj = activity.get_object() - DB.activities.update_one( - {"remote_id": activity.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - # FIXME(tsileo): set object actor only if different from actor? - "meta.object_actor": obj.get_actor().to_dict(embed=True), - } - }, - ) + obj_actor = obj.get_actor() + obj_actor_hash = _actor_hash(obj_actor) + + cache = {MetaKey.OBJECT: obj.to_dict(embed=True)} + + if activity.get_actor().id != obj_actor.id: + cache[MetaKey.OBJECT_ACTOR] = obj_actor.to_dict(embed=True) + cache[MetaKey.OBJECT_ACTOR_ID] = obj_actor.id + cache[MetaKey.OBJECT_ACTOR_HASH] = obj_actor_hash + + # FIXME(tsileo): set OBJECT_ACTOR_HASH (like in "cache actor" and do an update_many even for ACTOR (not only + # OBJECT_ACTOR) ; a migration for OBJECT_ACTOR_ID/OBJECT_ACTOR_HASH needed? + update_one_activity(by_remote_id(activity.id), upsert(cache)) except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) app.logger.exception(f"flagging activity {iri} as deleted, no object caching") @@ -243,11 +251,16 @@ def task_cache_actor() -> _Response: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") + # Reload the actor without caching (in case it got upated) + with no_cache(): + actor = ap.fetch_remote_activity(activity.get_actor().id) + + actor_hash = _actor_hash(actor) + # Fetch the Open Grah metadata if it's a `Create` if activity.has_type(ap.ActivityType.CREATE): Tasks.fetch_og_meta(iri) - actor = activity.get_actor() if actor.icon: if isinstance(actor.icon, dict) and "url" in actor.icon: config.MEDIA_CACHE.cache_actor_icon(actor.icon["url"]) @@ -258,19 +271,24 @@ def task_cache_actor() -> _Response: if actor.id == config.ID: # It's a new following, cache the "object" (which is the actor we follow) DB.activities.update_one( - {"remote_id": iri}, - { - "$set": { - "meta.object": activity.get_object().to_dict(embed=True) - } - }, + by_remote_id(iri), + upsert({MetaKey.OBJECT: activity.get_object().to_dict(embed=True)}), ) # Cache the actor info - DB.activities.update_one( - {"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}} + DB.activities.update_many( + { + **flag(MetaKey.ACTOR_ID, actor.id), + **flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}), + }, + upsert({MetaKey.ACTOR: actor.to_dict(embed=True)}), ) + # TODO(tsileo): Also update following (it's in the object) + # DB.activities.update_many( + # {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}} + # ) + app.logger.info(f"actor cached for {iri}") if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): Tasks.cache_attachments(iri) @@ -435,9 +453,9 @@ def task_process_new_activity() -> _Response: flags = {} if not activity.published: - flags[_meta(MetaKey.PUBLISHED)] = now() + flags.update(flag(MetaKey.PUBLISHED, now())) else: - flags[_meta(MetaKey.PUBLISHED)] = activity.published + flags.update(flag(MetaKey.PUBLISHED, activity.published)) set_inbox_flags(activity, flags) app.logger.info(f"a={activity}, flags={flags!r}") diff --git a/core/activitypub.py b/core/activitypub.py index a548d4e..52826c0 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -7,9 +7,9 @@ from datetime import datetime from datetime import timezone from typing import Any from typing import Dict +from typing import Iterator from typing import List from typing import Optional -from typing import Iterator from urllib.parse import urljoin from urllib.parse import urlparse @@ -311,21 +311,19 @@ class MicroblogPubBackend(Backend): if data["meta"]["deleted"]: raise ActivityGoneError(f"{iri} is gone") return data["activity"] + # Check if we're looking for an object wrapped in a Create obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"}) if obj: if obj["meta"]["deleted"]: raise ActivityGoneError(f"{iri} is gone") return obj["meta"].get("object") or obj["activity"]["object"] + # TODO(tsileo): also check the REPLIES box # Check if it's cached because it's a follower # Remove extra info (like the key hash if any) cleaned_iri = iri.split("#")[0] actor = DB.activities.find_one( - { - "meta.actor_id": cleaned_iri, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } + {"meta.actor_id": cleaned_iri, "meta.actor": {"$exists": True}} ) # "type" check is here to skip old metadata for "old/buggy" followers diff --git a/core/meta.py b/core/meta.py index 95d7840..fcab821 100644 --- a/core/meta.py +++ b/core/meta.py @@ -23,12 +23,16 @@ class MetaKey(Enum): POLL_ANSWER = "poll_answer" STREAM = "stream" ACTOR_ID = "actor_id" + ACTOR = "actor" + ACTOR_HASH = "actor_hash" UNDO = "undo" PUBLISHED = "published" GC_KEEP = "gc_keep" OBJECT = "object" OBJECT_ID = "object_id" OBJECT_ACTOR = "object_actor" + OBJECT_ACTOR_ID = "object_actor_id" + OBJECT_ACTOR_HASH = "object_actor_hash" PUBLIC = "public" DELETED = "deleted" @@ -86,3 +90,7 @@ def upsert(data: Dict[MetaKey, Any]) -> _SubQuery: sq[_meta(mk)] = val return {"$set": sq} + + +def flag(mk: MetaKey, val: Any) -> _SubQuery: + return {_meta(mk): val}