Setup cron for the cleanup
This commit is contained in:
parent
849a6c4ea0
commit
55a1c2dd88
2 changed files with 169 additions and 171 deletions
336
app.py
336
app.py
|
@ -91,6 +91,15 @@ from poussetaches import PousseTaches
|
||||||
|
|
||||||
phost = "http://" + os.getenv("COMPOSE_PROJECT_NAME", "")
|
phost = "http://" + os.getenv("COMPOSE_PROJECT_NAME", "")
|
||||||
p = PousseTaches(f"{phost}_poussetaches_1:7991", f"{phost}_web_1:5005")
|
p = PousseTaches(f"{phost}_poussetaches_1:7991", f"{phost}_web_1:5005")
|
||||||
|
# Setup the cron tasks
|
||||||
|
p.push({}, "/task/cleanup_part_1", schedule="@every 12h")
|
||||||
|
p.push({}, "/task/cleanup_part_2", schedule="@every 12h")
|
||||||
|
p.push({}, "/task/cleanup_part_3", schedule="@every 12h")
|
||||||
|
|
||||||
|
# Also trigger a cleanup now
|
||||||
|
p.push({}, "/task/cleanup_part_1")
|
||||||
|
p.push({}, "/task/cleanup_part_2")
|
||||||
|
p.push({}, "/task/cleanup_part_3")
|
||||||
|
|
||||||
|
|
||||||
back = activitypub.MicroblogPubBackend()
|
back = activitypub.MicroblogPubBackend()
|
||||||
|
@ -1296,174 +1305,6 @@ def admin():
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/admin/cleanup", methods=["GET"])
|
|
||||||
@login_required
|
|
||||||
def admin_cleanup():
|
|
||||||
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
# (We keep Follow and Accept forever)
|
|
||||||
|
|
||||||
# Announce and Like cleanup
|
|
||||||
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
|
|
||||||
# Migrate old (before meta.keep activities on the fly)
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap_type.value,
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
"activity.object": {"$regex": f"^{BASE_URL}"},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": True}},
|
|
||||||
)
|
|
||||||
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap_type.value,
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
"activity.object.id": {"$regex": f"^{BASE_URL}"},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": True}},
|
|
||||||
)
|
|
||||||
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap_type.value,
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": False}},
|
|
||||||
)
|
|
||||||
# End of the migration
|
|
||||||
|
|
||||||
# Delete old activities
|
|
||||||
DB.activities.delete_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap_type.value,
|
|
||||||
"meta.keep": False,
|
|
||||||
"activity.published": {"$lt": d},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# And delete the soft-deleted one
|
|
||||||
DB.activities.delete_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap_type.value,
|
|
||||||
"meta.keep": False,
|
|
||||||
"meta.deleted": True,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create cleanup (more complicated)
|
|
||||||
# The one that mention our actor
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
"activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": True}},
|
|
||||||
)
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": Box.REPLIES.value,
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
"activity.tag.href": {"$regex": f"^{BASE_URL}"},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": True}},
|
|
||||||
)
|
|
||||||
|
|
||||||
# The replies of the outbox
|
|
||||||
DB.activities.update_many(
|
|
||||||
{"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
|
|
||||||
{"$set": {"meta.keep": True}},
|
|
||||||
)
|
|
||||||
# Track all the threads we participated
|
|
||||||
keep_threads = []
|
|
||||||
for data in DB.activities.find(
|
|
||||||
{
|
|
||||||
"box": Box.OUTBOX.value,
|
|
||||||
"type": ActivityType.CREATE.value,
|
|
||||||
"meta.thread_root_parent": {"$exists": True},
|
|
||||||
}
|
|
||||||
):
|
|
||||||
keep_threads.append(data["meta"]["thread_root_parent"])
|
|
||||||
|
|
||||||
for root_parent in set(keep_threads):
|
|
||||||
DB.activities.update_many(
|
|
||||||
{"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
|
|
||||||
)
|
|
||||||
|
|
||||||
DB.activities.update_many(
|
|
||||||
{
|
|
||||||
"box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
|
|
||||||
"meta.keep": {"$exists": False},
|
|
||||||
},
|
|
||||||
{"$set": {"meta.keep": False}},
|
|
||||||
)
|
|
||||||
return "OK"
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/admin/cleanup2", methods=["GET"])
|
|
||||||
@login_required
|
|
||||||
def admin_cleanup2():
|
|
||||||
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
# Go over the old Create activities
|
|
||||||
for data in DB.activities.find(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ActivityType.CREATE.value,
|
|
||||||
"meta.keep": False,
|
|
||||||
"activity.published": {"$lt": d},
|
|
||||||
}
|
|
||||||
):
|
|
||||||
# Delete the cached attachment/
|
|
||||||
for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
|
|
||||||
try:
|
|
||||||
MEDIA_CACHE.fs.delete(grid_item._id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Delete the Create activities that no longer have cached attachments
|
|
||||||
DB.activities.delete_many(
|
|
||||||
{
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ActivityType.CREATE.value,
|
|
||||||
"meta.keep": False,
|
|
||||||
"activity.published": {"$lt": d},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return "OK"
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/admin/cleanup3", methods=["GET"])
|
|
||||||
@login_required
|
|
||||||
def admin_cleanup3():
|
|
||||||
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
# Delete old replies we don't care about
|
|
||||||
DB.activities.delete_many(
|
|
||||||
{"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Remove all the attachments no tied to a remote_id (post celery migration)
|
|
||||||
for grid_item in MEDIA_CACHE.fs.find(
|
|
||||||
{"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
MEDIA_CACHE.fs.delete(grid_item._id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
|
|
||||||
|
|
||||||
return "OK"
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/admin/tasks", methods=["GET"])
|
@app.route("/admin/tasks", methods=["GET"])
|
||||||
@login_required
|
@login_required
|
||||||
def admin_tasks():
|
def admin_tasks():
|
||||||
|
@ -1471,7 +1312,7 @@ def admin_tasks():
|
||||||
"admin_tasks.html",
|
"admin_tasks.html",
|
||||||
dead=p.get_dead(),
|
dead=p.get_dead(),
|
||||||
waiting=p.get_waiting(),
|
waiting=p.get_waiting(),
|
||||||
cron=[], # cron=p.get_cron(),
|
cron=p.get_cron(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -2868,3 +2709,160 @@ def task_post_to_remote_inbox():
|
||||||
raise TaskError() from err
|
raise TaskError() from err
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/task/cleanup_part_1", methods=["POST"])
|
||||||
|
def task_cleanup_part_1():
|
||||||
|
task = p.parse(request)
|
||||||
|
app.logger.info(f"task={task!r}")
|
||||||
|
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
# (We keep Follow and Accept forever)
|
||||||
|
|
||||||
|
# Announce and Like cleanup
|
||||||
|
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
|
||||||
|
# Migrate old (before meta.keep activities on the fly)
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap_type.value,
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
"activity.object": {"$regex": f"^{BASE_URL}"},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": True}},
|
||||||
|
)
|
||||||
|
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap_type.value,
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
"activity.object.id": {"$regex": f"^{BASE_URL}"},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": True}},
|
||||||
|
)
|
||||||
|
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap_type.value,
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": False}},
|
||||||
|
)
|
||||||
|
# End of the migration
|
||||||
|
|
||||||
|
# Delete old activities
|
||||||
|
DB.activities.delete_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap_type.value,
|
||||||
|
"meta.keep": False,
|
||||||
|
"activity.published": {"$lt": d},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# And delete the soft-deleted one
|
||||||
|
DB.activities.delete_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap_type.value,
|
||||||
|
"meta.keep": False,
|
||||||
|
"meta.deleted": True,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create cleanup (more complicated)
|
||||||
|
# The one that mention our actor
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
"activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": True}},
|
||||||
|
)
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": Box.REPLIES.value,
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
"activity.tag.href": {"$regex": f"^{BASE_URL}"},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": True}},
|
||||||
|
)
|
||||||
|
|
||||||
|
# The replies of the outbox
|
||||||
|
DB.activities.update_many(
|
||||||
|
{"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
|
||||||
|
{"$set": {"meta.keep": True}},
|
||||||
|
)
|
||||||
|
# Track all the threads we participated
|
||||||
|
keep_threads = []
|
||||||
|
for data in DB.activities.find(
|
||||||
|
{
|
||||||
|
"box": Box.OUTBOX.value,
|
||||||
|
"type": ActivityType.CREATE.value,
|
||||||
|
"meta.thread_root_parent": {"$exists": True},
|
||||||
|
}
|
||||||
|
):
|
||||||
|
keep_threads.append(data["meta"]["thread_root_parent"])
|
||||||
|
|
||||||
|
for root_parent in set(keep_threads):
|
||||||
|
DB.activities.update_many(
|
||||||
|
{"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
|
||||||
|
)
|
||||||
|
|
||||||
|
DB.activities.update_many(
|
||||||
|
{
|
||||||
|
"box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
|
||||||
|
"meta.keep": {"$exists": False},
|
||||||
|
},
|
||||||
|
{"$set": {"meta.keep": False}},
|
||||||
|
)
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/task/cleanup_part_2", methods=["POST"])
|
||||||
|
def task_cleanup_part_2():
|
||||||
|
task = p.parse(request)
|
||||||
|
app.logger.info(f"task={task!r}")
|
||||||
|
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
# Go over the old Create activities
|
||||||
|
for data in DB.activities.find(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ActivityType.CREATE.value,
|
||||||
|
"meta.keep": False,
|
||||||
|
"activity.published": {"$lt": d},
|
||||||
|
}
|
||||||
|
):
|
||||||
|
# Delete the cached attachment/
|
||||||
|
for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
|
||||||
|
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||||
|
DB.activities.delete_one({"_id": data["_id"]})
|
||||||
|
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/task/cleanup_part_3", methods=["POST"])
|
||||||
|
def task_cleanup_part_3():
|
||||||
|
task = p.parse(request)
|
||||||
|
app.logger.info(f"task={task!r}")
|
||||||
|
|
||||||
|
d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
# Delete old replies we don't care about
|
||||||
|
DB.activities.delete_many(
|
||||||
|
{"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Remove all the attachments no tied to a remote_id (post celery migration)
|
||||||
|
for grid_item in MEDIA_CACHE.fs.find(
|
||||||
|
{"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
|
||||||
|
):
|
||||||
|
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||||
|
|
||||||
|
# TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
|
||||||
|
|
||||||
|
return "OK"
|
||||||
|
|
|
@ -24,7 +24,7 @@ class Task:
|
||||||
class GetTask:
|
class GetTask:
|
||||||
payload: Any
|
payload: Any
|
||||||
expected: int
|
expected: int
|
||||||
# schedule: str
|
schedule: str
|
||||||
task_id: str
|
task_id: str
|
||||||
next_run: datetime
|
next_run: datetime
|
||||||
tries: int
|
tries: int
|
||||||
|
@ -101,7 +101,7 @@ class PousseTaches:
|
||||||
task_id=t["id"],
|
task_id=t["id"],
|
||||||
payload=t["payload"],
|
payload=t["payload"],
|
||||||
expected=t["expected"],
|
expected=t["expected"],
|
||||||
# shedule=t["schedule"],
|
shedule=t["schedule"],
|
||||||
tries=t["tries"],
|
tries=t["tries"],
|
||||||
url=t["url"],
|
url=t["url"],
|
||||||
last_error_status_code=t["last_error_status_code"],
|
last_error_status_code=t["last_error_status_code"],
|
||||||
|
|
Loading…
Reference in a new issue