import json import traceback from datetime import datetime from datetime import timezone from typing import Any from typing import Dict import flask import requests from bs4 import BeautifulSoup from flask import current_app as app from little_boxes import activitypub as ap from little_boxes.activitypub import _to_list from little_boxes.errors import ActivityGoneError from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import NotAnActivityError from requests.exceptions import HTTPError import config from config import DB from config import MEDIA_CACHE from core import gc from core.activitypub import SIG_AUTH from core.activitypub import Box from core.activitypub import _actor_hash from core.activitypub import _add_answers_to_question from core.activitypub import _cache_actor_icon from core.activitypub import is_from_outbox from core.activitypub import new_context from core.activitypub import post_to_outbox from core.activitypub import save_reply from core.activitypub import update_cached_actor from core.db import find_one_activity from core.db import update_one_activity from core.inbox import process_inbox from core.meta import MetaKey from core.meta import by_object_id from core.meta import by_remote_id from core.meta import by_type from core.meta import inc from core.meta import upsert from core.notifications import _NewMeta from core.notifications import set_inbox_flags from core.outbox import process_outbox from core.remote import track_failed_send from core.remote import track_successful_send from core.shared import MY_PERSON from core.shared import _Response from core.shared import back from core.shared import p from core.tasks import Tasks from utils import now from utils import opengraph from utils.media import is_video from utils.webmentions import discover_webmention_endpoint blueprint = flask.Blueprint("tasks", __name__) class TaskError(Exception): """Raised to log the error for poussetaches.""" def __init__(self): self.message = traceback.format_exc() @blueprint.route("/task/update_question", methods=["POST"]) def task_update_question() -> _Response: """Sends an Update.""" task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: app.logger.info(f"Updating question {iri}") cc = [config.ID + "/followers"] doc = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri}) _add_answers_to_question(doc) question = ap.Question(**doc["activity"]["object"]) raw_update = dict( actor=question.id, object=question.to_dict(embed=True), attributedTo=MY_PERSON.id, cc=list(set(cc)), to=[ap.AS_PUBLIC], ) raw_update["@context"] = config.DEFAULT_CTX update = ap.Update(**raw_update) print(update) print(update.to_dict()) post_to_outbox(update) except HTTPError as err: app.logger.exception("request failed") if 400 <= err.response.status_code <= 499: app.logger.info("client error, no retry") return "" raise TaskError() from err except Exception as err: app.logger.exception("task failed") raise TaskError() from err return "" @blueprint.route("/task/send_actor_update", methods=["POST"]) def task_send_actor_update() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") try: update = ap.Update( actor=MY_PERSON.id, object=MY_PERSON.to_dict(), to=[MY_PERSON.followers], cc=[ap.AS_PUBLIC], published=now(), context=new_context(), ) post_to_outbox(update) except Exception as err: app.logger.exception(f"failed to send actor update") raise TaskError() from err return "" @blueprint.route("/task/fetch_og_meta", methods=["POST"]) def task_fetch_og_meta() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") if activity.has_type(ap.ActivityType.CREATE): note = activity.get_object() links = opengraph.links_from_note(note.to_dict()) og_metadata = opengraph.fetch_og_metadata(config.USER_AGENT, links) for og in og_metadata: if not og.get("image"): continue config.MEDIA_CACHE.cache_og_image(og["image"], iri) app.logger.debug(f"OG metadata {og_metadata!r}") DB.activities.update_one( {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} ) app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}") except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"dropping activity {iri}, skip OG metedata") return "" except requests.exceptions.HTTPError as http_err: if 400 <= http_err.response.status_code < 500: app.logger.exception("bad request, no retry") return "" app.logger.exception("failed to fetch OG metadata") raise TaskError() from http_err except Exception as err: app.logger.exception(f"failed to fetch OG metadata for {iri}") raise TaskError() from err return "" @blueprint.route("/task/cache_object", methods=["POST"]) def task_cache_object() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") obj = activity.get_object() Tasks.cache_emojis(obj) # Refetch the object actor (without cache) obj_actor = ap.fetch_remote_activity(obj.get_actor().id, no_cache=True) cache = {MetaKey.OBJECT: obj.to_dict(embed=True)} if activity.get_actor().id != obj_actor.id: # Cache the object actor obj_actor_hash = _actor_hash(obj_actor) cache[MetaKey.OBJECT_ACTOR] = obj_actor.to_dict(embed=True) cache[MetaKey.OBJECT_ACTOR_ID] = obj_actor.id cache[MetaKey.OBJECT_ACTOR_HASH] = obj_actor_hash # Update the actor cache for the other activities update_cached_actor(obj_actor) update_one_activity(by_remote_id(activity.id), upsert(cache)) except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) app.logger.exception(f"flagging activity {iri} as deleted, no object caching") except Exception as err: app.logger.exception(f"failed to cache object for {iri}") raise TaskError() from err return "" @blueprint.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 def task_finish_post_to_outbox() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") recipients = activity.recipients() process_outbox(activity, {}) app.logger.info(f"recipients={recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity) for recp in recipients: app.logger.debug(f"posting to {recp}") Tasks.post_to_remote_inbox(payload, recp) except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"no retry") except Exception as err: app.logger.exception(f"failed to post to remote inbox for {iri}") raise TaskError() from err return "" @blueprint.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 def task_finish_post_to_inbox() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") process_inbox(activity, {}) except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): app.logger.exception(f"no retry") except Exception as err: app.logger.exception(f"failed to cfinish post to inbox for {iri}") raise TaskError() from err return "" def select_video_to_cache(links): """Try to find the 360p version from a video urls, or return the smallest one.""" videos = [] for link in links: if link.get("mimeType", "").startswith("video/") or is_video(link["href"]): videos.append({"href": link["href"], "height": link["height"]}) if not videos: app.logger.warning(f"failed to select a video from {links!r}") return None videos = sorted(videos, key=lambda l: l["height"]) for video in videos: if video["height"] == 360: return video return videos[0] @blueprint.route( "/task/cache_attachments", methods=["POST"] ) # noqa: C910 # too complex def task_cache_attachments() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"caching attachment for activity={activity!r}") # Generates thumbnails for the actor's icon and the attachments if any if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): obj = activity.get_object() else: obj = activity if obj.content: content_html = BeautifulSoup(obj.content, "html5lib") for img in content_html.find_all("img"): src = img.attrs.get("src") if src: Tasks.cache_attachment({"url": src}, iri) if obj.has_type(ap.ActivityType.VIDEO): if isinstance(obj.url, list): # TODO: filter only videogt link = select_video_to_cache(obj.url) if link: Tasks.cache_attachment({"url": link["href"]}, iri) elif isinstance(obj.url, str): Tasks.cache_attachment({"url": obj.url}, iri) else: app.logger.warning(f"failed to parse video link {obj!r} for {iri}") # Iter the attachments for attachment in obj._data.get("attachment", []): Tasks.cache_attachment(attachment, iri) app.logger.info(f"attachments cached for {iri}") except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): app.logger.exception(f"dropping activity {iri}, no attachment caching") except Exception as err: app.logger.exception(f"failed to cache attachments for {iri}") raise TaskError() from err return "" @blueprint.route("/task/cache_attachment", methods=["POST"]) def task_cache_attachment() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload["iri"] attachment = task.payload["attachment"] try: app.logger.info(f"caching attachment {attachment!r} for {iri}") config.MEDIA_CACHE.cache_attachment(attachment, iri) app.logger.info(f"attachment {attachment!r} cached for {iri}") except Exception as err: app.logger.exception(f"failed to cache attachment {attachment!r} for {iri}") raise TaskError() from err return "" @blueprint.route("/task/send_webmention", methods=["POST"]) def task_send_webmention() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") note_url = task.payload["note_url"] link = task.payload["link"] remote_id = task.payload["remote_id"] try: app.logger.info(f"trying to send webmention source={note_url} target={link}") webmention_endpoint = discover_webmention_endpoint(link) if not webmention_endpoint: app.logger.info("no webmention endpoint") return "" resp = requests.post( webmention_endpoint, data={"source": note_url, "target": link}, headers={"User-Agent": config.USER_AGENT}, ) app.logger.info(f"webmention endpoint resp={resp}/{resp.text}") resp.raise_for_status() except HTTPError as err: app.logger.exception("request failed") if 400 <= err.response.status_code <= 499: app.logger.info("client error, no retry") return "" raise TaskError() from err except Exception as err: app.logger.exception(f"failed to cache actor for {link}/{remote_id}/{note_url}") raise TaskError() from err return "" @blueprint.route("/task/cache_actor", methods=["POST"]) # noqa: C910 # too complex def task_cache_actor() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload["iri"] try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") # Reload the actor without caching (in case it got upated) actor = ap.fetch_remote_activity(activity.get_actor().id, no_cache=True) # Fetch the Open Grah metadata if it's a `Create` if activity.has_type(ap.ActivityType.CREATE): obj = activity.get_object() try: links = opengraph.links_from_note(obj.to_dict()) if links: Tasks.fetch_og_meta(iri) # Send Webmentions only if it's from the outbox, and public if ( is_from_outbox(obj) and ap.get_visibility(obj) == ap.Visibility.PUBLIC ): Tasks.send_webmentions(activity, links) except Exception: app.logger.exception("failed to cache links") if activity.has_type(ap.ActivityType.FOLLOW): if actor.id == config.ID: # It's a new following, cache the "object" (which is the actor we follow) DB.activities.update_one( by_remote_id(iri), upsert({MetaKey.OBJECT: activity.get_object().to_dict(embed=True)}), ) # Cache the actor info update_cached_actor(actor) app.logger.info(f"actor cached for {iri}") if not activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): return "" if activity.get_object()._data.get( "attachment", [] ) or activity.get_object().has_type(ap.ActivityType.VIDEO): Tasks.cache_attachments(iri) except (ActivityGoneError, ActivityNotFoundError): DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) app.logger.exception(f"flagging activity {iri} as deleted, no actor caching") except Exception as err: app.logger.exception(f"failed to cache actor for {iri}") raise TaskError() from err return "" @blueprint.route("/task/cache_actor_icon", methods=["POST"]) def task_cache_actor_icon() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") actor_iri = task.payload["actor_iri"] icon_url = task.payload["icon_url"] try: MEDIA_CACHE.cache_actor_icon(icon_url) except Exception as exc: err = f"failed to cache actor icon {icon_url} for {actor_iri}" app.logger.exception(err) raise TaskError() from exc return "" @blueprint.route("/task/cache_emoji", methods=["POST"]) def task_cache_emoji() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload["iri"] url = task.payload["url"] try: MEDIA_CACHE.cache_emoji(url, iri) except Exception as exc: err = f"failed to cache emoji {url} at {iri}" app.logger.exception(err) raise TaskError() from exc return "" @blueprint.route("/task/forward_activity", methods=["POST"]) def task_forward_activity() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) recipients = back.followers_as_recipients() app.logger.debug(f"Forwarding {activity!r} to {recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity) for recp in recipients: app.logger.debug(f"forwarding {activity!r} to {recp}") Tasks.post_to_remote_inbox(payload, recp) except Exception as err: app.logger.exception("task failed") raise TaskError() from err return "" @blueprint.route("/task/post_to_remote_inbox", methods=["POST"]) def task_post_to_remote_inbox() -> _Response: """Post an activity to a remote inbox.""" task = p.parse(flask.request) app.logger.info(f"task={task!r}") payload, to = task.payload["payload"], task.payload["to"] try: app.logger.info("payload=%s", payload) app.logger.info("generating sig") signed_payload = json.loads(payload) app.logger.info("to=%s", to) resp = requests.post( to, data=json.dumps(signed_payload), auth=SIG_AUTH, headers={ "Content-Type": config.HEADERS[1], "Accept": config.HEADERS[1], "User-Agent": config.USER_AGENT, }, ) app.logger.info("resp=%s", resp) app.logger.info("resp_body=%s", resp.text) resp.raise_for_status() except HTTPError as err: track_failed_send(to) app.logger.exception("request failed") if 400 <= err.response.status_code <= 499: app.logger.info("client error, no retry") return "" raise TaskError() from err except requests.RequestException: track_failed_send(to) app.logger.exception("request failed") except Exception as err: app.logger.exception("task failed") raise TaskError() from err track_successful_send(to) return "" @blueprint.route("/task/fetch_remote_question", methods=["POST"]) def task_fetch_remote_question() -> _Response: """Fetch a remote question for implementation that does not send Update.""" task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: app.logger.info(f"Fetching remote question {iri}") local_question = DB.activities.find_one( { "box": Box.INBOX.value, "type": ap.ActivityType.CREATE.value, "activity.object.id": iri, } ) try: remote_question = ap.get_backend().fetch_iri(iri, no_cache=True) except (ActivityGoneError, ActivityNotFoundError): app.logger.info("f{iri} not found, no retry") return "" # FIXME(tsileo): compute and set `meta.object_visiblity` (also update utils.py to do it) if ( local_question and ( local_question["meta"].get("voted_for") or local_question["meta"].get("subscribed") ) and not DB.notifications.find_one({"activity.id": remote_question["id"]}) ): DB.notifications.insert_one( { "type": "question_ended", "datetime": datetime.now(timezone.utc).isoformat(), "activity": remote_question, } ) # Update the Create if we received it in the inbox if local_question: DB.activities.update_one( {"remote_id": local_question["remote_id"], "box": Box.INBOX.value}, {"$set": {"activity.object": remote_question}}, ) # Also update all the cached copies (Like, Announce...) DB.activities.update_many( {"meta.object.id": remote_question["id"]}, {"$set": {"meta.object": remote_question}}, ) except HTTPError as err: app.logger.exception("request failed") if 400 <= err.response.status_code <= 499: app.logger.info("client error, no retry") return "" raise TaskError() from err except Exception as err: app.logger.exception("task failed") raise TaskError() from err return "" @blueprint.route("/task/cleanup", methods=["POST"]) def task_cleanup() -> _Response: task = p.parse(flask.request) app.logger.info(f"task={task!r}") gc.perform() return "" def _is_local_reply(activity: ap.BaseActivity) -> bool: for dest in _to_list(activity.to or []): if dest.startswith(config.BASE_URL): return True for dest in _to_list(activity.cc or []): if dest.startswith(config.BASE_URL): return True return False @blueprint.route("/task/process_reply", methods=["POST"]) def task_process_reply() -> _Response: """Process `Announce`d posts from Pleroma relays in order to process replies of activities that are in the inbox.""" task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"checking for reply activity={activity!r}") # Some AP server always return Create when requesting an object if activity.has_type(ap.ActivityType.CREATE): activity = activity.get_object() in_reply_to = activity.get_in_reply_to() if not in_reply_to: # If it's not reply, we can drop it app.logger.info(f"activity={activity!r} is not a reply, dropping it") return "" root_reply = in_reply_to # Fetch the activity reply reply = ap.fetch_remote_activity(in_reply_to) if reply.has_type(ap.ActivityType.CREATE): reply = reply.get_object() new_replies = [activity, reply] while 1: in_reply_to = reply.get_in_reply_to() if not in_reply_to: break root_reply = in_reply_to reply = ap.fetch_remote_activity(root_reply) if reply.has_type(ap.ActivityType.CREATE): reply = reply.get_object() new_replies.append(reply) app.logger.info(f"root_reply={reply!r} for activity={activity!r}") # In case the activity was from the inbox update_one_activity( {**by_object_id(activity.id), **by_type(ap.ActivityType.CREATE)}, upsert({MetaKey.THREAD_ROOT_PARENT: root_reply}), ) for (new_reply_idx, new_reply) in enumerate(new_replies): if find_one_activity( {**by_object_id(new_reply.id), **by_type(ap.ActivityType.CREATE)} ) or DB.replies.find_one(by_remote_id(new_reply.id)): continue actor = new_reply.get_actor() is_root_reply = new_reply_idx == len(new_replies) - 1 if is_root_reply: reply_flags: Dict[str, Any] = {} else: reply_actor = new_replies[new_reply_idx + 1].get_actor() is_in_reply_to_self = actor.id == reply_actor.id reply_flags = { MetaKey.IN_REPLY_TO_SELF.value: is_in_reply_to_self, MetaKey.IN_REPLY_TO.value: new_reply.get_in_reply_to(), } if not is_in_reply_to_self: reply_flags[MetaKey.IN_REPLY_TO_ACTOR.value] = reply_actor.to_dict( embed=True ) # Save the reply with the cached actor and the thread flag/ID save_reply( new_reply, { **reply_flags, MetaKey.THREAD_ROOT_PARENT.value: root_reply, MetaKey.ACTOR.value: actor.to_dict(embed=True), MetaKey.ACTOR_HASH.value: _actor_hash(actor), }, ) # Update the reply counters if new_reply.get_in_reply_to(): update_one_activity( { **by_object_id(new_reply.get_in_reply_to()), **by_type(ap.ActivityType.CREATE), }, inc(MetaKey.COUNT_REPLY, 1), ) DB.replies.update_one( by_remote_id(new_reply.get_in_reply_to()), inc(MetaKey.COUNT_REPLY, 1), ) # Cache the actor icon _cache_actor_icon(actor) # And cache the attachments Tasks.cache_attachments(new_reply.id) except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"dropping activity {iri}, skip processing") return "" except Exception as err: app.logger.exception(f"failed to process new activity {iri}") raise TaskError() from err return "" @blueprint.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 def task_process_new_activity() -> _Response: """Process an activity received in the inbox""" task = p.parse(flask.request) app.logger.info(f"task={task!r}") iri = task.payload try: activity = ap.fetch_remote_activity(iri) app.logger.info(f"activity={activity!r}") flags: _NewMeta = {} set_inbox_flags(activity, flags) app.logger.info(f"a={activity}, flags={flags!r}") if flags: DB.activities.update_one({"remote_id": activity.id}, {"$set": flags}) app.logger.info(f"new activity {iri} processed") except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"dropping activity {iri}, skip processing") return "" except Exception as err: app.logger.exception(f"failed to process new activity {iri}") raise TaskError() from err return ""