diff --git a/app.py b/app.py index 0c3ac1d..9afd27a 100644 --- a/app.py +++ b/app.py @@ -91,6 +91,15 @@ from poussetaches import PousseTaches phost = "http://" + os.getenv("COMPOSE_PROJECT_NAME", "") p = PousseTaches(f"{phost}_poussetaches_1:7991", f"{phost}_web_1:5005") +# Setup the cron tasks +p.push({}, "/task/cleanup_part_1", schedule="@every 12h") +p.push({}, "/task/cleanup_part_2", schedule="@every 12h") +p.push({}, "/task/cleanup_part_3", schedule="@every 12h") + +# Also trigger a cleanup now +p.push({}, "/task/cleanup_part_1") +p.push({}, "/task/cleanup_part_2") +p.push({}, "/task/cleanup_part_3") back = activitypub.MicroblogPubBackend() @@ -1296,174 +1305,6 @@ def admin(): ) -@app.route("/admin/cleanup", methods=["GET"]) -@login_required -def admin_cleanup(): - d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") - - # (We keep Follow and Accept forever) - - # Announce and Like cleanup - for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: - # Migrate old (before meta.keep activities on the fly) - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - "activity.object": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - "activity.object.id": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - }, - {"$set": {"meta.keep": False}}, - ) - # End of the migration - - # Delete old activities - DB.activities.delete_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": False, - "activity.published": {"$lt": d}, - } - ) - - # And delete the soft-deleted one - DB.activities.delete_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": False, - "meta.deleted": True, - } - ) - - # Create cleanup (more complicated) - # The one that mention our actor - DB.activities.update_many( - { - "box": Box.INBOX.value, - "meta.keep": {"$exists": False}, - "activity.object.tag.href": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - DB.activities.update_many( - { - "box": Box.REPLIES.value, - "meta.keep": {"$exists": False}, - "activity.tag.href": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - # The replies of the outbox - DB.activities.update_many( - {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}}, - {"$set": {"meta.keep": True}}, - ) - # Track all the threads we participated - keep_threads = [] - for data in DB.activities.find( - { - "box": Box.OUTBOX.value, - "type": ActivityType.CREATE.value, - "meta.thread_root_parent": {"$exists": True}, - } - ): - keep_threads.append(data["meta"]["thread_root_parent"]) - - for root_parent in set(keep_threads): - DB.activities.update_many( - {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}} - ) - - DB.activities.update_many( - { - "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]}, - "meta.keep": {"$exists": False}, - }, - {"$set": {"meta.keep": False}}, - ) - return "OK" - - -@app.route("/admin/cleanup2", methods=["GET"]) -@login_required -def admin_cleanup2(): - d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") - - # Go over the old Create activities - for data in DB.activities.find( - { - "box": Box.INBOX.value, - "type": ActivityType.CREATE.value, - "meta.keep": False, - "activity.published": {"$lt": d}, - } - ): - # Delete the cached attachment/ - for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}): - try: - MEDIA_CACHE.fs.delete(grid_item._id) - except Exception: - pass - - # Delete the Create activities that no longer have cached attachments - DB.activities.delete_many( - { - "box": Box.INBOX.value, - "type": ActivityType.CREATE.value, - "meta.keep": False, - "activity.published": {"$lt": d}, - } - ) - - return "OK" - - -@app.route("/admin/cleanup3", methods=["GET"]) -@login_required -def admin_cleanup3(): - d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") - - # Delete old replies we don't care about - DB.activities.delete_many( - {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}} - ) - - # Remove all the attachments no tied to a remote_id (post celery migration) - for grid_item in MEDIA_CACHE.fs.find( - {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}} - ): - try: - MEDIA_CACHE.fs.delete(grid_item._id) - except Exception: - pass - - # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task - - return "OK" - - @app.route("/admin/tasks", methods=["GET"]) @login_required def admin_tasks(): @@ -1471,7 +1312,7 @@ def admin_tasks(): "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting(), - cron=[], # cron=p.get_cron(), + cron=p.get_cron(), ) @@ -2868,3 +2709,160 @@ def task_post_to_remote_inbox(): raise TaskError() from err return "" + + +@app.route("/task/cleanup_part_1", methods=["POST"]) +def task_cleanup_part_1(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") + + # (We keep Follow and Accept forever) + + # Announce and Like cleanup + for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: + # Migrate old (before meta.keep activities on the fly) + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + "activity.object": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + "activity.object.id": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + DB.activities.update_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": {"$exists": False}, + }, + {"$set": {"meta.keep": False}}, + ) + # End of the migration + + # Delete old activities + DB.activities.delete_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": False, + "activity.published": {"$lt": d}, + } + ) + + # And delete the soft-deleted one + DB.activities.delete_many( + { + "box": Box.INBOX.value, + "type": ap_type.value, + "meta.keep": False, + "meta.deleted": True, + } + ) + + # Create cleanup (more complicated) + # The one that mention our actor + DB.activities.update_many( + { + "box": Box.INBOX.value, + "meta.keep": {"$exists": False}, + "activity.object.tag.href": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + DB.activities.update_many( + { + "box": Box.REPLIES.value, + "meta.keep": {"$exists": False}, + "activity.tag.href": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + # The replies of the outbox + DB.activities.update_many( + {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}}, + {"$set": {"meta.keep": True}}, + ) + # Track all the threads we participated + keep_threads = [] + for data in DB.activities.find( + { + "box": Box.OUTBOX.value, + "type": ActivityType.CREATE.value, + "meta.thread_root_parent": {"$exists": True}, + } + ): + keep_threads.append(data["meta"]["thread_root_parent"]) + + for root_parent in set(keep_threads): + DB.activities.update_many( + {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}} + ) + + DB.activities.update_many( + { + "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]}, + "meta.keep": {"$exists": False}, + }, + {"$set": {"meta.keep": False}}, + ) + return "OK" + + +@app.route("/task/cleanup_part_2", methods=["POST"]) +def task_cleanup_part_2(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") + + # Go over the old Create activities + for data in DB.activities.find( + { + "box": Box.INBOX.value, + "type": ActivityType.CREATE.value, + "meta.keep": False, + "activity.published": {"$lt": d}, + } + ): + # Delete the cached attachment/ + for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}): + MEDIA_CACHE.fs.delete(grid_item._id) + DB.activities.delete_one({"_id": data["_id"]}) + + return "OK" + + +@app.route("/task/cleanup_part_3", methods=["POST"]) +def task_cleanup_part_3(): + task = p.parse(request) + app.logger.info(f"task={task!r}") + + d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") + + # Delete old replies we don't care about + DB.activities.delete_many( + {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}} + ) + + # Remove all the attachments no tied to a remote_id (post celery migration) + for grid_item in MEDIA_CACHE.fs.find( + {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}} + ): + MEDIA_CACHE.fs.delete(grid_item._id) + + # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task + + return "OK" diff --git a/poussetaches.py b/poussetaches.py index e844dee..6f06e14 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -24,7 +24,7 @@ class Task: class GetTask: payload: Any expected: int - # schedule: str + schedule: str task_id: str next_run: datetime tries: int @@ -101,7 +101,7 @@ class PousseTaches: task_id=t["id"], payload=t["payload"], expected=t["expected"], - # shedule=t["schedule"], + shedule=t["schedule"], tries=t["tries"], url=t["url"], last_error_status_code=t["last_error_status_code"],