diff --git a/app.py b/app.py index 19275a7..507c767 100644 --- a/app.py +++ b/app.py @@ -6,6 +6,7 @@ import os import traceback import urllib from datetime import datetime +from datetime import timedelta from datetime import timezone from functools import wraps from io import BytesIO @@ -507,6 +508,7 @@ def handle_activitypub_error(error): class TaskError(Exception): """Raised to log the error for poussetaches.""" + def __init__(self): self.message = traceback.format_exc() @@ -1415,13 +1417,72 @@ def admin(): ) +@app.route("/admin/cleanup", methods=["GET"]) +@login_required +def admin_cleanup(): + d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") + + # Announce and Like cleanup + for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: + # Migrate old (before meta.keep activities on the fly) + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + "activity.object": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + "activity.object.id": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + }, + {"$set": {"meta.keep": False}}, + ) + # End of the migration + + # Delete old activities + DB.activities.delete_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": False, + "activity.published": {"$lt": d}, + } + ) + + # And delete the soft-deleted one + DB.activities.delete_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": False, + "meta.deleted": True, + } + ) + + return "OK" + + @app.route("/admin/tasks", methods=["GET"]) @login_required def admin_tasks(): return render_template( - "admin_tasks.html", - dead=p.get_dead(), - waiting=p.get_waiting(), + "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting() ) @@ -2239,6 +2300,7 @@ def token_endpoint(): ################# # Feeds + @app.route("/feed.json") def json_feed(): return Response( @@ -2266,6 +2328,7 @@ def rss_feed(): ########### # Tasks + class Tasks: @staticmethod def cache_object(iri: str) -> None: @@ -2373,6 +2436,7 @@ def task_cache_object(): return "" + @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 def task_finish_post_to_outbox(): task = p.parse(request) @@ -2642,12 +2706,15 @@ def task_process_new_activity(): # following = ap.get_backend().following() should_forward = False should_delete = False + should_keep = False tag_stream = False if activity.has_type(ap.ActivityType.ANNOUNCE): try: activity.get_object() tag_stream = True + if activity.get_object_id().startswith(BASE_URL): + should_keep = True except (NotAnActivityError, BadActivityError): app.logger.exception(f"failed to get announce object for {activity!r}") # Most likely on OStatus notice @@ -2657,12 +2724,21 @@ def task_process_new_activity(): # The announced activity is deleted/gone, drop it should_delete = True + elif activity.has_type(ap.ActivityType.FOLLOW): + # FIXME(tsileo): ensure it's a follow where the server is the object + should_keep = True + elif activity.has_type(ap.ActivityType.CREATE): note = activity.get_object() # Make the note part of the stream if it's not a reply, or if it's a local reply if not note.inReplyTo or note.inReplyTo.startswith(ID): tag_stream = True + if (note.inReplyTo and note.inReplyTo.startswith(ID)) or note.has_mention( + ID + ): + should_keep = True + if note.inReplyTo: try: reply = ap.fetch_remote_activity(note.inReplyTo) @@ -2672,6 +2748,7 @@ def task_process_new_activity(): # The reply is public "local reply", forward the reply (i.e. the original activity) to the # original recipients should_forward = True + should_keep = True except NotAnActivityError: # Most likely a reply to an OStatus notce should_delete = True @@ -2699,7 +2776,9 @@ def task_process_new_activity(): should_forward = True elif activity.has_type(ap.ActivityType.LIKE): - if not activity.get_object_id().startswith(BASE_URL): + if activity.get_object_id().startswith(BASE_URL): + should_keep = True + else: # We only want to keep a like if it's a like for a local activity # (Pleroma relay the likes it received, we don't want to store them) should_delete = True @@ -2716,6 +2795,7 @@ def task_process_new_activity(): {"remote_id": activity.id}, { "$set": { + "meta.keep": should_keep, "meta.stream": tag_stream, "meta.forwarded": should_forward, "meta.deleted": should_delete, diff --git a/poussetaches.py b/poussetaches.py index 7909416..28314f3 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -60,7 +60,9 @@ class PousseTaches: print(f"envelope={envelope!r}") payload = json.loads(base64.b64decode(envelope["payload"])) - return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore + return Task( + req_id=envelope["req_id"], tries=envelope["tries"], payload=payload + ) # type: ignore @staticmethod def _expand_task(t: Dict[str, Any]) -> None: @@ -86,16 +88,18 @@ class PousseTaches: dat = resp.json() for t in dat["tasks"]: self._expand_task(t) - out.append(GetTask( - task_id=t["id"], - payload=t["payload"], - expected=t["expected"], - tries=t["tries"], - url=t["url"], - last_error_status_code=t["last_error_status_code"], - last_error_body=t["last_error_body"], - next_run=t["next_run"], - )) + out.append( + GetTask( + task_id=t["id"], + payload=t["payload"], + expected=t["expected"], + tries=t["tries"], + url=t["url"], + last_error_status_code=t["last_error_status_code"], + last_error_body=t["last_error_body"], + next_run=t["next_run"], + ) + ) return out