From f902868250a7c5b5e12dc4042c22999b4d660516 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 4 Aug 2019 20:08:47 +0200 Subject: [PATCH] Big cleanup part 3 (#59) * Remove dead code and re-organize * Switch to new queries helper --- app.py | 165 +----------- blueprints/admin.py | 49 +++- blueprints/api.py | 8 +- blueprints/tasks.py | 4 +- core/activitypub.py | 623 ++++++++++++-------------------------------- core/feed.py | 125 +++++++++ core/inbox.py | 73 +++--- core/indexes.py | 16 +- core/meta.py | 25 ++ core/outbox.py | 58 +++-- core/shared.py | 82 ++---- 11 files changed, 491 insertions(+), 737 deletions(-) create mode 100644 core/feed.py diff --git a/app.py b/app.py index 9981ed3..14604b1 100644 --- a/app.py +++ b/app.py @@ -3,8 +3,6 @@ import logging import os import traceback from datetime import datetime -from typing import Any -from typing import Dict from urllib.parse import urlparse from bson.objectid import ObjectId @@ -25,9 +23,7 @@ from little_boxes.activitypub import get_backend from little_boxes.errors import ActivityGoneError from little_boxes.errors import Error from little_boxes.httpsig import verify_request -from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_remote_follow_template -from u2flib_server import u2f import blueprints.admin import blueprints.indieauth @@ -37,13 +33,17 @@ import config from blueprints.api import _api_required from blueprints.tasks import TaskError from config import DB -from config import HEADERS from config import ID from config import ME from config import MEDIA_CACHE from config import VERSION from core import activitypub -from core.activitypub import embed_collection +from core import feed +from core.activitypub import activity_from_doc +from core.activitypub import activity_url +from core.activitypub import post_to_inbox +from core.activitypub import post_to_outbox +from core.activitypub import remove_context from core.db import find_one_activity from core.meta import Box from core.meta import MetaKey @@ -51,19 +51,14 @@ from core.meta import _meta from core.meta import by_remote_id from core.meta import in_outbox from core.meta import is_public -from core.shared import MY_PERSON -from core.shared import _add_answers_to_question from core.shared import _build_thread from core.shared import _get_ip -from core.shared import activity_url -from core.shared import back from core.shared import csrf +from core.shared import is_api_request +from core.shared import jsonify from core.shared import login_required from core.shared import noindex from core.shared import paginated_query -from core.shared import post_to_outbox -from core.tasks import Tasks -from utils import now from utils.key import get_secret_key from utils.template_filters import filters @@ -164,29 +159,6 @@ def set_x_powered_by(response): return response -def jsonify(**data): - if "@context" not in data: - data["@context"] = config.DEFAULT_CTX - return Response( - response=json.dumps(data), - headers={ - "Content-Type": "application/json" - if app.debug - else "application/activity+json" - }, - ) - - -def is_api_request(): - h = request.headers.get("Accept") - if h is None: - return False - h = h.split(",")[0] - if h in HEADERS or h == "application/json": - return True - return False - - @app.errorhandler(ValueError) def handle_value_error(error): logger.error( @@ -271,12 +243,9 @@ def serve_uploads(oid, fname): return resp -####### -# Login - - @app.route("/remote_follow", methods=["GET", "POST"]) def remote_follow(): + """Form to allow visitor to perform the remote follow dance.""" if request.method == "GET": return render_template("remote_follow.html") @@ -287,59 +256,8 @@ def remote_follow(): return redirect(get_remote_follow_template(profile).format(uri=ID)) -@app.route("/authorize_follow", methods=["GET", "POST"]) -@login_required -def authorize_follow(): - if request.method == "GET": - return render_template( - "authorize_remote_follow.html", profile=request.args.get("profile") - ) - - actor = get_actor_url(request.form.get("profile")) - if not actor: - abort(500) - - q = { - "box": Box.OUTBOX.value, - "type": ActivityType.FOLLOW.value, - "meta.undo": False, - "activity.object": actor, - } - if DB.activities.count(q) > 0: - return redirect("/following") - - follow = ap.Follow( - actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now() - ) - post_to_outbox(follow) - - return redirect("/following") - - -@app.route("/u2f/register", methods=["GET", "POST"]) -@login_required -def u2f_register(): - # TODO(tsileo): ensure no duplicates - if request.method == "GET": - payload = u2f.begin_registration(ID) - session["challenge"] = payload - return render_template("u2f.html", payload=payload) - else: - resp = json.loads(request.form.get("resp")) - device, device_cert = u2f.complete_registration(session["challenge"], resp) - session["challenge"] = None - DB.u2f.insert_one({"device": device, "cert": device_cert}) - session["logged_in"] = False - return redirect("/login") - - ####### # Activity pub routes -@app.route("/drop_cache") -@login_required -def drop_cache(): - DB.actors.drop() - return "Done" @app.route("/") @@ -469,44 +387,6 @@ def note_by_id(note_id): ) -def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]: - if raw_doc["activity"]["type"] != ActivityType.CREATE.value: - return raw_doc - - raw_doc["activity"]["object"]["replies"] = embed_collection( - raw_doc.get("meta", {}).get("count_direct_reply", 0), - f'{raw_doc["remote_id"]}/replies', - ) - - raw_doc["activity"]["object"]["likes"] = embed_collection( - raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes' - ) - - raw_doc["activity"]["object"]["shares"] = embed_collection( - raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares' - ) - - return raw_doc - - -def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]: - if "@context" in activity: - del activity["@context"] - return activity - - -def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]: - raw_doc = add_extra_collection(raw_doc) - activity = clean_activity(raw_doc["activity"]) - - # Handle Questions - # TODO(tsileo): what about object embedded by ID/URL? - _add_answers_to_question(raw_doc) - if embed: - return remove_context(activity) - return activity - - @app.route("/outbox", methods=["GET", "POST"]) def outbox(): if request.method == "GET": @@ -987,7 +867,7 @@ def liked(): @app.route("/feed.json") def json_feed(): return Response( - response=json.dumps(activitypub.json_feed("/feed.json")), + response=json.dumps(feed.json_feed("/feed.json")), headers={"Content-Type": "application/json"}, ) @@ -995,7 +875,7 @@ def json_feed(): @app.route("/feed.atom") def atom_feed(): return Response( - response=activitypub.gen_feed().atom_str(), + response=feed.gen_feed().atom_str(), headers={"Content-Type": "application/atom+xml"}, ) @@ -1003,27 +883,6 @@ def atom_feed(): @app.route("/feed.rss") def rss_feed(): return Response( - response=activitypub.gen_feed().rss_str(), + response=feed.gen_feed().rss_str(), headers={"Content-Type": "application/rss+xml"}, ) - - -def post_to_inbox(activity: ap.BaseActivity) -> None: - # Check for Block activity - actor = activity.get_actor() - if back.outbox_is_blocked(MY_PERSON, actor.id): - app.logger.info( - f"actor {actor!r} is blocked, dropping the received activity {activity!r}" - ) - return - - if back.inbox_check_duplicate(MY_PERSON, activity.id): - # The activity is already in the inbox - app.logger.info(f"received duplicate activity {activity!r}, dropping it") - return - - back.save(Box.INBOX, activity) - Tasks.process_new_activity(activity.id) - - app.logger.info(f"spawning task for {activity!r}") - Tasks.finish_post_to_inbox(activity.id) diff --git a/blueprints/admin.py b/blueprints/admin.py index 94204bf..457ffb3 100644 --- a/blueprints/admin.py +++ b/blueprints/admin.py @@ -15,6 +15,7 @@ from flask import request from flask import session from flask import url_for from little_boxes import activitypub as ap +from little_boxes.webfinger import get_actor_url from passlib.hash import bcrypt from u2flib_server import u2f @@ -23,6 +24,7 @@ from config import DB from config import ID from config import PASS from core.activitypub import Box +from core.activitypub import post_to_outbox from core.shared import MY_PERSON from core.shared import _build_thread from core.shared import _Response @@ -31,7 +33,6 @@ from core.shared import login_required from core.shared import noindex from core.shared import p from core.shared import paginated_query -from core.shared import post_to_outbox from utils import now from utils.lookup import lookup @@ -412,3 +413,49 @@ def admin_bookmarks() -> _Response: return render_template( tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than ) + + +@blueprint.route("/u2f/register", methods=["GET", "POST"]) +@login_required +def u2f_register(): + # TODO(tsileo): ensure no duplicates + if request.method == "GET": + payload = u2f.begin_registration(ID) + session["challenge"] = payload + return render_template("u2f.html", payload=payload) + else: + resp = json.loads(request.form.get("resp")) + device, device_cert = u2f.complete_registration(session["challenge"], resp) + session["challenge"] = None + DB.u2f.insert_one({"device": device, "cert": device_cert}) + session["logged_in"] = False + return redirect("/login") + + +@blueprint.route("/authorize_follow", methods=["GET", "POST"]) +@login_required +def authorize_follow(): + if request.method == "GET": + return render_template( + "authorize_remote_follow.html", profile=request.args.get("profile") + ) + + actor = get_actor_url(request.form.get("profile")) + if not actor: + abort(500) + + q = { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + "activity.object": actor, + } + if DB.activities.count(q) > 0: + return redirect("/following") + + follow = ap.Follow( + actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now() + ) + post_to_outbox(follow) + + return redirect("/following") diff --git a/blueprints/api.py b/blueprints/api.py index 4a29aa9..45cc598 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -32,16 +32,16 @@ from config import ID from config import JWT from config import MEDIA_CACHE from config import _drop_db -from core import activitypub +from core import feed +from core.activitypub import activity_url +from core.activitypub import post_to_outbox from core.meta import Box from core.meta import MetaKey from core.meta import _meta from core.shared import MY_PERSON from core.shared import _Response -from core.shared import activity_url from core.shared import csrf from core.shared import login_required -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now @@ -587,7 +587,7 @@ def api_debug() -> _Response: def api_stream() -> _Response: return Response( response=json.dumps( - activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor")) + feed.build_inbox_json_feed("/api/stream", request.args.get("cursor")) ), headers={"Content-Type": "application/json"}, ) diff --git a/blueprints/tasks.py b/blueprints/tasks.py index 69e1a01..1939d3d 100644 --- a/blueprints/tasks.py +++ b/blueprints/tasks.py @@ -17,17 +17,17 @@ import config from config import DB from core import gc from core.activitypub import Box +from core.activitypub import _add_answers_to_question +from core.activitypub import post_to_outbox from core.inbox import process_inbox from core.meta import MetaKey from core.meta import _meta from core.notifications import set_inbox_flags from core.outbox import process_outbox from core.shared import MY_PERSON -from core.shared import _add_answers_to_question from core.shared import _Response from core.shared import back from core.shared import p -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now from utils import opengraph diff --git a/core/activitypub.py b/core/activitypub.py index 442d461..53dc539 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -1,24 +1,26 @@ +import binascii import hashlib import logging import os from datetime import datetime +from datetime import timezone from typing import Any from typing import Dict from typing import List from typing import Optional +from urllib.parse import urljoin from urllib.parse import urlparse from bson.objectid import ObjectId from cachetools import LRUCache -from feedgen.feed import FeedGenerator -from html2text import html2text +from flask import url_for from little_boxes import activitypub as ap from little_boxes import strtobool from little_boxes.activitypub import _to_list +from little_boxes.activitypub import clean_activity +from little_boxes.activitypub import format_datetime from little_boxes.backend import Backend from little_boxes.errors import ActivityGoneError -from little_boxes.errors import Error -from little_boxes.errors import NotAnActivityError from config import BASE_URL from config import DB @@ -26,7 +28,6 @@ from config import EXTRA_INBOXES from config import ID from config import ME from config import USER_AGENT -from config import USERNAME from core.meta import Box from core.tasks import Tasks @@ -39,26 +40,6 @@ ACTORS_CACHE = LRUCache(maxsize=256) MY_PERSON = ap.Person(**ME) -def _actor_to_meta(actor: ap.BaseActivity, with_inbox=False) -> Dict[str, Any]: - meta = { - "id": actor.id, - "url": actor.url, - "icon": actor.icon, - "name": actor.name, - "preferredUsername": actor.preferredUsername, - } - if with_inbox: - meta.update( - { - "inbox": actor.inbox, - "sharedInbox": actor._data.get("endpoints", {}).get("sharedInbox"), - } - ) - logger.debug(f"meta={meta}") - - return meta - - def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" doc = doc.copy() @@ -67,17 +48,6 @@ def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: return doc -def ensure_it_is_me(f): - """Method decorator used to track the events fired during tests.""" - - def wrapper(*args, **kwargs): - if args[1].id != ME["id"]: - raise Error("unexpected actor") - return f(*args, **kwargs) - - return wrapper - - def _answer_key(choice: str) -> str: h = hashlib.new("sha1") h.update(choice.encode()) @@ -96,6 +66,109 @@ def _is_local_reply(create: ap.Create) -> bool: return False +def save(box: Box, activity: ap.BaseActivity) -> None: + """Custom helper for saving an activity to the DB.""" + visibility = ap.get_visibility(activity) + is_public = False + if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: + is_public = True + + object_id = None + try: + object_id = activity.get_object_id() + except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object + pass + + object_visibility = None + if activity.has_type( + [ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE] + ): + object_visibility = ap.get_visibility(activity.get_object()).name + + actor_id = activity.get_actor().id + + DB.activities.insert_one( + { + "box": box.value, + "activity": activity.to_dict(), + "type": _to_list(activity.type), + "remote_id": activity.id, + "meta": { + "undo": False, + "deleted": False, + "public": is_public, + "server": urlparse(activity.id).netloc, + "visibility": visibility.name, + "actor_id": actor_id, + "object_id": object_id, + "object_visibility": object_visibility, + "poll_answer": False, + }, + } + ) + + +def outbox_is_blocked(actor_id: str) -> bool: + return bool( + DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.BLOCK.value, + "activity.object": actor_id, + "meta.undo": False, + } + ) + ) + + +def activity_url(item_id: str) -> str: + return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id)) + + +def post_to_inbox(activity: ap.BaseActivity) -> None: + # Check for Block activity + actor = activity.get_actor() + if outbox_is_blocked(actor.id): + logger.info( + f"actor {actor!r} is blocked, dropping the received activity {activity!r}" + ) + return + + if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}): + # The activity is already in the inbox + logger.info(f"received duplicate activity {activity!r}, dropping it") + return + + save(Box.INBOX, activity) + Tasks.process_new_activity(activity.id) + + logger.info(f"spawning task for {activity!r}") + Tasks.finish_post_to_inbox(activity.id) + + +def post_to_outbox(activity: ap.BaseActivity) -> str: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + # Assign create a random ID + obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8") + uri = activity_url(obj_id) + activity._data["id"] = uri + if activity.has_type(ap.ActivityType.CREATE): + activity._data["object"]["id"] = urljoin( + BASE_URL, url_for("outbox_activity", item_id=obj_id) + ) + activity._data["object"]["url"] = urljoin( + BASE_URL, url_for("note_by_id", note_id=obj_id) + ) + activity.reset_object_cache() + + save(Box.OUTBOX, activity) + Tasks.cache_actor(activity.id) + Tasks.finish_post_to_outbox(activity.id) + return activity.id + + class MicroblogPubBackend(Backend): """Implements a Little Boxes backend, backed by MongoDB.""" @@ -112,47 +185,6 @@ class MicroblogPubBackend(Backend): def extra_inboxes(self) -> List[str]: return EXTRA_INBOXES - def save(self, box: Box, activity: ap.BaseActivity) -> None: - """Custom helper for saving an activity to the DB.""" - visibility = ap.get_visibility(activity) - is_public = False - if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: - is_public = True - - object_id = None - try: - object_id = activity.get_object_id() - except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object - pass - - object_visibility = None - if activity.has_type( - [ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE] - ): - object_visibility = ap.get_visibility(activity.get_object()).name - - actor_id = activity.get_actor().id - - DB.activities.insert_one( - { - "box": box.value, - "activity": activity.to_dict(), - "type": _to_list(activity.type), - "remote_id": activity.id, - "meta": { - "undo": False, - "deleted": False, - "public": is_public, - "server": urlparse(activity.id).netloc, - "visibility": visibility.name, - "actor_id": actor_id, - "object_id": object_id, - "object_visibility": object_visibility, - "poll_answer": False, - }, - } - ) - def followers(self) -> List[str]: q = { "box": Box.INBOX.value, @@ -195,19 +227,6 @@ class MicroblogPubBackend(Backend): return super().parse_collection(payload, url) - @ensure_it_is_me - def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: - return bool( - DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.BLOCK.value, - "activity.object": actor_id, - "meta.undo": False, - } - ) - ) - def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901 # Shortcut if the instance actor is fetched if iri == ME["id"]: @@ -317,259 +336,9 @@ class MicroblogPubBackend(Backend): return data - @ensure_it_is_me - def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: - return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri})) - def set_post_to_remote_inbox(self, cb): self.post_to_remote_inbox_cb = cb - @ensure_it_is_me - def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: - DB.activities.update_one( - {"remote_id": follow.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: - DB.activities.update_one( - {"remote_id": follow.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"box": Box.OUTBOX.value, "activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}}, - ) - - @ensure_it_is_me - def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"box": Box.OUTBOX.value, "activity.object.id": obj.id}, - {"$inc": {"meta.count_like": -1}}, - ) - DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}}, - ) - - @ensure_it_is_me - def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}}, - ) - DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else - # or remove it? - try: - obj = announce.get_object() - except NotAnActivityError: - logger.exception( - f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message' - ) - return - - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": _actor_to_meta(obj.get_actor()), - } - }, - ) - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}} - ) - - @ensure_it_is_me - def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}} - ) - DB.activities.update_one( - {"remote_id": announce.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": _actor_to_meta(obj.get_actor()), - } - }, - ) - - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}} - ) - - @ensure_it_is_me - def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}} - ) - DB.activities.update_one( - {"remote_id": announce.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: - obj_id = delete.get_object_id() - logger.debug("delete object={obj_id}") - try: - obj = ap.fetch_remote_activity(obj_id) - logger.info(f"inbox_delete handle_replies obj={obj!r}") - in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None - if obj.has_type(ap.CREATE_TYPES): - in_reply_to = ap._get_id( - DB.activities.find_one( - {"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value} - )["activity"]["object"].get("inReplyTo") - ) - if in_reply_to: - self._handle_replies_delete(as_actor, in_reply_to) - except Exception: - logger.exception(f"failed to handle delete replies for {obj_id}") - - DB.activities.update_one( - {"meta.object_id": obj_id, "type": "Create"}, - {"$set": {"meta.deleted": True}}, - ) - - # Foce undo other related activities - DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: - DB.activities.update( - {"meta.object_id": delete.get_object_id()}, - {"$set": {"meta.deleted": True, "meta.undo": True}}, - ) - obj = delete.get_object() - if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE: - obj = ap.parse_activity( - DB.activities.find_one( - { - "activity.object.id": delete.get_object().id, - "type": ap.ActivityType.CREATE.value, - } - )["activity"] - ).get_object() - - self._handle_replies_delete(as_actor, obj.get_in_reply_to()) - - @ensure_it_is_me - def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: - obj = update.get_object() - if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$set": {"activity.object": obj.to_dict()}}, - ) - elif obj.has_type(ap.ActivityType.QUESTION): - choices = obj._data.get("oneOf", obj.anyOf) - total_replies = 0 - _set = {} - for choice in choices: - answer_key = _answer_key(choice["name"]) - cnt = choice["replies"]["totalItems"] - total_replies += cnt - _set[f"meta.question_answers.{answer_key}"] = cnt - - _set["meta.question_replies"] = total_replies - - DB.activities.update_one( - {"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set} - ) - # Also update the cached copies of the question (like Announce and Like) - DB.activities.update_many( - {"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}} - ) - - # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor - - @ensure_it_is_me - def outbox_update(self, as_actor: ap.Person, _update: ap.Update) -> None: - obj = _update._data["object"] - - update_prefix = "activity.object." - update: Dict[str, Any] = {"$set": dict(), "$unset": dict()} - update["$set"][f"{update_prefix}updated"] = ( - datetime.utcnow().replace(microsecond=0).isoformat() + "Z" - ) - for k, v in obj.items(): - if k in ["id", "type"]: - continue - if v is None: - update["$unset"][f"{update_prefix}{k}"] = "" - else: - update["$set"][f"{update_prefix}{k}"] = v - - if len(update["$unset"]) == 0: - del update["$unset"] - - print(f"updating note from outbox {obj!r} {update}") - logger.info(f"updating note from outbox {obj!r} {update}") - DB.activities.update_one({"activity.object.id": obj["id"]}, update) - # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients - # (create a new Update with the result of the update, and send it without saving it?) - - @ensure_it_is_me - def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: - obj = create.get_object() - - # Flag the activity as a poll answer if needed - print(f"POLL ANSWER ChECK {obj.get_in_reply_to()} {obj.name} {obj.content}") - if obj.get_in_reply_to() and obj.name and not obj.content: - DB.activities.update_one( - {"remote_id": create.id}, {"$set": {"meta.poll_answer": True}} - ) - - self._handle_replies(as_actor, create) - - @ensure_it_is_me - def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: - # If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the - # local copy) - question = create.get_object() - if question.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(question) - - self._handle_replies(as_actor, create) - - @ensure_it_is_me def _handle_replies_delete( self, as_actor: ap.Person, in_reply_to: Optional[str] ) -> None: @@ -627,7 +396,6 @@ class MicroblogPubBackend(Backend): return None - @ensure_it_is_me def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None: """Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent" key to make it easy to query a whole thread.""" @@ -666,7 +434,7 @@ class MicroblogPubBackend(Backend): ) if not creply: # It means the activity is not in the inbox, and not in the outbox, we want to save it - self.save(Box.REPLIES, reply) + save(Box.REPLIES, reply) new_threads.append(reply.id) # TODO(tsileo): parses the replies collection and import the replies? @@ -678,7 +446,7 @@ class MicroblogPubBackend(Backend): reply = ap.fetch_remote_activity(root_reply) q = {"activity.object.id": root_reply} if not DB.activities.count(q): - self.save(Box.REPLIES, reply) + save(Box.REPLIES, reply) new_threads.append(reply.id) DB.activities.update_one( @@ -690,118 +458,6 @@ class MicroblogPubBackend(Backend): ) -def gen_feed(): - fg = FeedGenerator() - fg.id(f"{ID}") - fg.title(f"{USERNAME} notes") - fg.author({"name": USERNAME, "email": "t@a4.io"}) - fg.link(href=ID, rel="alternate") - fg.description(f"{USERNAME} notes") - fg.logo(ME.get("icon", {}).get("url")) - fg.language("en") - for item in DB.activities.find( - { - "box": Box.OUTBOX.value, - "type": "Create", - "meta.deleted": False, - "meta.public": True, - }, - limit=10, - ).sort("_id", -1): - fe = fg.add_entry() - fe.id(item["activity"]["object"].get("url")) - fe.link(href=item["activity"]["object"].get("url")) - fe.title(item["activity"]["object"]["content"]) - fe.description(item["activity"]["object"]["content"]) - return fg - - -def json_feed(path: str) -> Dict[str, Any]: - """JSON Feed (https://jsonfeed.org/) document.""" - data = [] - for item in DB.activities.find( - { - "box": Box.OUTBOX.value, - "type": "Create", - "meta.deleted": False, - "meta.public": True, - }, - limit=10, - ).sort("_id", -1): - data.append( - { - "id": item["activity"]["id"], - "url": item["activity"]["object"].get("url"), - "content_html": item["activity"]["object"]["content"], - "content_text": html2text(item["activity"]["object"]["content"]), - "date_published": item["activity"]["object"].get("published"), - } - ) - return { - "version": "https://jsonfeed.org/version/1", - "user_comment": ( - "This is a microblog feed. You can add this to your feed reader using the following URL: " - + ID - + path - ), - "title": USERNAME, - "home_page_url": ID, - "feed_url": ID + path, - "author": { - "name": USERNAME, - "url": ID, - "avatar": ME.get("icon", {}).get("url"), - }, - "items": data, - } - - -def build_inbox_json_feed( - path: str, request_cursor: Optional[str] = None -) -> Dict[str, Any]: - """Build a JSON feed from the inbox activities.""" - data = [] - cursor = None - - q: Dict[str, Any] = { - "type": "Create", - "meta.deleted": False, - "box": Box.INBOX.value, - } - if request_cursor: - q["_id"] = {"$lt": request_cursor} - - for item in DB.activities.find(q, limit=50).sort("_id", -1): - actor = ap.get_backend().fetch_iri(item["activity"]["actor"]) - data.append( - { - "id": item["activity"]["id"], - "url": item["activity"]["object"].get("url"), - "content_html": item["activity"]["object"]["content"], - "content_text": html2text(item["activity"]["object"]["content"]), - "date_published": item["activity"]["object"].get("published"), - "author": { - "name": actor.get("name", actor.get("preferredUsername")), - "url": actor.get("url"), - "avatar": actor.get("icon", {}).get("url"), - }, - } - ) - cursor = str(item["_id"]) - - resp = { - "version": "https://jsonfeed.org/version/1", - "title": f"{USERNAME}'s stream", - "home_page_url": ID, - "feed_url": ID + path, - "items": data, - } - if cursor and len(data) == 50: - resp["next_url"] = ID + path + "?cursor=" + cursor - - return resp - - def embed_collection(total_items, first_page_id): """Helper creating a root OrderedCollection with a link to the first page.""" return { @@ -905,3 +561,60 @@ def build_ordered_collection( # XXX(tsileo): implements prev with prev=? return resp + + +def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None: + activity = raw_doc["activity"] + if ( + ap._has_type(activity["type"], ap.ActivityType.CREATE) + and "object" in activity + and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION) + ): + for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")): + choice["replies"] = { + "type": ap.ActivityType.COLLECTION.value, + "totalItems": raw_doc["meta"] + .get("question_answers", {}) + .get(_answer_key(choice["name"]), 0), + } + now = datetime.now(timezone.utc) + if format_datetime(now) >= activity["object"]["endTime"]: + activity["object"]["closed"] = activity["object"]["endTime"] + + +def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]: + if raw_doc["activity"]["type"] != ap.ActivityType.CREATE.value: + return raw_doc + + raw_doc["activity"]["object"]["replies"] = embed_collection( + raw_doc.get("meta", {}).get("count_direct_reply", 0), + f'{raw_doc["remote_id"]}/replies', + ) + + raw_doc["activity"]["object"]["likes"] = embed_collection( + raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes' + ) + + raw_doc["activity"]["object"]["shares"] = embed_collection( + raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares' + ) + + return raw_doc + + +def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]: + if "@context" in activity: + del activity["@context"] + return activity + + +def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]: + raw_doc = add_extra_collection(raw_doc) + activity = clean_activity(raw_doc["activity"]) + + # Handle Questions + # TODO(tsileo): what about object embedded by ID/URL? + _add_answers_to_question(raw_doc) + if embed: + return remove_context(activity) + return activity diff --git a/core/feed.py b/core/feed.py new file mode 100644 index 0000000..7082821 --- /dev/null +++ b/core/feed.py @@ -0,0 +1,125 @@ +from typing import Any +from typing import Dict +from typing import Optional + +from feedgen.feed import FeedGenerator +from html2text import html2text +from little_boxes import activitypub as ap + +from config import ID +from config import ME +from config import USERNAME +from core.db import DB +from core.meta import Box + + +def gen_feed(): + fg = FeedGenerator() + fg.id(f"{ID}") + fg.title(f"{USERNAME} notes") + fg.author({"name": USERNAME, "email": "t@a4.io"}) + fg.link(href=ID, rel="alternate") + fg.description(f"{USERNAME} notes") + fg.logo(ME.get("icon", {}).get("url")) + fg.language("en") + for item in DB.activities.find( + { + "box": Box.OUTBOX.value, + "type": "Create", + "meta.deleted": False, + "meta.public": True, + }, + limit=10, + ).sort("_id", -1): + fe = fg.add_entry() + fe.id(item["activity"]["object"].get("url")) + fe.link(href=item["activity"]["object"].get("url")) + fe.title(item["activity"]["object"]["content"]) + fe.description(item["activity"]["object"]["content"]) + return fg + + +def json_feed(path: str) -> Dict[str, Any]: + """JSON Feed (https://jsonfeed.org/) document.""" + data = [] + for item in DB.activities.find( + { + "box": Box.OUTBOX.value, + "type": "Create", + "meta.deleted": False, + "meta.public": True, + }, + limit=10, + ).sort("_id", -1): + data.append( + { + "id": item["activity"]["id"], + "url": item["activity"]["object"].get("url"), + "content_html": item["activity"]["object"]["content"], + "content_text": html2text(item["activity"]["object"]["content"]), + "date_published": item["activity"]["object"].get("published"), + } + ) + return { + "version": "https://jsonfeed.org/version/1", + "user_comment": ( + "This is a microblog feed. You can add this to your feed reader using the following URL: " + + ID + + path + ), + "title": USERNAME, + "home_page_url": ID, + "feed_url": ID + path, + "author": { + "name": USERNAME, + "url": ID, + "avatar": ME.get("icon", {}).get("url"), + }, + "items": data, + } + + +def build_inbox_json_feed( + path: str, request_cursor: Optional[str] = None +) -> Dict[str, Any]: + """Build a JSON feed from the inbox activities.""" + data = [] + cursor = None + + q: Dict[str, Any] = { + "type": "Create", + "meta.deleted": False, + "box": Box.INBOX.value, + } + if request_cursor: + q["_id"] = {"$lt": request_cursor} + + for item in DB.activities.find(q, limit=50).sort("_id", -1): + actor = ap.get_backend().fetch_iri(item["activity"]["actor"]) + data.append( + { + "id": item["activity"]["id"], + "url": item["activity"]["object"].get("url"), + "content_html": item["activity"]["object"]["content"], + "content_text": html2text(item["activity"]["object"]["content"]), + "date_published": item["activity"]["object"].get("published"), + "author": { + "name": actor.get("name", actor.get("preferredUsername")), + "url": actor.get("url"), + "avatar": actor.get("icon", {}).get("url"), + }, + } + ) + cursor = str(item["_id"]) + + resp = { + "version": "https://jsonfeed.org/version/1", + "title": f"{USERNAME}'s stream", + "home_page_url": ID, + "feed_url": ID + path, + "items": data, + } + if cursor and len(data) == 50: + resp["next_url"] = ID + path + "?cursor=" + cursor + + return resp diff --git a/core/inbox.py b/core/inbox.py index 376f694..2b17a2a 100644 --- a/core/inbox.py +++ b/core/inbox.py @@ -8,11 +8,18 @@ from little_boxes.errors import NotAnActivityError import config from core.activitypub import _answer_key +from core.activitypub import post_to_outbox from core.db import DB -from core.meta import Box +from core.db import update_one_activity +from core.meta import MetaKey +from core.meta import by_object_id +from core.meta import by_remote_id +from core.meta import by_type +from core.meta import in_inbox +from core.meta import inc +from core.meta import upsert from core.shared import MY_PERSON from core.shared import back -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now @@ -47,12 +54,13 @@ def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None: except Exception: _logger.exception(f"failed to handle delete replies for {obj_id}") - DB.activities.update_one( - {"meta.object_id": obj_id, "type": "Create"}, {"$set": {"meta.deleted": True}} + update_one_activity( + {**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)}, + upsert({MetaKey.DELETED: True}), ) # Foce undo other related activities - DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}}) + DB.activities.update(by_object_id(obj_id), upsert({MetaKey.UNDO: True})) @process_inbox.register @@ -60,7 +68,7 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None: _logger.info(f"process_inbox activity={update!r}") obj = update.get_object() if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: - DB.activities.update_one( + update_one_activity( {"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}} ) elif obj.has_type(ap.ActivityType.QUESTION): @@ -75,12 +83,10 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None: _set["meta.question_replies"] = total_replies - DB.activities.update_one( - {"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set} - ) + update_one_activity({**in_inbox(), **by_object_id(obj.id)}, {"$set": _set}) # Also update the cached copies of the question (like Announce and Like) DB.activities.update_many( - {"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}} + by_object_id(obj.id), upsert({MetaKey.OBJECT: obj.to_dict()}) ) # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor @@ -114,17 +120,18 @@ def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None: if obj.has_type(ap.ActivityType.QUESTION): Tasks.fetch_remote_question(obj) - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": obj.get_actor().to_dict(embed=True), + update_one_activity( + by_remote_id(announce.id), + upsert( + { + MetaKey.OBJECT: obj.to_dict(embed=True), + MetaKey.OBJECT_ACTOR: obj.get_actor().to_dict(embed=True), } - }, + ), ) - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}} + update_one_activity( + {**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)}, + inc(MetaKey.COUNT_BOOST, 1), ) @@ -133,9 +140,9 @@ def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None: _logger.info(f"process_inbox activity={like!r}") obj = like.get_object() # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"box": Box.OUTBOX.value, "activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}}, + update_one_activity( + {**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)}, + inc(MetaKey.COUNT_LIKE, 1), ) @@ -161,21 +168,23 @@ def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None: @process_inbox.register def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None: _logger.info(f"process_inbox activity={activity!r}") + # Fetch the object that's been undo'ed obj = activity.get_object() - DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}}) + + # Set the undo flag on the mentionned activity + update_one_activity(by_remote_id(obj.id), upsert({MetaKey.UNDO: True})) + + # Handle cached counters if obj.has_type(ap.ActivityType.LIKE): # Update the meta counter if the object is published by the server - DB.activities.update_one( - { - "box": Box.OUTBOX.value, - "meta.object_id": obj.get_object_id(), - "type": ap.ActivityType.CREATE.value, - }, - {"$inc": {"meta.count_like": -1}}, + update_one_activity( + {**by_object_id(obj.get_object_id()), **by_type(ap.ActivityType.CREATE)}, + inc(MetaKey.COUNT_LIKE, -1), ) elif obj.has_type(ap.ActivityType.ANNOUNCE): announced = obj.get_object() # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"activity.object.id": announced.id}, {"$inc": {"meta.count_boost": -1}} + update_one_activity( + {**by_type(ap.ActivityType.CREATE), **by_object_id(announced.id)}, + inc(MetaKey.COUNT_BOOST, -1), ) diff --git a/core/indexes.py b/core/indexes.py index f294cac..9f5a7db 100644 --- a/core/indexes.py +++ b/core/indexes.py @@ -9,13 +9,16 @@ def create_indexes(): if "trash" not in DB.collection_names(): DB.create_collection("trash", capped=True, size=50 << 20) # 50 MB - DB.command("compact", "activities") + if "activities" in DB.collection_names(): + DB.command("compact", "activities") + DB.activities.create_index([(_meta(MetaKey.NOTIFICATION), pymongo.ASCENDING)]) DB.activities.create_index( [(_meta(MetaKey.NOTIFICATION_UNREAD), pymongo.ASCENDING)] ) DB.activities.create_index([("remote_id", pymongo.ASCENDING)]) - DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)]) + DB.activities.create_index([("meta.actor_id", pymongo.ASCENDING)]) + DB.activities.create_index([("meta.object_id", pymongo.ASCENDING)]) DB.activities.create_index([("meta.thread_root_parent", pymongo.ASCENDING)]) DB.activities.create_index( [ @@ -26,14 +29,9 @@ def create_indexes(): DB.activities.create_index( [("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)] ) - DB.cache2.create_index( - [ - ("path", pymongo.ASCENDING), - ("type", pymongo.ASCENDING), - ("arg", pymongo.ASCENDING), - ] + DB.activities.create_index( + [("meta.object_id", pymongo.ASCENDING), ("type", pymongo.ASCENDING)] ) - DB.cache2.create_index("date", expireAfterSeconds=3600 * 12) # Index for the block query DB.activities.create_index( diff --git a/core/meta.py b/core/meta.py index 4fc0922..95d7840 100644 --- a/core/meta.py +++ b/core/meta.py @@ -27,9 +27,17 @@ class MetaKey(Enum): PUBLISHED = "published" GC_KEEP = "gc_keep" OBJECT = "object" + OBJECT_ID = "object_id" OBJECT_ACTOR = "object_actor" PUBLIC = "public" + DELETED = "deleted" + BOOSTED = "boosted" + LIKED = "liked" + + COUNT_LIKE = "count_like" + COUNT_BOOST = "count_boost" + def _meta(mk: MetaKey) -> str: return f"meta.{mk.value}" @@ -59,5 +67,22 @@ def by_actor(actor: ap.BaseActivity) -> _SubQuery: return {_meta(MetaKey.ACTOR_ID): actor.id} +def by_object_id(object_id: str) -> _SubQuery: + return {_meta(MetaKey.OBJECT_ID): object_id} + + def is_public() -> _SubQuery: return {_meta(MetaKey.PUBLIC): True} + + +def inc(mk: MetaKey, val: int) -> _SubQuery: + return {"$inc": {_meta(mk): val}} + + +def upsert(data: Dict[MetaKey, Any]) -> _SubQuery: + sq: Dict[str, Any] = {} + + for mk, val in data.items(): + sq[_meta(mk)] = val + + return {"$set": sq} diff --git a/core/outbox.py b/core/outbox.py index 8b4af72..f29bc37 100644 --- a/core/outbox.py +++ b/core/outbox.py @@ -6,9 +6,15 @@ from typing import Dict from little_boxes import activitypub as ap -from core.db import DB from core.db import find_one_activity from core.db import update_many_activities +from core.db import update_one_activity +from core.meta import MetaKey +from core.meta import by_object_id +from core.meta import by_remote_id +from core.meta import by_type +from core.meta import inc +from core.meta import upsert from core.shared import MY_PERSON from core.shared import back from core.tasks import Tasks @@ -31,13 +37,13 @@ def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None: # Flag everything referencing the deleted object as deleted (except the Delete activity itself) update_many_activities( - {"meta.object_id": obj_id, "remote_id": {"$ne": delete.id}}, - {"$set": {"meta.deleted": True, "meta.undo": True}}, + {**by_object_id(obj_id), "remote_id": {"$ne": delete.id}}, + upsert({MetaKey.DELETED: True, MetaKey.UNDO: True}), ) # If the deleted activity was in DB, decrease some threads-related counter data = find_one_activity( - {"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value} + {**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)} ) _logger.info(f"found local copy of deleted activity: {data}") if data: @@ -45,8 +51,8 @@ def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None: _logger.info(f"obj={obj!r}") in_reply_to = obj.get_in_reply_to() if in_reply_to: - DB.activities.update_one( - {"activity.object.id": in_reply_to}, + update_one_activity( + {**by_type(ap.ActivityType.CREATE), **by_object_id(in_reply_to)}, {"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}}, ) @@ -74,7 +80,7 @@ def _update_process_outbox(update: ap.Update, new_meta: _NewMeta) -> None: del to_update["$unset"] _logger.info(f"updating note from outbox {obj!r} {to_update}") - DB.activities.update_one({"activity.object.id": obj["id"]}, to_update) + update_one_activity({"activity.object.id": obj["id"]}, to_update) # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients # (create a new Update with the result of the update, and send it without saving it?) @@ -93,18 +99,19 @@ def _announce_process_outbox(announce: ap.Announce, new_meta: _NewMeta) -> None: if obj.has_type(ap.ActivityType.QUESTION): Tasks.fetch_remote_question(obj) - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": obj.get_actor().to_dict(embed=True), + update_one_activity( + by_remote_id(announce.id), + upsert( + { + MetaKey.OBJECT: obj.to_dict(embed=True), + MetaKey.OBJECT_ACTOR: obj.get_actor().to_dict(embed=True), } - }, + ), ) - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}} + update_one_activity( + {**by_object_id(obj.id), **by_type(ap.ActivityType.CREATE)}, + upsert({MetaKey.BOOSTED: announce.id}), ) @@ -116,9 +123,9 @@ def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None: if obj.has_type(ap.ActivityType.QUESTION): Tasks.fetch_remote_question(obj) - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}}, + update_one_activity( + {**by_object_id(obj.id), **by_type(ap.ActivityType.CREATE)}, + {**inc(MetaKey.COUNT_LIKE, 1), **upsert({MetaKey.LIKED: like.id})}, ) @@ -126,20 +133,21 @@ def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None: def _undo_process_outbox(undo: ap.Undo, new_meta: _NewMeta) -> None: _logger.info(f"process_outbox activity={undo!r}") obj = undo.get_object() - DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}}) + update_one_activity({"remote_id": obj.id}, {"$set": {"meta.undo": True}}) # Undo Like if obj.has_type(ap.ActivityType.LIKE): liked = obj.get_object_id() - DB.activities.update_one( - {"activity.object.id": liked}, - {"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}}, + update_one_activity( + {**by_object_id(liked), **by_type(ap.ActivityType.CREATE)}, + {**inc(MetaKey.COUNT_LIKE, -1), **upsert({MetaKey.LIKED: False})}, ) elif obj.has_type(ap.ActivityType.ANNOUNCE): announced = obj.get_object_id() - DB.activities.update_one( - {"activity.object.id": announced}, {"$set": {"meta.boosted": False}} + update_one_activity( + {**by_object_id(announced), **by_type(ap.ActivityType.CREATE)}, + upsert({MetaKey.BOOSTED: False}), ) # Undo Follow (undo new following) diff --git a/core/shared.py b/core/shared.py index 98e2629..b649a4e 100644 --- a/core/shared.py +++ b/core/shared.py @@ -1,14 +1,11 @@ -import binascii +import json import os -from datetime import datetime -from datetime import timezone from functools import wraps from typing import Any -from typing import Dict -from urllib.parse import urljoin import flask from bson.objectid import ObjectId +from flask import Response from flask import current_app as app from flask import redirect from flask import request @@ -16,16 +13,12 @@ from flask import session from flask import url_for from flask_wtf.csrf import CSRFProtect from little_boxes import activitypub as ap -from little_boxes.activitypub import format_datetime from poussetaches import PousseTaches -from config import BASE_URL +import config from config import DB from config import ME from core import activitypub -from core.activitypub import _answer_key -from core.meta import Box -from core.tasks import Tasks # _Response = Union[flask.Response, werkzeug.wrappers.Response, str, Any] _Response = Any @@ -45,6 +38,29 @@ ap.use_backend(back) MY_PERSON = ap.Person(**ME) +def jsonify(**data): + if "@context" not in data: + data["@context"] = config.DEFAULT_CTX + return Response( + response=json.dumps(data), + headers={ + "Content-Type": "application/json" + if app.debug + else "application/activity+json" + }, + ) + + +def is_api_request(): + h = request.headers.get("Accept") + if h is None: + return False + h = h.split(",")[0] + if h in config.HEADERS or h == "application/json": + return True + return False + + def add_response_headers(headers={}): """This decorator adds the headers passed in to the response""" @@ -94,33 +110,6 @@ def _get_ip(): return ip, geoip -def activity_url(item_id: str) -> str: - return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id)) - - -def post_to_outbox(activity: ap.BaseActivity) -> str: - if activity.has_type(ap.CREATE_TYPES): - activity = activity.build_create() - - # Assign create a random ID - obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8") - uri = activity_url(obj_id) - activity._data["id"] = uri - if activity.has_type(ap.ActivityType.CREATE): - activity._data["object"]["id"] = urljoin( - BASE_URL, url_for("outbox_activity", item_id=obj_id) - ) - activity._data["object"]["url"] = urljoin( - BASE_URL, url_for("note_by_id", note_id=obj_id) - ) - activity.reset_object_cache() - - back.save(Box.OUTBOX, activity) - Tasks.cache_actor(activity.id) - Tasks.finish_post_to_outbox(activity.id) - return activity.id - - def _build_thread(data, include_children=True): # noqa: C901 data["_requested"] = True app.logger.info(f"_build_thread({data!r})") @@ -225,22 +214,3 @@ def paginated_query(db, q, limit=25, sort_key="_id"): older_than = str(outbox_data[-1]["_id"]) return outbox_data, older_than, newer_than - - -def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None: - activity = raw_doc["activity"] - if ( - ap._has_type(activity["type"], ap.ActivityType.CREATE) - and "object" in activity - and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION) - ): - for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")): - choice["replies"] = { - "type": ap.ActivityType.COLLECTION.value, - "totalItems": raw_doc["meta"] - .get("question_answers", {}) - .get(_answer_key(choice["name"]), 0), - } - now = datetime.now(timezone.utc) - if format_datetime(now) >= activity["object"]["endTime"]: - activity["object"]["closed"] = activity["object"]["endTime"]