Lot of improvements/bugfixes
Better caching, and more on visibility too
This commit is contained in:
parent
8d12cf9571
commit
0b75ee7324
4 changed files with 210 additions and 82 deletions
143
activitypub.py
143
activitypub.py
|
@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
ACTORS_CACHE = LRUCache(maxsize=256)
|
||||
MY_PERSON = ap.Person(**ME)
|
||||
|
||||
|
||||
def _actor_to_meta(actor: ap.BaseActivity, with_inbox=False) -> Dict[str, Any]:
|
||||
|
@ -125,7 +126,7 @@ class MicroblogPubBackend(Backend):
|
|||
except ValueError:
|
||||
pass
|
||||
object_visibility = None
|
||||
if object_id:
|
||||
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
|
||||
object_visibility = ap.get_visibility(activity.get_object()).name
|
||||
|
||||
actor_id = activity.get_actor().id
|
||||
|
@ -205,10 +206,40 @@ class MicroblogPubBackend(Backend):
|
|||
)
|
||||
)
|
||||
|
||||
def _fetch_iri(self, iri: str) -> ap.ObjectType:
|
||||
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
|
||||
# Shortcut if the instance actor is fetched
|
||||
if iri == ME["id"]:
|
||||
return ME
|
||||
|
||||
# Internal collecitons handling
|
||||
# Followers
|
||||
if iri == MY_PERSON.followers:
|
||||
followers = []
|
||||
for data in DB.activities.find(
|
||||
{
|
||||
"box": Box.INBOX.value,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
}
|
||||
):
|
||||
followers.append(data["meta"]["actor_id"])
|
||||
return {"type": "Collection", "items": followers}
|
||||
|
||||
# Following
|
||||
if iri == MY_PERSON.following:
|
||||
following = []
|
||||
for data in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
}
|
||||
):
|
||||
following.append(data["meta"]["object_id"])
|
||||
return {"type": "Collection", "items": following}
|
||||
|
||||
# TODO(tsileo): handle the liked collection too
|
||||
|
||||
# Check if the activity is owned by this server
|
||||
if iri.startswith(BASE_URL):
|
||||
is_a_note = False
|
||||
|
@ -229,40 +260,48 @@ class MicroblogPubBackend(Backend):
|
|||
if data["meta"]["deleted"]:
|
||||
raise ActivityGoneError(f"{iri} is gone")
|
||||
return data["activity"]
|
||||
obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"})
|
||||
if obj:
|
||||
if obj["meta"]["deleted"]:
|
||||
raise ActivityGoneError(f"{iri} is gone")
|
||||
return obj["meta"].get("object") or obj["activity"]["object"]
|
||||
|
||||
# Check if it's cached because it's a follower
|
||||
# Remove extra info (like the key hash if any)
|
||||
cleaned_iri = iri.split("#")[0]
|
||||
actor = DB.activities.find_one(
|
||||
{
|
||||
"meta.actor_id": cleaned_iri,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
}
|
||||
)
|
||||
if actor and actor["meta"].get("actor"):
|
||||
return actor["meta"]["actor"]
|
||||
|
||||
# Check if it's cached because it's a following
|
||||
actor2 = DB.activities.find_one(
|
||||
{
|
||||
"meta.object_id": cleaned_iri,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
}
|
||||
)
|
||||
if actor2 and actor2["meta"].get("object"):
|
||||
return actor2["meta"]["object"]
|
||||
|
||||
# Fetch the URL via HTTP
|
||||
logger.info(f"dereference {iri} via HTTP")
|
||||
return super().fetch_iri(iri)
|
||||
|
||||
def fetch_iri(self, iri: str, no_cache=False) -> ap.ObjectType:
|
||||
if iri == ME["id"]:
|
||||
return ME
|
||||
|
||||
if iri in ACTORS_CACHE:
|
||||
logger.info(f"{iri} found in cache")
|
||||
return ACTORS_CACHE[iri]
|
||||
|
||||
# data = DB.actors.find_one({"remote_id": iri})
|
||||
# if data:
|
||||
# if ap._has_type(data["type"], ap.ACTOR_TYPES):
|
||||
# logger.info(f"{iri} found in DB cache")
|
||||
# ACTORS_CACHE[iri] = data["data"]
|
||||
# return data["data"]
|
||||
if not no_cache:
|
||||
# Fetch the activity by checking the local DB first
|
||||
data = self._fetch_iri(iri)
|
||||
else:
|
||||
return super().fetch_iri(iri)
|
||||
data = super().fetch_iri(iri)
|
||||
|
||||
logger.debug(f"_fetch_iri({iri!r}) == {data!r}")
|
||||
if ap._has_type(data["type"], ap.ACTOR_TYPES):
|
||||
logger.debug(f"caching actor {iri}")
|
||||
# Cache the actor
|
||||
DB.actors.update_one(
|
||||
{"remote_id": iri},
|
||||
{"$set": {"remote_id": iri, "data": data}},
|
||||
upsert=True,
|
||||
)
|
||||
ACTORS_CACHE[iri] = data
|
||||
|
||||
return data
|
||||
|
||||
|
@ -395,37 +434,36 @@ class MicroblogPubBackend(Backend):
|
|||
|
||||
@ensure_it_is_me
|
||||
def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
obj = delete.get_object()
|
||||
logger.debug("delete object={obj!r}")
|
||||
obj_id = delete.get_object_id()
|
||||
logger.debug("delete object={obj_id}")
|
||||
try:
|
||||
obj = delete.get_object()
|
||||
logger.info(f"inbox_delete handle_replies obj={obj!r}")
|
||||
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
|
||||
if obj.has_type(ap.CREATE_TYPES):
|
||||
in_reply_to = ap._get_id(
|
||||
DB.activities.find_one(
|
||||
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
|
||||
)["activity"]["object"].get("inReplyTo")
|
||||
)
|
||||
if in_reply_to:
|
||||
self._handle_replies_delete(as_actor, in_reply_to)
|
||||
except Exception:
|
||||
logger.exception(f"failed to handle delete replies for {obj_id}")
|
||||
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.deleted": True}}
|
||||
{"meta.object_id": obj_id, "type": "Create"},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
)
|
||||
|
||||
logger.info(f"inbox_delete handle_replies obj={obj!r}")
|
||||
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
|
||||
if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE:
|
||||
in_reply_to = ap._get_id(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"activity.object.id": delete.get_object().id,
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
}
|
||||
)["activity"]["object"].get("inReplyTo")
|
||||
)
|
||||
|
||||
# Fake a Undo so any related Like/Announce doesn't appear on the web UI
|
||||
DB.activities.update(
|
||||
{"meta.object.id": obj.id},
|
||||
{"$set": {"meta.undo": True, "meta.extra": "object deleted"}},
|
||||
)
|
||||
if in_reply_to:
|
||||
self._handle_replies_delete(as_actor, in_reply_to)
|
||||
# Foce undo other related activities
|
||||
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": delete.get_object().id},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
DB.activities.update(
|
||||
{"meta.object_id": delete.get_object_id()},
|
||||
{"$set": {"meta.deleted": True, "meta.undo": True}},
|
||||
)
|
||||
obj = delete.get_object()
|
||||
if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE:
|
||||
|
@ -438,11 +476,6 @@ class MicroblogPubBackend(Backend):
|
|||
)["activity"]
|
||||
).get_object()
|
||||
|
||||
DB.activities.update(
|
||||
{"meta.object.id": obj.id},
|
||||
{"$set": {"meta.undo": True, "meta.exta": "object deleted"}},
|
||||
)
|
||||
|
||||
self._handle_replies_delete(as_actor, obj.get_in_reply_to())
|
||||
|
||||
@ensure_it_is_me
|
||||
|
|
102
app.py
102
app.py
|
@ -249,7 +249,10 @@ def _get_file_url(url, size, kind):
|
|||
|
||||
@app.template_filter()
|
||||
def visibility(v: str) -> str:
|
||||
return ap.Visibility[v].value.lower()
|
||||
try:
|
||||
return ap.Visibility[v].value.lower()
|
||||
except Exception:
|
||||
return v
|
||||
|
||||
|
||||
@app.template_filter()
|
||||
|
@ -771,7 +774,13 @@ def authorize_follow():
|
|||
if DB.activities.count(q) > 0:
|
||||
return redirect("/following")
|
||||
|
||||
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
|
||||
follow = ap.Follow(
|
||||
actor=MY_PERSON.id,
|
||||
object=actor,
|
||||
to=[actor],
|
||||
cc=[ap.AS_PUBLIC],
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
post_to_outbox(follow)
|
||||
|
||||
return redirect("/following")
|
||||
|
@ -1689,7 +1698,14 @@ def api_delete():
|
|||
"""API endpoint to delete a Note activity."""
|
||||
note = _user_api_get_note(from_outbox=True)
|
||||
|
||||
delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True))
|
||||
# Create the delete, same audience as the Create object
|
||||
delete = ap.Delete(
|
||||
actor=ID,
|
||||
object=ap.Tombstone(id=note.id).to_dict(embed=True),
|
||||
to=note.to,
|
||||
cc=note.cc,
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
|
||||
delete_id = post_to_outbox(delete)
|
||||
|
||||
|
@ -1702,10 +1718,16 @@ def api_boost():
|
|||
note = _user_api_get_note()
|
||||
|
||||
# Ensures the note visibility allow us to build an Announce (in respect to the post visibility)
|
||||
if ap.get_visibility(note) not in [ap.Visibility.PUBLIC, ap.VISIBILITY.UNLISTED]:
|
||||
if ap.get_visibility(note) not in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
|
||||
abort(400)
|
||||
|
||||
announce = note.build_announce(MY_PERSON)
|
||||
announce = ap.Announce(
|
||||
actor=MY_PERSON.id,
|
||||
object=note.id,
|
||||
to=[MY_PERSON.followers, note.attributedTo],
|
||||
cc=[ap.AS_PUBLIC],
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
announce_id = post_to_outbox(announce)
|
||||
|
||||
return _user_api_response(activity=announce_id)
|
||||
|
@ -1741,7 +1763,28 @@ def api_vote():
|
|||
def api_like():
|
||||
note = _user_api_get_note()
|
||||
|
||||
like = note.build_like(MY_PERSON)
|
||||
to = []
|
||||
cc = []
|
||||
|
||||
note_visibility = ap.get_visibility(note)
|
||||
|
||||
if note_visibility == ap.Visibility.PUBLIC:
|
||||
to = [ap.AS_PUBLIC]
|
||||
cc = [ID + "/followers", note.get_actor().id]
|
||||
elif note_visibility == ap.Visibility.UNLISTED:
|
||||
to = [ID + "/followers", note.get_actor().id]
|
||||
cc = [ap.AS_PUBLIC]
|
||||
else:
|
||||
to = [note.get_actor().id]
|
||||
|
||||
like = ap.Like(
|
||||
object=note.id,
|
||||
actor=MY_PERSON.id,
|
||||
to=to,
|
||||
cc=cc,
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
|
||||
like_id = post_to_outbox(like)
|
||||
|
||||
return _user_api_response(activity=like_id)
|
||||
|
@ -1806,8 +1849,16 @@ def api_undo():
|
|||
raise ActivityNotFoundError(f"cannot found {oid}")
|
||||
|
||||
obj = ap.parse_activity(doc.get("activity"))
|
||||
|
||||
undo = ap.Undo(
|
||||
actor=MY_PERSON.id,
|
||||
object=obj.to_dict(embed=True, embed_object_id_only=True),
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
to=obj.to,
|
||||
cc=obj.cc,
|
||||
)
|
||||
|
||||
# FIXME(tsileo): detect already undo-ed and make this API call idempotent
|
||||
undo = obj.build_undo()
|
||||
undo_id = post_to_outbox(undo)
|
||||
|
||||
return _user_api_response(activity=undo_id)
|
||||
|
@ -2191,7 +2242,13 @@ def api_follow():
|
|||
if existing:
|
||||
return _user_api_response(activity=existing["activity"]["id"])
|
||||
|
||||
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
|
||||
follow = ap.Follow(
|
||||
actor=MY_PERSON.id,
|
||||
object=actor,
|
||||
to=[actor],
|
||||
cc=[ap.AS_PUBLIC],
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
follow_id = post_to_outbox(follow)
|
||||
|
||||
return _user_api_response(activity=follow_id)
|
||||
|
@ -2728,7 +2785,13 @@ def task_finish_post_to_inbox():
|
|||
back.inbox_like(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.FOLLOW):
|
||||
# Reply to a Follow with an Accept
|
||||
accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True))
|
||||
accept = ap.Accept(
|
||||
actor=ID,
|
||||
object=activity.to_dict(),
|
||||
to=[activity.get_actor().id],
|
||||
cc=[ap.AS_PUBLIC],
|
||||
published=ap.format_datetime(datetime.now(timezone.utc)),
|
||||
)
|
||||
post_to_outbox(accept)
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
obj = activity.get_object()
|
||||
|
@ -2881,34 +2944,21 @@ def task_cache_actor() -> str:
|
|||
|
||||
actor = activity.get_actor()
|
||||
|
||||
cache_actor_with_inbox = False
|
||||
if activity.has_type(ap.ActivityType.FOLLOW):
|
||||
if actor.id != ID:
|
||||
# It's a Follow from the Inbox
|
||||
cache_actor_with_inbox = True
|
||||
else:
|
||||
if actor.id == ID:
|
||||
# It's a new following, cache the "object" (which is the actor we follow)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": activitypub._actor_to_meta(
|
||||
activity.get_object()
|
||||
)
|
||||
"meta.object": activity.get_object().to_dict(embed=True)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Cache the actor info
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.actor": activitypub._actor_to_meta(
|
||||
actor, cache_actor_with_inbox
|
||||
)
|
||||
}
|
||||
},
|
||||
{"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}}
|
||||
)
|
||||
|
||||
app.logger.info(f"actor cached for {iri}")
|
||||
|
@ -3013,7 +3063,7 @@ def task_process_new_activity():
|
|||
|
||||
elif activity.has_type(ap.ActivityType.DELETE):
|
||||
note = DB.activities.find_one(
|
||||
{"activity.object.id": activity.get_object().id}
|
||||
{"activity.object.id": activity.get_object_id()}
|
||||
)
|
||||
if note and note["meta"].get("forwarded", False):
|
||||
# If the activity was originally forwarded, forward the delete too
|
||||
|
|
|
@ -9,9 +9,14 @@ from utils.migrations import DB
|
|||
from utils.migrations import Migration
|
||||
from utils.migrations import logger
|
||||
from utils.migrations import perform # noqa: just here for export
|
||||
from config import ID
|
||||
import activitypub
|
||||
|
||||
back = activitypub.MicroblogPubBackend()
|
||||
ap.use_backend(back)
|
||||
|
||||
|
||||
class _1_MetaMigrationt(Migration):
|
||||
class _1_MetaMigration(Migration):
|
||||
"""Add new metadata to simplify querying."""
|
||||
|
||||
def __guess_visibility(self, data: Dict[str, Any]) -> ap.Visibility:
|
||||
|
@ -106,3 +111,42 @@ class _1_MetaMigrationt(Migration):
|
|||
logger.info(f"meta={set_meta}\n")
|
||||
if set_meta:
|
||||
DB.activities.update_one({"_id": data["_id"]}, {"$set": set_meta})
|
||||
|
||||
|
||||
class _2_FollowMigration(Migration):
|
||||
"""Add new metadata to update the cached actor in Follow activities."""
|
||||
|
||||
def migrate(self) -> None:
|
||||
actor_cache: Dict[str, Dict[str, Any]] = {}
|
||||
for data in DB.activities.find({"type": ap.ActivityType.FOLLOW.value}):
|
||||
if data["meta"]["actor_id"] == ID:
|
||||
# It's a "following"
|
||||
actor = actor_cache.get(data["meta"]["object_id"])
|
||||
if not actor:
|
||||
actor = ap.parse_activity(
|
||||
ap.get_backend().fetch_iri(
|
||||
data["meta"]["object_id"], no_cache=True
|
||||
)
|
||||
).to_dict(embed=True)
|
||||
if not actor:
|
||||
raise ValueError(f"missing actor {data!r}")
|
||||
actor_cache[actor["id"]] = actor
|
||||
DB.activities.update_one(
|
||||
{"_id": data["_id"]}, {"$set": {"meta.object": actor}}
|
||||
)
|
||||
|
||||
else:
|
||||
# It's a "followers"
|
||||
actor = actor_cache.get(data["meta"]["actor_id"])
|
||||
if not actor:
|
||||
actor = ap.parse_activity(
|
||||
ap.get_backend().fetch_iri(
|
||||
data["meta"]["actor_id"], no_cache=True
|
||||
)
|
||||
).to_dict(embed=True)
|
||||
if not actor:
|
||||
raise ValueError(f"missing actor {data!r}")
|
||||
actor_cache[actor["id"]] = actor
|
||||
DB.activities.update_one(
|
||||
{"_id": data["_id"]}, {"$set": {"meta.actor": actor}}
|
||||
)
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
<div id="notes">
|
||||
{% for item in inbox_data %}
|
||||
{{ item }}
|
||||
{% if 'actor' in item.meta %}
|
||||
{% if item | has_type('Create') %}
|
||||
{{ utils.display_note(item.activity.object, ui=True, meta=item.meta) }}
|
||||
|
|
Loading…
Reference in a new issue