import json from datetime import datetime from datetime import timedelta from datetime import timezone from typing import Any from typing import List from urllib.parse import urlparse import flask from flask import abort from flask import current_app as app from flask import redirect from flask import render_template from flask import request from flask import session from flask import url_for from little_boxes import activitypub as ap from little_boxes.webfinger import get_actor_url from passlib.hash import bcrypt from u2flib_server import u2f import config from config import DB from config import ID from config import PASS from core.activitypub import Box from core.activitypub import post_to_outbox from core.db import find_one_activity from core.meta import by_object_id from core.meta import by_type from core.shared import MY_PERSON from core.shared import _build_thread from core.shared import _Response from core.shared import csrf from core.shared import htmlify from core.shared import login_required from core.shared import noindex from core.shared import p from core.shared import paginated_query from utils import now from utils.emojis import EMOJIS_BY_NAME from utils.lookup import lookup blueprint = flask.Blueprint("admin", __name__) def verify_pass(pwd): return bcrypt.verify(pwd, PASS) @blueprint.route("/admin/update_actor") @login_required def admin_update_actor() -> _Response: # FIXME(tsileo): make this a task, and keep track of our own actor_hash at startup update = ap.Update( actor=MY_PERSON.id, object=MY_PERSON.to_dict(), to=[MY_PERSON.followers], cc=[ap.AS_PUBLIC], published=now(), ) post_to_outbox(update) return "OK" @blueprint.route("/admin/logout") @login_required def admin_logout() -> _Response: session["logged_in"] = False return redirect("/") @blueprint.route("/login", methods=["POST", "GET"]) @noindex def admin_login() -> _Response: if session.get("logged_in") is True: return redirect(url_for("admin.admin_notifications")) devices = [doc["device"] for doc in DB.u2f.find()] u2f_enabled = True if devices else False if request.method == "POST": csrf.protect() # 1. Check regular password login flow pwd = request.form.get("pass") if pwd: if verify_pass(pwd): session["logged_in"] = True return redirect( request.args.get("redirect") or url_for("admin.admin_notifications") ) else: abort(403) # 2. Check for U2F payload, if any elif devices: resp = json.loads(request.form.get("resp")) # type: ignore try: u2f.complete_authentication(session["challenge"], resp) except ValueError as exc: print("failed", exc) abort(403) return finally: session["challenge"] = None session["logged_in"] = True return redirect( request.args.get("redirect") or url_for("admin.admin_notifications") ) else: abort(401) payload = None if devices: payload = u2f.begin_authentication(ID, devices) session["challenge"] = payload return htmlify( render_template("login.html", u2f_enabled=u2f_enabled, payload=payload) ) @blueprint.route("/admin", methods=["GET"]) @login_required def admin_index() -> _Response: q = { "meta.deleted": False, "meta.undo": False, "type": ap.ActivityType.LIKE.value, "box": Box.OUTBOX.value, } col_liked = DB.activities.count(q) return htmlify( render_template( "admin.html", instances=list(DB.instances.find()), inbox_size=DB.activities.count({"box": Box.INBOX.value}), outbox_size=DB.activities.count({"box": Box.OUTBOX.value}), col_liked=col_liked, col_followers=DB.activities.count( { "box": Box.INBOX.value, "type": ap.ActivityType.FOLLOW.value, "meta.undo": False, } ), col_following=DB.activities.count( { "box": Box.OUTBOX.value, "type": ap.ActivityType.FOLLOW.value, "meta.undo": False, } ), ) ) @blueprint.route("/admin/indieauth", methods=["GET"]) @login_required def admin_indieauth() -> _Response: return htmlify( render_template( "admin_indieauth.html", indieauth_actions=DB.indieauth.find().sort("ts", -1).limit(100), ) ) @blueprint.route("/admin/tasks", methods=["GET"]) @login_required def admin_tasks() -> _Response: return htmlify( render_template( "admin_tasks.html", success=p.get_success(), dead=p.get_dead(), waiting=p.get_waiting(), cron=p.get_cron(), ) ) @blueprint.route("/admin/lookup", methods=["GET"]) @login_required def admin_lookup() -> _Response: data = None meta = None if request.args.get("url"): data = lookup(request.args.get("url")) # type: ignore if data: if data.has_type(ap.ActivityType.ANNOUNCE): meta = dict( object=data.get_object().to_dict(), object_actor=data.get_object().get_actor().to_dict(), actor=data.get_actor().to_dict(), ) elif data.has_type(ap.ActivityType.QUESTION): p.push(data.id, "/task/fetch_remote_question") print(data) app.logger.debug(data.to_dict()) return htmlify( render_template( "lookup.html", data=data, meta=meta, url=request.args.get("url") ) ) @blueprint.route("/admin/profile", methods=["GET"]) @login_required def admin_profile() -> _Response: if not request.args.get("actor_id"): abort(404) actor_id = request.args.get("actor_id") actor = ap.fetch_remote_activity(actor_id) q = { "meta.actor_id": actor_id, "box": "inbox", "type": {"$in": [ap.ActivityType.CREATE.value, ap.ActivityType.ANNOUNCE.value]}, } inbox_data, older_than, newer_than = paginated_query( DB.activities, q, limit=int(request.args.get("limit", 25)) ) follower = find_one_activity( { "box": "inbox", "type": ap.ActivityType.FOLLOW.value, "meta.actor_id": actor.id, "meta.undo": False, } ) following = find_one_activity( {"type": ap.ActivityType.ACCEPT.value, "meta.actor_id": actor.id} ) return htmlify( render_template( "stream.html", actor_id=actor_id, actor=actor.to_dict(), inbox_data=inbox_data, older_than=older_than, newer_than=newer_than, follower=follower, following=following, lists=list(DB.lists.find()), ) ) @blueprint.route("/admin/thread") @login_required def admin_thread() -> _Response: oid = request.args.get("oid") if not oid: abort(404) data = find_one_activity({**by_type(ap.ActivityType.CREATE), **by_object_id(oid)}) if not data: abort(404) if data["meta"].get("deleted", False): abort(410) thread = _build_thread(data) tpl = "note.html" if request.args.get("debug"): tpl = "note_debug.html" return htmlify(render_template(tpl, thread=thread, note=data)) @blueprint.route("/admin/new", methods=["GET"]) @login_required def admin_new() -> _Response: reply_id = None content = "" thread: List[Any] = [] print(request.args) if request.args.get("reply"): data = DB.activities.find_one({"activity.object.id": request.args.get("reply")}) if data: reply = ap.parse_activity(data["activity"]) else: data = dict( meta={}, activity=dict( object=ap.get_backend().fetch_iri(request.args.get("reply")) ), ) reply = ap.parse_activity(data["activity"]["object"]) reply_id = reply.id if reply.ACTIVITY_TYPE == ap.ActivityType.CREATE: reply_id = reply.get_object().id actor = reply.get_actor() domain = urlparse(actor.id).netloc # FIXME(tsileo): if reply of reply, fetch all participants content = f"@{actor.preferredUsername}@{domain} " thread = _build_thread(data) return htmlify( render_template( "new.html", reply=reply_id, content=content, thread=thread, visibility=ap.Visibility, emojis=config.EMOJIS.split(" "), custom_emojis={ name: ap.Emoji(**dat) for name, dat in EMOJIS_BY_NAME.items() }, ) ) @blueprint.route("/admin/lists", methods=["GET"]) @login_required def admin_lists() -> _Response: lists = list(DB.lists.find()) return htmlify(render_template("lists.html", lists=lists)) @blueprint.route("/admin/notifications") @login_required def admin_notifications() -> _Response: # Setup the cron for deleting old activities # FIXME(tsileo): put back to 12h p.push({}, "/task/cleanup", schedule="@every 1h") # Trigger a cleanup if asked if request.args.get("cleanup"): p.push({}, "/task/cleanup") # FIXME(tsileo): show unfollow (performed by the current actor) and liked??? mentions_query = { "type": ap.ActivityType.CREATE.value, "activity.object.tag.type": "Mention", "activity.object.tag.name": f"@{config.USERNAME}@{config.DOMAIN}", "meta.deleted": False, } replies_query = { "type": ap.ActivityType.CREATE.value, "activity.object.inReplyTo": {"$regex": f"^{config.BASE_URL}"}, "meta.poll_answer": False, } announced_query = { "type": ap.ActivityType.ANNOUNCE.value, "activity.object": {"$regex": f"^{config.BASE_URL}"}, } new_followers_query = {"type": ap.ActivityType.FOLLOW.value} unfollow_query = { "type": ap.ActivityType.UNDO.value, "activity.object.type": ap.ActivityType.FOLLOW.value, } likes_query = { "type": ap.ActivityType.LIKE.value, "activity.object": {"$regex": f"^{config.BASE_URL}"}, } followed_query = {"type": ap.ActivityType.ACCEPT.value} q = { "box": Box.INBOX.value, "$or": [ mentions_query, announced_query, replies_query, new_followers_query, followed_query, unfollow_query, likes_query, ], } inbox_data, older_than, newer_than = paginated_query(DB.activities, q) if not newer_than: nstart = datetime.now(timezone.utc).isoformat() else: nstart = inbox_data[0]["_id"].generation_time.isoformat() if not older_than: nend = (datetime.now(timezone.utc) - timedelta(days=15)).isoformat() else: nend = inbox_data[-1]["_id"].generation_time.isoformat() print(nstart, nend) notifs = list( DB.notifications.find({"datetime": {"$lte": nstart, "$gt": nend}}) .sort("_id", -1) .limit(50) ) print(inbox_data) nid = None if inbox_data: nid = inbox_data[0]["_id"] inbox_data.extend(notifs) inbox_data = sorted( inbox_data, reverse=True, key=lambda doc: doc["_id"].generation_time ) return htmlify( render_template( "stream.html", inbox_data=inbox_data, older_than=older_than, newer_than=newer_than, nid=nid, ) ) @blueprint.route("/admin/stream") @login_required def admin_stream() -> _Response: q = {"meta.stream": True, "meta.deleted": False} tpl = "stream.html" if request.args.get("debug"): tpl = "stream_debug.html" if request.args.get("debug_inbox"): q = {} inbox_data, older_than, newer_than = paginated_query( DB.activities, q, limit=int(request.args.get("limit", 25)) ) return htmlify( render_template( tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than ) ) @blueprint.route("/admin/list/") @login_required def admin_list(name: str) -> _Response: list_ = DB.lists.find_one({"name": name}) if not list_: abort(404) q = { "meta.stream": True, "meta.deleted": False, "meta.actor_id": {"$in": list_["members"]}, } tpl = "stream.html" if request.args.get("debug"): tpl = "stream_debug.html" if request.args.get("debug_inbox"): q = {} inbox_data, older_than, newer_than = paginated_query( DB.activities, q, limit=int(request.args.get("limit", 25)) ) return htmlify( render_template( tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than ) ) @blueprint.route("/admin/bookmarks") @login_required def admin_bookmarks() -> _Response: q = {"meta.bookmarked": True} tpl = "stream.html" if request.args.get("debug"): tpl = "stream_debug.html" if request.args.get("debug_inbox"): q = {} inbox_data, older_than, newer_than = paginated_query( DB.activities, q, limit=int(request.args.get("limit", 25)) ) return htmlify( render_template( tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than ) ) @blueprint.route("/u2f/register", methods=["GET", "POST"]) @login_required def u2f_register(): # TODO(tsileo): ensure no duplicates if request.method == "GET": payload = u2f.begin_registration(ID) session["challenge"] = payload return htmlify(render_template("u2f.html", payload=payload)) else: resp = json.loads(request.form.get("resp")) device, device_cert = u2f.complete_registration(session["challenge"], resp) session["challenge"] = None DB.u2f.insert_one({"device": device, "cert": device_cert}) session["logged_in"] = False return redirect("/login") @blueprint.route("/authorize_follow", methods=["GET", "POST"]) @login_required def authorize_follow(): if request.method == "GET": return htmlify( render_template( "authorize_remote_follow.html", profile=request.args.get("profile") ) ) actor = get_actor_url(request.form.get("profile")) if not actor: abort(500) q = { "box": Box.OUTBOX.value, "type": ap.ActivityType.FOLLOW.value, "meta.undo": False, "activity.object": actor, } if DB.activities.count(q) > 0: return redirect("/following") follow = ap.Follow( actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now() ) post_to_outbox(follow) return redirect("/following")