Start working on a clenaup task for old activities

This commit is contained in:
Thomas Sileo 2019-04-07 21:24:52 +02:00
parent 523b8686c7
commit 363dbf4b6a
2 changed files with 99 additions and 15 deletions

88
app.py
View file

@ -6,6 +6,7 @@ import os
import traceback import traceback
import urllib import urllib
from datetime import datetime from datetime import datetime
from datetime import timedelta
from datetime import timezone from datetime import timezone
from functools import wraps from functools import wraps
from io import BytesIO from io import BytesIO
@ -507,6 +508,7 @@ def handle_activitypub_error(error):
class TaskError(Exception): class TaskError(Exception):
"""Raised to log the error for poussetaches.""" """Raised to log the error for poussetaches."""
def __init__(self): def __init__(self):
self.message = traceback.format_exc() self.message = traceback.format_exc()
@ -1415,13 +1417,72 @@ def admin():
) )
@app.route("/admin/cleanup", methods=["GET"])
@login_required
def admin_cleanup():
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
# Announce and Like cleanup
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
# Migrate old (before meta.keep activities on the fly)
DB.activities.update_many(
{
"box": Box.INBOX.value,
"type": ap_type.value,
"meta.keep": {"$exists": False},
"activity.object": {"$regex": f"^{BASE_URL}"},
},
{"$set": {"meta.keep": True}},
)
DB.activities.update_many(
{
"box": Box.INBOX.value,
"type": ap_type.value,
"meta.keep": {"$exists": False},
"activity.object.id": {"$regex": f"^{BASE_URL}"},
},
{"$set": {"meta.keep": True}},
)
DB.activities.update_many(
{
"box": Box.INBOX.value,
"type": ap_type.value,
"meta.keep": {"$exists": False},
},
{"$set": {"meta.keep": False}},
)
# End of the migration
# Delete old activities
DB.activities.delete_many(
{
"box": Box.INBOX.value,
"type": ap_type.value,
"meta.keep": False,
"activity.published": {"$lt": d},
}
)
# And delete the soft-deleted one
DB.activities.delete_many(
{
"box": Box.INBOX.value,
"type": ap_type.value,
"meta.keep": False,
"meta.deleted": True,
}
)
return "OK"
@app.route("/admin/tasks", methods=["GET"]) @app.route("/admin/tasks", methods=["GET"])
@login_required @login_required
def admin_tasks(): def admin_tasks():
return render_template( return render_template(
"admin_tasks.html", "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting()
dead=p.get_dead(),
waiting=p.get_waiting(),
) )
@ -2239,6 +2300,7 @@ def token_endpoint():
################# #################
# Feeds # Feeds
@app.route("/feed.json") @app.route("/feed.json")
def json_feed(): def json_feed():
return Response( return Response(
@ -2266,6 +2328,7 @@ def rss_feed():
########### ###########
# Tasks # Tasks
class Tasks: class Tasks:
@staticmethod @staticmethod
def cache_object(iri: str) -> None: def cache_object(iri: str) -> None:
@ -2373,6 +2436,7 @@ def task_cache_object():
return "" return ""
@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
def task_finish_post_to_outbox(): def task_finish_post_to_outbox():
task = p.parse(request) task = p.parse(request)
@ -2642,12 +2706,15 @@ def task_process_new_activity():
# following = ap.get_backend().following() # following = ap.get_backend().following()
should_forward = False should_forward = False
should_delete = False should_delete = False
should_keep = False
tag_stream = False tag_stream = False
if activity.has_type(ap.ActivityType.ANNOUNCE): if activity.has_type(ap.ActivityType.ANNOUNCE):
try: try:
activity.get_object() activity.get_object()
tag_stream = True tag_stream = True
if activity.get_object_id().startswith(BASE_URL):
should_keep = True
except (NotAnActivityError, BadActivityError): except (NotAnActivityError, BadActivityError):
app.logger.exception(f"failed to get announce object for {activity!r}") app.logger.exception(f"failed to get announce object for {activity!r}")
# Most likely on OStatus notice # Most likely on OStatus notice
@ -2657,12 +2724,21 @@ def task_process_new_activity():
# The announced activity is deleted/gone, drop it # The announced activity is deleted/gone, drop it
should_delete = True should_delete = True
elif activity.has_type(ap.ActivityType.FOLLOW):
# FIXME(tsileo): ensure it's a follow where the server is the object
should_keep = True
elif activity.has_type(ap.ActivityType.CREATE): elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object() note = activity.get_object()
# Make the note part of the stream if it's not a reply, or if it's a local reply # Make the note part of the stream if it's not a reply, or if it's a local reply
if not note.inReplyTo or note.inReplyTo.startswith(ID): if not note.inReplyTo or note.inReplyTo.startswith(ID):
tag_stream = True tag_stream = True
if (note.inReplyTo and note.inReplyTo.startswith(ID)) or note.has_mention(
ID
):
should_keep = True
if note.inReplyTo: if note.inReplyTo:
try: try:
reply = ap.fetch_remote_activity(note.inReplyTo) reply = ap.fetch_remote_activity(note.inReplyTo)
@ -2672,6 +2748,7 @@ def task_process_new_activity():
# The reply is public "local reply", forward the reply (i.e. the original activity) to the # The reply is public "local reply", forward the reply (i.e. the original activity) to the
# original recipients # original recipients
should_forward = True should_forward = True
should_keep = True
except NotAnActivityError: except NotAnActivityError:
# Most likely a reply to an OStatus notce # Most likely a reply to an OStatus notce
should_delete = True should_delete = True
@ -2699,7 +2776,9 @@ def task_process_new_activity():
should_forward = True should_forward = True
elif activity.has_type(ap.ActivityType.LIKE): elif activity.has_type(ap.ActivityType.LIKE):
if not activity.get_object_id().startswith(BASE_URL): if activity.get_object_id().startswith(BASE_URL):
should_keep = True
else:
# We only want to keep a like if it's a like for a local activity # We only want to keep a like if it's a like for a local activity
# (Pleroma relay the likes it received, we don't want to store them) # (Pleroma relay the likes it received, we don't want to store them)
should_delete = True should_delete = True
@ -2716,6 +2795,7 @@ def task_process_new_activity():
{"remote_id": activity.id}, {"remote_id": activity.id},
{ {
"$set": { "$set": {
"meta.keep": should_keep,
"meta.stream": tag_stream, "meta.stream": tag_stream,
"meta.forwarded": should_forward, "meta.forwarded": should_forward,
"meta.deleted": should_delete, "meta.deleted": should_delete,

View file

@ -60,7 +60,9 @@ class PousseTaches:
print(f"envelope={envelope!r}") print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"])) payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore return Task(
req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
) # type: ignore
@staticmethod @staticmethod
def _expand_task(t: Dict[str, Any]) -> None: def _expand_task(t: Dict[str, Any]) -> None:
@ -86,7 +88,8 @@ class PousseTaches:
dat = resp.json() dat = resp.json()
for t in dat["tasks"]: for t in dat["tasks"]:
self._expand_task(t) self._expand_task(t)
out.append(GetTask( out.append(
GetTask(
task_id=t["id"], task_id=t["id"],
payload=t["payload"], payload=t["payload"],
expected=t["expected"], expected=t["expected"],
@ -95,7 +98,8 @@ class PousseTaches:
last_error_status_code=t["last_error_status_code"], last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"], last_error_body=t["last_error_body"],
next_run=t["next_run"], next_run=t["next_run"],
)) )
)
return out return out