microblog.pub/core/activitypub.py
2019-08-19 23:31:57 +02:00

732 lines
24 KiB
Python

import binascii
import hashlib
import logging
import os
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urljoin
from urllib.parse import urlparse
from bson.objectid import ObjectId
from flask import url_for
from little_boxes import activitypub as ap
from little_boxes import strtobool
from little_boxes.activitypub import _to_list
from little_boxes.activitypub import clean_activity
from little_boxes.activitypub import format_datetime
from little_boxes.backend import Backend
from little_boxes.errors import ActivityGoneError
from little_boxes.httpsig import HTTPSigAuth
from config import BASE_URL
from config import DB
from config import DEFAULT_CTX
from config import EXTRA_INBOXES
from config import ID
from config import KEY
from config import ME
from config import USER_AGENT
from core.db import find_one_activity
from core.db import update_many_activities
from core.meta import Box
from core.meta import MetaKey
from core.meta import by_object_id
from core.meta import by_remote_id
from core.meta import by_type
from core.meta import flag
from core.meta import inc
from core.meta import upsert
from core.remote import server
from core.tasks import Tasks
from utils import now
logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
SIG_AUTH = HTTPSigAuth(KEY)
MY_PERSON = ap.Person(**ME)
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
"""Helper for removing MongoDB's `_id` field."""
doc = doc.copy()
if "_id" in doc:
del doc["_id"]
return doc
def _answer_key(choice: str) -> str:
h = hashlib.new("sha1")
h.update(choice.encode())
return h.hexdigest()
def _actor_hash(actor: ap.ActivityType) -> str:
"""Used to know when to update the meta actor cache, like an "actor version"."""
h = hashlib.new("sha1")
h.update(actor.id.encode())
h.update((actor.name or "").encode())
h.update((actor.preferredUsername or "").encode())
h.update((actor.summary or "").encode())
h.update((actor.url or "").encode())
key = actor.get_key()
h.update(key.pubkey_pem.encode())
h.update(key.key_id().encode())
if isinstance(actor.icon, dict) and "url" in actor.icon:
h.update(actor.icon["url"].encode())
return h.hexdigest()
def _is_local_reply(create: ap.Create) -> bool:
for dest in _to_list(create.to or []):
if dest.startswith(BASE_URL):
return True
for dest in _to_list(create.cc or []):
if dest.startswith(BASE_URL):
return True
return False
def save(box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
):
object_visibility = ap.get_visibility(activity.get_object()).name
actor_id = activity.get_actor().id
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": actor_id,
"object_id": object_id,
"object_visibility": object_visibility,
"poll_answer": False,
},
}
)
def outbox_is_blocked(actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if outbox_is_blocked(actor.id):
logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
# If the message is coming from a Pleroma relay, we process it as a possible reply for a stream activity
if (
actor.has_type(ap.ActivityType.APPLICATION)
and actor.id.endswith("/relay")
and activity.has_type(ap.ActivityType.ANNOUNCE)
and not find_one_activity(
{
**by_object_id(activity.get_object_id()),
**by_type(ap.ActivityType.CREATE),
}
)
and not DB.replies.find_one(by_remote_id(activity.get_object_id()))
):
Tasks.process_reply(activity.get_object_id())
return
# Hubzilla forward activities in a Create, process them as possible replies
if activity.has_type(ap.ActivityType.CREATE) and server(activity.id) != server(
activity.get_object_id()
):
Tasks.process_reply(activity.get_object_id())
return
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}):
# The activity is already in the inbox
logger.info(f"received duplicate activity {activity!r}, dropping it")
return
save(Box.INBOX, activity)
logger.info(f"spawning tasks for {activity!r}")
if not activity.has_type([ap.ActivityType.DELETE, ap.ActivityType.UPDATE]):
Tasks.cache_actor(activity.id)
Tasks.process_new_activity(activity.id)
Tasks.finish_post_to_inbox(activity.id)
def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None:
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
published = activity.published if activity.published else now()
DB.replies.insert_one(
{
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": activity.get_actor().id,
MetaKey.PUBLISHED.value: published,
**meta,
},
}
)
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
class MicroblogPubBackend(Backend):
"""Implements a Little Boxes backend, backed by MongoDB."""
def base_url(self) -> str:
return BASE_URL
def debug_mode(self) -> bool:
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
def user_agent(self) -> str:
"""Setup a custom user agent."""
return USER_AGENT
def extra_inboxes(self) -> List[str]:
return EXTRA_INBOXES
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
def followers_as_recipients(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
recipients = []
for doc in DB.activities.find(q):
recipients.append(
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
)
return list(set(recipients))
def following(self) -> List[str]:
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
def parse_collection(
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
) -> List[str]:
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
# Resolve internal collections via MongoDB directly
if url == ID + "/followers":
return self.followers()
elif url == ID + "/following":
return self.following()
return super().parse_collection(payload, url)
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
# Shortcut if the instance actor is fetched
if iri == ME["id"]:
return ME
# Internal collecitons handling
# Followers
if iri == MY_PERSON.followers:
followers = []
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
followers.append(data["meta"]["actor_id"])
return {"type": "Collection", "items": followers}
# Following
if iri == MY_PERSON.following:
following = []
for data in DB.activities.find(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
following.append(data["meta"]["object_id"])
return {"type": "Collection", "items": following}
# TODO(tsileo): handle the liked collection too
# Check if the activity is owned by this server
if iri.startswith(BASE_URL):
is_a_note = False
if iri.endswith("/activity"):
iri = iri.replace("/activity", "")
is_a_note = True
data = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
if data and data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
if data and is_a_note:
return data["activity"]["object"]
elif data:
return data["activity"]
else:
# Check if the activity is stored in the inbox
data = DB.activities.find_one({"remote_id": iri})
if data:
if data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
return data["activity"]
# Check if we're looking for an object wrapped in a Create
obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"})
if obj:
if obj["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
return obj["meta"].get("object") or obj["activity"]["object"]
# TODO(tsileo): also check the REPLIES box
# Check if it's cached because it's a follower
# Remove extra info (like the key hash if any)
cleaned_iri = iri.split("#")[0]
actor = DB.activities.find_one(
{"meta.actor_id": cleaned_iri, "meta.actor": {"$exists": True}}
)
# "type" check is here to skip old metadata for "old/buggy" followers
if (
actor
and actor["meta"].get("actor")
and "type" in actor["meta"]["actor"]
):
return actor["meta"]["actor"]
# Check if it's cached because it's a following
actor2 = DB.activities.find_one(
{
"meta.object_id": cleaned_iri,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
)
if (
actor2
and actor2["meta"].get("object")
and "type" in actor2["meta"]["object"]
):
return actor2["meta"]["object"]
reply = DB.replies.find_one(by_remote_id(iri))
if reply:
return reply["activity"]
# Fetch the URL via HTTP
logger.info(f"dereference {iri} via HTTP")
return super().fetch_iri(iri)
def fetch_iri(self, iri: str, **kwargs: Any) -> ap.ObjectType:
if not kwargs.pop("no_cache", False):
# Fetch the activity by checking the local DB first
data = self._fetch_iri(iri)
logger.debug(f"_fetch_iri({iri!r}) == {data!r}")
else:
# Pass the SIG_AUTH to enable "authenticated fetch"
data = super().fetch_iri(iri, auth=SIG_AUTH)
logger.debug(f"fetch_iri({iri!r}) == {data!r}")
return data
def embed_collection(total_items, first_page_id):
"""Helper creating a root OrderedCollection with a link to the first page."""
return {
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"totalItems": total_items,
"first": f"{first_page_id}?page=first",
"id": first_page_id,
}
def simple_build_ordered_collection(col_name, data):
return {
"@context": DEFAULT_CTX,
"id": BASE_URL + "/" + col_name,
"totalItems": len(data),
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"orderedItems": data,
}
def build_ordered_collection(
col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False
):
"""Helper for building an OrderedCollection from a MongoDB query (with pagination support)."""
col_name = col_name or col.name
if q is None:
q = {}
if cursor:
q["_id"] = {"$lt": ObjectId(cursor)}
data = list(col.find(q, limit=limit).sort("_id", -1))
if not data:
# Returns an empty page if there's a cursor
if cursor:
return {
"@context": ap.COLLECTION_CTX,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + cursor,
"partOf": BASE_URL + "/" + col_name,
"totalItems": 0,
"orderedItems": [],
}
return {
"@context": ap.COLLECTION_CTX,
"id": BASE_URL + "/" + col_name,
"totalItems": 0,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"orderedItems": [],
}
start_cursor = str(data[0]["_id"])
next_page_cursor = str(data[-1]["_id"])
total_items = col.find(q).count()
data = [_remove_id(doc) for doc in data]
if map_func:
data = [map_func(doc) for doc in data]
# No cursor, this is the first page and we return an OrderedCollection
if not cursor:
resp = {
"@context": ap.COLLECTION_CTX,
"id": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"first": {
"id": f"{BASE_URL}/{col_name}?cursor={start_cursor}",
"orderedItems": data,
"partOf": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
},
}
if len(data) == limit:
resp["first"]["next"] = (
BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
)
if first_page:
return resp["first"]
return resp
# If there's a cursor, then we return an OrderedCollectionPage
resp = {
"@context": ap.COLLECTION_CTX,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + start_cursor,
"totalItems": total_items,
"partOf": BASE_URL + "/" + col_name,
"orderedItems": data,
}
if len(data) == limit:
resp["next"] = BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
if first_page:
return resp["first"]
# XXX(tsileo): implements prev with prev=<first item cursor>?
return resp
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
activity = raw_doc["activity"]
if (
ap._has_type(activity["type"], ap.ActivityType.CREATE)
and "object" in activity
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
):
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
choice["replies"] = {
"type": ap.ActivityType.COLLECTION.value,
"totalItems": raw_doc["meta"]
.get("question_answers", {})
.get(_answer_key(choice["name"]), 0),
}
now = datetime.now(timezone.utc)
if format_datetime(now) >= activity["object"]["endTime"]:
activity["object"]["closed"] = activity["object"]["endTime"]
def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
if not ap._has_type(raw_doc["activity"]["type"], ap.ActivityType.CREATE.value):
return raw_doc
raw_doc["activity"]["object"]["replies"] = embed_collection(
raw_doc.get("meta", {}).get(MetaKey.COUNT_REPLY.value, 0),
f'{raw_doc["remote_id"]}/replies',
)
raw_doc["activity"]["object"]["likes"] = embed_collection(
raw_doc.get("meta", {}).get(MetaKey.COUNT_LIKE.value, 0),
f'{raw_doc["remote_id"]}/likes',
)
raw_doc["activity"]["object"]["shares"] = embed_collection(
raw_doc.get("meta", {}).get(MetaKey.COUNT_BOOST.value, 0),
f'{raw_doc["remote_id"]}/shares',
)
return raw_doc
def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
if "@context" in activity:
del activity["@context"]
return activity
def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
raw_doc = add_extra_collection(raw_doc)
activity = clean_activity(raw_doc["activity"])
# Handle Questions
# TODO(tsileo): what about object embedded by ID/URL?
_add_answers_to_question(raw_doc)
if embed:
return remove_context(activity)
return activity
def _cache_actor_icon(actor: ap.BaseActivity) -> None:
if actor.icon:
if isinstance(actor.icon, dict) and "url" in actor.icon:
Tasks.cache_actor_icon(actor.icon["url"], actor.id)
else:
logger.warning(f"failed to parse icon {actor.icon} for {actor!r}")
def update_cached_actor(actor: ap.BaseActivity) -> None:
actor_hash = _actor_hash(actor)
update_many_activities(
{
**flag(MetaKey.ACTOR_ID, actor.id),
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{MetaKey.ACTOR: actor.to_dict(embed=True), MetaKey.ACTOR_HASH: actor_hash}
),
)
update_many_activities(
{
**flag(MetaKey.OBJECT_ACTOR_ID, actor.id),
**flag(MetaKey.OBJECT_ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{
MetaKey.OBJECT_ACTOR: actor.to_dict(embed=True),
MetaKey.OBJECT_ACTOR_HASH: actor_hash,
}
),
)
DB.replies.update_many(
{
**flag(MetaKey.ACTOR_ID, actor.id),
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{MetaKey.ACTOR: actor.to_dict(embed=True), MetaKey.ACTOR_HASH: actor_hash}
),
)
# TODO(tsileo): Also update following (it's in the object)
# DB.activities.update_many(
# {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}}
# )
_cache_actor_icon(actor)
def handle_question_reply(create: ap.Create, question: ap.Question) -> None:
choice = create.get_object().name
# Ensure it's a valid choice
if choice not in [c["name"] for c in question._data.get("oneOf", question.anyOf)]:
logger.info("invalid choice")
return
# Hash the choice/answer (so we can use it as a key)
answer_key = _answer_key(choice)
is_single_choice = bool(question._data.get("oneOf", []))
dup_query = {
"activity.object.actor": create.get_actor().id,
"meta.answer_to": question.id,
**({} if is_single_choice else {"meta.poll_answer_choice": choice}),
}
print(f"dup_q={dup_query}")
# Check for duplicate votes
if DB.activities.find_one(dup_query):
logger.info("duplicate response")
return
# Update the DB
DB.activities.update_one(
{**by_object_id(question.id), **by_type(ap.ActivityType.CREATE)},
{
"$inc": {
"meta.question_replies": 1,
f"meta.question_answers.{answer_key}": 1,
}
},
)
DB.activities.update_one(
by_remote_id(create.id),
{
"$set": {
"meta.answer_to": question.id,
"meta.poll_answer_choice": choice,
"meta.stream": False,
"meta.poll_answer": True,
}
},
)
return None
def handle_replies(create: ap.Create) -> None:
"""Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent"
key to make it easy to query a whole thread."""
in_reply_to = create.get_object().get_in_reply_to()
if not in_reply_to:
return
reply = ap.fetch_remote_activity(in_reply_to)
if reply.has_type(ap.ActivityType.CREATE):
reply = reply.get_object()
# FIXME(tsileo): can be a 403 too, in this case what to do? not error at least
# Ensure the this is a local reply, of a question, with a direct "to" addressing
if (
reply.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and _is_local_reply(create)
and not create.is_public()
):
return handle_question_reply(create, reply)
elif (
create.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and not create.is_public()
):
# Keep track of our own votes
DB.activities.update_one(
{"activity.object.id": reply.id, "box": "inbox"},
{
"$set": {
f"meta.poll_answers_sent.{_answer_key(create.get_object().name)}": True
}
},
)
return None
# It's a regular reply, try to increment the reply counter
creply = DB.activities.find_one_and_update(
{**by_object_id(in_reply_to), **by_type(ap.ActivityType.CREATE)},
inc(MetaKey.COUNT_REPLY, 1),
)
if not creply:
# Maybe it's the reply of a reply?
DB.replies.find_one_and_update(
by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1)
)
# Spawn a task to process it (and determine if it needs to be saved)
Tasks.process_reply(create.get_object().id)