Improve actor caching
This commit is contained in:
parent
4bf91ffa80
commit
4433a552c0
3 changed files with 52 additions and 28 deletions
|
@ -17,11 +17,16 @@ import config
|
|||
from config import DB
|
||||
from core import gc
|
||||
from core.activitypub import Box
|
||||
from core.activitypub import _actor_hash
|
||||
from core.activitypub import _add_answers_to_question
|
||||
from core.activitypub import no_cache
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.db import update_one_activity
|
||||
from core.inbox import process_inbox
|
||||
from core.meta import MetaKey
|
||||
from core.meta import _meta
|
||||
from core.meta import by_remote_id
|
||||
from core.meta import flag
|
||||
from core.meta import upsert
|
||||
from core.notifications import set_inbox_flags
|
||||
from core.outbox import process_outbox
|
||||
from core.remote import track_failed_send
|
||||
|
@ -135,16 +140,19 @@ def task_cache_object() -> _Response:
|
|||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
obj = activity.get_object()
|
||||
DB.activities.update_one(
|
||||
{"remote_id": activity.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
# FIXME(tsileo): set object actor only if different from actor?
|
||||
"meta.object_actor": obj.get_actor().to_dict(embed=True),
|
||||
}
|
||||
},
|
||||
)
|
||||
obj_actor = obj.get_actor()
|
||||
obj_actor_hash = _actor_hash(obj_actor)
|
||||
|
||||
cache = {MetaKey.OBJECT: obj.to_dict(embed=True)}
|
||||
|
||||
if activity.get_actor().id != obj_actor.id:
|
||||
cache[MetaKey.OBJECT_ACTOR] = obj_actor.to_dict(embed=True)
|
||||
cache[MetaKey.OBJECT_ACTOR_ID] = obj_actor.id
|
||||
cache[MetaKey.OBJECT_ACTOR_HASH] = obj_actor_hash
|
||||
|
||||
# FIXME(tsileo): set OBJECT_ACTOR_HASH (like in "cache actor" and do an update_many even for ACTOR (not only
|
||||
# OBJECT_ACTOR) ; a migration for OBJECT_ACTOR_ID/OBJECT_ACTOR_HASH needed?
|
||||
update_one_activity(by_remote_id(activity.id), upsert(cache))
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
|
||||
|
@ -243,11 +251,16 @@ def task_cache_actor() -> _Response:
|
|||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
# Reload the actor without caching (in case it got upated)
|
||||
with no_cache():
|
||||
actor = ap.fetch_remote_activity(activity.get_actor().id)
|
||||
|
||||
actor_hash = _actor_hash(actor)
|
||||
|
||||
# Fetch the Open Grah metadata if it's a `Create`
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
Tasks.fetch_og_meta(iri)
|
||||
|
||||
actor = activity.get_actor()
|
||||
if actor.icon:
|
||||
if isinstance(actor.icon, dict) and "url" in actor.icon:
|
||||
config.MEDIA_CACHE.cache_actor_icon(actor.icon["url"])
|
||||
|
@ -258,19 +271,24 @@ def task_cache_actor() -> _Response:
|
|||
if actor.id == config.ID:
|
||||
# It's a new following, cache the "object" (which is the actor we follow)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": activity.get_object().to_dict(embed=True)
|
||||
}
|
||||
},
|
||||
by_remote_id(iri),
|
||||
upsert({MetaKey.OBJECT: activity.get_object().to_dict(embed=True)}),
|
||||
)
|
||||
|
||||
# Cache the actor info
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}}
|
||||
DB.activities.update_many(
|
||||
{
|
||||
**flag(MetaKey.ACTOR_ID, actor.id),
|
||||
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
|
||||
},
|
||||
upsert({MetaKey.ACTOR: actor.to_dict(embed=True)}),
|
||||
)
|
||||
|
||||
# TODO(tsileo): Also update following (it's in the object)
|
||||
# DB.activities.update_many(
|
||||
# {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}}
|
||||
# )
|
||||
|
||||
app.logger.info(f"actor cached for {iri}")
|
||||
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
|
||||
Tasks.cache_attachments(iri)
|
||||
|
@ -435,9 +453,9 @@ def task_process_new_activity() -> _Response:
|
|||
flags = {}
|
||||
|
||||
if not activity.published:
|
||||
flags[_meta(MetaKey.PUBLISHED)] = now()
|
||||
flags.update(flag(MetaKey.PUBLISHED, now()))
|
||||
else:
|
||||
flags[_meta(MetaKey.PUBLISHED)] = activity.published
|
||||
flags.update(flag(MetaKey.PUBLISHED, activity.published))
|
||||
|
||||
set_inbox_flags(activity, flags)
|
||||
app.logger.info(f"a={activity}, flags={flags!r}")
|
||||
|
|
|
@ -7,9 +7,9 @@ from datetime import datetime
|
|||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterator
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Iterator
|
||||
from urllib.parse import urljoin
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
@ -311,21 +311,19 @@ class MicroblogPubBackend(Backend):
|
|||
if data["meta"]["deleted"]:
|
||||
raise ActivityGoneError(f"{iri} is gone")
|
||||
return data["activity"]
|
||||
# Check if we're looking for an object wrapped in a Create
|
||||
obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"})
|
||||
if obj:
|
||||
if obj["meta"]["deleted"]:
|
||||
raise ActivityGoneError(f"{iri} is gone")
|
||||
return obj["meta"].get("object") or obj["activity"]["object"]
|
||||
# TODO(tsileo): also check the REPLIES box
|
||||
|
||||
# Check if it's cached because it's a follower
|
||||
# Remove extra info (like the key hash if any)
|
||||
cleaned_iri = iri.split("#")[0]
|
||||
actor = DB.activities.find_one(
|
||||
{
|
||||
"meta.actor_id": cleaned_iri,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
}
|
||||
{"meta.actor_id": cleaned_iri, "meta.actor": {"$exists": True}}
|
||||
)
|
||||
|
||||
# "type" check is here to skip old metadata for "old/buggy" followers
|
||||
|
|
|
@ -23,12 +23,16 @@ class MetaKey(Enum):
|
|||
POLL_ANSWER = "poll_answer"
|
||||
STREAM = "stream"
|
||||
ACTOR_ID = "actor_id"
|
||||
ACTOR = "actor"
|
||||
ACTOR_HASH = "actor_hash"
|
||||
UNDO = "undo"
|
||||
PUBLISHED = "published"
|
||||
GC_KEEP = "gc_keep"
|
||||
OBJECT = "object"
|
||||
OBJECT_ID = "object_id"
|
||||
OBJECT_ACTOR = "object_actor"
|
||||
OBJECT_ACTOR_ID = "object_actor_id"
|
||||
OBJECT_ACTOR_HASH = "object_actor_hash"
|
||||
PUBLIC = "public"
|
||||
|
||||
DELETED = "deleted"
|
||||
|
@ -86,3 +90,7 @@ def upsert(data: Dict[MetaKey, Any]) -> _SubQuery:
|
|||
sq[_meta(mk)] = val
|
||||
|
||||
return {"$set": sq}
|
||||
|
||||
|
||||
def flag(mk: MetaKey, val: Any) -> _SubQuery:
|
||||
return {_meta(mk): val}
|
||||
|
|
Loading…
Reference in a new issue