Merge pull request #57 from tsileo/big-cleanup

Big cleanup
This commit is contained in:
Thomas Sileo 2019-08-01 22:49:05 +02:00 committed by GitHub
commit d38c43ebe8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 2650 additions and 2704 deletions

View file

@ -21,9 +21,9 @@ steps:
DOCKER_HOST: tcp://docker:2375
commands:
- apk update && apk upgrade && apk add --no-cache bash git openssh curl
- git clone https://github.com/tsileo/poussetaches.git pt && cd pt && docker build . -t poussetaches:latest && cd - && rm -rf pt
- docker network create fede
- docker pull mongo
- docker pull poussetaches/poussetaches
- docker build . -t microblogpub:latest
# Run poussetaches (will be shared by the two microblog.pub instances) "in the background"
@ -34,7 +34,7 @@ steps:
DOCKER_HOST: tcp://docker:2375
POUSSETACHES_AUTH_KEY: lol
commands:
- docker run -p 7991:7991 --net fede -e POUSSETACHES_AUTH_KEY --name poussetaches poussetaches
- docker run -p 7991:7991 --net fede -e POUSSETACHES_AUTH_KEY --name poussetaches poussetaches/poussetaches
# Run MongoDB (will be shared by the two microblog.pub instances) "in the background"
- name: run_mongodb
@ -90,6 +90,6 @@ services:
privileged: true
---
kind: signature
hmac: 75997f4d5da8105b6a0bfa227aadab874c19d5dcf00c94139059899e1243397e
hmac: ae911176117298c18ecfcd95fbdbd62304c5f32462b42f2aefdd5a5b834fed60
...

View file

@ -27,11 +27,6 @@ reload-dev:
docker build . -t microblogpub:latest
docker-compose -f docker-compose-dev.yml up -d --force-recreate
# Build the poussetaches Docker image
.PHONY: poussetaches
poussetaches:
git clone https://github.com/tsileo/poussetaches.git pt && cd pt && docker build . -t poussetaches:latest && cd - && rm -rf pt
# Build the microblogpub Docker image
.PHONY: microblogpub
microblogpub:
@ -42,10 +37,11 @@ microblogpub:
# Run the docker-compose project locally (will perform a update if the project is already running)
.PHONY: run
run: poussetaches microblogpub
run: microblogpub
# (poussetaches and microblogpub Docker image will updated)
# Update MongoDB
docker pull mongo
docker pull poussetaches/poussetaches
# Restart the project
docker-compose stop
docker-compose up -d --force-recreate --build

View file

@ -71,7 +71,7 @@ Once the initial configuration is done, you can still tweak the config by editin
### Deployment
To spawn the docker-compose project (running this command will also update _microblog.pub_ to latest and restart the project it it's already running):
To spawn the docker-compose project (running this command will also update _microblog.pub_ to latest and restart everything if it's already running):
```shell
$ make run

2484
app.py

File diff suppressed because it is too large Load diff

0
blueprints/__init__.py Normal file
View file

414
blueprints/admin.py Normal file
View file

@ -0,0 +1,414 @@
import json
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import List
from urllib.parse import urlparse
import flask
from flask import abort
from flask import current_app as app
from flask import redirect
from flask import render_template
from flask import request
from flask import session
from flask import url_for
from little_boxes import activitypub as ap
from passlib.hash import bcrypt
from u2flib_server import u2f
import config
from config import DB
from config import ID
from config import PASS
from core.activitypub import Box
from core.shared import MY_PERSON
from core.shared import _build_thread
from core.shared import _Response
from core.shared import csrf
from core.shared import login_required
from core.shared import noindex
from core.shared import p
from core.shared import paginated_query
from core.shared import post_to_outbox
from utils import now
from utils.lookup import lookup
blueprint = flask.Blueprint("admin", __name__)
def verify_pass(pwd):
return bcrypt.verify(pwd, PASS)
@blueprint.route("/admin/update_actor")
@login_required
def admin_update_actor() -> _Response:
update = ap.Update(
actor=MY_PERSON.id,
object=MY_PERSON.to_dict(),
to=[MY_PERSON.followers],
cc=[ap.AS_PUBLIC],
published=now(),
)
post_to_outbox(update)
return "OK"
@blueprint.route("/admin/logout")
@login_required
def admin_logout() -> _Response:
session["logged_in"] = False
return redirect("/")
@blueprint.route("/login", methods=["POST", "GET"])
@noindex
def admin_login() -> _Response:
if session.get("logged_in") is True:
return redirect(url_for("admin_notifications"))
devices = [doc["device"] for doc in DB.u2f.find()]
u2f_enabled = True if devices else False
if request.method == "POST":
csrf.protect()
# 1. Check regular password login flow
pwd = request.form.get("pass")
if pwd:
if verify_pass(pwd):
session["logged_in"] = True
return redirect(
request.args.get("redirect") or url_for("admin_notifications")
)
else:
abort(403)
# 2. Check for U2F payload, if any
elif devices:
resp = json.loads(request.form.get("resp")) # type: ignore
try:
u2f.complete_authentication(session["challenge"], resp)
except ValueError as exc:
print("failed", exc)
abort(403)
return
finally:
session["challenge"] = None
session["logged_in"] = True
return redirect(
request.args.get("redirect") or url_for("admin_notifications")
)
else:
abort(401)
payload = None
if devices:
payload = u2f.begin_authentication(ID, devices)
session["challenge"] = payload
return render_template("login.html", u2f_enabled=u2f_enabled, payload=payload)
@blueprint.route("/admin", methods=["GET"])
@login_required
def admin_index() -> _Response:
q = {
"meta.deleted": False,
"meta.undo": False,
"type": ap.ActivityType.LIKE.value,
"box": Box.OUTBOX.value,
}
col_liked = DB.activities.count(q)
return render_template(
"admin.html",
instances=list(DB.instances.find()),
inbox_size=DB.activities.count({"box": Box.INBOX.value}),
outbox_size=DB.activities.count({"box": Box.OUTBOX.value}),
col_liked=col_liked,
col_followers=DB.activities.count(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
),
col_following=DB.activities.count(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
),
)
@blueprint.route("/admin/indieauth", methods=["GET"])
@login_required
def admin_indieauth() -> _Response:
return render_template(
"admin_indieauth.html",
indieauth_actions=DB.indieauth.find().sort("ts", -1).limit(100),
)
@blueprint.route("/admin/tasks", methods=["GET"])
@login_required
def admin_tasks() -> _Response:
return render_template(
"admin_tasks.html",
success=p.get_success(),
dead=p.get_dead(),
waiting=p.get_waiting(),
cron=p.get_cron(),
)
@blueprint.route("/admin/lookup", methods=["GET", "POST"])
@login_required
def admin_lookup() -> _Response:
data = None
meta = None
if request.method == "POST":
if request.form.get("url"):
data = lookup(request.form.get("url")) # type: ignore
if data:
if data.has_type(ap.ActivityType.ANNOUNCE):
meta = dict(
object=data.get_object().to_dict(),
object_actor=data.get_object().get_actor().to_dict(),
actor=data.get_actor().to_dict(),
)
elif data.has_type(ap.ActivityType.QUESTION):
p.push(data.id, "/task/fetch_remote_question")
print(data)
app.logger.debug(data.to_dict())
return render_template(
"lookup.html", data=data, meta=meta, url=request.form.get("url")
)
@blueprint.route("/admin/thread")
@login_required
def admin_thread() -> _Response:
data = DB.activities.find_one(
{
"type": ap.ActivityType.CREATE.value,
"activity.object.id": request.args.get("oid"),
}
)
if not data:
abort(404)
if data["meta"].get("deleted", False):
abort(410)
thread = _build_thread(data)
tpl = "note.html"
if request.args.get("debug"):
tpl = "note_debug.html"
return render_template(tpl, thread=thread, note=data)
@blueprint.route("/admin/new", methods=["GET"])
@login_required
def admin_new() -> _Response:
reply_id = None
content = ""
thread: List[Any] = []
print(request.args)
if request.args.get("reply"):
data = DB.activities.find_one({"activity.object.id": request.args.get("reply")})
if data:
reply = ap.parse_activity(data["activity"])
else:
data = dict(
meta={},
activity=dict(
object=ap.get_backend().fetch_iri(request.args.get("reply"))
),
)
reply = ap.parse_activity(data["activity"]["object"])
reply_id = reply.id
if reply.ACTIVITY_TYPE == ap.ActivityType.CREATE:
reply_id = reply.get_object().id
actor = reply.get_actor()
domain = urlparse(actor.id).netloc
# FIXME(tsileo): if reply of reply, fetch all participants
content = f"@{actor.preferredUsername}@{domain} "
thread = _build_thread(data)
return render_template(
"new.html",
reply=reply_id,
content=content,
thread=thread,
visibility=ap.Visibility,
emojis=config.EMOJIS.split(" "),
)
@blueprint.route("/admin/lists", methods=["GET"])
@login_required
def admin_lists() -> _Response:
lists = list(DB.lists.find())
return render_template("lists.html", lists=lists)
@blueprint.route("/admin/notifications")
@login_required
def admin_notifications() -> _Response:
# Setup the cron for deleting old activities
# FIXME(tsileo): put back to 12h
p.push({}, "/task/cleanup", schedule="@every 1h")
# Trigger a cleanup if asked
if request.args.get("cleanup"):
p.push({}, "/task/cleanup")
# FIXME(tsileo): show unfollow (performed by the current actor) and liked???
mentions_query = {
"type": ap.ActivityType.CREATE.value,
"activity.object.tag.type": "Mention",
"activity.object.tag.name": f"@{config.USERNAME}@{config.DOMAIN}",
"meta.deleted": False,
}
replies_query = {
"type": ap.ActivityType.CREATE.value,
"activity.object.inReplyTo": {"$regex": f"^{config.BASE_URL}"},
"meta.poll_answer": False,
}
announced_query = {
"type": ap.ActivityType.ANNOUNCE.value,
"activity.object": {"$regex": f"^{config.BASE_URL}"},
}
new_followers_query = {"type": ap.ActivityType.FOLLOW.value}
unfollow_query = {
"type": ap.ActivityType.UNDO.value,
"activity.object.type": ap.ActivityType.FOLLOW.value,
}
likes_query = {
"type": ap.ActivityType.LIKE.value,
"activity.object": {"$regex": f"^{config.BASE_URL}"},
}
followed_query = {"type": ap.ActivityType.ACCEPT.value}
q = {
"box": Box.INBOX.value,
"$or": [
mentions_query,
announced_query,
replies_query,
new_followers_query,
followed_query,
unfollow_query,
likes_query,
],
}
inbox_data, older_than, newer_than = paginated_query(DB.activities, q)
if not newer_than:
nstart = datetime.now(timezone.utc).isoformat()
else:
nstart = inbox_data[0]["_id"].generation_time.isoformat()
if not older_than:
nend = (datetime.now(timezone.utc) - timedelta(days=15)).isoformat()
else:
nend = inbox_data[-1]["_id"].generation_time.isoformat()
print(nstart, nend)
notifs = list(
DB.notifications.find({"datetime": {"$lte": nstart, "$gt": nend}})
.sort("_id", -1)
.limit(50)
)
print(inbox_data)
nid = None
if inbox_data:
nid = inbox_data[0]["_id"]
inbox_data.extend(notifs)
inbox_data = sorted(
inbox_data, reverse=True, key=lambda doc: doc["_id"].generation_time
)
return render_template(
"stream.html",
inbox_data=inbox_data,
older_than=older_than,
newer_than=newer_than,
nid=nid,
)
@blueprint.route("/admin/stream")
@login_required
def admin_stream() -> _Response:
q = {"meta.stream": True, "meta.deleted": False}
tpl = "stream.html"
if request.args.get("debug"):
tpl = "stream_debug.html"
if request.args.get("debug_inbox"):
q = {}
inbox_data, older_than, newer_than = paginated_query(
DB.activities, q, limit=int(request.args.get("limit", 25))
)
return render_template(
tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than
)
@blueprint.route("/admin/list/<name>")
@login_required
def admin_list(name: str) -> _Response:
list_ = DB.lists.find_one({"name": name})
if not list_:
abort(404)
q = {
"meta.stream": True,
"meta.deleted": False,
"meta.actor_id": {"$in": list_["members"]},
}
tpl = "stream.html"
if request.args.get("debug"):
tpl = "stream_debug.html"
if request.args.get("debug_inbox"):
q = {}
inbox_data, older_than, newer_than = paginated_query(
DB.activities, q, limit=int(request.args.get("limit", 25))
)
return render_template(
tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than
)
@blueprint.route("/admin/bookmarks")
@login_required
def admin_bookmarks() -> _Response:
q = {"meta.bookmarked": True}
tpl = "stream.html"
if request.args.get("debug"):
tpl = "stream_debug.html"
if request.args.get("debug_inbox"):
q = {}
inbox_data, older_than, newer_than = paginated_query(
DB.activities, q, limit=int(request.args.get("limit", 25))
)
return render_template(
tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than
)

593
blueprints/api.py Normal file
View file

@ -0,0 +1,593 @@
import json
import mimetypes
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from functools import wraps
from io import BytesIO
from typing import Any
from typing import List
import flask
from bson.objectid import ObjectId
from flask import Response
from flask import abort
from flask import current_app as app
from flask import redirect
from flask import request
from flask import session
from itsdangerous import BadSignature
from little_boxes import activitypub as ap
from little_boxes.content_helper import parse_markdown
from little_boxes.errors import ActivityNotFoundError
from little_boxes.errors import NotFromOutboxError
from werkzeug.utils import secure_filename
import config
from config import ADMIN_API_KEY
from config import BASE_URL
from config import DB
from config import DEBUG_MODE
from config import ID
from config import JWT
from config import MEDIA_CACHE
from config import _drop_db
from core import activitypub
from core.meta import Box
from core.meta import MetaKey
from core.meta import _meta
from core.shared import MY_PERSON
from core.shared import _Response
from core.shared import back
from core.shared import csrf
from core.shared import login_required
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now
blueprint = flask.Blueprint("api", __name__)
def without_id(l):
out = []
for d in l:
if "_id" in d:
del d["_id"]
out.append(d)
return out
def _api_required() -> None:
if session.get("logged_in"):
if request.method not in ["GET", "HEAD"]:
# If a standard API request is made with a "login session", it must havw a CSRF token
csrf.protect()
return
# Token verification
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
# IndieAuth token
token = request.form.get("access_token", "")
# Will raise a BadSignature on bad auth
payload = JWT.loads(token)
app.logger.info(f"api call by {payload}")
def api_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
_api_required()
except BadSignature:
abort(401)
return f(*args, **kwargs)
return decorated_function
def _user_api_arg(key: str, **kwargs) -> Any:
"""Try to get the given key from the requests, try JSON body, form data and query arg."""
if request.is_json:
oid = request.json.get(key)
else:
oid = request.args.get(key) or request.form.get(key)
if not oid:
if "default" in kwargs:
app.logger.info(f'{key}={kwargs.get("default")}')
return kwargs.get("default")
raise ValueError(f"missing {key}")
app.logger.info(f"{key}={oid}")
return oid
def _user_api_get_note(from_outbox: bool = False) -> ap.BaseActivity:
oid = _user_api_arg("id")
app.logger.info(f"fetching {oid}")
note = ap.parse_activity(ap.get_backend().fetch_iri(oid))
if from_outbox and not note.id.startswith(ID):
raise NotFromOutboxError(
f"cannot load {note.id}, id must be owned by the server"
)
return note
def _user_api_response(**kwargs) -> _Response:
_redirect = _user_api_arg("redirect", default=None)
if _redirect:
return redirect(_redirect)
resp = flask.jsonify(**kwargs)
resp.status_code = 201
return resp
@blueprint.route("/api/key")
@login_required
def api_user_key() -> _Response:
return flask.jsonify(api_key=ADMIN_API_KEY)
@blueprint.route("/note/delete", methods=["POST"])
@api_required
def api_delete() -> _Response:
"""API endpoint to delete a Note activity."""
note = _user_api_get_note(from_outbox=True)
# Create the delete, same audience as the Create object
delete = ap.Delete(
actor=ID,
object=ap.Tombstone(id=note.id).to_dict(embed=True),
to=note.to,
cc=note.cc,
published=now(),
)
delete_id = post_to_outbox(delete)
return _user_api_response(activity=delete_id)
@blueprint.route("/boost", methods=["POST"])
@api_required
def api_boost() -> _Response:
note = _user_api_get_note()
# Ensures the note visibility allow us to build an Announce (in respect to the post visibility)
if ap.get_visibility(note) not in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
abort(400)
announce = ap.Announce(
actor=MY_PERSON.id,
object=note.id,
to=[MY_PERSON.followers, note.attributedTo],
cc=[ap.AS_PUBLIC],
published=now(),
)
announce_id = post_to_outbox(announce)
return _user_api_response(activity=announce_id)
@blueprint.route("/mark_notifications_as_read", methods=["POST"])
@api_required
def api_mark_notification_as_read() -> _Response:
nid = ObjectId(_user_api_arg("nid"))
DB.activities.update_many(
{_meta(MetaKey.NOTIFICATION_UNREAD): True, "_id": {"$lte": nid}},
{"$set": {_meta(MetaKey.NOTIFICATION_UNREAD): False}},
)
return _user_api_response()
@blueprint.route("/vote", methods=["POST"])
@api_required
def api_vote() -> _Response:
oid = _user_api_arg("id")
app.logger.info(f"fetching {oid}")
note = ap.parse_activity(ap.get_backend().fetch_iri(oid))
choice = _user_api_arg("choice")
raw_note = dict(
attributedTo=MY_PERSON.id,
cc=[],
to=note.get_actor().id,
name=choice,
tag=[],
inReplyTo=note.id,
)
raw_note["@context"] = config.DEFAULT_CTX
note = ap.Note(**raw_note)
create = note.build_create()
create_id = post_to_outbox(create)
return _user_api_response(activity=create_id)
@blueprint.route("/like", methods=["POST"])
@api_required
def api_like() -> _Response:
note = _user_api_get_note()
to: List[str] = []
cc: List[str] = []
note_visibility = ap.get_visibility(note)
if note_visibility == ap.Visibility.PUBLIC:
to = [ap.AS_PUBLIC]
cc = [ID + "/followers", note.get_actor().id]
elif note_visibility == ap.Visibility.UNLISTED:
to = [ID + "/followers", note.get_actor().id]
cc = [ap.AS_PUBLIC]
else:
to = [note.get_actor().id]
like = ap.Like(object=note.id, actor=MY_PERSON.id, to=to, cc=cc, published=now())
like_id = post_to_outbox(like)
return _user_api_response(activity=like_id)
@blueprint.route("/bookmark", methods=["POST"])
@api_required
def api_bookmark() -> _Response:
note = _user_api_get_note()
undo = _user_api_arg("undo", default=None) == "yes"
# Try to bookmark the `Create` first
if not DB.activities.update_one(
{"activity.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}}
).modified_count:
# Then look for the `Announce`
DB.activities.update_one(
{"meta.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}}
)
return _user_api_response()
@blueprint.route("/note/pin", methods=["POST"])
@api_required
def api_pin() -> _Response:
note = _user_api_get_note(from_outbox=True)
DB.activities.update_one(
{"activity.object.id": note.id, "box": Box.OUTBOX.value},
{"$set": {"meta.pinned": True}},
)
return _user_api_response(pinned=True)
@blueprint.route("/note/unpin", methods=["POST"])
@api_required
def api_unpin() -> _Response:
note = _user_api_get_note(from_outbox=True)
DB.activities.update_one(
{"activity.object.id": note.id, "box": Box.OUTBOX.value},
{"$set": {"meta.pinned": False}},
)
return _user_api_response(pinned=False)
@blueprint.route("/undo", methods=["POST"])
@api_required
def api_undo() -> _Response:
oid = _user_api_arg("id")
doc = DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"$or": [{"remote_id": back.activity_url(oid)}, {"remote_id": oid}],
}
)
if not doc:
raise ActivityNotFoundError(f"cannot found {oid}")
obj = ap.parse_activity(doc.get("activity"))
undo = ap.Undo(
actor=MY_PERSON.id,
object=obj.to_dict(embed=True, embed_object_id_only=True),
published=now(),
to=obj.to,
cc=obj.cc,
)
# FIXME(tsileo): detect already undo-ed and make this API call idempotent
undo_id = post_to_outbox(undo)
return _user_api_response(activity=undo_id)
@blueprint.route("/new_list", methods=["POST"])
@api_required
def api_new_list() -> _Response:
name = _user_api_arg("name")
if not name:
raise ValueError("missing name")
if not DB.lists.find_one({"name": name}):
DB.lists.insert_one({"name": name, "members": []})
return _user_api_response(name=name)
@blueprint.route("/delete_list", methods=["POST"])
@api_required
def api_delete_list() -> _Response:
name = _user_api_arg("name")
if not name:
raise ValueError("missing name")
if not DB.lists.find_one({"name": name}):
abort(404)
DB.lists.delete_one({"name": name})
return _user_api_response()
@blueprint.route("/add_to_list", methods=["POST"])
@api_required
def api_add_to_list() -> _Response:
list_name = _user_api_arg("list_name")
if not list_name:
raise ValueError("missing list_name")
if not DB.lists.find_one({"name": list_name}):
raise ValueError(f"list {list_name} does not exist")
actor_id = _user_api_arg("actor_id")
if not actor_id:
raise ValueError("missing actor_id")
DB.lists.update_one({"name": list_name}, {"$addToSet": {"members": actor_id}})
return _user_api_response()
@blueprint.route("/remove_from_list", methods=["POST"])
@api_required
def api_remove_from_list() -> _Response:
list_name = _user_api_arg("list_name")
if not list_name:
raise ValueError("missing list_name")
if not DB.lists.find_one({"name": list_name}):
raise ValueError(f"list {list_name} does not exist")
actor_id = _user_api_arg("actor_id")
if not actor_id:
raise ValueError("missing actor_id")
DB.lists.update_one({"name": list_name}, {"$pull": {"members": actor_id}})
return _user_api_response()
@blueprint.route("/new_note", methods=["POST"])
@api_required
def api_new_note() -> _Response:
source = _user_api_arg("content")
if not source:
raise ValueError("missing content")
_reply, reply = None, None
try:
_reply = _user_api_arg("reply")
except ValueError:
pass
visibility = ap.Visibility[
_user_api_arg("visibility", default=ap.Visibility.PUBLIC.name)
]
content, tags = parse_markdown(source)
to: List[str] = []
cc: List[str] = []
if visibility == ap.Visibility.PUBLIC:
to = [ap.AS_PUBLIC]
cc = [ID + "/followers"]
elif visibility == ap.Visibility.UNLISTED:
to = [ID + "/followers"]
cc = [ap.AS_PUBLIC]
elif visibility == ap.Visibility.FOLLOWERS_ONLY:
to = [ID + "/followers"]
cc = []
if _reply:
reply = ap.fetch_remote_activity(_reply)
if visibility == ap.Visibility.DIRECT:
to.append(reply.attributedTo)
else:
cc.append(reply.attributedTo)
for tag in tags:
if tag["type"] == "Mention":
if visibility == ap.Visibility.DIRECT:
to.append(tag["href"])
else:
cc.append(tag["href"])
raw_note = dict(
attributedTo=MY_PERSON.id,
cc=list(set(cc)),
to=list(set(to)),
content=content,
tag=tags,
source={"mediaType": "text/markdown", "content": source},
inReplyTo=reply.id if reply else None,
)
if "file" in request.files and request.files["file"].filename:
file = request.files["file"]
rfilename = secure_filename(file.filename)
with BytesIO() as buf:
file.save(buf)
oid = MEDIA_CACHE.save_upload(buf, rfilename)
mtype = mimetypes.guess_type(rfilename)[0]
raw_note["attachment"] = [
{
"mediaType": mtype,
"name": rfilename,
"type": "Document",
"url": f"{BASE_URL}/uploads/{oid}/{rfilename}",
}
]
note = ap.Note(**raw_note)
create = note.build_create()
create_id = post_to_outbox(create)
return _user_api_response(activity=create_id)
@blueprint.route("/new_question", methods=["POST"])
@api_required
def api_new_question() -> _Response:
source = _user_api_arg("content")
if not source:
raise ValueError("missing content")
content, tags = parse_markdown(source)
cc = [ID + "/followers"]
for tag in tags:
if tag["type"] == "Mention":
cc.append(tag["href"])
answers = []
for i in range(4):
a = _user_api_arg(f"answer{i}", default=None)
if not a:
break
answers.append(
{
"type": ap.ActivityType.NOTE.value,
"name": a,
"replies": {"type": ap.ActivityType.COLLECTION.value, "totalItems": 0},
}
)
open_for = int(_user_api_arg("open_for"))
choices = {
"endTime": ap.format_datetime(
datetime.now(timezone.utc) + timedelta(minutes=open_for)
)
}
of = _user_api_arg("of")
if of == "anyOf":
choices["anyOf"] = answers
else:
choices["oneOf"] = answers
raw_question = dict(
attributedTo=MY_PERSON.id,
cc=list(set(cc)),
to=[ap.AS_PUBLIC],
content=content,
tag=tags,
source={"mediaType": "text/markdown", "content": source},
inReplyTo=None,
**choices,
)
question = ap.Question(**raw_question)
create = question.build_create()
create_id = post_to_outbox(create)
Tasks.update_question_outbox(create_id, open_for)
return _user_api_response(activity=create_id)
@blueprint.route("/block", methods=["POST"])
@api_required
def api_block() -> _Response:
actor = _user_api_arg("actor")
existing = DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor,
"meta.undo": False,
}
)
if existing:
return _user_api_response(activity=existing["activity"]["id"])
block = ap.Block(actor=MY_PERSON.id, object=actor)
block_id = post_to_outbox(block)
return _user_api_response(activity=block_id)
@blueprint.route("/follow", methods=["POST"])
@api_required
def api_follow() -> _Response:
actor = _user_api_arg("actor")
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
"activity.object": actor,
}
existing = DB.activities.find_one(q)
if existing:
return _user_api_response(activity=existing["activity"]["id"])
follow = ap.Follow(
actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now()
)
follow_id = post_to_outbox(follow)
return _user_api_response(activity=follow_id)
@blueprint.route("/debug", methods=["GET", "DELETE"])
@api_required
def api_debug() -> _Response:
"""Endpoint used/needed for testing, only works in DEBUG_MODE."""
if not DEBUG_MODE:
return flask.jsonify(message="DEBUG_MODE is off")
if request.method == "DELETE":
_drop_db()
return flask.jsonify(message="DB dropped")
return flask.jsonify(
inbox=DB.activities.count({"box": Box.INBOX.value}),
outbox=DB.activities.count({"box": Box.OUTBOX.value}),
outbox_data=without_id(DB.activities.find({"box": Box.OUTBOX.value})),
)
@blueprint.route("/stream")
@api_required
def api_stream() -> _Response:
return Response(
response=json.dumps(
activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor"))
),
headers={"Content-Type": "application/json"},
)

242
blueprints/indieauth.py Normal file
View file

@ -0,0 +1,242 @@
import binascii
import json
import os
from datetime import datetime
from datetime import timedelta
from urllib.parse import urlencode
import flask
import mf2py
from flask import Response
from flask import abort
from flask import redirect
from flask import render_template
from flask import request
from flask import session
from flask import url_for
from itsdangerous import BadSignature
from config import DB
from config import JWT
from core.shared import _get_ip
from core.shared import login_required
blueprint = flask.Blueprint("indieauth", __name__)
def build_auth_resp(payload):
if request.headers.get("Accept") == "application/json":
return Response(
status=200,
headers={"Content-Type": "application/json"},
response=json.dumps(payload),
)
return Response(
status=200,
headers={"Content-Type": "application/x-www-form-urlencoded"},
response=urlencode(payload),
)
def _get_prop(props, name, default=None):
if name in props:
items = props.get(name)
if isinstance(items, list):
return items[0]
return items
return default
def get_client_id_data(url):
# FIXME(tsileo): ensure not localhost via `little_boxes.urlutils.is_url_valid`
data = mf2py.parse(url=url)
for item in data["items"]:
if "h-x-app" in item["type"] or "h-app" in item["type"]:
props = item.get("properties", {})
print(props)
return dict(
logo=_get_prop(props, "logo"),
name=_get_prop(props, "name"),
url=_get_prop(props, "url"),
)
return dict(logo=None, name=url, url=url)
@blueprint.route("/indieauth/flow", methods=["POST"])
@login_required
def indieauth_flow():
auth = dict(
scope=" ".join(request.form.getlist("scopes")),
me=request.form.get("me"),
client_id=request.form.get("client_id"),
state=request.form.get("state"),
redirect_uri=request.form.get("redirect_uri"),
response_type=request.form.get("response_type"),
ts=datetime.now().timestamp(),
code=binascii.hexlify(os.urandom(8)).decode("utf-8"),
verified=False,
)
# XXX(tsileo): a whitelist for me values?
# TODO(tsileo): redirect_uri checks
if not auth["redirect_uri"]:
abort(400)
DB.indieauth.insert_one(auth)
# FIXME(tsileo): fetch client ID and validate redirect_uri
red = f'{auth["redirect_uri"]}?code={auth["code"]}&state={auth["state"]}&me={auth["me"]}'
return redirect(red)
@blueprint.route("/indieauth", methods=["GET", "POST"])
def indieauth_endpoint():
if request.method == "GET":
if not session.get("logged_in"):
return redirect(url_for("admin_login", next=request.url))
me = request.args.get("me")
# FIXME(tsileo): ensure me == ID
client_id = request.args.get("client_id")
redirect_uri = request.args.get("redirect_uri")
state = request.args.get("state", "")
response_type = request.args.get("response_type", "id")
scope = request.args.get("scope", "").split()
print("STATE", state)
return render_template(
"indieauth_flow.html",
client=get_client_id_data(client_id),
scopes=scope,
redirect_uri=redirect_uri,
state=state,
response_type=response_type,
client_id=client_id,
me=me,
)
# Auth verification via POST
code = request.form.get("code")
redirect_uri = request.form.get("redirect_uri")
client_id = request.form.get("client_id")
ip, geoip = _get_ip()
auth = DB.indieauth.find_one_and_update(
{
"code": code,
"redirect_uri": redirect_uri,
"client_id": client_id,
"verified": False,
},
{
"$set": {
"verified": True,
"verified_by": "id",
"verified_at": datetime.now().timestamp(),
"ip_address": ip,
"geoip": geoip,
}
},
)
print(auth)
print(code, redirect_uri, client_id)
# Ensure the code is recent
if (datetime.now() - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5):
abort(400)
if not auth:
abort(403)
return
session["logged_in"] = True
me = auth["me"]
state = auth["state"]
scope = auth["scope"]
print("STATE", state)
return build_auth_resp({"me": me, "state": state, "scope": scope})
@blueprint.route("/token", methods=["GET", "POST"])
def token_endpoint():
# Generate a new token with the returned access code
if request.method == "POST":
code = request.form.get("code")
me = request.form.get("me")
redirect_uri = request.form.get("redirect_uri")
client_id = request.form.get("client_id")
now = datetime.now()
ip, geoip = _get_ip()
# This query ensure code, client_id, redirect_uri and me are matching with the code request
auth = DB.indieauth.find_one_and_update(
{
"code": code,
"me": me,
"redirect_uri": redirect_uri,
"client_id": client_id,
"verified": False,
},
{
"$set": {
"verified": True,
"verified_by": "code",
"verified_at": now.timestamp(),
"ip_address": ip,
"geoip": geoip,
}
},
)
if not auth:
abort(403)
scope = auth["scope"].split()
# Ensure there's at least one scope
if not len(scope):
abort(400)
# Ensure the code is recent
if (now - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5):
abort(400)
payload = dict(me=me, client_id=client_id, scope=scope, ts=now.timestamp())
token = JWT.dumps(payload).decode("utf-8")
DB.indieauth.update_one(
{"_id": auth["_id"]},
{
"$set": {
"token": token,
"token_expires": (now + timedelta(minutes=30)).timestamp(),
}
},
)
return build_auth_resp(
{"me": me, "scope": auth["scope"], "access_token": token}
)
# Token verification
token = request.headers.get("Authorization").replace("Bearer ", "")
try:
payload = JWT.loads(token)
except BadSignature:
abort(403)
# Check the token expritation (valid for 3 hours)
if (datetime.now() - datetime.fromtimestamp(payload["ts"])) > timedelta(
minutes=180
):
abort(401)
return build_auth_resp(
{
"me": payload["me"],
"scope": " ".join(payload["scope"]),
"client_id": payload["client_id"],
}
)

496
blueprints/tasks.py Normal file
View file

@ -0,0 +1,496 @@
import json
import traceback
from datetime import datetime
from datetime import timezone
import flask
import requests
from flask import current_app as app
from little_boxes import activitypub as ap
from little_boxes.errors import ActivityGoneError
from little_boxes.errors import ActivityNotFoundError
from little_boxes.errors import NotAnActivityError
from little_boxes.httpsig import HTTPSigAuth
from requests.exceptions import HTTPError
import config
from config import DB
from core import activitypub
from core import gc
from core.activitypub import Box
from core.meta import MetaKey
from core.meta import _meta
from core.notifications import set_inbox_flags
from core.shared import MY_PERSON
from core.shared import _add_answers_to_question
from core.shared import back
from core.shared import p
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now
from utils import opengraph
SIG_AUTH = HTTPSigAuth(config.KEY)
blueprint = flask.Blueprint("tasks", __name__)
class TaskError(Exception):
"""Raised to log the error for poussetaches."""
def __init__(self):
self.message = traceback.format_exc()
@blueprint.route("/task/update_question", methods=["POST"])
def task_update_question():
"""Sends an Update."""
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
app.logger.info(f"Updating question {iri}")
cc = [config.ID + "/followers"]
doc = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
_add_answers_to_question(doc)
question = ap.Question(**doc["activity"]["object"])
raw_update = dict(
actor=question.id,
object=question.to_dict(embed=True),
attributedTo=MY_PERSON.id,
cc=list(set(cc)),
to=[ap.AS_PUBLIC],
)
raw_update["@context"] = config.DEFAULT_CTX
update = ap.Update(**raw_update)
print(update)
print(update.to_dict())
post_to_outbox(update)
except HTTPError as err:
app.logger.exception("request failed")
if 400 >= err.response.status_code >= 499:
app.logger.info("client error, no retry")
return ""
raise TaskError() from err
except Exception as err:
app.logger.exception("task failed")
raise TaskError() from err
return ""
@blueprint.route("/task/fetch_og_meta", methods=["POST"])
def task_fetch_og_meta():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
links = opengraph.links_from_note(note.to_dict())
og_metadata = opengraph.fetch_og_metadata(config.USER_AGENT, links)
for og in og_metadata:
if not og.get("image"):
continue
config.MEDIA_CACHE.cache_og_image(og["image"], iri)
app.logger.debug(f"OG metadata {og_metadata!r}")
DB.activities.update_one(
{"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
)
app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}")
except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"dropping activity {iri}, skip OG metedata")
return ""
except requests.exceptions.HTTPError as http_err:
if 400 <= http_err.response.status_code < 500:
app.logger.exception("bad request, no retry")
return ""
app.logger.exception("failed to fetch OG metadata")
raise TaskError() from http_err
except Exception as err:
app.logger.exception(f"failed to fetch OG metadata for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/cache_object", methods=["POST"])
def task_cache_object():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
obj = activity.get_object()
DB.activities.update_one(
{"remote_id": activity.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": activitypub._actor_to_meta(obj.get_actor()),
}
},
)
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
except Exception as err:
app.logger.exception(f"failed to cache object for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
def task_finish_post_to_outbox():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
recipients = activity.recipients()
if activity.has_type(ap.ActivityType.DELETE):
back.outbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.outbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.outbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.outbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.outbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_following(MY_PERSON, obj)
app.logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
app.logger.debug(f"posting to {recp}")
Tasks.post_to_remote_inbox(payload, recp)
except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"no retry")
except Exception as err:
app.logger.exception(f"failed to post to remote inbox for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901
def task_finish_post_to_inbox():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.DELETE):
back.inbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.inbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.inbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.inbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.FOLLOW):
# Reply to a Follow with an Accept
actor_id = activity.get_actor().id
accept = ap.Accept(
actor=config.ID,
object={
"type": "Follow",
"id": activity.id,
"object": activity.get_object_id(),
"actor": actor_id,
},
to=[actor_id],
published=now(),
)
post_to_outbox(accept)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.inbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_follower(MY_PERSON, obj)
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
app.logger.exception(f"no retry")
except Exception as err:
app.logger.exception(f"failed to cache attachments for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/cache_attachments", methods=["POST"])
def task_cache_attachments():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
# Generates thumbnails for the actor's icon and the attachments if any
obj = activity.get_object()
# Iter the attachments
for attachment in obj._data.get("attachment", []):
try:
config.MEDIA_CACHE.cache_attachment(attachment, iri)
except ValueError:
app.logger.exception(f"failed to cache {attachment}")
app.logger.info(f"attachments cached for {iri}")
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
app.logger.exception(f"dropping activity {iri}, no attachment caching")
except Exception as err:
app.logger.exception(f"failed to cache attachments for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/cache_actor", methods=["POST"])
def task_cache_actor() -> str:
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload["iri"]
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
# Fetch the Open Grah metadata if it's a `Create`
if activity.has_type(ap.ActivityType.CREATE):
Tasks.fetch_og_meta(iri)
actor = activity.get_actor()
if actor.icon:
if isinstance(actor.icon, dict) and "url" in actor.icon:
config.MEDIA_CACHE.cache_actor_icon(actor.icon["url"])
else:
app.logger.warning(f"failed to parse icon {actor.icon} for {iri}")
if activity.has_type(ap.ActivityType.FOLLOW):
if actor.id == config.ID:
# It's a new following, cache the "object" (which is the actor we follow)
DB.activities.update_one(
{"remote_id": iri},
{
"$set": {
"meta.object": activity.get_object().to_dict(embed=True)
}
},
)
# Cache the actor info
DB.activities.update_one(
{"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}}
)
app.logger.info(f"actor cached for {iri}")
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
Tasks.cache_attachments(iri)
except (ActivityGoneError, ActivityNotFoundError):
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
app.logger.exception(f"flagging activity {iri} as deleted, no actor caching")
except Exception as err:
app.logger.exception(f"failed to cache actor for {iri}")
raise TaskError() from err
return ""
@blueprint.route("/task/forward_activity", methods=["POST"])
def task_forward_activity():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
recipients = back.followers_as_recipients()
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
app.logger.debug(f"forwarding {activity!r} to {recp}")
Tasks.post_to_remote_inbox(payload, recp)
except Exception as err:
app.logger.exception("task failed")
raise TaskError() from err
return ""
@blueprint.route("/task/post_to_remote_inbox", methods=["POST"])
def task_post_to_remote_inbox():
"""Post an activity to a remote inbox."""
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
payload, to = task.payload["payload"], task.payload["to"]
try:
app.logger.info("payload=%s", payload)
app.logger.info("generating sig")
signed_payload = json.loads(payload)
# XXX Disable JSON-LD signature crap for now (as HTTP signatures are enough for most implementations)
# Don't overwrite the signature if we're forwarding an activity
# if "signature" not in signed_payload:
# generate_signature(signed_payload, KEY)
app.logger.info("to=%s", to)
resp = requests.post(
to,
data=json.dumps(signed_payload),
auth=SIG_AUTH,
headers={
"Content-Type": config.HEADERS[1],
"Accept": config.HEADERS[1],
"User-Agent": config.USER_AGENT,
},
)
app.logger.info("resp=%s", resp)
app.logger.info("resp_body=%s", resp.text)
resp.raise_for_status()
except HTTPError as err:
app.logger.exception("request failed")
if 400 >= err.response.status_code >= 499:
app.logger.info("client error, no retry")
return ""
raise TaskError() from err
except Exception as err:
app.logger.exception("task failed")
raise TaskError() from err
return ""
@blueprint.route("/task/fetch_remote_question", methods=["POST"])
def task_fetch_remote_question():
"""Fetch a remote question for implementation that does not send Update."""
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
app.logger.info(f"Fetching remote question {iri}")
local_question = DB.activities.find_one(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.CREATE.value,
"activity.object.id": iri,
}
)
remote_question = ap.get_backend().fetch_iri(iri, no_cache=True)
# FIXME(tsileo): compute and set `meta.object_visiblity` (also update utils.py to do it)
if (
local_question
and (
local_question["meta"].get("voted_for")
or local_question["meta"].get("subscribed")
)
and not DB.notifications.find_one({"activity.id": remote_question["id"]})
):
DB.notifications.insert_one(
{
"type": "question_ended",
"datetime": datetime.now(timezone.utc).isoformat(),
"activity": remote_question,
}
)
# Update the Create if we received it in the inbox
if local_question:
DB.activities.update_one(
{"remote_id": local_question["remote_id"], "box": Box.INBOX.value},
{"$set": {"activity.object": remote_question}},
)
# Also update all the cached copies (Like, Announce...)
DB.activities.update_many(
{"meta.object.id": remote_question["id"]},
{"$set": {"meta.object": remote_question}},
)
except HTTPError as err:
app.logger.exception("request failed")
if 400 >= err.response.status_code >= 499:
app.logger.info("client error, no retry")
return ""
raise TaskError() from err
except Exception as err:
app.logger.exception("task failed")
raise TaskError() from err
return ""
@blueprint.route("/task/cleanup", methods=["POST"])
def task_cleanup():
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
gc.perform()
return ""
@blueprint.route("/task/process_new_activity", methods=["POST"]) # noqa:c901
def task_process_new_activity():
"""Process an activity received in the inbox"""
task = p.parse(flask.request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
flags = {}
if not activity.published:
flags[_meta(MetaKey.PUBLISHED)] = now()
else:
flags[_meta(MetaKey.PUBLISHED)] = activity.published
set_inbox_flags(activity, flags)
app.logger.info(f"a={activity}, flags={flags!r}")
if flags:
DB.activities.update_one({"remote_id": activity.id}, {"$set": flags})
app.logger.info(f"new activity {iri} processed")
if not activity.has_type(ap.ActivityType.DELETE):
Tasks.cache_actor(iri)
except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"dropping activity {iri}, skip processing")
return ""
except Exception as err:
app.logger.exception(f"failed to process new activity {iri}")
raise TaskError() from err
return ""

101
blueprints/well_known.py Normal file
View file

@ -0,0 +1,101 @@
import json
import mimetypes
from typing import Any
import flask
from flask import Response
from flask import abort
from flask import request
from little_boxes import activitypub as ap
import config
from config import DB
from core.meta import Box
blueprint = flask.Blueprint("well_known", __name__)
@blueprint.route("/.well-known/webfinger")
def wellknown_webfinger() -> Any:
"""Exposes/servers WebFinger data."""
resource = request.args.get("resource")
if resource not in [f"acct:{config.USERNAME}@{config.DOMAIN}", config.ID]:
abort(404)
out = {
"subject": f"acct:{config.USERNAME}@{config.DOMAIN}",
"aliases": [config.ID],
"links": [
{
"rel": "http://webfinger.net/rel/profile-page",
"type": "text/html",
"href": config.ID,
},
{"rel": "self", "type": "application/activity+json", "href": config.ID},
{
"rel": "http://ostatus.org/schema/1.0/subscribe",
"template": config.BASE_URL + "/authorize_follow?profile={uri}",
},
{"rel": "magic-public-key", "href": config.KEY.to_magic_key()},
{
"href": config.ICON_URL,
"rel": "http://webfinger.net/rel/avatar",
"type": mimetypes.guess_type(config.ICON_URL)[0],
},
],
}
return Response(
response=json.dumps(out),
headers={"Content-Type": "application/jrd+json; charset=utf-8"},
)
@blueprint.route("/.well-known/nodeinfo")
def wellknown_nodeinfo() -> Any:
"""Exposes the NodeInfo endpoint (http://nodeinfo.diaspora.software/)."""
return flask.jsonify(
links=[
{
"rel": "http://nodeinfo.diaspora.software/ns/schema/2.1",
"href": f"{config.ID}/nodeinfo",
}
]
)
@blueprint.route("/nodeinfo")
def nodeinfo() -> Any:
"""NodeInfo endpoint."""
q = {
"box": Box.OUTBOX.value,
"meta.deleted": False,
"type": {"$in": [ap.ActivityType.CREATE.value, ap.ActivityType.ANNOUNCE.value]},
}
response = json.dumps(
{
"version": "2.1",
"software": {
"name": "microblogpub",
"version": config.VERSION,
"repository": "https://github.com/tsileo/microblog.pub",
},
"protocols": ["activitypub"],
"services": {"inbound": [], "outbound": []},
"openRegistrations": False,
"usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
"metadata": {
"nodeName": f"@{config.USERNAME}@{config.DOMAIN}",
"version": config.VERSION,
"versionDate": config.VERSION_DATE,
},
}
)
return Response(
headers={
"Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.1#"
},
response=response,
)

View file

@ -4,7 +4,6 @@ import subprocess
from datetime import datetime
from enum import Enum
import pymongo
import yaml
from itsdangerous import JSONWebSignatureSerializer
from little_boxes import strtobool
@ -16,8 +15,6 @@ from utils.key import KEY_DIR
from utils.key import get_key
from utils.key import get_secret_key
from utils.media import MediaCache
from utils.meta import MetaKey
from utils.meta import _meta
class ThemeStyle(Enum):
@ -33,16 +30,6 @@ DEFAULT_THEME_PRIMARY_COLOR = {
}
def noop():
pass
CUSTOM_CACHE_HOOKS = False
try:
from cache_hooks import purge as custom_cache_purge_hook
except ModuleNotFoundError:
custom_cache_purge_hook = noop
VERSION = (
subprocess.check_output(["git", "describe", "--always"]).split()[0].decode("utf-8")
)
@ -108,78 +95,6 @@ GRIDFS = mongo_client[f"{DB_NAME}_gridfs"]
MEDIA_CACHE = MediaCache(GRIDFS, USER_AGENT)
def create_indexes():
if "trash" not in DB.collection_names():
DB.create_collection("trash", capped=True, size=50 << 20) # 50 MB
DB.command("compact", "activities")
DB.activities.create_index([(_meta(MetaKey.NOTIFICATION), pymongo.ASCENDING)])
DB.activities.create_index(
[(_meta(MetaKey.NOTIFICATION_UNREAD), pymongo.ASCENDING)]
)
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
DB.activities.create_index([("meta.thread_root_parent", pymongo.ASCENDING)])
DB.activities.create_index(
[
("meta.thread_root_parent", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
)
DB.cache2.create_index(
[
("path", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("arg", pymongo.ASCENDING),
]
)
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
# Index for the block query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
]
)
# Index for count queries
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index([("box", pymongo.ASCENDING)])
# Outbox query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
("meta.public", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[
("type", pymongo.ASCENDING),
("activity.object.type", pymongo.ASCENDING),
("activity.object.inReplyTo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
def _drop_db():
if not DEBUG_MODE:
return

0
core/__init__.py Normal file
View file

View file

@ -28,8 +28,8 @@ from config import ID
from config import ME
from config import USER_AGENT
from config import USERNAME
from tasks import Tasks
from utils.meta import Box
from core.meta import Box
from core.tasks import Tasks
logger = logging.getLogger(__name__)

19
core/db.py Normal file
View file

@ -0,0 +1,19 @@
from enum import Enum
from enum import unique
from typing import Any
from typing import Dict
from typing import Optional
from config import DB
_Q = Dict[str, Any]
_Doc = Optional[Dict[str, Any]]
@unique
class CollectionName(Enum):
ACTIVITIES = "activities"
def find_one_activity(q: _Q) -> _Doc:
return DB[CollectionName.ACTIVITIES.value].find_one(q)

View file

@ -8,12 +8,12 @@ from typing import List
from little_boxes import activitypub as ap
import activitypub
from activitypub import Box
from config import DAYS_TO_KEEP
from config import ID
from config import ME
from config import MEDIA_CACHE
from core import activitypub
from core.meta import Box
from utils.migrations import DB
back = activitypub.MicroblogPubBackend()
@ -48,7 +48,7 @@ def threads_of_interest() -> List[str]:
return list(out)
def _keep(data: Dict[str, Any]):
def _keep(data: Dict[str, Any]) -> None:
DB.activities.update_one({"_id": data["_id"]}, {"$set": {"meta.gc_keep": True}})

77
core/indexes.py Normal file
View file

@ -0,0 +1,77 @@
import pymongo
from config import DB
from core.meta import MetaKey
from core.meta import _meta
def create_indexes():
if "trash" not in DB.collection_names():
DB.create_collection("trash", capped=True, size=50 << 20) # 50 MB
DB.command("compact", "activities")
DB.activities.create_index([(_meta(MetaKey.NOTIFICATION), pymongo.ASCENDING)])
DB.activities.create_index(
[(_meta(MetaKey.NOTIFICATION_UNREAD), pymongo.ASCENDING)]
)
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
DB.activities.create_index([("meta.thread_root_parent", pymongo.ASCENDING)])
DB.activities.create_index(
[
("meta.thread_root_parent", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
)
DB.cache2.create_index(
[
("path", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("arg", pymongo.ASCENDING),
]
)
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
# Index for the block query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
]
)
# Index for count queries
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)
DB.activities.create_index([("box", pymongo.ASCENDING)])
# Outbox query
DB.activities.create_index(
[
("box", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("meta.undo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
("meta.public", pymongo.ASCENDING),
]
)
DB.activities.create_index(
[
("type", pymongo.ASCENDING),
("activity.object.type", pymongo.ASCENDING),
("activity.object.inReplyTo", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
]
)

View file

@ -20,12 +20,15 @@ class MetaKey(Enum):
NOTIFICATION = "notification"
NOTIFICATION_UNREAD = "notification_unread"
NOTIFICATION_FOLLOWS_BACK = "notification_follows_back"
POLL_ANSWER = "poll_answer"
STREAM = "stream"
ACTOR_ID = "actor_id"
UNDO = "undo"
PUBLISHED = "published"
GC_KEEP = "gc_keep"
OBJECT = "object"
OBJECT_ACTOR = "object_actor"
PUBLIC = "public"
def _meta(mk: MetaKey) -> str:
@ -54,3 +57,7 @@ def not_undo() -> _SubQuery:
def by_actor(actor: ap.BaseActivity) -> _SubQuery:
return {_meta(MetaKey.ACTOR_ID): actor.id}
def is_public() -> _SubQuery:
return {_meta(MetaKey.PUBLIC): True}

View file

@ -5,8 +5,8 @@ from urllib.parse import urlparse
from little_boxes import activitypub as ap
import activitypub
from config import ID
from core import activitypub
from utils.migrations import DB
from utils.migrations import Migration
from utils.migrations import logger

View file

@ -2,28 +2,35 @@ import logging
from functools import singledispatch
from typing import Any
from typing import Dict
from urllib.parse import urlparse
from little_boxes import activitypub as ap
from config import BASE_URL
from config import DB
from config import MetaKey
from config import _meta
from tasks import Tasks
from utils.meta import by_actor
from utils.meta import by_type
from utils.meta import in_inbox
from utils.meta import not_undo
from core.meta import MetaKey
from core.meta import _meta
from core.meta import by_actor
from core.meta import by_type
from core.meta import in_inbox
from core.meta import not_undo
from core.tasks import Tasks
_logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
_LOCAL_NETLOC = urlparse(BASE_URL).netloc
def _is_from_outbox(activity: ap.BaseActivity) -> bool:
return activity.id.startswith(BASE_URL)
def _is_local(url: str) -> bool:
return urlparse(url).netloc == _LOCAL_NETLOC
def _flag_as_notification(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
new_meta.update(
{_meta(MetaKey.NOTIFICATION): True, _meta(MetaKey.NOTIFICATION_UNREAD): True}
@ -31,8 +38,14 @@ def _flag_as_notification(activity: ap.BaseActivity, new_meta: _NewMeta) -> None
return None
def _set_flag(meta: _NewMeta, meta_key: MetaKey, value: Any = True) -> None:
meta.update({_meta(meta_key): value})
return None
@singledispatch
def set_inbox_flags(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
_logger.warning(f"skipping {activity!r}")
return None
@ -58,13 +71,15 @@ def _accept_set_inbox_flags(activity: ap.Accept, new_meta: _NewMeta) -> None:
# This Accept will be a "You started following $actor" notification
_flag_as_notification(activity, new_meta)
new_meta.update({_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): follows_back})
_set_flag(new_meta, MetaKey.GC_KEEP)
_set_flag(new_meta, MetaKey.NOTIFICATION_FOLLOWS_BACK, follows_back)
return None
@set_inbox_flags.register
def _follow_set_inbox_flags(activity: ap.Follow, new_meta: _NewMeta) -> None:
"""Handle notification for new followers."""
_logger.info(f"set_inbox_flags activity={activity!r}")
# Check if we're already following this actor
follows_back = False
accept_query = {
@ -83,12 +98,14 @@ def _follow_set_inbox_flags(activity: ap.Follow, new_meta: _NewMeta) -> None:
# This Follow will be a "$actor started following you" notification
_flag_as_notification(activity, new_meta)
new_meta.update({_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): follows_back})
_set_flag(new_meta, MetaKey.GC_KEEP)
_set_flag(new_meta, MetaKey.NOTIFICATION_FOLLOWS_BACK, follows_back)
return None
@set_inbox_flags.register
def _like_set_inbox_flags(activity: ap.Like, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
# Is it a Like of local acitivty/from the outbox
if _is_from_outbox(activity.get_object()):
# Flag it as a notification
@ -98,29 +115,33 @@ def _like_set_inbox_flags(activity: ap.Like, new_meta: _NewMeta) -> None:
Tasks.cache_object(activity.id)
# Also set the "keep mark" for the GC (as we want to keep it forever)
new_meta.update({_meta(MetaKey.GC_KEEP): True})
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
@set_inbox_flags.register
def _announce_set_inbox_flags(activity: ap.Announce, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
# Is it a Like of local acitivty/from the outbox
if _is_from_outbox(activity.get_object()):
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
new_meta.update({_meta(MetaKey.GC_KEEP): True})
_set_flag(new_meta, MetaKey.GC_KEEP)
# Cache the object in all case (for display on the notifcation page **and** the stream page)
Tasks.cache_object(activity.id)
# Display it in the stream
_set_flag(new_meta, MetaKey.STREAM)
return None
@set_inbox_flags.register
def _undo_set_inbox_flags(activity: ap.Undo, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
obj = activity.get_object()
if obj.has_type(ap.ActivityType.FOLLOW):
@ -128,6 +149,49 @@ def _undo_set_inbox_flags(activity: ap.Undo, new_meta: _NewMeta) -> None:
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
new_meta.update({_meta(MetaKey.GC_KEEP): True})
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
@set_inbox_flags.register
def _create_set_inbox_flags(activity: ap.Create, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
obj = activity.get_object()
_set_flag(new_meta, MetaKey.POLL_ANSWER, False)
in_reply_to = obj.get_in_reply_to()
# Check if it's a local reply
if in_reply_to and _is_local(in_reply_to):
# TODO(tsileo): fetch the reply to check for poll answers more precisely
# reply_of = ap.fetch_remote_activity(in_reply_to)
# Ensure it's not a poll answer
if obj.name and not obj.content:
_set_flag(new_meta, MetaKey.POLL_ANSWER)
return None
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
# Check for mention
for mention in obj.get_mentions():
if mention.href and _is_local(mention.href):
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
if not in_reply_to:
# A good candidate for displaying in the stream
_set_flag(new_meta, MetaKey.STREAM)
return None

232
core/shared.py Normal file
View file

@ -0,0 +1,232 @@
import os
from datetime import datetime
from datetime import timezone
from functools import wraps
from typing import Any
from typing import Dict
from typing import Union
import flask
import werkzeug
from bson.objectid import ObjectId
from flask import current_app as app
from flask import redirect
from flask import request
from flask import session
from flask import url_for
from flask_wtf.csrf import CSRFProtect
from little_boxes import activitypub as ap
from little_boxes.activitypub import format_datetime
from poussetaches import PousseTaches
from config import DB
from config import ME
from core import activitypub
from core.activitypub import _answer_key
from core.meta import Box
from core.tasks import Tasks
_Response = Union[flask.Response, werkzeug.wrappers.Response, str]
p = PousseTaches(
os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"),
os.getenv("MICROBLOGPUB_INTERNAL_HOST", "http://localhost:5000"),
)
csrf = CSRFProtect()
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
def add_response_headers(headers={}):
"""This decorator adds the headers passed in to the response"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = flask.make_response(f(*args, **kwargs))
h = resp.headers
for header, value in headers.items():
h[header] = value
return resp
return decorated_function
return decorator
def noindex(f):
"""This decorator passes X-Robots-Tag: noindex, nofollow"""
return add_response_headers({"X-Robots-Tag": "noindex, nofollow"})(f)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("admin_login", next=request.url))
return f(*args, **kwargs)
return decorated_function
def _get_ip():
"""Guess the IP address from the request. Only used for security purpose (failed logins or bad payload).
Geoip will be returned if the "broxy" headers are set (it does Geoip
using an offline database and append these special headers).
"""
ip = request.headers.get("X-Forwarded-For", request.remote_addr)
geoip = None
if request.headers.get("Broxy-Geoip-Country"):
geoip = (
request.headers.get("Broxy-Geoip-Country")
+ "/"
+ request.headers.get("Broxy-Geoip-Region")
)
return ip, geoip
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
def _build_thread(data, include_children=True): # noqa: C901
data["_requested"] = True
app.logger.info(f"_build_thread({data!r})")
root_id = data["meta"].get("thread_root_parent", data["activity"]["object"]["id"])
query = {
"$or": [{"meta.thread_root_parent": root_id}, {"activity.object.id": root_id}],
"meta.deleted": False,
}
replies = [data]
for dat in DB.activities.find(query):
print(dat["type"])
if dat["type"][0] == ap.ActivityType.CREATE.value:
replies.append(dat)
if dat["type"][0] == ap.ActivityType.UPDATE.value:
continue
else:
# Make a Note/Question/... looks like a Create
dat = {
"activity": {"object": dat["activity"]},
"meta": dat["meta"],
"_id": dat["_id"],
}
replies.append(dat)
replies = sorted(replies, key=lambda d: d["activity"]["object"]["published"])
# Index all the IDs in order to build a tree
idx = {}
replies2 = []
for rep in replies:
rep_id = rep["activity"]["object"]["id"]
if rep_id in idx:
continue
idx[rep_id] = rep.copy()
idx[rep_id]["_nodes"] = []
replies2.append(rep)
# Build the tree
for rep in replies2:
rep_id = rep["activity"]["object"]["id"]
if rep_id == root_id:
continue
reply_of = ap._get_id(rep["activity"]["object"].get("inReplyTo"))
try:
idx[reply_of]["_nodes"].append(rep)
except KeyError:
app.logger.info(f"{reply_of} is not there! skipping {rep}")
# Flatten the tree
thread = []
def _flatten(node, level=0):
node["_level"] = level
thread.append(node)
for snode in sorted(
idx[node["activity"]["object"]["id"]]["_nodes"],
key=lambda d: d["activity"]["object"]["published"],
):
_flatten(snode, level=level + 1)
try:
_flatten(idx[root_id])
except KeyError:
app.logger.info(f"{root_id} is not there! skipping")
return thread
def paginated_query(db, q, limit=25, sort_key="_id"):
older_than = newer_than = None
query_sort = -1
first_page = not request.args.get("older_than") and not request.args.get(
"newer_than"
)
query_older_than = request.args.get("older_than")
query_newer_than = request.args.get("newer_than")
if query_older_than:
q["_id"] = {"$lt": ObjectId(query_older_than)}
elif query_newer_than:
q["_id"] = {"$gt": ObjectId(query_newer_than)}
query_sort = 1
outbox_data = list(db.find(q, limit=limit + 1).sort(sort_key, query_sort))
outbox_len = len(outbox_data)
outbox_data = sorted(
outbox_data[:limit], key=lambda x: str(x[sort_key]), reverse=True
)
if query_older_than:
newer_than = str(outbox_data[0]["_id"])
if outbox_len == limit + 1:
older_than = str(outbox_data[-1]["_id"])
elif query_newer_than:
older_than = str(outbox_data[-1]["_id"])
if outbox_len == limit + 1:
newer_than = str(outbox_data[0]["_id"])
elif first_page and outbox_len == limit + 1:
older_than = str(outbox_data[-1]["_id"])
return outbox_data, older_than, newer_than
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
activity = raw_doc["activity"]
if (
ap._has_type(activity["type"], ap.ActivityType.CREATE)
and "object" in activity
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
):
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
choice["replies"] = {
"type": ap.ActivityType.COLLECTION.value,
"totalItems": raw_doc["meta"]
.get("question_answers", {})
.get(_answer_key(choice["name"]), 0),
}
now = datetime.now(timezone.utc)
if format_datetime(now) >= activity["object"]["endTime"]:
activity["object"]["closed"] = activity["object"]["endTime"]

View file

@ -3,6 +3,7 @@ from datetime import datetime
from datetime import timezone
from poussetaches import PousseTaches
from utils import parse_datetime
p = PousseTaches(

View file

@ -7,7 +7,7 @@ services:
ports:
- "27017:27017"
poussetaches:
image: "poussetaches:latest"
image: "poussetaches/poussetaches:latest"
volumes:
- "${DATA_DIR}/poussetaches:/app/poussetaches_data"
environment:

View file

@ -18,7 +18,7 @@ services:
volumes:
- "${DATA_DIR}/mongodb:/data/db"
poussetaches:
image: "poussetaches:latest"
image: "poussetaches/poussetaches:latest"
volumes:
- "${DATA_DIR}/poussetaches:/app/poussetaches_data"
environment:

View file

@ -1,132 +0,0 @@
import base64
import json
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from typing import Dict
from typing import List
import flask
import requests
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@dataclass
class Task:
req_id: str
tries: int
payload: Any
@dataclass
class GetTask:
payload: Any
expected: int
schedule: str
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
self.base_url = base_url
def push(
self,
payload: Any,
path: str,
expected: int = 200,
schedule: str = "",
delay: int = 0,
) -> str:
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
json={
"url": self.base_url + path,
"payload": p,
"expected": expected,
"schedule": schedule,
"delay": delay,
},
)
resp.raise_for_status()
return resp.headers["Poussetaches-Task-ID"]
def parse(self, req: flask.Request) -> Task:
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
raise ValueError("Bad auth key")
# Parse the "envelope"
envelope = json.loads(req.data)
print(req)
print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(
req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
) # type: ignore
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
t["last_run"] = datetime.fromtimestamp(float(t["last_run"] / 1e9))
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(
GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
schedule=t["schedule"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
)
)
return out
def get_cron(self) -> List[GetTask]:
return self._get("cron")
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")

View file

@ -1,3 +1,4 @@
poussetaches
python-dateutil
libsass
tornado<6.0.0

4
run.sh
View file

@ -1,4 +1,4 @@
#!/bin/bash
python -c "import logging; logging.basicConfig(level=logging.DEBUG); import migrations; migrations.perform()"
python -c "import config; config.create_indexes()"
python -c "import logging; logging.basicConfig(level=logging.DEBUG); from core import migrations; migrations.perform()"
python -c "from core import indexes; indexes.create_indexes()"
gunicorn -t 600 -w 5 -b 0.0.0.0:5005 --log-level debug app:app

View file

@ -20,7 +20,7 @@
</div>
</div>
<form method="POST" action="{{ url_for('indieauth_flow') }}">
<form method="POST" action="{{ url_for('indieauth.indieauth_flow') }}">
{% if scopes %}
<h3>Scopes</h3>
<ul>

View file

@ -31,7 +31,7 @@
{% if unread_notifications_count %}
({{unread_notifications_count}})
{% endif %}</a></li>
<li class="left"><a href="/admin/lists"{% if request.path == url_for('admin_lists') %} class="selected" {% endif %}>Lists</a></li>
<li class="left"><a href="/admin/lists"{% if request.path == url_for('admin.admin_lists') %} class="selected" {% endif %}>Lists</a></li>
<li class="left"><a href="/admin/bookmarks"{% if request.path == "/admin/bookmarks" %} class="selected" {% endif %}>Bookmarks</a></li>
<li class="left"><a href="/admin/lookup"{% if request.path == "/admin/lookup" %} class="selected" {% endif %}>Lookup</a></li>
<li class="left"><a href="/admin/logout">Logout</a></li>

View file

@ -12,7 +12,7 @@
<p>Lists and its members are private.</p>
<h2>New List</h2>
<form action="/api/new_list" method="POST">
<input type="hidden" name="redirect" value="{{ url_for('admin_lists') }}">
<input type="hidden" name="redirect" value="{{ url_for('admin.admin_lists') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="text" name="name" placeholder="My list">
<input type="submit" value="Create">
@ -23,13 +23,13 @@
<ul>
{% for l in lists %}
<li><a href="{{url_for('admin_list', name=l.name)}}">{{ l.name }}</a></li>
<li><a href="{{url_for('admin.admin_list', name=l.name)}}">{{ l.name }}</a></li>
{% endfor %}
</ul>
<h2>Manage lists</h2>
{% for l in lists %}
<h3><a href="{{url_for("admin_list", name=l.name)}}">{{ l.name }}</a> <small style="font-weight:normal">{{ l.members | length }} members</small></h3>
<h3><a href="{{url_for('admin.admin_list', name=l.name)}}">{{ l.name }}</a> <small style="font-weight:normal">{{ l.members | length }} members</small></h3>
<form action="/api/delete_list" method="post">
<input type="hidden" name="redirect" value="{{ request.path }}"/>
<input type="hidden" name="name" value="{{ l.name }}"/>

View file

@ -1,12 +1,12 @@
{% extends "layout.html" %}
{% import 'utils.html' as utils %}
{% block title %}{% if request.path == url_for('admin_stream') %}Stream{% else %}Notifications{% endif %} - {{ config.NAME }}{% endblock %}
{% block title %}{% if request.path == url_for('admin.admin_stream') %}Stream{% else %}Notifications{% endif %} - {{ config.NAME }}{% endblock %}
{% block content %}
<div class="h-feed" id="container">
{% include "header.html" %}
<div id="admin">
{% if request.path == url_for('admin_notifications') and unread_notifications_count %}
{% if request.path == url_for('admin.admin_notifications') and unread_notifications_count %}
<div style="clear:both;padding-bottom:60px;">
<form action="/api/mark_notifications_as_read" method="POST">
<input type="hidden" name="redirect" value="{{ request.path }}"/>
@ -28,7 +28,7 @@
{% if boost_actor %}
<div style="margin-left:70px;padding-bottom:5px;margin-bottom:15px;display:inline-block;">
<span class="bar-item-no-hover"><a style="color:#808080;" href="{{ boost_actor.url | get_url }}">{{ boost_actor.name or boost_actor.preferredUsername }}</a> boosted</span>
{% if request.path == url_for('admin_notifications') %}
{% if request.path == url_for('admin.admin_notifications') %}
{% if item.meta.notification_unread %}<span class="bar-item-no-bg"><span class="pcolor">new</span></span>{% endif %}
<span class="bar-item-no-bg">{{ (item.activity.published or item.meta.published) | format_timeago }}</span>
{% endif %}

View file

@ -213,7 +213,7 @@
</form>
{% endif %}
{% if meta.bookmarked or request.path == url_for("admin_bookmarks") %}
{% if meta.bookmarked or request.path == url_for("admin.admin_bookmarks") %}
<form action="/api/bookmark" class="action-form" method="POST">
<input type="hidden" name="redirect" value="{{ redir }}">
<input type="hidden" name="id" value="{{ obj.id }}">

328
utils/template_filters.py Normal file
View file

@ -0,0 +1,328 @@
import logging
import urllib
from datetime import datetime
from datetime import timezone
from typing import Dict
from typing import Optional
from typing import Tuple
from urllib.parse import urlparse
import bleach
import emoji_unicode
import flask
import html2text
import timeago
from little_boxes import activitypub as ap
from little_boxes.activitypub import _to_list
from little_boxes.errors import ActivityGoneError
from little_boxes.errors import ActivityNotFoundError
from core.activitypub import _answer_key
from config import EMOJI_TPL
from config import ID
from config import MEDIA_CACHE
from utils import parse_datetime
from utils.media import Kind
_logger = logging.getLogger(__name__)
H2T = html2text.HTML2Text()
H2T.ignore_links = True
H2T.ignore_images = True
filters = flask.Blueprint("filters", __name__)
@filters.app_template_filter()
def visibility(v: str) -> str:
try:
return ap.Visibility[v].value.lower()
except Exception:
return v
@filters.app_template_filter()
def visibility_is_public(v: str) -> bool:
return v in [ap.Visibility.PUBLIC.name, ap.Visibility.UNLISTED.name]
@filters.app_template_filter()
def emojify(text):
return emoji_unicode.replace(
text, lambda e: EMOJI_TPL.format(filename=e.code_points, raw=e.unicode)
)
# HTML/templates helper
ALLOWED_TAGS = [
"a",
"abbr",
"acronym",
"b",
"br",
"blockquote",
"code",
"pre",
"em",
"i",
"li",
"ol",
"strong",
"ul",
"span",
"div",
"p",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
]
def clean_html(html):
try:
return bleach.clean(html, tags=ALLOWED_TAGS)
except Exception:
return ""
@filters.app_template_filter()
def gtone(n):
return n > 1
@filters.app_template_filter()
def gtnow(dtstr):
return ap.format_datetime(datetime.now(timezone.utc)) > dtstr
@filters.app_template_filter()
def clean(html):
out = clean_html(html)
return emoji_unicode.replace(
out, lambda e: EMOJI_TPL.format(filename=e.code_points, raw=e.unicode)
)
@filters.app_template_filter()
def permalink_id(val):
return str(hash(val))
@filters.app_template_filter()
def quote_plus(t):
return urllib.parse.quote_plus(t)
@filters.app_template_filter()
def is_from_outbox(t):
return t.startswith(ID)
@filters.app_template_filter()
def html2plaintext(body):
return H2T.handle(body)
@filters.app_template_filter()
def domain(url):
return urlparse(url).netloc
@filters.app_template_filter()
def format_time(val):
if val:
dt = parse_datetime(val)
return datetime.strftime(dt, "%B %d, %Y, %H:%M %p")
return val
@filters.app_template_filter()
def format_ts(val):
return datetime.fromtimestamp(val).strftime("%B %d, %Y, %H:%M %p")
@filters.app_template_filter()
def gt_ts(val):
return datetime.now() > datetime.fromtimestamp(val)
@filters.app_template_filter()
def format_timeago(val):
if val:
dt = parse_datetime(val)
return timeago.format(dt.astimezone(timezone.utc), datetime.now(timezone.utc))
return val
@filters.app_template_filter()
def url_or_id(d):
if isinstance(d, dict):
if "url" in d:
return d["url"]
else:
return d["id"]
return ""
@filters.app_template_filter()
def get_url(u):
print(f"GET_URL({u!r})")
if isinstance(u, list):
for l in u:
if l.get("mimeType") == "text/html":
u = l
if isinstance(u, dict):
return u["href"]
elif isinstance(u, str):
return u
else:
return u
@filters.app_template_filter()
def get_actor(url):
if not url:
return None
if isinstance(url, list):
url = url[0]
if isinstance(url, dict):
url = url.get("id")
print(f"GET_ACTOR {url}")
try:
return ap.get_backend().fetch_iri(url)
except (ActivityNotFoundError, ActivityGoneError):
return f"Deleted<{url}>"
except Exception as exc:
return f"Error<{url}/{exc!r}>"
@filters.app_template_filter()
def get_answer_count(choice, obj, meta):
count_from_meta = meta.get("question_answers", {}).get(_answer_key(choice), 0)
print(count_from_meta)
print(choice, obj, meta)
if count_from_meta:
return count_from_meta
for option in obj.get("oneOf", obj.get("anyOf", [])):
if option.get("name") == choice:
return option.get("replies", {}).get("totalItems", 0)
@filters.app_template_filter()
def get_total_answers_count(obj, meta):
cached = meta.get("question_replies", 0)
if cached:
return cached
cnt = 0
for choice in obj.get("anyOf", obj.get("oneOf", [])):
print(choice)
cnt += choice.get("replies", {}).get("totalItems", 0)
return cnt
_GRIDFS_CACHE: Dict[Tuple[Kind, str, Optional[int]], str] = {}
def _get_file_url(url, size, kind):
k = (kind, url, size)
cached = _GRIDFS_CACHE.get(k)
if cached:
return cached
doc = MEDIA_CACHE.get_file(url, size, kind)
if doc:
u = f"/media/{str(doc._id)}"
_GRIDFS_CACHE[k] = u
return u
# MEDIA_CACHE.cache(url, kind)
_logger.error(f"cache not available for {url}/{size}/{kind}")
return url
@filters.app_template_filter()
def get_actor_icon_url(url, size):
return _get_file_url(url, size, Kind.ACTOR_ICON)
@filters.app_template_filter()
def get_attachment_url(url, size):
return _get_file_url(url, size, Kind.ATTACHMENT)
@filters.app_template_filter()
def get_og_image_url(url, size=100):
try:
return _get_file_url(url, size, Kind.OG_IMAGE)
except Exception:
return ""
@filters.app_template_filter()
def remove_mongo_id(dat):
if isinstance(dat, list):
return [remove_mongo_id(item) for item in dat]
if "_id" in dat:
dat["_id"] = str(dat["_id"])
for k, v in dat.items():
if isinstance(v, dict):
dat[k] = remove_mongo_id(dat[k])
return dat
@filters.app_template_filter()
def get_video_link(data):
for link in data:
if link.get("mimeType", "").startswith("video/"):
return link.get("href")
return None
@filters.app_template_filter()
def has_type(doc, _types):
for _type in _to_list(_types):
if _type in _to_list(doc["type"]):
return True
return False
@filters.app_template_filter()
def has_actor_type(doc):
# FIXME(tsileo): skipping the last one "Question", cause Mastodon sends question restuls as an update coming from
# the question... Does Pleroma do that too?
for t in ap.ACTOR_TYPES[:-1]:
if has_type(doc, t.value):
return True
return False
def _is_img(filename):
filename = filename.lower()
if (
filename.endswith(".png")
or filename.endswith(".jpg")
or filename.endswith(".jpeg")
or filename.endswith(".gif")
or filename.endswith(".svg")
):
return True
return False
@filters.app_template_filter()
def not_only_imgs(attachment):
for a in attachment:
if isinstance(a, dict) and not _is_img(a["url"]):
return True
if isinstance(a, str) and not _is_img(a):
return True
return False
@filters.app_template_filter()
def is_img(filename):
return _is_img(filename)