From 2180a79cf29018d483c91294cb057af9de0e5fef Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Thu, 18 Jul 2019 20:48:49 +0200 Subject: [PATCH] Disable old cleanup and start working on GC, clean media cache --- app.py | 208 ++++---------------------------------------- gc.py | 100 +++++++++++++++++++++ templates/note.html | 2 +- utils/media.py | 70 +-------------- 4 files changed, 123 insertions(+), 257 deletions(-) create mode 100644 gc.py diff --git a/app.py b/app.py index 354fa6e..7a0ef1e 100644 --- a/app.py +++ b/app.py @@ -19,6 +19,7 @@ from urllib.parse import urlparse import bleach import emoji_unicode +import html2text import mf2py import requests import timeago @@ -34,7 +35,6 @@ from flask import request from flask import session from flask import url_for from flask_wtf.csrf import CSRFProtect -import html2text from itsdangerous import BadSignature from little_boxes import activitypub as ap from little_boxes.activitypub import ActivityType @@ -2675,14 +2675,14 @@ def task_fetch_og_meta(): for og in og_metadata: if not og.get("image"): continue - MEDIA_CACHE.cache_og_image2(og["image"], iri) + MEDIA_CACHE.cache_og_image(og["image"], iri) app.logger.debug(f"OG metadata {og_metadata!r}") DB.activities.update_one( {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} ) - app.logger.info(f"OG metadata fetched for {iri}") + app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}") except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"dropping activity {iri}, skip OG metedata") return "" @@ -2893,33 +2893,12 @@ def task_cache_attachments(): app.logger.info(f"activity={activity!r}") # Generates thumbnails for the actor's icon and the attachments if any - actor = activity.get_actor() - - # Update the cached actor - DB.actors.update_one( - {"remote_id": iri}, - {"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}}, - upsert=True, - ) - - if actor.icon: - MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) - - obj = None - if activity.has_type(ap.ActivityType.CREATE): - # This means a `Create` triggered the task - obj = activity.get_object() - elif activity.has_type(ap.CREATE_TYPES): - # This means a `Announce` triggered the task - obj = activity - else: - app.logger.warning(f"Don't know what to do with {activity!r}") - return + obj = activity.get_object() # Iter the attachments for attachment in obj._data.get("attachment", []): try: - MEDIA_CACHE.cache_attachment2(attachment, iri) + MEDIA_CACHE.cache_attachment(attachment, iri) except ValueError: app.logger.exception(f"failed to cache {attachment}") @@ -2938,22 +2917,28 @@ def task_cache_attachments(): def task_cache_actor() -> str: task = p.parse(request) app.logger.info(f"task={task!r}") - iri, also_cache_attachments = ( - task.payload["iri"], - task.payload.get("also_cache_attachments", True), - ) + iri = task.payload["iri"] try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") - # FIXME(tsileo): OG meta for Announce? + # Fetch the Open Grah metadata if it's a `Create` if activity.has_type(ap.ActivityType.CREATE): Tasks.fetch_og_meta(iri) - if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]): + # Cache the object if it's a `Like` or an `Announce` unrelated to the server outbox (because it will never get + # displayed) + if activity.has_type( + [ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE] + ) and not activity.get_object_id().startswith(BASE_URL): Tasks.cache_object(iri) actor = activity.get_actor() + if actor.icon: + if isinstance(actor.icon, dict) and "url" in actor.icon: + MEDIA_CACHE.cache_actor_icon(actor.icon["url"]) + else: + app.logger.warning(f"failed to parse icon {actor.icon} for {iri}") if activity.has_type(ap.ActivityType.FOLLOW): if actor.id == ID: @@ -2973,12 +2958,8 @@ def task_cache_actor() -> str: ) app.logger.info(f"actor cached for {iri}") - if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE): + if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): Tasks.cache_attachments(iri) - elif also_cache_attachments and activity.has_type(ap.ActivityType.ANNOUNCE): - obj = activity.get_object() - Tasks.cache_attachments(obj.id) - Tasks.cache_actor(obj.id) except (ActivityGoneError, ActivityNotFoundError): DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) @@ -3291,129 +3272,13 @@ def task_update_question(): def task_cleanup(): task = p.parse(request) app.logger.info(f"task={task!r}") - p.push({}, "/task/cleanup_part_1") + # p.push({}, "/task/cleanup_part_1") return "" -@app.route("/task/cleanup_part_1", methods=["POST"]) def task_cleanup_part_1(): task = p.parse(request) app.logger.info(f"task={task!r}") - d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") - - # (We keep Follow and Accept forever) - - # Announce and Like cleanup - for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: - # Migrate old (before meta.keep activities on the fly) - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - "activity.object": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - "activity.object.id": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - DB.activities.update_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": {"$exists": False}, - }, - {"$set": {"meta.keep": False}}, - ) - # End of the migration - - # Delete old activities - DB.activities.delete_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": False, - "activity.published": {"$lt": d}, - } - ) - - # And delete the soft-deleted one - DB.activities.delete_many( - { - "box": Box.INBOX.value, - "type": ap_type.value, - "meta.keep": False, - "meta.deleted": True, - } - ) - - # Create cleanup (more complicated) - # The one that mention our actor - DB.activities.update_many( - { - "box": Box.INBOX.value, - "meta.keep": {"$exists": False}, - "activity.object.tag.href": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - DB.activities.update_many( - { - "box": Box.REPLIES.value, - "meta.keep": {"$exists": False}, - "activity.tag.href": {"$regex": f"^{BASE_URL}"}, - }, - {"$set": {"meta.keep": True}}, - ) - - # The replies of the outbox - DB.activities.update_many( - {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}}, - {"$set": {"meta.keep": True}}, - ) - # Track all the threads we participated - keep_threads = [] - for data in DB.activities.find( - { - "box": Box.OUTBOX.value, - "type": ActivityType.CREATE.value, - "meta.thread_root_parent": {"$exists": True}, - } - ): - keep_threads.append(data["meta"]["thread_root_parent"]) - - for root_parent in set(keep_threads): - DB.activities.update_many( - {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}} - ) - - DB.activities.update_many( - { - "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]}, - "meta.keep": {"$exists": False}, - }, - {"$set": {"meta.keep": False}}, - ) - - DB.activities.update_many( - { - "box": Box.OUTBOX.value, - "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, - "meta.public": {"$exists": False}, - }, - {"$set": {"meta.public": True}}, - ) - - p.push({}, "/task/cleanup_part_2") return "OK" @@ -3421,25 +3286,6 @@ def task_cleanup_part_1(): def task_cleanup_part_2(): task = p.parse(request) app.logger.info(f"task={task!r}") - d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") - - # Go over the old Create activities - for data in DB.activities.find( - { - "box": Box.INBOX.value, - "type": ActivityType.CREATE.value, - "meta.keep": False, - "activity.published": {"$lt": d}, - } - ).limit(5000): - # Delete the cached attachment/ - for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}): - MEDIA_CACHE.fs.delete(grid_item._id) - DB.activities.delete_one({"_id": data["_id"]}) - - # FIXME(tsileo): cleanup cache from announces object - - p.push({}, "/task/cleanup_part_3") return "OK" @@ -3447,20 +3293,4 @@ def task_cleanup_part_2(): def task_cleanup_part_3(): task = p.parse(request) app.logger.info(f"task={task!r}") - - d = (datetime.utcnow() - timedelta(days=15)).strftime("%Y-%m-%d") - - # Delete old replies we don't care about - DB.activities.delete_many( - {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}} - ) - - # Remove all the attachments no tied to a remote_id (post celery migration) - for grid_item in MEDIA_CACHE.fs.find( - {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}} - ): - MEDIA_CACHE.fs.delete(grid_item._id) - - # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task - return "OK" diff --git a/gc.py b/gc.py new file mode 100644 index 0000000..95a03ba --- /dev/null +++ b/gc.py @@ -0,0 +1,100 @@ +import logging +from datetime import datetime +from datetime import timedelta +from typing import Any +from typing import Dict +from typing import List +from urllib.parse import urlparse + +from little_boxes import activitypub as ap + +import activitypub +from activitypub import Box +from config import ID +from config import ME +from config import MEDIA_CACHE +from utils.migrations import DB +from utils.migrations import Migration +from utils.migrations import logger + +back = activitypub.MicroblogPubBackend() +ap.use_backend(back) + +MY_PERSON = ap.Person(**ME) + +logger = logging.getLogger(__name__) + + +def threads_of_interest() -> List[str]: + out = set() + + # Fetch all the threads we've participed in + for data in DB.activities.find( + { + "meta.thread_root_parent": {"$exists": True}, + "box": Box.OUTBOX.value, + "type": ap.ActivityType.CREATE.value, + } + ): + out.add(data["meta"]["thread_root_parent"]) + + # Fetch all threads related to bookmarked activities + for data in DB.activities.find({"meta.bookmarked": True}): + # Keep the replies + out.add(data["meta"]["object_id"]) + # And the whole thread if any + if "thread_root_parent" in data["meta"]: + out.add(data["meta"]["thread_root_parent"]) + + return list(out) + + +def perform() -> None: + d = (datetime.utcnow() - timedelta(days=2)).strftime("%Y-%m-%d") + toi = threads_of_interest() + + # Go over the old Create activities + for data in DB.activities.find( + { + "box": Box.INBOX.value, + "type": ap.ActivityType.CREATE.value, + "activity.published": {"$lt": d}, + } + ).limit(1000): + remote_id = data["remote_id"] + meta = data["meta"] + activity = ap.parse_activity(data["activity"]) + logger.info(f"{activity}") + + # This activity has been bookmarked, keep it + if meta.get("bookmarked"): + continue + + # Inspect the object + obj = activity.get_object() + + # This activity mentions the server actor, keep it + if obj.has_mention(ID): + continue + + # This activity is a direct reply of one the server actor activity, keep it + in_reply_to = obj.get_in_reply_to() + if in_reply_to and in_reply_to.startswith(ID): + continue + + # This activity is part of a thread we want to keep, keep it + if in_reply_to and meta.get("thread_root_parent"): + thread_root_parent = meta["thread_root_parent"] + if thread_root_parent.startswith(ID) or thread_root_parent in toi: + continue + + # This activity was boosted or liked, keep it + if meta.get("boosted") or meta.get("liked"): + continue + + # Delete the cached attachment + for grid_item in MEDIA_CACHE.fs.find({"remote_id": remote_id}): + MEDIA_CACHE.fs.delete(grid_item._id) + + # Delete the activity + DB.activities.delete_one({"_id": data["_id"]}) diff --git a/templates/note.html b/templates/note.html index 46603b4..053b6c7 100644 --- a/templates/note.html +++ b/templates/note.html @@ -1,6 +1,6 @@ {% extends "layout.html" %} {% import 'utils.html' as utils %} -{% block title %}{{ config.NAME }}: {{ note.activity.object.content | html2plaintext | truncate(50) }}{% endblock %} +{% block title %}{{ config.NAME }}: "{{ note.activity.object.content | html2plaintext | truncate(50) }}"{% endblock %} {% block header %} diff --git a/utils/media.py b/utils/media.py index d2e3d42..9e2467f 100644 --- a/utils/media.py +++ b/utils/media.py @@ -42,25 +42,7 @@ class MediaCache(object): self.fs = gridfs.GridFS(gridfs_db) self.user_agent = user_agent - def cache_og_image(self, url: str) -> None: - if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}): - return - i = load(url, self.user_agent) - # Save the original attachment (gzipped) - i.thumbnail((100, 100)) - with BytesIO() as buf: - with GzipFile(mode="wb", fileobj=buf) as f1: - i.save(f1, format=i.format) - buf.seek(0) - self.fs.put( - buf, - url=url, - size=100, - content_type=i.get_format_mimetype(), - kind=Kind.OG_IMAGE.value, - ) - - def cache_og_image2(self, url: str, remote_id: str) -> None: + def cache_og_image(self, url: str, remote_id: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}): return i = load(url, self.user_agent) @@ -79,49 +61,11 @@ class MediaCache(object): remote_id=remote_id, ) - def cache_attachment(self, url: str) -> None: - if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): - return - if ( - url.endswith(".png") - or url.endswith(".jpg") - or url.endswith(".jpeg") - or url.endswith(".gif") - ): - i = load(url, self.user_agent) - # Save the original attachment (gzipped) - with BytesIO() as buf: - f1 = GzipFile(mode="wb", fileobj=buf) - i.save(f1, format=i.format) - f1.close() - buf.seek(0) - self.fs.put( - buf, - url=url, - size=None, - content_type=i.get_format_mimetype(), - kind=Kind.ATTACHMENT.value, - ) - # Save a thumbnail (gzipped) - i.thumbnail((720, 720)) - with BytesIO() as buf: - with GzipFile(mode="wb", fileobj=buf) as f1: - i.save(f1, format=i.format) - buf.seek(0) - self.fs.put( - buf, - url=url, - size=720, - content_type=i.get_format_mimetype(), - kind=Kind.ATTACHMENT.value, - ) - return - - def cache_attachment2(self, attachment: Dict[str, Any], remote_id: str) -> None: + def cache_attachment(self, attachment: Dict[str, Any], remote_id: str) -> None: url = attachment["url"] # Ensure it's not already there - if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): + if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value, "remote_id": remote_id}): return # If it's an image, make some thumbnails @@ -231,14 +175,6 @@ class MediaCache(object): ) return str(oid) - def cache(self, url: str, kind: Kind) -> None: - if kind == Kind.ACTOR_ICON: - self.cache_actor_icon(url) - elif kind == Kind.OG_IMAGE: - self.cache_og_image(url) - else: - self.cache_attachment(url) - def get_actor_icon(self, url: str, size: int) -> Any: return self.get_file(url, size, Kind.ACTOR_ICON)