import logging from datetime import datetime from bson.objectid import ObjectId from html2text import html2text from feedgen.feed import FeedGenerator from little_boxes import activitypub as ap from little_boxes.backend import Backend from little_boxes.collection import parse_collection as ap_parse_collection from config import USERNAME, BASE_URL, ID from config import DB, ME import tasks from typing import List, Optional, Dict, Any, Union logger = logging.getLogger(__name__) def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" doc = doc.copy() if "_id" in doc: del (doc["_id"]) return doc def _to_list(data: Union[List[Any], Any]) -> List[Any]: """Helper to convert fields that can be either an object or a list of objects to a list of object.""" if isinstance(data, list): return data return [data] class MicroblogPubBackend(Backend): def base_url(self) -> str: return BASE_URL def activity_url(self, obj_id): return f"{BASE_URL}/outbox/{obj_id}" def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: DB.outbox.insert_one( { "activity": activity.to_dict(), "type": activity.type, "remote_id": activity.id, "meta": {"undo": False, "deleted": False}, } ) def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: return bool( DB.outbox.find_one( { "type": ap.ActivityType.BLOCK.value, "activity.object": as_actor.id, "meta.undo": False, } ) ) def fetch_iri(self, iri: str) -> ap.ObjectType: pass def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: return bool(DB.inbox.find_one({"remote_id": iri})) def inbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: DB.inbox.insert_one( { "activity": activity.to_dict(), "type": activity.type, "remote_id": activity.id, "meta": {"undo": False, "deleted": False}, } ) def post_to_remote_inbox(self, as_actor: ap.Person, payload: str, to: str) -> None: tasks.post_to_inbox.delay(payload, to) def new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: remote_actor = follow.get_actor().id if DB.followers.find({"remote_actor": remote_actor}).count() == 0: DB.followers.insert_one({"remote_actor": remote_actor}) def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: # TODO(tsileo): update the follow to set undo DB.followers.delete_one({"remote_actor": follow.get_actor().id}) def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: # TODO(tsileo): update the follow to set undo DB.following.delete_one({"remote_actor": follow.get_object().id}) def new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: remote_actor = follow.get_actor().id if DB.following.find({"remote_actor": remote_actor}).count() == 0: DB.following.insert_one({"remote_actor": remote_actor}) def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() # Update the meta counter if the object is published by the server DB.outbox.update_one( {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}} ) def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() # Update the meta counter if the object is published by the server DB.outbox.update_one( {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}} ) def outobx_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() # Unlikely, but an actor can like it's own post DB.outbox.update_one( {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}} ) # Keep track of the like we just performed DB.inbox.update_one( {"activity.object.id": obj.id}, {"$set": {"meta.liked": like.id}} ) def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() # Unlikely, but an actor can like it's own post DB.outbox.update_one( {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}} ) DB.inbox.update_one( {"activity.object.id": obj.id}, {"$set": {"meta.liked": False}} ) def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: if isinstance(announce._data["object"], str) and not announce._data[ "object" ].startswith("http"): # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else logger.warn( f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message' ) return # FIXME(tsileo): Save/cache the object, and make it part of the stream so we can fetch it if isinstance(announce._data["object"], str): obj_iri = announce._data["object"] else: obj_iri = self.get_object().id DB.outbox.update_one( {"activity.object.id": obj_iri}, {"$inc": {"meta.count_boost": 1}} ) def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: obj = announce.get_object() # Update the meta counter if the object is published by the server DB.outbox.update_one( {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}} ) def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: obj = announce.get_object() DB.inbox.update_one( {"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}} ) def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: obj = announce.get_object() DB.inbox.update_one( {"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}} ) def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: DB.inbox.update_one( {"activity.object.id": delete.get_object().id}, {"$set": {"meta.deleted": True}}, ) # FIXME(tsileo): handle threads # obj = delete._get_actual_object() # if obj.type_enum == ActivityType.NOTE: # obj._delete_from_threads() # TODO(tsileo): also purge the cache if it's a reply of a published activity def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: DB.outbox.update_one( {"activity.object.id": delete.get_object().id}, {"$set": {"meta.deleted": True}}, ) def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: obj = update.get_object() if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: DB.inbox.update_one( {"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}, ) return # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor def outbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: obj = update._data["object"] update_prefix = "activity.object." update: Dict[str, Any] = {"$set": dict(), "$unset": dict()} update["$set"][f"{update_prefix}updated"] = ( datetime.utcnow().replace(microsecond=0).isoformat() + "Z" ) for k, v in obj.items(): if k in ["id", "type"]: continue if v is None: update["$unset"][f"{update_prefix}{k}"] = "" else: update["$set"][f"{update_prefix}{k}"] = v if len(update["$unset"]) == 0: del (update["$unset"]) print(f"updating note from outbox {obj!r} {update}") logger.info(f"updating note from outbox {obj!r} {update}") DB.outbox.update_one({"activity.object.id": obj["id"]}, update) # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients # (create a new Update with the result of the update, and send it without saving it?) def gen_feed(): fg = FeedGenerator() fg.id(f"{ID}") fg.title(f"{USERNAME} notes") fg.author({"name": USERNAME, "email": "t@a4.io"}) fg.link(href=ID, rel="alternate") fg.description(f"{USERNAME} notes") fg.logo(ME.get("icon", {}).get("url")) fg.language("en") for item in DB.outbox.find({"type": "Create"}, limit=50): fe = fg.add_entry() fe.id(item["activity"]["object"].get("url")) fe.link(href=item["activity"]["object"].get("url")) fe.title(item["activity"]["object"]["content"]) fe.description(item["activity"]["object"]["content"]) return fg def json_feed(path: str) -> Dict[str, Any]: """JSON Feed (https://jsonfeed.org/) document.""" data = [] for item in DB.outbox.find({"type": "Create"}, limit=50): data.append( { "id": item["id"], "url": item["activity"]["object"].get("url"), "content_html": item["activity"]["object"]["content"], "content_text": html2text(item["activity"]["object"]["content"]), "date_published": item["activity"]["object"].get("published"), } ) return { "version": "https://jsonfeed.org/version/1", "user_comment": ( "This is a microblog feed. You can add this to your feed reader using the following URL: " + ID + path ), "title": USERNAME, "home_page_url": ID, "feed_url": ID + path, "author": { "name": USERNAME, "url": ID, "avatar": ME.get("icon", {}).get("url"), }, "items": data, } def build_inbox_json_feed( path: str, request_cursor: Optional[str] = None ) -> Dict[str, Any]: data = [] cursor = None q: Dict[str, Any] = {"type": "Create", "meta.deleted": False} if request_cursor: q["_id"] = {"$lt": request_cursor} for item in DB.inbox.find(q, limit=50).sort("_id", -1): actor = ap.get_backend().fetch_iri(item["activity"]["actor"]) data.append( { "id": item["activity"]["id"], "url": item["activity"]["object"].get("url"), "content_html": item["activity"]["object"]["content"], "content_text": html2text(item["activity"]["object"]["content"]), "date_published": item["activity"]["object"].get("published"), "author": { "name": actor.get("name", actor.get("preferredUsername")), "url": actor.get("url"), "avatar": actor.get("icon", {}).get("url"), }, } ) cursor = str(item["_id"]) resp = { "version": "https://jsonfeed.org/version/1", "title": f"{USERNAME}'s stream", "home_page_url": ID, "feed_url": ID + path, "items": data, } if cursor and len(data) == 50: resp["next_url"] = ID + path + "?cursor=" + cursor return resp def parse_collection( payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None ) -> List[str]: """Resolve/fetch a `Collection`/`OrderedCollection`.""" # Resolve internal collections via MongoDB directly if url == ID + "/followers": return [doc["remote_actor"] for doc in DB.followers.find()] elif url == ID + "/following": return [doc["remote_actor"] for doc in DB.following.find()] # Go through all the pages return ap_parse_collection(payload, url) def embed_collection(total_items, first_page_id): return { "type": ap.ActivityType.ORDERED_COLLECTION.value, "totalItems": total_items, "first": f"{first_page_id}?page=first", "id": first_page_id, } def build_ordered_collection( col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False ): col_name = col_name or col.name if q is None: q = {} if cursor: q["_id"] = {"$lt": ObjectId(cursor)} data = list(col.find(q, limit=limit).sort("_id", -1)) if not data: return { "id": BASE_URL + "/" + col_name, "totalItems": 0, "type": ap.ActivityType.ORDERED_COLLECTION.value, "orederedItems": [], } start_cursor = str(data[0]["_id"]) next_page_cursor = str(data[-1]["_id"]) total_items = col.find(q).count() data = [_remove_id(doc) for doc in data] if map_func: data = [map_func(doc) for doc in data] # No cursor, this is the first page and we return an OrderedCollection if not cursor: resp = { "@context": ap.COLLECTION_CTX, "id": f"{BASE_URL}/{col_name}", "totalItems": total_items, "type": ap.ActivityType.ORDERED_COLLECTION.value, "first": { "id": f"{BASE_URL}/{col_name}?cursor={start_cursor}", "orderedItems": data, "partOf": f"{BASE_URL}/{col_name}", "totalItems": total_items, "type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value, }, } if len(data) == limit: resp["first"]["next"] = ( BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor ) if first_page: return resp["first"] return resp # If there's a cursor, then we return an OrderedCollectionPage resp = { "@context": ap.COLLECTION_CTX, "type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value, "id": BASE_URL + "/" + col_name + "?cursor=" + start_cursor, "totalItems": total_items, "partOf": BASE_URL + "/" + col_name, "orderedItems": data, } if len(data) == limit: resp["next"] = BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor if first_page: return resp["first"] # XXX(tsileo): implements prev with prev=? return resp