From 3289e91786e8bbef99d90e101090e156f1ab03ae Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 11:35:48 +0200 Subject: [PATCH] Try poussetaches --- .travis.yml | 1 + app.py | 646 ++++++++++++++++++++++++++++++++++++--- config.py | 17 +- docker-compose-tests.yml | 11 +- docker-compose.yml | 6 + poussetaches.py | 48 +++ tasks.py | 1 + 7 files changed, 673 insertions(+), 57 deletions(-) create mode 100644 poussetaches.py diff --git a/.travis.yml b/.travis.yml index 2e8ab86..db184e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,7 @@ install: - sudo chmod +x /usr/local/bin/docker-compose - docker-compose --version - pip install -r dev-requirements.txt + - git clone https://github.com/tsileo/poussetaches.git && cd poussetaches && docker build . -t poussetaches:latest && cd - script: - mypy --ignore-missing-imports . - flake8 activitypub.py diff --git a/app.py b/app.py index 0730813..c9ac063 100644 --- a/app.py +++ b/app.py @@ -16,6 +16,8 @@ from typing import Tuple from urllib.parse import urlencode from urllib.parse import urlparse +from requests.exceptions import HTTPError +import requests import bleach import mf2py import pymongo @@ -41,7 +43,10 @@ from little_boxes.activitypub import _to_list from little_boxes.activitypub import clean_activity from little_boxes.activitypub import get_backend from little_boxes.content_helper import parse_markdown +from little_boxes.linked_data_sig import generate_signature from little_boxes.errors import ActivityGoneError +from little_boxes.errors import NotAnActivityError +from little_boxes.errors import BadActivityError from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import Error from little_boxes.errors import NotFromOutboxError @@ -49,15 +54,18 @@ from little_boxes.httpsig import HTTPSigAuth from little_boxes.httpsig import verify_request from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_remote_follow_template +from utils import opengraph from passlib.hash import bcrypt from u2flib_server import u2f from werkzeug.utils import secure_filename import activitypub import config -import tasks + +# import tasks from activitypub import Box from activitypub import embed_collection +from config import USER_AGENT from config import ADMIN_API_KEY from config import BASE_URL from config import DB @@ -78,6 +86,11 @@ from utils.key import get_secret_key from utils.lookup import lookup from utils.media import Kind +from poussetaches import PousseTaches + +p = PousseTaches("http://poussetaches:7991", "http://web:5005") + + back = activitypub.MicroblogPubBackend() ap.use_backend(back) @@ -191,7 +204,7 @@ ALLOWED_TAGS = [ def clean_html(html): try: return bleach.clean(html, tags=ALLOWED_TAGS) - except: + except Exception: return "" @@ -631,7 +644,7 @@ def authorize_follow(): return redirect("/following") follow = ap.Follow(actor=MY_PERSON.id, object=actor) - tasks.post_to_outbox(follow) + post_to_outbox(follow) return redirect("/following") @@ -758,7 +771,7 @@ def tmp_migrate4(): @login_required def tmp_migrate5(): for activity in DB.activities.find(): - tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False) + Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False) return "Done" @@ -835,9 +848,10 @@ def _get_cached(type_="html", arg=None): cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg}) if cached: app.logger.info("from cache") - return cached['response_data'] + return cached["response_data"] return None + def _cache(resp, type_="html", arg=None): if not CACHING: return None @@ -855,7 +869,9 @@ def _cache(resp, type_="html", arg=None): def index(): if is_api_request(): return jsonify(**ME) - cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}" + cache_arg = ( + f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}" + ) cached = _get_cached("html", cache_arg) if cached: return cached @@ -1053,22 +1069,22 @@ def nodeinfo(): } response = json.dumps( - { - "version": "2.0", - "software": { - "name": "microblogpub", - "version": f"Microblog.pub {VERSION}", - }, - "protocols": ["activitypub"], - "services": {"inbound": [], "outbound": []}, - "openRegistrations": False, - "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, - "metadata": { - "sourceCode": "https://github.com/tsileo/microblog.pub", - "nodeName": f"@{USERNAME}@{DOMAIN}", - }, - } - ) + { + "version": "2.0", + "software": { + "name": "microblogpub", + "version": f"Microblog.pub {VERSION}", + }, + "protocols": ["activitypub"], + "services": {"inbound": [], "outbound": []}, + "openRegistrations": False, + "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, + "metadata": { + "sourceCode": "https://github.com/tsileo/microblog.pub", + "nodeName": f"@{USERNAME}@{DOMAIN}", + }, + } + ) if not cached: _cache(response, "api") @@ -1197,7 +1213,7 @@ def outbox(): data = request.get_json(force=True) print(data) activity = ap.parse_activity(data) - activity_id = tasks.post_to_outbox(activity) + activity_id = post_to_outbox(activity) return Response(status=201, headers={"Location": activity_id}) @@ -1536,11 +1552,15 @@ def _user_api_get_note(from_outbox: bool = False): oid = _user_api_arg("id") app.logger.info(f"fetching {oid}") try: - note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.NOTE) - except: + note = ap.parse_activity( + get_backend().fetch_iri(oid), expected=ActivityType.NOTE + ) + except Exception: try: - note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.VIDEO) - except: + note = ap.parse_activity( + get_backend().fetch_iri(oid), expected=ActivityType.VIDEO + ) + except Exception: raise ActivityNotFoundError( "Expected Note or Video ActivityType, but got something else" ) @@ -1570,7 +1590,7 @@ def api_delete(): delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True)) - delete_id = tasks.post_to_outbox(delete) + delete_id = post_to_outbox(delete) return _user_api_response(activity=delete_id) @@ -1581,7 +1601,7 @@ def api_boost(): note = _user_api_get_note() announce = note.build_announce(MY_PERSON) - announce_id = tasks.post_to_outbox(announce) + announce_id = post_to_outbox(announce) return _user_api_response(activity=announce_id) @@ -1592,7 +1612,7 @@ def api_like(): note = _user_api_get_note() like = note.build_like(MY_PERSON) - like_id = tasks.post_to_outbox(like) + like_id = post_to_outbox(like) return _user_api_response(activity=like_id) @@ -1639,7 +1659,7 @@ def api_undo(): obj = ap.parse_activity(doc.get("activity")) # FIXME(tsileo): detect already undo-ed and make this API call idempotent undo = obj.build_undo() - undo_id = tasks.post_to_outbox(undo) + undo_id = post_to_outbox(undo) return _user_api_response(activity=undo_id) @@ -1664,7 +1684,7 @@ def admin_stream(): ) -@app.route("/inbox", methods=["GET", "POST"]) +@app.route("/inbox", methods=["GET", "POST"]) # noqa: C901 def inbox(): if request.method == "GET": if not is_api_request(): @@ -1733,7 +1753,7 @@ def inbox(): ) activity = ap.parse_activity(data) logger.debug(f"inbox activity={activity}/{data}") - tasks.post_to_inbox(activity) + post_to_inbox(activity) return Response(status=201) @@ -1819,7 +1839,7 @@ def api_new_note(): note = ap.Note(**raw_note) create = note.build_create() - create_id = tasks.post_to_outbox(create) + create_id = post_to_outbox(create) return _user_api_response(activity=create_id) @@ -1852,7 +1872,7 @@ def api_block(): return _user_api_response(activity=existing["activity"]["id"]) block = ap.Block(actor=MY_PERSON.id, object=actor) - block_id = tasks.post_to_outbox(block) + block_id = post_to_outbox(block) return _user_api_response(activity=block_id) @@ -1874,7 +1894,7 @@ def api_follow(): return _user_api_response(activity=existing["activity"]["id"]) follow = ap.Follow(actor=MY_PERSON.id, object=actor) - follow_id = tasks.post_to_outbox(follow) + follow_id = post_to_outbox(follow) return _user_api_response(activity=follow_id) @@ -1895,8 +1915,9 @@ def followers(): ) raw_followers, older_than, newer_than = paginated_query(DB.activities, q) - followers = [doc["meta"]["actor"] - for doc in raw_followers if "actor" in doc.get("meta", {})] + followers = [ + doc["meta"]["actor"] for doc in raw_followers if "actor" in doc.get("meta", {}) + ] return render_template( "followers.html", followers_data=followers, @@ -1924,9 +1945,11 @@ def following(): abort(404) following, older_than, newer_than = paginated_query(DB.activities, q) - following = [(doc["remote_id"], doc["meta"]["object"]) - for doc in following - if "remote_id" in doc and "object" in doc.get("meta", {})] + following = [ + (doc["remote_id"], doc["meta"]["object"]) + for doc in following + if "remote_id" in doc and "object" in doc.get("meta", {}) + ] return render_template( "following.html", following_data=following, @@ -2087,7 +2110,7 @@ def indieauth_flow(): return redirect(red) -@app.route('/indieauth', methods=['GET', 'POST']) +@app.route("/indieauth", methods=["GET", "POST"]) def indieauth_endpoint(): if request.method == "GET": if not session.get("logged_in"): @@ -2189,9 +2212,7 @@ def token_endpoint(): @app.route("/feed.json") def json_feed(): return Response( - response=json.dumps( - activitypub.json_feed("/feed.json") - ), + response=json.dumps(activitypub.json_feed("/feed.json")), headers={"Content-Type": "application/json"}, ) @@ -2210,3 +2231,538 @@ def rss_feed(): response=activitypub.gen_feed().rss_str(), headers={"Content-Type": "application/rss+xml"}, ) + + +@app.route("/task/t1") +def task_t1(): + p.push( + "https://mastodon.cloud/@iulius/101852467780804071/activity", + "/task/cache_object", + ) + return "ok" + + +@app.route("/task/t2", methods=["POST"]) +def task_t2(): + print(request) + print(request.headers) + task = p.parse(request) + print(task) + return "yay" + + +@app.route("/task/fetch_og_meta", methods=["POST"]) +def task_fetch_og_metadata(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + if activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + links = opengraph.links_from_note(note.to_dict()) + og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links) + for og in og_metadata: + if not og.get("image"): + continue + MEDIA_CACHE.cache_og_image(og["image"]) + + app.logger.debug(f"OG metadata {og_metadata!r}") + DB.activities.update_one( + {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} + ) + + app.logger.info(f"OG metadata fetched for {iri}") + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"dropping activity {iri}, skip OG metedata") + return "" + except requests.exceptions.HTTPError as http_err: + if 400 <= http_err.response.status_code < 500: + app.logger.exception("bad request, no retry") + return "" + app.logger.exception("failed to fetch OG metadata") + abort(500) + except Exception: + app.logger.exception(f"failed to fetch OG metadata for {iri}") + abort(500) + + +@app.route("/task/cache_object", methods=["POST"]) +def task_cache_object(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + print(activity) + print(activity.__dict__) + app.logger.info(f"activity={activity!r}") + obj = activity + # obj = activity.get_object() + DB.activities.update_one( + {"remote_id": activity.id}, + { + "$set": { + "meta.object": obj.to_dict(embed=True), + "meta.object_actor": activitypub._actor_to_meta(obj.get_actor()), + } + }, + ) + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no object caching") + return "" + except Exception: + app.logger.exception(f"failed to cache object for {iri}") + abort(500) + return "" + + +class Tasks: + @staticmethod + def cache_object(iri: str) -> None: + p.push(iri, "/task/cache_object") + + @staticmethod + def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: + p.push( + {"iri": iri, "also_cache_attachments": also_cache_attachments}, + "/task/cache_actor", + ) + + @staticmethod + def post_to_remote_inbox(payload: str, recp: str) -> None: + p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") + + @staticmethod + def forward_activity(iri: str) -> None: + p.push(iri, "/task/forward_activity") + + @staticmethod + def fetch_og_meta(iri: str) -> None: + p.push(iri, "/task/fetch_og_meta") + + @staticmethod + def process_new_activity(iri: str) -> None: + p.push(iri, "/task/process_new_activity") + + @staticmethod + def cache_attachments(iri: str) -> None: + p.push(iri, "/task/cache_attachments") + + @staticmethod + def finish_post_to_inbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_inbox") + + @staticmethod + def finish_post_to_outbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_outbox") + + +@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 +def task_finish_post_to_outbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + recipients = activity.recipients() + + if activity.has_type(ap.ActivityType.DELETE): + back.outbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.outbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.outbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.outbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.outbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_following(MY_PERSON, obj) + + app.logger.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + DB.cache2.remove() + + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"posting to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"no retry") + except Exception: + app.logger.exception(f"failed to post to remote inbox for {iri}") + abort(500) + + +@app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 +def task_finish_post_to_inbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.DELETE): + back.inbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.inbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.inbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.inbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.FOLLOW): + # Reply to a Follow with an Accept + accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True)) + post_to_outbox(accept) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.inbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_follower(MY_PERSON, obj) + try: + invalidate_cache(activity) + except Exception: + app.logger.exception("failed to invalidate cache") + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"no retry") + except Exception: + app.logger.exception(f"failed to cache attachments for {iri}") + abort(500) + + +def post_to_outbox(activity: ap.BaseActivity) -> str: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + # Assign create a random ID + obj_id = back.random_object_id() + activity.set_id(back.activity_url(obj_id), obj_id) + + back.save(Box.OUTBOX, activity) + Tasks.cache_actor(activity.id) + Tasks.finish_post_to_outbox(activity.id) + return activity.id + + +def post_to_inbox(activity: ap.BaseActivity) -> None: + # Check for Block activity + actor = activity.get_actor() + if back.outbox_is_blocked(MY_PERSON, actor.id): + app.logger.info( + f"actor {actor!r} is blocked, dropping the received activity {activity!r}" + ) + return + + if back.inbox_check_duplicate(MY_PERSON, activity.id): + # The activity is already in the inbox + app.logger.info(f"received duplicate activity {activity!r}, dropping it") + + back.save(Box.INBOX, activity) + Tasks.process_new_activity(activity.id) + + app.logger.info(f"spawning task for {activity!r}") + Tasks.finish_post_to_inbox(activity.id) + + +def invalidate_cache(activity): + if activity.has_type(ap.ActivityType.LIKE): + if activity.get_object().id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.ANNOUNCE): + if activity.get_object().id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UNDO): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.DELETE): + # TODO(tsileo): only invalidate if it's a delete of a reply + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UPDATE): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + if not note.inReplyTo or note.inReplyTo.startswith(ID): + DB.cache2.remove() + # FIXME(tsileo): check if it's a reply of a reply + + +@app.route("/task/cache_attachments", methods=["POST"]) +def task_cache_attachments(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + # Generates thumbnails for the actor's icon and the attachments if any + + actor = activity.get_actor() + + # Update the cached actor + DB.actors.update_one( + {"remote_id": iri}, + {"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}}, + upsert=True, + ) + + if actor.icon: + MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) + + if activity.has_type(ap.ActivityType.CREATE): + for attachment in activity.get_object()._data.get("attachment", []): + if ( + attachment.get("mediaType", "").startswith("image/") + or attachment.get("type") == ap.ActivityType.IMAGE.value + ): + try: + MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) + except ValueError: + app.logger.exception(f"failed to cache {attachment}") + + app.logger.info(f"attachments cached for {iri}") + + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"dropping activity {iri}, no attachment caching") + except Exception: + app.logger.exception(f"failed to cache attachments for {iri}") + abort(500) + + +@app.route("/task/cache_actor", methods=["POST"]) +def task_cache_actor(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri, also_cache_attachments = ( + task.payload["iri"], + task.payload.get("also_cache_attachments", True), + ) + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.CREATE): + Tasks.fetch_og_metadata(iri) + + if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]): + Tasks.cache_object(iri) + + actor = activity.get_actor() + + cache_actor_with_inbox = False + if activity.has_type(ap.ActivityType.FOLLOW): + if actor.id != ID: + # It's a Follow from the Inbox + cache_actor_with_inbox = True + else: + # It's a new following, cache the "object" (which is the actor we follow) + DB.activities.update_one( + {"remote_id": iri}, + { + "$set": { + "meta.object": activitypub._actor_to_meta( + activity.get_object() + ) + } + }, + ) + + # Cache the actor info + DB.activities.update_one( + {"remote_id": iri}, + { + "$set": { + "meta.actor": activitypub._actor_to_meta( + actor, cache_actor_with_inbox + ) + } + }, + ) + + app.logger.info(f"actor cached for {iri}") + if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE): + Tasks.cache_attachments(iri) + + except (ActivityGoneError, ActivityNotFoundError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no actor caching") + except Exception: + app.logger.exception(f"failed to cache actor for {iri}") + abort(500) + + +@app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 +def task_process_new_activity(): + """Process an activity received in the inbox""" + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + # Is the activity expected? + # following = ap.get_backend().following() + should_forward = False + should_delete = False + + tag_stream = False + if activity.has_type(ap.ActivityType.ANNOUNCE): + try: + activity.get_object() + tag_stream = True + except (NotAnActivityError, BadActivityError): + app.logger.exception(f"failed to get announce object for {activity!r}") + # Most likely on OStatus notice + tag_stream = False + should_delete = True + except (ActivityGoneError, ActivityNotFoundError): + # The announced activity is deleted/gone, drop it + should_delete = True + + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + # Make the note part of the stream if it's not a reply, or if it's a local reply + if not note.inReplyTo or note.inReplyTo.startswith(ID): + tag_stream = True + + if note.inReplyTo: + try: + reply = ap.fetch_remote_activity(note.inReplyTo) + if ( + reply.id.startswith(ID) or reply.has_mention(ID) + ) and activity.is_public(): + # The reply is public "local reply", forward the reply (i.e. the original activity) to the + # original recipients + should_forward = True + except NotAnActivityError: + # Most likely a reply to an OStatus notce + should_delete = True + + # (partial) Ghost replies handling + # [X] This is the first time the server has seen this Activity. + should_forward = False + local_followers = ID + "/followers" + for field in ["to", "cc"]: + if field in activity._data: + if local_followers in activity._data[field]: + # [X] The values of to, cc, and/or audience contain a Collection owned by the server. + should_forward = True + + # [X] The values of inReplyTo, object, target and/or tag are objects owned by the server + if not (note.inReplyTo and note.inReplyTo.startswith(ID)): + should_forward = False + + elif activity.has_type(ap.ActivityType.DELETE): + note = DB.activities.find_one( + {"activity.object.id": activity.get_object().id} + ) + if note and note["meta"].get("forwarded", False): + # If the activity was originally forwarded, forward the delete too + should_forward = True + + elif activity.has_type(ap.ActivityType.LIKE): + if not activity.get_object_id().startswith(BASE_URL): + # We only want to keep a like if it's a like for a local activity + # (Pleroma relay the likes it received, we don't want to store them) + should_delete = True + + if should_forward: + app.logger.info(f"will forward {activity!r} to followers") + Tasks.forward_activity(activity.id) + + if should_delete: + app.logger.info(f"will soft delete {activity!r}") + + app.logger.info(f"{iri} tag_stream={tag_stream}") + DB.activities.update_one( + {"remote_id": activity.id}, + { + "$set": { + "meta.stream": tag_stream, + "meta.forwarded": should_forward, + "meta.deleted": should_delete, + } + }, + ) + + app.logger.info(f"new activity {iri} processed") + if not should_delete and not activity.has_type(ap.ActivityType.DELETE): + Tasks.cache_actor(iri) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.log.exception(f"dropping activity {iri}, skip processing") + except Exception: + app.logger.exception(f"failed to process new activity {iri}") + abort(500) + + +@app.route("/task/forward_activity") +def task_forward_activity(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + recipients = back.followers_as_recipients() + app.logger.debug(f"Forwarding {activity!r} to {recipients}") + activity = ap.clean_activity(activity.to_dict()) + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"forwarding {activity!r} to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except Exception: + app.logger.exception("task failed") + abort(500) + + +@app.route("/task/post_to_remote_inbox") +def task_post_to_remote_inbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + payload, to = task.payload["payload"], task.payload["to"] + try: + app.logger.info("payload=%s", payload) + app.logger.info("generating sig") + signed_payload = json.loads(payload) + + # Don't overwrite the signature if we're forwarding an activity + if "signature" not in signed_payload: + generate_signature(signed_payload, KEY) + + app.logger.info("to=%s", to) + resp = requests.post( + to, + data=json.dumps(signed_payload), + auth=SIG_AUTH, + headers={ + "Content-Type": HEADERS[1], + "Accept": HEADERS[1], + "User-Agent": USER_AGENT, + }, + ) + app.logger.info("resp=%s", resp) + app.logger.info("resp_body=%s", resp.text) + resp.raise_for_status() + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + abort(500) diff --git a/config.py b/config.py index b66ce71..846ae79 100644 --- a/config.py +++ b/config.py @@ -105,12 +105,17 @@ MEDIA_CACHE = MediaCache(GRIDFS, USER_AGENT) def create_indexes(): DB.activities.create_index([("remote_id", pymongo.ASCENDING)]) DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)]) - DB.activities.create_index([ - ("activity.object.id", pymongo.ASCENDING), - ("meta.deleted", pymongo.ASCENDING), - ]) - DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)]) - DB.cache2.create_index("date", expireAfterSeconds=3600*12) + DB.activities.create_index( + [("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)] + ) + DB.cache2.create_index( + [ + ("path", pymongo.ASCENDING), + ("type", pymongo.ASCENDING), + ("arg", pymongo.ASCENDING), + ] + ) + DB.cache2.create_index("date", expireAfterSeconds=3600 * 12) # Index for the block query DB.activities.create_index( diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index eae4264..0a9c393 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -4,9 +4,6 @@ services: image: 'microblogpub:latest' ports: - "${WEB_PORT}:5005" - links: - - mongo - - rmq volumes: - "${CONFIG_DIR}:/app/config" - "./static:/app/static" @@ -14,12 +11,10 @@ services: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 - MICROBLOGPUB_DEBUG=1 + - POUSSETACHES_AUTH_KEY=123 celery: # image: "instance1_web" image: 'microblogpub:latest' - links: - - mongo - - rmq command: 'celery worker -l info -A tasks' volumes: - "${CONFIG_DIR}:/app/config" @@ -35,6 +30,10 @@ services: environment: - RABBITMQ_ERLANG_COOKIE=secretrabbit - RABBITMQ_NODENAME=rabbit@my-rabbit + poussetaches: + image: "poussetaches:latest" + environment: + - POUSSETACHES_AUTH_KEY=123 networks: default: name: microblogpubfede diff --git a/docker-compose.yml b/docker-compose.yml index b7f9521..e77552f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,12 +7,14 @@ services: links: - mongo - rmq + - poussetaches volumes: - "${CONFIG_DIR}:/app/config" - "./static:/app/static" environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 + - POUSSETACHES_AUTH_KEY=123 celery: image: 'microblogpub:latest' links: @@ -36,3 +38,7 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit volumes: - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" + poussetaches: + image: "poussetaches:latest" + environment: + - POUSSETACHES_AUTH_KEY=123 diff --git a/poussetaches.py b/poussetaches.py new file mode 100644 index 0000000..f2532bc --- /dev/null +++ b/poussetaches.py @@ -0,0 +1,48 @@ +import base64 +import json +import os +from typing import Any +from dataclasses import dataclass +import flask +import requests + +POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") + + +@dataclass +class Task: + req_id: str + tries: int + + payload: Any + + +class PousseTaches: + def __init__(self, api_url: str, base_url: str) -> None: + self.api_url = api_url + self.base_url = base_url + + def push(self, payload: Any, path: str, expected=200) -> str: + # Encode our payload + p = base64.b64encode(json.dumps(payload).encode()).decode() + + # Queue/push it + resp = requests.post( + self.api_url, + json={"url": self.base_url + path, "payload": p, "expected": expected}, + ) + resp.raise_for_status() + + return resp.headers.get("Poussetaches-Task-ID") + + def parse(self, req: flask.Request) -> Task: + if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY: + raise ValueError("Bad auth key") + + # Parse the "envelope" + envelope = json.loads(req.data) + print(req) + print(f"envelope={envelope!r}") + payload = json.loads(base64.b64decode(envelope["payload"])) + + return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) diff --git a/tasks.py b/tasks.py index 7a2f64d..f34d217 100644 --- a/tasks.py +++ b/tasks.py @@ -339,6 +339,7 @@ def invalidate_cache(activity): DB.cache2.remove() # FIXME(tsileo): check if it's a reply of a reply + @app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901 def finish_post_to_inbox(self, iri: str) -> None: try: