2019-08-04 09:34:30 -05:00
|
|
|
import logging
|
|
|
|
from functools import singledispatch
|
|
|
|
from typing import Any
|
|
|
|
from typing import Dict
|
|
|
|
|
|
|
|
from little_boxes import activitypub as ap
|
|
|
|
from little_boxes.errors import NotAnActivityError
|
|
|
|
|
|
|
|
import config
|
|
|
|
from core.activitypub import _answer_key
|
2019-08-18 05:39:19 -05:00
|
|
|
from core.activitypub import handle_replies
|
2019-08-04 13:08:47 -05:00
|
|
|
from core.activitypub import post_to_outbox
|
2019-08-11 04:41:16 -05:00
|
|
|
from core.activitypub import update_cached_actor
|
2019-08-04 09:34:30 -05:00
|
|
|
from core.db import DB
|
2019-08-04 13:08:47 -05:00
|
|
|
from core.db import update_one_activity
|
2019-09-01 03:26:25 -05:00
|
|
|
from core.meta import FollowStatus
|
2019-08-04 13:08:47 -05:00
|
|
|
from core.meta import MetaKey
|
|
|
|
from core.meta import by_object_id
|
|
|
|
from core.meta import by_remote_id
|
|
|
|
from core.meta import by_type
|
|
|
|
from core.meta import in_inbox
|
|
|
|
from core.meta import inc
|
|
|
|
from core.meta import upsert
|
2019-08-04 09:34:30 -05:00
|
|
|
from core.tasks import Tasks
|
|
|
|
from utils import now
|
|
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
_NewMeta = Dict[str, Any]
|
|
|
|
|
|
|
|
|
|
|
|
@singledispatch
|
|
|
|
def process_inbox(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.warning(f"skipping {activity!r}")
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={delete!r}")
|
|
|
|
obj_id = delete.get_object_id()
|
2019-08-05 15:40:24 -05:00
|
|
|
_logger.debug(f"delete object={obj_id}")
|
2019-08-04 09:34:30 -05:00
|
|
|
try:
|
2019-08-05 15:40:24 -05:00
|
|
|
# FIXME(tsileo): call the DB here instead? like for the oubox
|
2019-08-04 09:34:30 -05:00
|
|
|
obj = ap.fetch_remote_activity(obj_id)
|
|
|
|
_logger.info(f"inbox_delete handle_replies obj={obj!r}")
|
|
|
|
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
|
|
|
|
if obj.has_type(ap.CREATE_TYPES):
|
2019-08-18 11:31:52 -05:00
|
|
|
post_query = {**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)}
|
2019-08-04 09:34:30 -05:00
|
|
|
in_reply_to = ap._get_id(
|
2019-08-18 11:31:52 -05:00
|
|
|
DB.activities.find_one(post_query)["activity"]["object"].get(
|
|
|
|
"inReplyTo"
|
|
|
|
)
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
if in_reply_to:
|
2019-08-18 12:05:46 -05:00
|
|
|
DB.activities.update_one(
|
|
|
|
{**by_object_id(in_reply_to), **by_type(ap.ActivityType.CREATE)},
|
|
|
|
inc(MetaKey.COUNT_REPLY, -1),
|
|
|
|
)
|
|
|
|
DB.replies.update_one(
|
|
|
|
by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, -1)
|
|
|
|
)
|
2019-08-04 09:34:30 -05:00
|
|
|
except Exception:
|
|
|
|
_logger.exception(f"failed to handle delete replies for {obj_id}")
|
|
|
|
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
|
|
|
{**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)},
|
|
|
|
upsert({MetaKey.DELETED: True}),
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
# Foce undo other related activities
|
2019-08-04 13:08:47 -05:00
|
|
|
DB.activities.update(by_object_id(obj_id), upsert({MetaKey.UNDO: True}))
|
2019-08-04 09:34:30 -05:00
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={update!r}")
|
|
|
|
obj = update.get_object()
|
|
|
|
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
2019-08-04 09:34:30 -05:00
|
|
|
{"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}
|
|
|
|
)
|
|
|
|
elif obj.has_type(ap.ActivityType.QUESTION):
|
|
|
|
choices = obj._data.get("oneOf", obj.anyOf)
|
|
|
|
total_replies = 0
|
|
|
|
_set = {}
|
|
|
|
for choice in choices:
|
|
|
|
answer_key = _answer_key(choice["name"])
|
|
|
|
cnt = choice["replies"]["totalItems"]
|
|
|
|
total_replies += cnt
|
|
|
|
_set[f"meta.question_answers.{answer_key}"] = cnt
|
|
|
|
|
|
|
|
_set["meta.question_replies"] = total_replies
|
|
|
|
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity({**in_inbox(), **by_object_id(obj.id)}, {"$set": _set})
|
2019-08-04 09:34:30 -05:00
|
|
|
# Also update the cached copies of the question (like Announce and Like)
|
|
|
|
DB.activities.update_many(
|
2019-08-04 13:08:47 -05:00
|
|
|
by_object_id(obj.id), upsert({MetaKey.OBJECT: obj.to_dict()})
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
|
2019-08-11 04:41:16 -05:00
|
|
|
elif obj.has_type(ap.ACTOR_TYPES):
|
2019-08-18 05:27:38 -05:00
|
|
|
actor = ap.fetch_remote_activity(obj.id, no_cache=True)
|
2019-08-11 05:07:30 -05:00
|
|
|
update_cached_actor(actor)
|
2019-08-04 09:34:30 -05:00
|
|
|
|
2019-08-11 05:17:02 -05:00
|
|
|
else:
|
|
|
|
raise ValueError(f"don't know how to update {obj!r}")
|
|
|
|
|
2019-08-04 09:34:30 -05:00
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _create_process_inbox(create: ap.Create, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={create!r}")
|
|
|
|
# If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the
|
|
|
|
# local copy)
|
2019-08-20 15:16:47 -05:00
|
|
|
obj = create.get_object()
|
|
|
|
if obj.has_type(ap.ActivityType.QUESTION):
|
|
|
|
Tasks.fetch_remote_question(obj)
|
|
|
|
|
|
|
|
Tasks.cache_emojis(obj)
|
2019-08-04 09:34:30 -05:00
|
|
|
|
2019-08-18 05:39:19 -05:00
|
|
|
handle_replies(create)
|
2019-08-04 09:34:30 -05:00
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={announce!r}")
|
|
|
|
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
|
|
|
|
# or remove it?
|
|
|
|
try:
|
|
|
|
obj = announce.get_object()
|
|
|
|
except NotAnActivityError:
|
|
|
|
_logger.exception(
|
|
|
|
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
|
|
|
|
)
|
|
|
|
return
|
|
|
|
|
|
|
|
if obj.has_type(ap.ActivityType.QUESTION):
|
|
|
|
Tasks.fetch_remote_question(obj)
|
|
|
|
|
2019-08-11 04:32:52 -05:00
|
|
|
# Cache the announced object
|
|
|
|
Tasks.cache_object(announce.id)
|
|
|
|
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
|
|
|
{**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
|
|
|
|
inc(MetaKey.COUNT_BOOST, 1),
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={like!r}")
|
|
|
|
obj = like.get_object()
|
|
|
|
# Update the meta counter if the object is published by the server
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
|
|
|
{**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
|
|
|
|
inc(MetaKey.COUNT_LIKE, 1),
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={activity!r}")
|
|
|
|
# Reply to a Follow with an Accept
|
|
|
|
actor_id = activity.get_actor().id
|
|
|
|
accept = ap.Accept(
|
|
|
|
actor=config.ID,
|
|
|
|
object={
|
|
|
|
"type": "Follow",
|
|
|
|
"id": activity.id,
|
|
|
|
"object": activity.get_object_id(),
|
|
|
|
"actor": actor_id,
|
|
|
|
},
|
|
|
|
to=[actor_id],
|
|
|
|
published=now(),
|
|
|
|
)
|
|
|
|
post_to_outbox(accept)
|
|
|
|
|
|
|
|
|
2019-09-05 17:24:20 -05:00
|
|
|
def _update_follow_status(follow_id: str, status: FollowStatus) -> None:
|
|
|
|
_logger.info(f"{follow_id} is {status}")
|
2019-09-01 03:26:25 -05:00
|
|
|
update_one_activity(
|
2019-09-05 17:24:20 -05:00
|
|
|
by_remote_id(follow_id), upsert({MetaKey.FOLLOW_STATUS: status.value})
|
2019-09-01 03:26:25 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _accept_process_inbox(activity: ap.Accept, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={activity!r}")
|
|
|
|
# Set a flag on the follow
|
2019-09-05 17:24:20 -05:00
|
|
|
follow = activity.get_object_id()
|
2019-09-01 03:26:25 -05:00
|
|
|
_update_follow_status(follow, FollowStatus.ACCEPTED)
|
|
|
|
|
|
|
|
|
|
|
|
@process_inbox.register
|
|
|
|
def _reject_process_inbox(activity: ap.Reject, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={activity!r}")
|
2019-09-05 17:24:20 -05:00
|
|
|
follow = activity.get_object_id()
|
2019-09-01 03:26:25 -05:00
|
|
|
_update_follow_status(follow, FollowStatus.REJECTED)
|
|
|
|
|
|
|
|
|
2019-08-04 09:34:30 -05:00
|
|
|
@process_inbox.register
|
|
|
|
def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None:
|
|
|
|
_logger.info(f"process_inbox activity={activity!r}")
|
2019-08-04 13:08:47 -05:00
|
|
|
# Fetch the object that's been undo'ed
|
2019-08-04 09:34:30 -05:00
|
|
|
obj = activity.get_object()
|
2019-08-04 13:08:47 -05:00
|
|
|
|
|
|
|
# Set the undo flag on the mentionned activity
|
|
|
|
update_one_activity(by_remote_id(obj.id), upsert({MetaKey.UNDO: True}))
|
|
|
|
|
|
|
|
# Handle cached counters
|
2019-08-04 09:34:30 -05:00
|
|
|
if obj.has_type(ap.ActivityType.LIKE):
|
|
|
|
# Update the meta counter if the object is published by the server
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
|
|
|
{**by_object_id(obj.get_object_id()), **by_type(ap.ActivityType.CREATE)},
|
|
|
|
inc(MetaKey.COUNT_LIKE, -1),
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|
|
|
|
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
|
|
|
announced = obj.get_object()
|
|
|
|
# Update the meta counter if the object is published by the server
|
2019-08-04 13:08:47 -05:00
|
|
|
update_one_activity(
|
|
|
|
{**by_type(ap.ActivityType.CREATE), **by_object_id(announced.id)},
|
|
|
|
inc(MetaKey.COUNT_BOOST, -1),
|
2019-08-04 09:34:30 -05:00
|
|
|
)
|