From 27622813ecc2968f4e84a8a7545fd666a9f6a508 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Mon, 8 Apr 2019 16:41:09 +0200 Subject: [PATCH] More work for cleaning up old activities --- app.py | 262 +++++++++++++++++-------------------- poussetaches.py | 16 ++- templates/admin_tasks.html | 29 +++- templates/login.html | 2 +- utils/media.py | 60 +++++++++ 5 files changed, 222 insertions(+), 147 deletions(-) diff --git a/app.py b/app.py index 507c767..7d00994 100644 --- a/app.py +++ b/app.py @@ -599,23 +599,28 @@ def admin_login(): if request.method == "POST": csrf.protect() pwd = request.form.get("pass") - if pwd and verify_pass(pwd): - if devices: - resp = json.loads(request.form.get("resp")) - print(resp) - try: - u2f.complete_authentication(session["challenge"], resp) - except ValueError as exc: - print("failed", exc) - abort(401) - return - finally: - session["challenge"] = None + if devices: + resp = json.loads(request.form.get("resp")) + try: + u2f.complete_authentication(session["challenge"], resp) + except ValueError as exc: + print("failed", exc) + abort(403) + return + finally: + session["challenge"] = None session["logged_in"] = True return redirect( request.args.get("redirect") or url_for("admin_notifications") ) + elif pwd and verify_pass(pwd): + session["logged_in"] = True + return redirect( + request.args.get("redirect") or url_for("admin_notifications") + ) + elif pwd: + abort(403) else: abort(401) @@ -681,7 +686,8 @@ def u2f_register(): device, device_cert = u2f.complete_registration(session["challenge"], resp) session["challenge"] = None DB.u2f.insert_one({"device": device, "cert": device_cert}) - return "" + session["logged_in"] = False + return redirect("/login") ####### @@ -693,133 +699,6 @@ def drop_cache(): return "Done" -@app.route("/migration1_step1") -@login_required -def tmp_migrate(): - for activity in DB.outbox.find(): - activity["box"] = Box.OUTBOX.value - DB.activities.insert_one(activity) - for activity in DB.inbox.find(): - activity["box"] = Box.INBOX.value - DB.activities.insert_one(activity) - for activity in DB.replies.find(): - activity["box"] = Box.REPLIES.value - DB.activities.insert_one(activity) - return "Done" - - -@app.route("/migration1_step2") -@login_required -def tmp_migrate2(): - # Remove buggy OStatus announce - DB.activities.remove( - {"activity.object": {"$regex": f"^tag:"}, "type": ActivityType.ANNOUNCE.value} - ) - # Cache the object - for activity in DB.activities.find(): - if ( - activity["box"] == Box.OUTBOX.value - and activity["type"] == ActivityType.LIKE.value - ): - like = ap.parse_activity(activity["activity"]) - obj = like.get_object() - DB.activities.update_one( - {"remote_id": like.id}, - {"$set": {"meta.object": obj.to_dict(embed=True)}}, - ) - elif activity["type"] == ActivityType.ANNOUNCE.value: - announce = ap.parse_activity(activity["activity"]) - obj = announce.get_object() - DB.activities.update_one( - {"remote_id": announce.id}, - {"$set": {"meta.object": obj.to_dict(embed=True)}}, - ) - return "Done" - - -@app.route("/migration2") -@login_required -def tmp_migrate3(): - for activity in DB.activities.find(): - try: - activity = ap.parse_activity(activity["activity"]) - actor = activity.get_actor() - if actor.icon: - MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) - if activity.type == ActivityType.CREATE.value: - for attachment in activity.get_object()._data.get("attachment", []): - MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) - except Exception: - app.logger.exception("failed") - return "Done" - - -@app.route("/migration3") -@login_required -def tmp_migrate4(): - for activity in DB.activities.find( - {"box": Box.OUTBOX.value, "type": ActivityType.UNDO.value} - ): - try: - activity = ap.parse_activity(activity["activity"]) - if activity.get_object().type == ActivityType.FOLLOW.value: - DB.activities.update_one( - {"remote_id": activity.get_object().id}, - {"$set": {"meta.undo": True}}, - ) - print(activity.get_object().to_dict()) - except Exception: - app.logger.exception("failed") - for activity in DB.activities.find( - {"box": Box.INBOX.value, "type": ActivityType.UNDO.value} - ): - try: - activity = ap.parse_activity(activity["activity"]) - if activity.get_object().type == ActivityType.FOLLOW.value: - DB.activities.update_one( - {"remote_id": activity.get_object().id}, - {"$set": {"meta.undo": True}}, - ) - print(activity.get_object().to_dict()) - except Exception: - app.logger.exception("failed") - return "Done" - - -@app.route("/migration4") -@login_required -def tmp_migrate5(): - for activity in DB.activities.find(): - Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False) - - return "Done" - - -@app.route("/migration5") -@login_required -def tmp_migrate6(): - for activity in DB.activities.find(): - # tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False) - - try: - a = ap.parse_activity(activity["activity"]) - if a.has_type([ActivityType.LIKE, ActivityType.FOLLOW]): - DB.activities.update_one( - {"remote_id": a.id}, - { - "$set": { - "meta.object_actor": activitypub._actor_to_meta( - a.get_object().get_actor() - ) - } - }, - ) - except Exception: - app.logger.exception(f"processing {activity} failed") - - return "Done" - - def paginated_query(db, q, limit=25, sort_key="_id"): older_than = newer_than = None query_sort = -1 @@ -1422,6 +1301,8 @@ def admin(): def admin_cleanup(): d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") + # (We keep Follow and Accept forever) + # Announce and Like cleanup for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: # Migrate old (before meta.keep activities on the fly) @@ -1475,6 +1356,97 @@ def admin_cleanup(): } ) + # Create cleanup (more complicated) + # The one that mention our actor + DB.activities.update_many( + { + "box": Box.INBOX.value, + "meta.keep": {"$exists": False}, + "activity.object.tag.href": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + DB.activities.update_many( + { + "box": Box.REPLIES.value, + "meta.keep": {"$exists": False}, + "activity.tag.href": {"$regex": f"^{BASE_URL}"}, + }, + {"$set": {"meta.keep": True}}, + ) + + # The replies of the outbox + DB.activities.update_many( + {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}}, + {"$set": {"meta.keep": True}}, + ) + # Track all the threads we participated + keep_threads = [] + for data in DB.activities.find( + { + "box": Box.OUTBOX.value, + "type": ActivityType.CREATE.value, + "meta.thread_root_parent": {"$exists": True}, + } + ): + keep_threads.append(data["meta"]["thread_root_parent"]) + + for root_parent in set(keep_threads): + DB.activities.update_many( + {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}} + ) + + DB.activities.update_many( + { + "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]}, + "meta.keep": {"$exists": False}, + }, + {"$set": {"meta.keep": False}}, + ) + return "OK" + + +@app.route("/admin/cleanup2", methods=["GET"]) +@login_required +def admin_cleanup2(): + d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") + + # Go over the old Create activities + for data in DB.activities.find( + { + "box": Box.INBOX.value, + "type": ActivityType.CREATE.value, + "meta.keep": False, + "activity.published": {"$lt": d}, + } + ): + # Delete the cached attachment/ + for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}): + MEDIA_CACHE.fs.delete(grid_item._id) + + # Delete the Create activities that no longer have cached attachments + DB.activities.delete_many( + { + "box": Box.INBOX.value, + "type": ActivityType.CREATE.value, + "meta.keep": False, + "activity.published": {"$lt": d}, + } + ) + + # Delete old replies we don't care about + DB.activities.delete_many( + {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}} + ) + + # Remove all the attachments no tied to a remote_id (post celery migration) + for grid_item in MEDIA_CACHE.fs.find( + {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}} + ): + MEDIA_CACHE.fs.delete(grid_item._id) + + # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task + return "OK" @@ -1482,7 +1454,10 @@ def admin_cleanup(): @login_required def admin_tasks(): return render_template( - "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting() + "admin_tasks.html", + dead=p.get_dead(), + waiting=p.get_waiting(), + cron=[], # cron=p.get_cron(), ) @@ -2385,7 +2360,7 @@ def task_fetch_og_meta(): for og in og_metadata: if not og.get("image"): continue - MEDIA_CACHE.cache_og_image(og["image"]) + MEDIA_CACHE.cache_og_image2(og["image"], iri) app.logger.debug(f"OG metadata {og_metadata!r}") DB.activities.update_one( @@ -2613,7 +2588,7 @@ def task_cache_attachments(): or attachment.get("type") == ap.ActivityType.IMAGE.value ): try: - MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) + MEDIA_CACHE.cache_attachment2(attachment["url"], iri) except ValueError: app.logger.exception(f"failed to cache {attachment}") @@ -2710,6 +2685,7 @@ def task_process_new_activity(): tag_stream = False if activity.has_type(ap.ActivityType.ANNOUNCE): + # FIXME(tsileo): Ensure it's follower and store into a "dead activities" DB try: activity.get_object() tag_stream = True diff --git a/poussetaches.py b/poussetaches.py index 28314f3..e844dee 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -24,6 +24,7 @@ class Task: class GetTask: payload: Any expected: int + # schedule: str task_id: str next_run: datetime tries: int @@ -37,14 +38,21 @@ class PousseTaches: self.api_url = api_url self.base_url = base_url - def push(self, payload: Any, path: str, expected=200) -> str: + def push( + self, payload: Any, path: str, expected: int = 200, schedule: str = "" + ) -> str: # Encode our payload p = base64.b64encode(json.dumps(payload).encode()).decode() # Queue/push it resp = requests.post( self.api_url, - json={"url": self.base_url + path, "payload": p, "expected": expected}, + json={ + "url": self.base_url + path, + "payload": p, + "expected": expected, + "schedule": schedule, + }, ) resp.raise_for_status() @@ -93,6 +101,7 @@ class PousseTaches: task_id=t["id"], payload=t["payload"], expected=t["expected"], + # shedule=t["schedule"], tries=t["tries"], url=t["url"], last_error_status_code=t["last_error_status_code"], @@ -103,6 +112,9 @@ class PousseTaches: return out + def get_cron(self) -> List[GetTask]: + return self._get("cron") + def get_success(self) -> List[GetTask]: return self._get("success") diff --git a/templates/admin_tasks.html b/templates/admin_tasks.html index 38e5cbf..14f6f1c 100644 --- a/templates/admin_tasks.html +++ b/templates/admin_tasks.html @@ -6,6 +6,33 @@ {% include "header.html" %}
+

Cron

+ + + + + + + + + + + + + + {% for task in dead %} + + + + + + + + + {% endfor %} + +
#URLPayloadScheduleNext runResponse
{{ task.task_id }}{{ task.url }} ({{ task.expected }}){{ task.payload }}{{ task.schedule }}{{ task.next_run }}Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }})
+

Dead

@@ -22,7 +49,7 @@ {% for task in dead %} - + diff --git a/templates/login.html b/templates/login.html index fde73d2..0d5f368 100644 --- a/templates/login.html +++ b/templates/login.html @@ -21,7 +21,7 @@ display:inline; {% if u2f_enabled %} - + {% else %} {% endif %} diff --git a/utils/media.py b/utils/media.py index 66ad02a..06559f7 100644 --- a/utils/media.py +++ b/utils/media.py @@ -60,6 +60,25 @@ class MediaCache(object): kind=Kind.OG_IMAGE.value, ) + def cache_og_image2(self, url: str, remote_id: str) -> None: + if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}): + return + i = load(url, self.user_agent) + # Save the original attachment (gzipped) + i.thumbnail((100, 100)) + with BytesIO() as buf: + with GzipFile(mode="wb", fileobj=buf) as f1: + i.save(f1, format=i.format) + buf.seek(0) + self.fs.put( + buf, + url=url, + size=100, + content_type=i.get_format_mimetype(), + kind=Kind.OG_IMAGE.value, + remote_id=remote_id, + ) + def cache_attachment(self, url: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): return @@ -98,6 +117,46 @@ class MediaCache(object): ) return + def cache_attachment2(self, url: str, remote_id: str) -> None: + if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): + return + if ( + url.endswith(".png") + or url.endswith(".jpg") + or url.endswith(".jpeg") + or url.endswith(".gif") + ): + i = load(url, self.user_agent) + # Save the original attachment (gzipped) + with BytesIO() as buf: + f1 = GzipFile(mode="wb", fileobj=buf) + i.save(f1, format=i.format) + f1.close() + buf.seek(0) + self.fs.put( + buf, + url=url, + size=None, + content_type=i.get_format_mimetype(), + kind=Kind.ATTACHMENT.value, + remote_id=remote_id, + ) + # Save a thumbnail (gzipped) + i.thumbnail((720, 720)) + with BytesIO() as buf: + with GzipFile(mode="wb", fileobj=buf) as f1: + i.save(f1, format=i.format) + buf.seek(0) + self.fs.put( + buf, + url=url, + size=720, + content_type=i.get_format_mimetype(), + kind=Kind.ATTACHMENT.value, + remote_id=remote_id, + ) + return + # The attachment is not an image, download and save it anyway with requests.get( url, stream=True, headers={"User-Agent": self.user_agent} @@ -115,6 +174,7 @@ class MediaCache(object): size=None, content_type=mimetypes.guess_type(url)[0], kind=Kind.ATTACHMENT.value, + remote_id=remote_id, ) def cache_actor_icon(self, url: str) -> None:
{{ task.task_id }}{{ task.url }} ({{ task.expected }}{{ task.url }} ({{ task.expected }}) {{ task.payload }} {{ task.next_run }} Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }})