import binascii import json import os from datetime import datetime from datetime import timedelta from urllib.parse import urlencode import flask import mf2py from bson.objectid import ObjectId from flask import Response from flask import abort from flask import current_app as app from flask import redirect from flask import render_template from flask import request from flask import session from flask import url_for from itsdangerous import BadSignature from app_utils import _get_ip from app_utils import login_required from config import DB from config import JWT blueprint = flask.Blueprint("indieauth", __name__) def build_auth_resp(payload): if request.headers.get("Accept") == "application/json": return Response( status=200, headers={"Content-Type": "application/json"}, response=json.dumps(payload), ) return Response( status=200, headers={"Content-Type": "application/x-www-form-urlencoded"}, response=urlencode(payload), ) def _get_prop(props, name, default=None): if name in props: items = props.get(name) if isinstance(items, list): return items[0] return items return default def get_client_id_data(url): # FIXME(tsileo): ensure not localhost via `little_boxes.urlutils.is_url_valid` data = mf2py.parse(url=url) for item in data["items"]: if "h-x-app" in item["type"] or "h-app" in item["type"]: props = item.get("properties", {}) print(props) return dict( logo=_get_prop(props, "logo"), name=_get_prop(props, "name"), url=_get_prop(props, "url"), ) return dict(logo=None, name=url, url=url) @blueprint.route("/indieauth/flow", methods=["POST"]) @login_required def indieauth_flow(): auth = dict( scope=" ".join(request.form.getlist("scopes")), me=request.form.get("me"), client_id=request.form.get("client_id"), state=request.form.get("state"), redirect_uri=request.form.get("redirect_uri"), response_type=request.form.get("response_type"), ts=datetime.now().timestamp(), code=binascii.hexlify(os.urandom(8)).decode("utf-8"), verified=False, ) # XXX(tsileo): a whitelist for me values? # TODO(tsileo): redirect_uri checks if not auth["redirect_uri"]: abort(400) DB.indieauth.insert_one(auth) # FIXME(tsileo): fetch client ID and validate redirect_uri red = f'{auth["redirect_uri"]}?code={auth["code"]}&state={auth["state"]}&me={auth["me"]}' return redirect(red) @blueprint.route("/indieauth", methods=["GET", "POST"]) def indieauth_endpoint(): if request.method == "GET": if not session.get("logged_in"): return redirect(url_for("admin_login", next=request.url)) me = request.args.get("me") # FIXME(tsileo): ensure me == ID client_id = request.args.get("client_id") redirect_uri = request.args.get("redirect_uri") state = request.args.get("state", "") response_type = request.args.get("response_type", "id") scope = request.args.get("scope", "").split() print("STATE", state) return render_template( "indieauth_flow.html", client=get_client_id_data(client_id), scopes=scope, redirect_uri=redirect_uri, state=state, response_type=response_type, client_id=client_id, me=me, ) # Auth verification via POST code = request.form.get("code") redirect_uri = request.form.get("redirect_uri") client_id = request.form.get("client_id") ip, geoip = _get_ip() auth = DB.indieauth.find_one_and_update( { "code": code, "redirect_uri": redirect_uri, "client_id": client_id, "verified": False, }, { "$set": { "verified": True, "verified_by": "id", "verified_at": datetime.now().timestamp(), "ip_address": ip, "geoip": geoip, } }, ) print(auth) print(code, redirect_uri, client_id) # Ensure the code is recent if (datetime.now() - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5): abort(400) if not auth: abort(403) return session["logged_in"] = True me = auth["me"] state = auth["state"] scope = auth["scope"] print("STATE", state) return build_auth_resp({"me": me, "state": state, "scope": scope}) @blueprint.route("/token", methods=["GET", "POST"]) def token_endpoint(): # Generate a new token with the returned access code if request.method == "POST": code = request.form.get("code") me = request.form.get("me") redirect_uri = request.form.get("redirect_uri") client_id = request.form.get("client_id") now = datetime.now() ip, geoip = _get_ip() # This query ensure code, client_id, redirect_uri and me are matching with the code request auth = DB.indieauth.find_one_and_update( { "code": code, "me": me, "redirect_uri": redirect_uri, "client_id": client_id, "verified": False, }, { "$set": { "verified": True, "verified_by": "code", "verified_at": now.timestamp(), "ip_address": ip, "geoip": geoip, } }, ) if not auth: abort(403) scope = auth["scope"].split() # Ensure there's at least one scope if not len(scope): abort(400) # Ensure the code is recent if (now - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5): abort(400) payload = dict(me=me, client_id=client_id, scope=scope, ts=now.timestamp()) token = JWT.dumps(payload).decode("utf-8") DB.indieauth.update_one( {"_id": auth["_id"]}, { "$set": { "token": token, "token_expires": (now + timedelta(minutes=30)).timestamp(), } }, ) return build_auth_resp( {"me": me, "scope": auth["scope"], "access_token": token} ) # Token verification token = request.headers.get("Authorization").replace("Bearer ", "") try: payload = JWT.loads(token) except BadSignature: abort(403) # Check the token expritation (valid for 3 hours) if (datetime.now() - datetime.fromtimestamp(payload["ts"])) > timedelta( minutes=180 ): abort(401) return build_auth_resp( { "me": payload["me"], "scope": " ".join(payload["scope"]), "client_id": payload["client_id"], } )