From 066309a0c820dfbbde7a83282bbbb197dd32837d Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 11:35:48 +0200 Subject: [PATCH 01/11] Try poussetaches --- .travis.yml | 1 + app.py | 646 ++++++++++++++++++++++++++++++++++++--- config.py | 17 +- docker-compose-tests.yml | 11 +- docker-compose.yml | 6 + poussetaches.py | 48 +++ tasks.py | 1 + 7 files changed, 673 insertions(+), 57 deletions(-) create mode 100644 poussetaches.py diff --git a/.travis.yml b/.travis.yml index 2e8ab86..db184e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,7 @@ install: - sudo chmod +x /usr/local/bin/docker-compose - docker-compose --version - pip install -r dev-requirements.txt + - git clone https://github.com/tsileo/poussetaches.git && cd poussetaches && docker build . -t poussetaches:latest && cd - script: - mypy --ignore-missing-imports . - flake8 activitypub.py diff --git a/app.py b/app.py index 0730813..c9ac063 100644 --- a/app.py +++ b/app.py @@ -16,6 +16,8 @@ from typing import Tuple from urllib.parse import urlencode from urllib.parse import urlparse +from requests.exceptions import HTTPError +import requests import bleach import mf2py import pymongo @@ -41,7 +43,10 @@ from little_boxes.activitypub import _to_list from little_boxes.activitypub import clean_activity from little_boxes.activitypub import get_backend from little_boxes.content_helper import parse_markdown +from little_boxes.linked_data_sig import generate_signature from little_boxes.errors import ActivityGoneError +from little_boxes.errors import NotAnActivityError +from little_boxes.errors import BadActivityError from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import Error from little_boxes.errors import NotFromOutboxError @@ -49,15 +54,18 @@ from little_boxes.httpsig import HTTPSigAuth from little_boxes.httpsig import verify_request from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_remote_follow_template +from utils import opengraph from passlib.hash import bcrypt from u2flib_server import u2f from werkzeug.utils import secure_filename import activitypub import config -import tasks + +# import tasks from activitypub import Box from activitypub import embed_collection +from config import USER_AGENT from config import ADMIN_API_KEY from config import BASE_URL from config import DB @@ -78,6 +86,11 @@ from utils.key import get_secret_key from utils.lookup import lookup from utils.media import Kind +from poussetaches import PousseTaches + +p = PousseTaches("http://poussetaches:7991", "http://web:5005") + + back = activitypub.MicroblogPubBackend() ap.use_backend(back) @@ -191,7 +204,7 @@ ALLOWED_TAGS = [ def clean_html(html): try: return bleach.clean(html, tags=ALLOWED_TAGS) - except: + except Exception: return "" @@ -631,7 +644,7 @@ def authorize_follow(): return redirect("/following") follow = ap.Follow(actor=MY_PERSON.id, object=actor) - tasks.post_to_outbox(follow) + post_to_outbox(follow) return redirect("/following") @@ -758,7 +771,7 @@ def tmp_migrate4(): @login_required def tmp_migrate5(): for activity in DB.activities.find(): - tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False) + Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False) return "Done" @@ -835,9 +848,10 @@ def _get_cached(type_="html", arg=None): cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg}) if cached: app.logger.info("from cache") - return cached['response_data'] + return cached["response_data"] return None + def _cache(resp, type_="html", arg=None): if not CACHING: return None @@ -855,7 +869,9 @@ def _cache(resp, type_="html", arg=None): def index(): if is_api_request(): return jsonify(**ME) - cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}" + cache_arg = ( + f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}" + ) cached = _get_cached("html", cache_arg) if cached: return cached @@ -1053,22 +1069,22 @@ def nodeinfo(): } response = json.dumps( - { - "version": "2.0", - "software": { - "name": "microblogpub", - "version": f"Microblog.pub {VERSION}", - }, - "protocols": ["activitypub"], - "services": {"inbound": [], "outbound": []}, - "openRegistrations": False, - "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, - "metadata": { - "sourceCode": "https://github.com/tsileo/microblog.pub", - "nodeName": f"@{USERNAME}@{DOMAIN}", - }, - } - ) + { + "version": "2.0", + "software": { + "name": "microblogpub", + "version": f"Microblog.pub {VERSION}", + }, + "protocols": ["activitypub"], + "services": {"inbound": [], "outbound": []}, + "openRegistrations": False, + "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, + "metadata": { + "sourceCode": "https://github.com/tsileo/microblog.pub", + "nodeName": f"@{USERNAME}@{DOMAIN}", + }, + } + ) if not cached: _cache(response, "api") @@ -1197,7 +1213,7 @@ def outbox(): data = request.get_json(force=True) print(data) activity = ap.parse_activity(data) - activity_id = tasks.post_to_outbox(activity) + activity_id = post_to_outbox(activity) return Response(status=201, headers={"Location": activity_id}) @@ -1536,11 +1552,15 @@ def _user_api_get_note(from_outbox: bool = False): oid = _user_api_arg("id") app.logger.info(f"fetching {oid}") try: - note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.NOTE) - except: + note = ap.parse_activity( + get_backend().fetch_iri(oid), expected=ActivityType.NOTE + ) + except Exception: try: - note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.VIDEO) - except: + note = ap.parse_activity( + get_backend().fetch_iri(oid), expected=ActivityType.VIDEO + ) + except Exception: raise ActivityNotFoundError( "Expected Note or Video ActivityType, but got something else" ) @@ -1570,7 +1590,7 @@ def api_delete(): delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True)) - delete_id = tasks.post_to_outbox(delete) + delete_id = post_to_outbox(delete) return _user_api_response(activity=delete_id) @@ -1581,7 +1601,7 @@ def api_boost(): note = _user_api_get_note() announce = note.build_announce(MY_PERSON) - announce_id = tasks.post_to_outbox(announce) + announce_id = post_to_outbox(announce) return _user_api_response(activity=announce_id) @@ -1592,7 +1612,7 @@ def api_like(): note = _user_api_get_note() like = note.build_like(MY_PERSON) - like_id = tasks.post_to_outbox(like) + like_id = post_to_outbox(like) return _user_api_response(activity=like_id) @@ -1639,7 +1659,7 @@ def api_undo(): obj = ap.parse_activity(doc.get("activity")) # FIXME(tsileo): detect already undo-ed and make this API call idempotent undo = obj.build_undo() - undo_id = tasks.post_to_outbox(undo) + undo_id = post_to_outbox(undo) return _user_api_response(activity=undo_id) @@ -1664,7 +1684,7 @@ def admin_stream(): ) -@app.route("/inbox", methods=["GET", "POST"]) +@app.route("/inbox", methods=["GET", "POST"]) # noqa: C901 def inbox(): if request.method == "GET": if not is_api_request(): @@ -1733,7 +1753,7 @@ def inbox(): ) activity = ap.parse_activity(data) logger.debug(f"inbox activity={activity}/{data}") - tasks.post_to_inbox(activity) + post_to_inbox(activity) return Response(status=201) @@ -1819,7 +1839,7 @@ def api_new_note(): note = ap.Note(**raw_note) create = note.build_create() - create_id = tasks.post_to_outbox(create) + create_id = post_to_outbox(create) return _user_api_response(activity=create_id) @@ -1852,7 +1872,7 @@ def api_block(): return _user_api_response(activity=existing["activity"]["id"]) block = ap.Block(actor=MY_PERSON.id, object=actor) - block_id = tasks.post_to_outbox(block) + block_id = post_to_outbox(block) return _user_api_response(activity=block_id) @@ -1874,7 +1894,7 @@ def api_follow(): return _user_api_response(activity=existing["activity"]["id"]) follow = ap.Follow(actor=MY_PERSON.id, object=actor) - follow_id = tasks.post_to_outbox(follow) + follow_id = post_to_outbox(follow) return _user_api_response(activity=follow_id) @@ -1895,8 +1915,9 @@ def followers(): ) raw_followers, older_than, newer_than = paginated_query(DB.activities, q) - followers = [doc["meta"]["actor"] - for doc in raw_followers if "actor" in doc.get("meta", {})] + followers = [ + doc["meta"]["actor"] for doc in raw_followers if "actor" in doc.get("meta", {}) + ] return render_template( "followers.html", followers_data=followers, @@ -1924,9 +1945,11 @@ def following(): abort(404) following, older_than, newer_than = paginated_query(DB.activities, q) - following = [(doc["remote_id"], doc["meta"]["object"]) - for doc in following - if "remote_id" in doc and "object" in doc.get("meta", {})] + following = [ + (doc["remote_id"], doc["meta"]["object"]) + for doc in following + if "remote_id" in doc and "object" in doc.get("meta", {}) + ] return render_template( "following.html", following_data=following, @@ -2087,7 +2110,7 @@ def indieauth_flow(): return redirect(red) -@app.route('/indieauth', methods=['GET', 'POST']) +@app.route("/indieauth", methods=["GET", "POST"]) def indieauth_endpoint(): if request.method == "GET": if not session.get("logged_in"): @@ -2189,9 +2212,7 @@ def token_endpoint(): @app.route("/feed.json") def json_feed(): return Response( - response=json.dumps( - activitypub.json_feed("/feed.json") - ), + response=json.dumps(activitypub.json_feed("/feed.json")), headers={"Content-Type": "application/json"}, ) @@ -2210,3 +2231,538 @@ def rss_feed(): response=activitypub.gen_feed().rss_str(), headers={"Content-Type": "application/rss+xml"}, ) + + +@app.route("/task/t1") +def task_t1(): + p.push( + "https://mastodon.cloud/@iulius/101852467780804071/activity", + "/task/cache_object", + ) + return "ok" + + +@app.route("/task/t2", methods=["POST"]) +def task_t2(): + print(request) + print(request.headers) + task = p.parse(request) + print(task) + return "yay" + + +@app.route("/task/fetch_og_meta", methods=["POST"]) +def task_fetch_og_metadata(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + if activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + links = opengraph.links_from_note(note.to_dict()) + og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links) + for og in og_metadata: + if not og.get("image"): + continue + MEDIA_CACHE.cache_og_image(og["image"]) + + app.logger.debug(f"OG metadata {og_metadata!r}") + DB.activities.update_one( + {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} + ) + + app.logger.info(f"OG metadata fetched for {iri}") + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"dropping activity {iri}, skip OG metedata") + return "" + except requests.exceptions.HTTPError as http_err: + if 400 <= http_err.response.status_code < 500: + app.logger.exception("bad request, no retry") + return "" + app.logger.exception("failed to fetch OG metadata") + abort(500) + except Exception: + app.logger.exception(f"failed to fetch OG metadata for {iri}") + abort(500) + + +@app.route("/task/cache_object", methods=["POST"]) +def task_cache_object(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + print(activity) + print(activity.__dict__) + app.logger.info(f"activity={activity!r}") + obj = activity + # obj = activity.get_object() + DB.activities.update_one( + {"remote_id": activity.id}, + { + "$set": { + "meta.object": obj.to_dict(embed=True), + "meta.object_actor": activitypub._actor_to_meta(obj.get_actor()), + } + }, + ) + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no object caching") + return "" + except Exception: + app.logger.exception(f"failed to cache object for {iri}") + abort(500) + return "" + + +class Tasks: + @staticmethod + def cache_object(iri: str) -> None: + p.push(iri, "/task/cache_object") + + @staticmethod + def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: + p.push( + {"iri": iri, "also_cache_attachments": also_cache_attachments}, + "/task/cache_actor", + ) + + @staticmethod + def post_to_remote_inbox(payload: str, recp: str) -> None: + p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") + + @staticmethod + def forward_activity(iri: str) -> None: + p.push(iri, "/task/forward_activity") + + @staticmethod + def fetch_og_meta(iri: str) -> None: + p.push(iri, "/task/fetch_og_meta") + + @staticmethod + def process_new_activity(iri: str) -> None: + p.push(iri, "/task/process_new_activity") + + @staticmethod + def cache_attachments(iri: str) -> None: + p.push(iri, "/task/cache_attachments") + + @staticmethod + def finish_post_to_inbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_inbox") + + @staticmethod + def finish_post_to_outbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_outbox") + + +@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 +def task_finish_post_to_outbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + recipients = activity.recipients() + + if activity.has_type(ap.ActivityType.DELETE): + back.outbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.outbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.outbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.outbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.outbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_following(MY_PERSON, obj) + + app.logger.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + DB.cache2.remove() + + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"posting to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"no retry") + except Exception: + app.logger.exception(f"failed to post to remote inbox for {iri}") + abort(500) + + +@app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 +def task_finish_post_to_inbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.DELETE): + back.inbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.inbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.inbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.inbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.FOLLOW): + # Reply to a Follow with an Accept + accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True)) + post_to_outbox(accept) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.inbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_follower(MY_PERSON, obj) + try: + invalidate_cache(activity) + except Exception: + app.logger.exception("failed to invalidate cache") + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"no retry") + except Exception: + app.logger.exception(f"failed to cache attachments for {iri}") + abort(500) + + +def post_to_outbox(activity: ap.BaseActivity) -> str: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + # Assign create a random ID + obj_id = back.random_object_id() + activity.set_id(back.activity_url(obj_id), obj_id) + + back.save(Box.OUTBOX, activity) + Tasks.cache_actor(activity.id) + Tasks.finish_post_to_outbox(activity.id) + return activity.id + + +def post_to_inbox(activity: ap.BaseActivity) -> None: + # Check for Block activity + actor = activity.get_actor() + if back.outbox_is_blocked(MY_PERSON, actor.id): + app.logger.info( + f"actor {actor!r} is blocked, dropping the received activity {activity!r}" + ) + return + + if back.inbox_check_duplicate(MY_PERSON, activity.id): + # The activity is already in the inbox + app.logger.info(f"received duplicate activity {activity!r}, dropping it") + + back.save(Box.INBOX, activity) + Tasks.process_new_activity(activity.id) + + app.logger.info(f"spawning task for {activity!r}") + Tasks.finish_post_to_inbox(activity.id) + + +def invalidate_cache(activity): + if activity.has_type(ap.ActivityType.LIKE): + if activity.get_object().id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.ANNOUNCE): + if activity.get_object().id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UNDO): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.DELETE): + # TODO(tsileo): only invalidate if it's a delete of a reply + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UPDATE): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + if not note.inReplyTo or note.inReplyTo.startswith(ID): + DB.cache2.remove() + # FIXME(tsileo): check if it's a reply of a reply + + +@app.route("/task/cache_attachments", methods=["POST"]) +def task_cache_attachments(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + # Generates thumbnails for the actor's icon and the attachments if any + + actor = activity.get_actor() + + # Update the cached actor + DB.actors.update_one( + {"remote_id": iri}, + {"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}}, + upsert=True, + ) + + if actor.icon: + MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) + + if activity.has_type(ap.ActivityType.CREATE): + for attachment in activity.get_object()._data.get("attachment", []): + if ( + attachment.get("mediaType", "").startswith("image/") + or attachment.get("type") == ap.ActivityType.IMAGE.value + ): + try: + MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) + except ValueError: + app.logger.exception(f"failed to cache {attachment}") + + app.logger.info(f"attachments cached for {iri}") + + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"dropping activity {iri}, no attachment caching") + except Exception: + app.logger.exception(f"failed to cache attachments for {iri}") + abort(500) + + +@app.route("/task/cache_actor", methods=["POST"]) +def task_cache_actor(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri, also_cache_attachments = ( + task.payload["iri"], + task.payload.get("also_cache_attachments", True), + ) + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.CREATE): + Tasks.fetch_og_metadata(iri) + + if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]): + Tasks.cache_object(iri) + + actor = activity.get_actor() + + cache_actor_with_inbox = False + if activity.has_type(ap.ActivityType.FOLLOW): + if actor.id != ID: + # It's a Follow from the Inbox + cache_actor_with_inbox = True + else: + # It's a new following, cache the "object" (which is the actor we follow) + DB.activities.update_one( + {"remote_id": iri}, + { + "$set": { + "meta.object": activitypub._actor_to_meta( + activity.get_object() + ) + } + }, + ) + + # Cache the actor info + DB.activities.update_one( + {"remote_id": iri}, + { + "$set": { + "meta.actor": activitypub._actor_to_meta( + actor, cache_actor_with_inbox + ) + } + }, + ) + + app.logger.info(f"actor cached for {iri}") + if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE): + Tasks.cache_attachments(iri) + + except (ActivityGoneError, ActivityNotFoundError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no actor caching") + except Exception: + app.logger.exception(f"failed to cache actor for {iri}") + abort(500) + + +@app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 +def task_process_new_activity(): + """Process an activity received in the inbox""" + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + # Is the activity expected? + # following = ap.get_backend().following() + should_forward = False + should_delete = False + + tag_stream = False + if activity.has_type(ap.ActivityType.ANNOUNCE): + try: + activity.get_object() + tag_stream = True + except (NotAnActivityError, BadActivityError): + app.logger.exception(f"failed to get announce object for {activity!r}") + # Most likely on OStatus notice + tag_stream = False + should_delete = True + except (ActivityGoneError, ActivityNotFoundError): + # The announced activity is deleted/gone, drop it + should_delete = True + + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + # Make the note part of the stream if it's not a reply, or if it's a local reply + if not note.inReplyTo or note.inReplyTo.startswith(ID): + tag_stream = True + + if note.inReplyTo: + try: + reply = ap.fetch_remote_activity(note.inReplyTo) + if ( + reply.id.startswith(ID) or reply.has_mention(ID) + ) and activity.is_public(): + # The reply is public "local reply", forward the reply (i.e. the original activity) to the + # original recipients + should_forward = True + except NotAnActivityError: + # Most likely a reply to an OStatus notce + should_delete = True + + # (partial) Ghost replies handling + # [X] This is the first time the server has seen this Activity. + should_forward = False + local_followers = ID + "/followers" + for field in ["to", "cc"]: + if field in activity._data: + if local_followers in activity._data[field]: + # [X] The values of to, cc, and/or audience contain a Collection owned by the server. + should_forward = True + + # [X] The values of inReplyTo, object, target and/or tag are objects owned by the server + if not (note.inReplyTo and note.inReplyTo.startswith(ID)): + should_forward = False + + elif activity.has_type(ap.ActivityType.DELETE): + note = DB.activities.find_one( + {"activity.object.id": activity.get_object().id} + ) + if note and note["meta"].get("forwarded", False): + # If the activity was originally forwarded, forward the delete too + should_forward = True + + elif activity.has_type(ap.ActivityType.LIKE): + if not activity.get_object_id().startswith(BASE_URL): + # We only want to keep a like if it's a like for a local activity + # (Pleroma relay the likes it received, we don't want to store them) + should_delete = True + + if should_forward: + app.logger.info(f"will forward {activity!r} to followers") + Tasks.forward_activity(activity.id) + + if should_delete: + app.logger.info(f"will soft delete {activity!r}") + + app.logger.info(f"{iri} tag_stream={tag_stream}") + DB.activities.update_one( + {"remote_id": activity.id}, + { + "$set": { + "meta.stream": tag_stream, + "meta.forwarded": should_forward, + "meta.deleted": should_delete, + } + }, + ) + + app.logger.info(f"new activity {iri} processed") + if not should_delete and not activity.has_type(ap.ActivityType.DELETE): + Tasks.cache_actor(iri) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.log.exception(f"dropping activity {iri}, skip processing") + except Exception: + app.logger.exception(f"failed to process new activity {iri}") + abort(500) + + +@app.route("/task/forward_activity") +def task_forward_activity(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + recipients = back.followers_as_recipients() + app.logger.debug(f"Forwarding {activity!r} to {recipients}") + activity = ap.clean_activity(activity.to_dict()) + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"forwarding {activity!r} to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except Exception: + app.logger.exception("task failed") + abort(500) + + +@app.route("/task/post_to_remote_inbox") +def task_post_to_remote_inbox(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + payload, to = task.payload["payload"], task.payload["to"] + try: + app.logger.info("payload=%s", payload) + app.logger.info("generating sig") + signed_payload = json.loads(payload) + + # Don't overwrite the signature if we're forwarding an activity + if "signature" not in signed_payload: + generate_signature(signed_payload, KEY) + + app.logger.info("to=%s", to) + resp = requests.post( + to, + data=json.dumps(signed_payload), + auth=SIG_AUTH, + headers={ + "Content-Type": HEADERS[1], + "Accept": HEADERS[1], + "User-Agent": USER_AGENT, + }, + ) + app.logger.info("resp=%s", resp) + app.logger.info("resp_body=%s", resp.text) + resp.raise_for_status() + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + abort(500) diff --git a/config.py b/config.py index b66ce71..846ae79 100644 --- a/config.py +++ b/config.py @@ -105,12 +105,17 @@ MEDIA_CACHE = MediaCache(GRIDFS, USER_AGENT) def create_indexes(): DB.activities.create_index([("remote_id", pymongo.ASCENDING)]) DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)]) - DB.activities.create_index([ - ("activity.object.id", pymongo.ASCENDING), - ("meta.deleted", pymongo.ASCENDING), - ]) - DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)]) - DB.cache2.create_index("date", expireAfterSeconds=3600*12) + DB.activities.create_index( + [("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)] + ) + DB.cache2.create_index( + [ + ("path", pymongo.ASCENDING), + ("type", pymongo.ASCENDING), + ("arg", pymongo.ASCENDING), + ] + ) + DB.cache2.create_index("date", expireAfterSeconds=3600 * 12) # Index for the block query DB.activities.create_index( diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index eae4264..0a9c393 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -4,9 +4,6 @@ services: image: 'microblogpub:latest' ports: - "${WEB_PORT}:5005" - links: - - mongo - - rmq volumes: - "${CONFIG_DIR}:/app/config" - "./static:/app/static" @@ -14,12 +11,10 @@ services: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 - MICROBLOGPUB_DEBUG=1 + - POUSSETACHES_AUTH_KEY=123 celery: # image: "instance1_web" image: 'microblogpub:latest' - links: - - mongo - - rmq command: 'celery worker -l info -A tasks' volumes: - "${CONFIG_DIR}:/app/config" @@ -35,6 +30,10 @@ services: environment: - RABBITMQ_ERLANG_COOKIE=secretrabbit - RABBITMQ_NODENAME=rabbit@my-rabbit + poussetaches: + image: "poussetaches:latest" + environment: + - POUSSETACHES_AUTH_KEY=123 networks: default: name: microblogpubfede diff --git a/docker-compose.yml b/docker-compose.yml index b7f9521..e77552f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,12 +7,14 @@ services: links: - mongo - rmq + - poussetaches volumes: - "${CONFIG_DIR}:/app/config" - "./static:/app/static" environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 + - POUSSETACHES_AUTH_KEY=123 celery: image: 'microblogpub:latest' links: @@ -36,3 +38,7 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit volumes: - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" + poussetaches: + image: "poussetaches:latest" + environment: + - POUSSETACHES_AUTH_KEY=123 diff --git a/poussetaches.py b/poussetaches.py new file mode 100644 index 0000000..f2532bc --- /dev/null +++ b/poussetaches.py @@ -0,0 +1,48 @@ +import base64 +import json +import os +from typing import Any +from dataclasses import dataclass +import flask +import requests + +POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") + + +@dataclass +class Task: + req_id: str + tries: int + + payload: Any + + +class PousseTaches: + def __init__(self, api_url: str, base_url: str) -> None: + self.api_url = api_url + self.base_url = base_url + + def push(self, payload: Any, path: str, expected=200) -> str: + # Encode our payload + p = base64.b64encode(json.dumps(payload).encode()).decode() + + # Queue/push it + resp = requests.post( + self.api_url, + json={"url": self.base_url + path, "payload": p, "expected": expected}, + ) + resp.raise_for_status() + + return resp.headers.get("Poussetaches-Task-ID") + + def parse(self, req: flask.Request) -> Task: + if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY: + raise ValueError("Bad auth key") + + # Parse the "envelope" + envelope = json.loads(req.data) + print(req) + print(f"envelope={envelope!r}") + payload = json.loads(base64.b64decode(envelope["payload"])) + + return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) diff --git a/tasks.py b/tasks.py index 7a2f64d..f34d217 100644 --- a/tasks.py +++ b/tasks.py @@ -339,6 +339,7 @@ def invalidate_cache(activity): DB.cache2.remove() # FIXME(tsileo): check if it's a reply of a reply + @app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901 def finish_post_to_inbox(self, iri: str) -> None: try: From 2ed79e9e27825efb0f79da2bda13917b6ec7a0e4 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 11:42:14 +0200 Subject: [PATCH 02/11] Fix config --- docker-compose-tests.yml | 2 ++ docker-compose.yml | 2 ++ poussetaches.py | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 0a9c393..5d93b0e 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -32,6 +32,8 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit poussetaches: image: "poussetaches:latest" + ports: + - '7991' environment: - POUSSETACHES_AUTH_KEY=123 networks: diff --git a/docker-compose.yml b/docker-compose.yml index e77552f..bfb08a6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,5 +40,7 @@ services: - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" poussetaches: image: "poussetaches:latest" + ports: + - '7991' environment: - POUSSETACHES_AUTH_KEY=123 diff --git a/poussetaches.py b/poussetaches.py index f2532bc..ea989b6 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -33,7 +33,7 @@ class PousseTaches: ) resp.raise_for_status() - return resp.headers.get("Poussetaches-Task-ID") + return resp.headers["Poussetaches-Task-ID"] def parse(self, req: flask.Request) -> Task: if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY: From b7681b3a02e62e0e5da78c5b3b3473d45b58fcd9 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 15:14:57 +0200 Subject: [PATCH 03/11] Fix CI --- app.py | 26 +++++++++++++++++++++----- run.sh | 2 +- tests/federation_test.py | 3 ++- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index c9ac063..34894a6 100644 --- a/app.py +++ b/app.py @@ -2252,7 +2252,7 @@ def task_t2(): @app.route("/task/fetch_og_meta", methods=["POST"]) -def task_fetch_og_metadata(): +def task_fetch_og_meta(): task = p.parse(request) app.logger.info(f"task={task!r}") iri = task.payload @@ -2287,6 +2287,8 @@ def task_fetch_og_metadata(): app.logger.exception(f"failed to fetch OG metadata for {iri}") abort(500) + return "" + @app.route("/task/cache_object", methods=["POST"]) def task_cache_object(): @@ -2405,6 +2407,8 @@ def task_finish_post_to_outbox(): app.logger.exception(f"failed to post to remote inbox for {iri}") abort(500) + return "" + @app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 def task_finish_post_to_inbox(): @@ -2447,6 +2451,8 @@ def task_finish_post_to_inbox(): app.logger.exception(f"failed to cache attachments for {iri}") abort(500) + return "" + def post_to_outbox(activity: ap.BaseActivity) -> str: if activity.has_type(ap.CREATE_TYPES): @@ -2544,9 +2550,11 @@ def task_cache_attachments(): app.logger.exception(f"failed to cache attachments for {iri}") abort(500) + return "" + @app.route("/task/cache_actor", methods=["POST"]) -def task_cache_actor(): +def task_cache_actor() -> str: task = p.parse(request) app.logger.info(f"task={task!r}") iri, also_cache_attachments = ( @@ -2558,7 +2566,7 @@ def task_cache_actor(): app.logger.info(f"activity={activity!r}") if activity.has_type(ap.ActivityType.CREATE): - Tasks.fetch_og_metadata(iri) + Tasks.fetch_og_meta(iri) if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]): Tasks.cache_object(iri) @@ -2606,6 +2614,8 @@ def task_cache_actor(): app.logger.exception(f"failed to cache actor for {iri}") abort(500) + return "" + @app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 def task_process_new_activity(): @@ -2711,8 +2721,10 @@ def task_process_new_activity(): app.logger.exception(f"failed to process new activity {iri}") abort(500) + return "" -@app.route("/task/forward_activity") + +@app.route("/task/forward_activity", methods=["POST"]) def task_forward_activity(): task = p.parse(request) app.logger.info(f"task={task!r}") @@ -2730,8 +2742,10 @@ def task_forward_activity(): app.logger.exception("task failed") abort(500) + return "" -@app.route("/task/post_to_remote_inbox") + +@app.route("/task/post_to_remote_inbox", methods=["POST"]) def task_post_to_remote_inbox(): task = p.parse(request) app.logger.info(f"task={task!r}") @@ -2766,3 +2780,5 @@ def task_post_to_remote_inbox(): return "" abort(500) + + return "" diff --git a/run.sh b/run.sh index a29f5fc..a7eef03 100755 --- a/run.sh +++ b/run.sh @@ -1,3 +1,3 @@ #!/bin/bash python -c "import config; config.create_indexes()" -gunicorn -t 300 -w 2 -b 0.0.0.0:5005 --log-level debug app:app +gunicorn -t 300 -w 5 -b 0.0.0.0:5005 --log-level debug app:app diff --git a/tests/federation_test.py b/tests/federation_test.py index 20569bb..bdaf58d 100644 --- a/tests/federation_test.py +++ b/tests/federation_test.py @@ -19,7 +19,7 @@ class Instance(object): def __init__(self, name, host_url, docker_url=None): self.host_url = host_url self.docker_url = docker_url or host_url - self._create_delay = 12 + self._create_delay = 60 with open( os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -50,6 +50,7 @@ class Instance(object): def debug(self): """Returns the debug infos (number of items in the inbox/outbox.""" + time.sleep(self._create_delay) resp = requests.get( f"{self.host_url}/api/debug", headers={**self._auth_headers, "Accept": "application/json"}, From 61624b6e75c5693ca4a04a1d3b00277c2696ed26 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 20:03:49 +0200 Subject: [PATCH 04/11] Fix tests --- app.py | 3 ++- docker-compose-tests.yml | 3 ++- poussetaches.py | 2 +- tests/federation_test.py | 3 +-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/app.py b/app.py index 34894a6..4e3ef0b 100644 --- a/app.py +++ b/app.py @@ -88,7 +88,8 @@ from utils.media import Kind from poussetaches import PousseTaches -p = PousseTaches("http://poussetaches:7991", "http://web:5005") +phost = "http://" + os.getenv("COMPOSE_PROJECT_NAME", "") +p = PousseTaches(f"{phost}_poussetaches_1:7991", f"{phost}_web_1:5005") back = activitypub.MicroblogPubBackend() diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 5d93b0e..fc94f79 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -9,8 +9,9 @@ services: - "./static:/app/static" environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - - MICROBLOGPUB_MONGODB_HOST=mongo:27017 + - MICROBLOGPUB_MONGODB_HOST=${COMPOSE_PROJECT_NAME}_mongo_1:27017 - MICROBLOGPUB_DEBUG=1 + - COMPOSE_PROJECT_NAME=${COMPOSE_PROJECT_NAME} - POUSSETACHES_AUTH_KEY=123 celery: # image: "instance1_web" diff --git a/poussetaches.py b/poussetaches.py index ea989b6..6bf9180 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -45,4 +45,4 @@ class PousseTaches: print(f"envelope={envelope!r}") payload = json.loads(base64.b64decode(envelope["payload"])) - return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) + return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore diff --git a/tests/federation_test.py b/tests/federation_test.py index bdaf58d..959280f 100644 --- a/tests/federation_test.py +++ b/tests/federation_test.py @@ -19,7 +19,7 @@ class Instance(object): def __init__(self, name, host_url, docker_url=None): self.host_url = host_url self.docker_url = docker_url or host_url - self._create_delay = 60 + self._create_delay = 15 with open( os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -50,7 +50,6 @@ class Instance(object): def debug(self): """Returns the debug infos (number of items in the inbox/outbox.""" - time.sleep(self._create_delay) resp = requests.get( f"{self.host_url}/api/debug", headers={**self._auth_headers, "Accept": "application/json"}, From 1147ec80537e23555af8b931680a271d6a807d7d Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 20:06:18 +0200 Subject: [PATCH 05/11] Fix CI --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index db184e9..f9fc688 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,9 +24,9 @@ script: - docker build . -t microblogpub:latest - docker-compose up -d - docker-compose ps - - WEB_PORT=5006 CONFIG_DIR=./tests/fixtures/instance1/config docker-compose -p instance1 -f docker-compose-tests.yml up -d + - WEB_PORT=5006 COMPOSE_PROJECT_NAME=instance1 CONFIG_DIR=./tests/fixtures/instance1/config docker-compose -p instance1 -f docker-compose-tests.yml up -d - docker-compose -p instance1 -f docker-compose-tests.yml ps - - WEB_PORT=5007 CONFIG_DIR=./tests/fixtures/instance2/config docker-compose -p instance2 -f docker-compose-tests.yml up -d + - WEB_PORT=5007 COMPOSE_PROJECT_NAME=instance2 CONFIG_DIR=./tests/fixtures/instance2/config docker-compose -p instance2 -f docker-compose-tests.yml up -d - docker-compose -p instance2 -f docker-compose-tests.yml ps - sleep 5 # Integration tests first From 8c3eedac7d10fc028b54bf54895cd75ea0a89fed Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 20:08:12 +0200 Subject: [PATCH 06/11] Re-enable Celery for the migration --- app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app.py b/app.py index 4e3ef0b..46d3753 100644 --- a/app.py +++ b/app.py @@ -62,7 +62,7 @@ from werkzeug.utils import secure_filename import activitypub import config -# import tasks +import tasks from activitypub import Box from activitypub import embed_collection from config import USER_AGENT From 5d8fa38d5e2754d41a720aba79e58bd1ba3a1e92 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 21:36:56 +0200 Subject: [PATCH 07/11] More poussetaches integrations --- app.py | 102 +++++++++++++++++---------------------- docker-compose-tests.yml | 1 - docker-compose.yml | 8 +-- poussetaches.py | 61 +++++++++++++++++++++++ 4 files changed, 110 insertions(+), 62 deletions(-) diff --git a/app.py b/app.py index 46d3753..e320d83 100644 --- a/app.py +++ b/app.py @@ -62,7 +62,7 @@ from werkzeug.utils import secure_filename import activitypub import config -import tasks +import tasks # noqa: here just for the migration # FIXME(tsileo): remove me from activitypub import Box from activitypub import embed_collection from config import USER_AGENT @@ -2210,6 +2210,9 @@ def token_endpoint(): ) +################# +# Feeds + @app.route("/feed.json") def json_feed(): return Response( @@ -2234,22 +2237,48 @@ def rss_feed(): ) -@app.route("/task/t1") -def task_t1(): - p.push( - "https://mastodon.cloud/@iulius/101852467780804071/activity", - "/task/cache_object", - ) - return "ok" +########### +# Tasks +class Tasks: + @staticmethod + def cache_object(iri: str) -> None: + p.push(iri, "/task/cache_object") -@app.route("/task/t2", methods=["POST"]) -def task_t2(): - print(request) - print(request.headers) - task = p.parse(request) - print(task) - return "yay" + @staticmethod + def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: + p.push( + {"iri": iri, "also_cache_attachments": also_cache_attachments}, + "/task/cache_actor", + ) + + @staticmethod + def post_to_remote_inbox(payload: str, recp: str) -> None: + p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") + + @staticmethod + def forward_activity(iri: str) -> None: + p.push(iri, "/task/forward_activity") + + @staticmethod + def fetch_og_meta(iri: str) -> None: + p.push(iri, "/task/fetch_og_meta") + + @staticmethod + def process_new_activity(iri: str) -> None: + p.push(iri, "/task/process_new_activity") + + @staticmethod + def cache_attachments(iri: str) -> None: + p.push(iri, "/task/cache_attachments") + + @staticmethod + def finish_post_to_inbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_inbox") + + @staticmethod + def finish_post_to_outbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_outbox") @app.route("/task/fetch_og_meta", methods=["POST"]) @@ -2321,48 +2350,6 @@ def task_cache_object(): abort(500) return "" - -class Tasks: - @staticmethod - def cache_object(iri: str) -> None: - p.push(iri, "/task/cache_object") - - @staticmethod - def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: - p.push( - {"iri": iri, "also_cache_attachments": also_cache_attachments}, - "/task/cache_actor", - ) - - @staticmethod - def post_to_remote_inbox(payload: str, recp: str) -> None: - p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") - - @staticmethod - def forward_activity(iri: str) -> None: - p.push(iri, "/task/forward_activity") - - @staticmethod - def fetch_og_meta(iri: str) -> None: - p.push(iri, "/task/fetch_og_meta") - - @staticmethod - def process_new_activity(iri: str) -> None: - p.push(iri, "/task/process_new_activity") - - @staticmethod - def cache_attachments(iri: str) -> None: - p.push(iri, "/task/cache_attachments") - - @staticmethod - def finish_post_to_inbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_inbox") - - @staticmethod - def finish_post_to_outbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_outbox") - - @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 def task_finish_post_to_outbox(): task = p.parse(request) @@ -2748,6 +2735,7 @@ def task_forward_activity(): @app.route("/task/post_to_remote_inbox", methods=["POST"]) def task_post_to_remote_inbox(): + """Post an activity to a remote inbox.""" task = p.parse(request) app.logger.info(f"task={task!r}") payload, to = task.payload["payload"], task.payload["to"] diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index fc94f79..3360bdc 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -33,7 +33,6 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit poussetaches: image: "poussetaches:latest" - ports: - '7991' environment: - POUSSETACHES_AUTH_KEY=123 diff --git a/docker-compose.yml b/docker-compose.yml index bfb08a6..94c49ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 - - POUSSETACHES_AUTH_KEY=123 + - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} celery: image: 'microblogpub:latest' links: @@ -40,7 +40,7 @@ services: - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" poussetaches: image: "poussetaches:latest" - ports: - - '7991' + volumes: + - "${DATA_DIR}/poussetaches:/app/poussetaches/poussetaches_data" environment: - - POUSSETACHES_AUTH_KEY=123 + - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} diff --git a/poussetaches.py b/poussetaches.py index 6bf9180..cabf5ca 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -1,10 +1,13 @@ import base64 import json import os +from typing import Dict from typing import Any +from typing import List from dataclasses import dataclass import flask import requests +from datetime import datetime POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") @@ -17,6 +20,18 @@ class Task: payload: Any +@dataclass +class GetTask: + payload: Any + expected: int + task_id: str + next_run: datetime + tries: int + url: str + last_error_status_code: int + last_error_body: str + + class PousseTaches: def __init__(self, api_url: str, base_url: str) -> None: self.api_url = api_url @@ -46,3 +61,49 @@ class PousseTaches: payload = json.loads(base64.b64decode(envelope["payload"])) return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore + + @staticmethod + def _expand_task(t: Dict[str, Any]) -> None: + try: + t["payload"] = json.loads(base64.b64decode(t["payload"])) + except json.JSONDecodeError: + t["payload"] = base64.b64decode(t["payload"]).decode() + + if t["last_error_body"]: + t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode() + + t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9)) + if t["last_run"]: + t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9)) + else: + del t["last_run"] + + def _get(self, where: str) -> List[GetTask]: + out = [] + + resp = requests.get(self.api_url + f"/{where}") + resp.raise_for_status() + dat = resp.json() + for t in dat["tasks"]: + self._expand_task(t) + out.append(GetTask( + task_id=t["id"], + payload=t["payload"], + expected=t["expected"], + tries=t["tries"], + url=t["url"], + last_error_status_code=t["last_error_status_code"], + last_error_body=t["last_error_body"], + next_run=t["next_run"], + )) + + return out + + def get_success(self) -> List[GetTask]: + return self._get("success") + + def get_waiting(self) -> List[GetTask]: + return self._get("waiting") + + def get_dead(self) -> List[GetTask]: + return self._get("dead") From 26125a0816a02c74c01c64745ccdbc85a7add609 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 21:42:44 +0200 Subject: [PATCH 08/11] Add data dir for tasks --- data/poussetaches/.gitignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 data/poussetaches/.gitignore diff --git a/data/poussetaches/.gitignore b/data/poussetaches/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/data/poussetaches/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore From ccfa2f0d890800c542eb07e59d7830f3fdfa7477 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 22:11:32 +0200 Subject: [PATCH 09/11] Fix docker compose config --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 94c49ef..23ba974 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,6 @@ services: poussetaches: image: "poussetaches:latest" volumes: - - "${DATA_DIR}/poussetaches:/app/poussetaches/poussetaches_data" + - "${DATA_DIR}/poussetaches:/app/poussetaches_data" environment: - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} From 2ef968f11e45534f81c9ae4fbc96819ec6146dea Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 22:16:10 +0200 Subject: [PATCH 10/11] Fix docker compose config --- docker-compose-tests.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 3360bdc..4d9443d 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -33,7 +33,6 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit poussetaches: image: "poussetaches:latest" - - '7991' environment: - POUSSETACHES_AUTH_KEY=123 networks: From 2bd06886eca48f587cd37d9499cbd9789010b996 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 22:41:25 +0200 Subject: [PATCH 11/11] Fix compose config --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 23ba974..634987e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,7 @@ services: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} + - COMPOSE_PROJECT_NAME=${COMPOSE_PROJECT_NAME} celery: image: 'microblogpub:latest' links: