Move more DB stuff to celery

This commit is contained in:
Thomas Sileo 2018-07-29 16:07:27 +02:00
parent e3b8c4f63c
commit 55ff15ff86
3 changed files with 145 additions and 49 deletions

View file

@ -1,5 +1,6 @@
import logging
import os
import json
from datetime import datetime
from enum import Enum
from typing import Any
@ -120,10 +121,6 @@ class MicroblogPubBackend(Backend):
def set_save_cb(self, cb):
self.save_cb = cb
@ensure_it_is_me
def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:
self.save(Box.OUTBOX, activity)
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
@ -241,21 +238,9 @@ class MicroblogPubBackend(Backend):
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
@ensure_it_is_me
def inbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:
self.save(Box.INBOX, activity)
def set_post_to_remote_inbox(self, cb):
self.post_to_remote_inbox_cb = cb
@ensure_it_is_me
def post_to_remote_inbox(self, as_actor: ap.Person, payload: str, to: str) -> None:
self.post_to_remote_inbox_cb(payload, to)
@ensure_it_is_me
def new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
pass
@ensure_it_is_me
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
DB.activities.update_one(
@ -268,10 +253,6 @@ class MicroblogPubBackend(Backend):
{"remote_id": follow.id}, {"$set": {"meta.undo": True}}
)
@ensure_it_is_me
def new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None:
pass
@ensure_it_is_me
def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object()
@ -527,6 +508,25 @@ class MicroblogPubBackend(Backend):
{"$set": {"meta.thread_root_parent": root_reply}},
)
def post_to_outbox(self, activity: ap.BaseActivity) -> None:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
self.save(Box.OUTBOX, activity)
# Assign create a random ID
obj_id = self.random_object_id()
activity.set_id(self.activity_url(obj_id), obj_id)
recipients = activity.recipients()
logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
logger.debug(f"posting to {recp}")
self.post_to_remote_inbox(self.get_actor(), payload, recp)
def gen_feed():
fg = FeedGenerator()

46
app.py
View file

@ -74,7 +74,6 @@ from config import PASS
from config import USERNAME
from config import VERSION
from config import _drop_db
from config import custom_cache_purge_hook
from utils.key import get_secret_key
from utils.lookup import lookup
from utils.media import Kind
@ -91,9 +90,6 @@ def save_cb(box: Box, iri: str) -> None:
back.set_save_cb(save_cb)
back.set_post_to_remote_inbox(tasks.post_to_inbox.delay)
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
@ -117,9 +113,6 @@ else:
SIG_AUTH = HTTPSigAuth(KEY)
OUTBOX = ap.Outbox(MY_PERSON)
INBOX = ap.Inbox(MY_PERSON)
def verify_pass(pwd):
return bcrypt.verify(pwd, PASS)
@ -615,7 +608,7 @@ def authorize_follow():
return redirect("/following")
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
OUTBOX.post(follow)
tasks.post_to_outbox(follow)
return redirect("/following")
@ -1121,12 +1114,9 @@ def outbox():
data = request.get_json(force=True)
print(data)
activity = ap.parse_activity(data)
OUTBOX.post(activity)
activity_id = tasks.post_to_outbox(activity)
# Purge the cache if a custom hook is set, as new content was published
custom_cache_purge_hook()
return Response(status=201, headers={"Location": activity.id})
return Response(status=201, headers={"Location": activity_id})
@app.route("/outbox/<item_id>")
@ -1465,9 +1455,9 @@ def api_delete():
note = _user_api_get_note(from_outbox=True)
delete = note.build_delete()
OUTBOX.post(delete)
delete_id = tasks.post_to_outbox(delete)
return _user_api_response(activity=delete.id)
return _user_api_response(activity=delete_id)
@app.route("/api/boost", methods=["POST"])
@ -1476,9 +1466,9 @@ def api_boost():
note = _user_api_get_note()
announce = note.build_announce(MY_PERSON)
OUTBOX.post(announce)
announce_id = tasks.post_to_outbox(announce)
return _user_api_response(activity=announce.id)
return _user_api_response(activity=announce_id)
@app.route("/api/like", methods=["POST"])
@ -1487,9 +1477,9 @@ def api_like():
note = _user_api_get_note()
like = note.build_like(MY_PERSON)
OUTBOX.post(like)
like_id = tasks.post_to_outbox(like)
return _user_api_response(activity=like.id)
return _user_api_response(activity=like_id)
@app.route("/api/note/pin", methods=["POST"])
@ -1534,9 +1524,9 @@ def api_undo():
obj = ap.parse_activity(doc.get("activity"))
# FIXME(tsileo): detect already undo-ed and make this API call idempotent
undo = obj.build_undo()
OUTBOX.post(undo)
undo_id = tasks.post_to_outbox(undo)
return _user_api_response(activity=undo.id)
return _user_api_response(activity=undo_id)
@app.route("/admin/stream")
@ -1605,7 +1595,7 @@ def inbox():
)
activity = ap.parse_activity(data)
logger.debug(f"inbox activity={activity}/{data}")
INBOX.post(activity)
tasks.post_to_inbox(activity)
return Response(status=201)
@ -1691,9 +1681,9 @@ def api_new_note():
note = ap.Note(**raw_note)
create = note.build_create()
OUTBOX.post(create)
create_id = tasks.post_to_outbox(create)
return _user_api_response(activity=create.id)
return _user_api_response(activity=create_id)
@app.route("/api/stream")
@ -1724,9 +1714,9 @@ def api_block():
return _user_api_response(activity=existing["activity"]["id"])
block = ap.Block(actor=MY_PERSON.id, object=actor)
OUTBOX.post(block)
block_id = tasks.post_to_outbox(block)
return _user_api_response(activity=block.id)
return _user_api_response(activity=block_id)
@app.route("/api/follow", methods=["POST"])
@ -1746,9 +1736,9 @@ def api_follow():
return _user_api_response(activity=existing["activity"]["id"])
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
OUTBOX.post(follow)
follow_id = tasks.post_to_outbox(follow)
return _user_api_response(activity=follow.id)
return _user_api_response(activity=follow_id)
@app.route("/followers")

108
tasks.py
View file

@ -14,8 +14,10 @@ from little_boxes.linked_data_sig import generate_signature
from requests.exceptions import HTTPError
import activitypub
from activitypub import Box
from config import DB
from config import HEADERS
from config import ME
from config import ID
from config import KEY
from config import MEDIA_CACHE
@ -33,6 +35,8 @@ SigAuth = HTTPSigAuth(KEY)
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
@app.task(bind=True, max_retries=12) # noqa: C901
def process_new_activity(self, iri: str) -> None:
@ -253,8 +257,110 @@ def cache_attachments(self, iri: str) -> None:
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if back.outbox_is_blocked(MY_PERSON, actor.id):
log.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
if back.inbox_check_duplicate(MY_PERSON, activity.id):
# The activity is already in the inbox
log.info(f"received duplicate activity {activity!r}, dropping it")
back.save(Box.INBOX, activity)
finish_post_to_inbox.delay(activity.id)
@app.task(bind=True, max_retries=12) # noqa: C901
def finish_post_to_inbox(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.DELETE):
back.inbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.inbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.inbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.inbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.FOLLOW):
# Reply to a Follow with an Accept
accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True))
post_to_outbox(accept)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.inbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_follower(MY_PERSON, obj)
except Exception as err:
log.exception(f"failed to cache attachments for {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity)
finish_post_to_outbox.delay(activity.id)
return activity.id
@app.task(bind=True, max_retries=12) # noqa:C901
def finish_post_to_outbox(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
if activity.has_type(ap.activitytype.delete):
back.outbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.outbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.outbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.outbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.outbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_following(MY_PERSON, obj)
recipients = activity.recipients()
log.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
log.debug(f"posting to {recp}")
post_to_remote_inbox.delay(payload, recp)
except Exception as err:
log.exception(f"failed to cache attachments for {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
@app.task(bind=True, max_retries=12)
def post_to_inbox(self, payload: str, to: str) -> None:
def post_to_remote_inbox(self, payload: str, to: str) -> None:
try:
log.info("payload=%s", payload)
log.info("generating sig")