Try poussetaches

This commit is contained in:
Thomas Sileo 2019-04-05 11:35:48 +02:00
parent 01b849be70
commit 3289e91786
7 changed files with 673 additions and 57 deletions

View file

@ -16,6 +16,7 @@ install:
- sudo chmod +x /usr/local/bin/docker-compose
- docker-compose --version
- pip install -r dev-requirements.txt
- git clone https://github.com/tsileo/poussetaches.git && cd poussetaches && docker build . -t poussetaches:latest && cd -
script:
- mypy --ignore-missing-imports .
- flake8 activitypub.py

646
app.py
View file

@ -16,6 +16,8 @@ from typing import Tuple
from urllib.parse import urlencode
from urllib.parse import urlparse
from requests.exceptions import HTTPError
import requests
import bleach
import mf2py
import pymongo
@ -41,7 +43,10 @@ from little_boxes.activitypub import _to_list
from little_boxes.activitypub import clean_activity
from little_boxes.activitypub import get_backend
from little_boxes.content_helper import parse_markdown
from little_boxes.linked_data_sig import generate_signature
from little_boxes.errors import ActivityGoneError
from little_boxes.errors import NotAnActivityError
from little_boxes.errors import BadActivityError
from little_boxes.errors import ActivityNotFoundError
from little_boxes.errors import Error
from little_boxes.errors import NotFromOutboxError
@ -49,15 +54,18 @@ from little_boxes.httpsig import HTTPSigAuth
from little_boxes.httpsig import verify_request
from little_boxes.webfinger import get_actor_url
from little_boxes.webfinger import get_remote_follow_template
from utils import opengraph
from passlib.hash import bcrypt
from u2flib_server import u2f
from werkzeug.utils import secure_filename
import activitypub
import config
import tasks
# import tasks
from activitypub import Box
from activitypub import embed_collection
from config import USER_AGENT
from config import ADMIN_API_KEY
from config import BASE_URL
from config import DB
@ -78,6 +86,11 @@ from utils.key import get_secret_key
from utils.lookup import lookup
from utils.media import Kind
from poussetaches import PousseTaches
p = PousseTaches("http://poussetaches:7991", "http://web:5005")
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
@ -191,7 +204,7 @@ ALLOWED_TAGS = [
def clean_html(html):
try:
return bleach.clean(html, tags=ALLOWED_TAGS)
except:
except Exception:
return ""
@ -631,7 +644,7 @@ def authorize_follow():
return redirect("/following")
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
tasks.post_to_outbox(follow)
post_to_outbox(follow)
return redirect("/following")
@ -758,7 +771,7 @@ def tmp_migrate4():
@login_required
def tmp_migrate5():
for activity in DB.activities.find():
tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False)
Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False)
return "Done"
@ -835,9 +848,10 @@ def _get_cached(type_="html", arg=None):
cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg})
if cached:
app.logger.info("from cache")
return cached['response_data']
return cached["response_data"]
return None
def _cache(resp, type_="html", arg=None):
if not CACHING:
return None
@ -855,7 +869,9 @@ def _cache(resp, type_="html", arg=None):
def index():
if is_api_request():
return jsonify(**ME)
cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
cache_arg = (
f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
)
cached = _get_cached("html", cache_arg)
if cached:
return cached
@ -1053,22 +1069,22 @@ def nodeinfo():
}
response = json.dumps(
{
"version": "2.0",
"software": {
"name": "microblogpub",
"version": f"Microblog.pub {VERSION}",
},
"protocols": ["activitypub"],
"services": {"inbound": [], "outbound": []},
"openRegistrations": False,
"usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
"metadata": {
"sourceCode": "https://github.com/tsileo/microblog.pub",
"nodeName": f"@{USERNAME}@{DOMAIN}",
},
}
)
{
"version": "2.0",
"software": {
"name": "microblogpub",
"version": f"Microblog.pub {VERSION}",
},
"protocols": ["activitypub"],
"services": {"inbound": [], "outbound": []},
"openRegistrations": False,
"usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
"metadata": {
"sourceCode": "https://github.com/tsileo/microblog.pub",
"nodeName": f"@{USERNAME}@{DOMAIN}",
},
}
)
if not cached:
_cache(response, "api")
@ -1197,7 +1213,7 @@ def outbox():
data = request.get_json(force=True)
print(data)
activity = ap.parse_activity(data)
activity_id = tasks.post_to_outbox(activity)
activity_id = post_to_outbox(activity)
return Response(status=201, headers={"Location": activity_id})
@ -1536,11 +1552,15 @@ def _user_api_get_note(from_outbox: bool = False):
oid = _user_api_arg("id")
app.logger.info(f"fetching {oid}")
try:
note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.NOTE)
except:
note = ap.parse_activity(
get_backend().fetch_iri(oid), expected=ActivityType.NOTE
)
except Exception:
try:
note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.VIDEO)
except:
note = ap.parse_activity(
get_backend().fetch_iri(oid), expected=ActivityType.VIDEO
)
except Exception:
raise ActivityNotFoundError(
"Expected Note or Video ActivityType, but got something else"
)
@ -1570,7 +1590,7 @@ def api_delete():
delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True))
delete_id = tasks.post_to_outbox(delete)
delete_id = post_to_outbox(delete)
return _user_api_response(activity=delete_id)
@ -1581,7 +1601,7 @@ def api_boost():
note = _user_api_get_note()
announce = note.build_announce(MY_PERSON)
announce_id = tasks.post_to_outbox(announce)
announce_id = post_to_outbox(announce)
return _user_api_response(activity=announce_id)
@ -1592,7 +1612,7 @@ def api_like():
note = _user_api_get_note()
like = note.build_like(MY_PERSON)
like_id = tasks.post_to_outbox(like)
like_id = post_to_outbox(like)
return _user_api_response(activity=like_id)
@ -1639,7 +1659,7 @@ def api_undo():
obj = ap.parse_activity(doc.get("activity"))
# FIXME(tsileo): detect already undo-ed and make this API call idempotent
undo = obj.build_undo()
undo_id = tasks.post_to_outbox(undo)
undo_id = post_to_outbox(undo)
return _user_api_response(activity=undo_id)
@ -1664,7 +1684,7 @@ def admin_stream():
)
@app.route("/inbox", methods=["GET", "POST"])
@app.route("/inbox", methods=["GET", "POST"]) # noqa: C901
def inbox():
if request.method == "GET":
if not is_api_request():
@ -1733,7 +1753,7 @@ def inbox():
)
activity = ap.parse_activity(data)
logger.debug(f"inbox activity={activity}/{data}")
tasks.post_to_inbox(activity)
post_to_inbox(activity)
return Response(status=201)
@ -1819,7 +1839,7 @@ def api_new_note():
note = ap.Note(**raw_note)
create = note.build_create()
create_id = tasks.post_to_outbox(create)
create_id = post_to_outbox(create)
return _user_api_response(activity=create_id)
@ -1852,7 +1872,7 @@ def api_block():
return _user_api_response(activity=existing["activity"]["id"])
block = ap.Block(actor=MY_PERSON.id, object=actor)
block_id = tasks.post_to_outbox(block)
block_id = post_to_outbox(block)
return _user_api_response(activity=block_id)
@ -1874,7 +1894,7 @@ def api_follow():
return _user_api_response(activity=existing["activity"]["id"])
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
follow_id = tasks.post_to_outbox(follow)
follow_id = post_to_outbox(follow)
return _user_api_response(activity=follow_id)
@ -1895,8 +1915,9 @@ def followers():
)
raw_followers, older_than, newer_than = paginated_query(DB.activities, q)
followers = [doc["meta"]["actor"]
for doc in raw_followers if "actor" in doc.get("meta", {})]
followers = [
doc["meta"]["actor"] for doc in raw_followers if "actor" in doc.get("meta", {})
]
return render_template(
"followers.html",
followers_data=followers,
@ -1924,9 +1945,11 @@ def following():
abort(404)
following, older_than, newer_than = paginated_query(DB.activities, q)
following = [(doc["remote_id"], doc["meta"]["object"])
for doc in following
if "remote_id" in doc and "object" in doc.get("meta", {})]
following = [
(doc["remote_id"], doc["meta"]["object"])
for doc in following
if "remote_id" in doc and "object" in doc.get("meta", {})
]
return render_template(
"following.html",
following_data=following,
@ -2087,7 +2110,7 @@ def indieauth_flow():
return redirect(red)
@app.route('/indieauth', methods=['GET', 'POST'])
@app.route("/indieauth", methods=["GET", "POST"])
def indieauth_endpoint():
if request.method == "GET":
if not session.get("logged_in"):
@ -2189,9 +2212,7 @@ def token_endpoint():
@app.route("/feed.json")
def json_feed():
return Response(
response=json.dumps(
activitypub.json_feed("/feed.json")
),
response=json.dumps(activitypub.json_feed("/feed.json")),
headers={"Content-Type": "application/json"},
)
@ -2210,3 +2231,538 @@ def rss_feed():
response=activitypub.gen_feed().rss_str(),
headers={"Content-Type": "application/rss+xml"},
)
@app.route("/task/t1")
def task_t1():
p.push(
"https://mastodon.cloud/@iulius/101852467780804071/activity",
"/task/cache_object",
)
return "ok"
@app.route("/task/t2", methods=["POST"])
def task_t2():
print(request)
print(request.headers)
task = p.parse(request)
print(task)
return "yay"
@app.route("/task/fetch_og_meta", methods=["POST"])
def task_fetch_og_metadata():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
links = opengraph.links_from_note(note.to_dict())
og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links)
for og in og_metadata:
if not og.get("image"):
continue
MEDIA_CACHE.cache_og_image(og["image"])
app.logger.debug(f"OG metadata {og_metadata!r}")
DB.activities.update_one(
{"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
)
app.logger.info(f"OG metadata fetched for {iri}")
except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"dropping activity {iri}, skip OG metedata")
return ""
except requests.exceptions.HTTPError as http_err:
if 400 <= http_err.response.status_code < 500:
app.logger.exception("bad request, no retry")
return ""
app.logger.exception("failed to fetch OG metadata")
abort(500)
except Exception:
app.logger.exception(f"failed to fetch OG metadata for {iri}")
abort(500)
@app.route("/task/cache_object", methods=["POST"])
def task_cache_object():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
print(activity)
print(activity.__dict__)
app.logger.info(f"activity={activity!r}")
obj = activity
# obj = activity.get_object()
DB.activities.update_one(
{"remote_id": activity.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": activitypub._actor_to_meta(obj.get_actor()),
}
},
)
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
return ""
except Exception:
app.logger.exception(f"failed to cache object for {iri}")
abort(500)
return ""
class Tasks:
@staticmethod
def cache_object(iri: str) -> None:
p.push(iri, "/task/cache_object")
@staticmethod
def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
p.push(
{"iri": iri, "also_cache_attachments": also_cache_attachments},
"/task/cache_actor",
)
@staticmethod
def post_to_remote_inbox(payload: str, recp: str) -> None:
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
@staticmethod
def forward_activity(iri: str) -> None:
p.push(iri, "/task/forward_activity")
@staticmethod
def fetch_og_meta(iri: str) -> None:
p.push(iri, "/task/fetch_og_meta")
@staticmethod
def process_new_activity(iri: str) -> None:
p.push(iri, "/task/process_new_activity")
@staticmethod
def cache_attachments(iri: str) -> None:
p.push(iri, "/task/cache_attachments")
@staticmethod
def finish_post_to_inbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_inbox")
@staticmethod
def finish_post_to_outbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_outbox")
@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
def task_finish_post_to_outbox():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
recipients = activity.recipients()
if activity.has_type(ap.ActivityType.DELETE):
back.outbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.outbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.outbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.outbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.outbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_following(MY_PERSON, obj)
app.logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
DB.cache2.remove()
payload = json.dumps(activity)
for recp in recipients:
app.logger.debug(f"posting to {recp}")
Tasks.post_to_remote_inbox(payload, recp)
except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"no retry")
except Exception:
app.logger.exception(f"failed to post to remote inbox for {iri}")
abort(500)
@app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901
def task_finish_post_to_inbox():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.DELETE):
back.inbox_delete(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.UPDATE):
back.inbox_update(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.CREATE):
back.inbox_create(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_announce(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.LIKE):
back.inbox_like(MY_PERSON, activity)
elif activity.has_type(ap.ActivityType.FOLLOW):
# Reply to a Follow with an Accept
accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True))
post_to_outbox(accept)
elif activity.has_type(ap.ActivityType.UNDO):
obj = activity.get_object()
if obj.has_type(ap.ActivityType.LIKE):
back.inbox_undo_like(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_follower(MY_PERSON, obj)
try:
invalidate_cache(activity)
except Exception:
app.logger.exception("failed to invalidate cache")
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
app.logger.exception(f"no retry")
except Exception:
app.logger.exception(f"failed to cache attachments for {iri}")
abort(500)
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if back.outbox_is_blocked(MY_PERSON, actor.id):
app.logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
if back.inbox_check_duplicate(MY_PERSON, activity.id):
# The activity is already in the inbox
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
back.save(Box.INBOX, activity)
Tasks.process_new_activity(activity.id)
app.logger.info(f"spawning task for {activity!r}")
Tasks.finish_post_to_inbox(activity.id)
def invalidate_cache(activity):
if activity.has_type(ap.ActivityType.LIKE):
if activity.get_object().id.startswith(BASE_URL):
DB.cache2.remove()
elif activity.has_type(ap.ActivityType.ANNOUNCE):
if activity.get_object().id.startswith(BASE_URL):
DB.cache2.remove()
elif activity.has_type(ap.ActivityType.UNDO):
DB.cache2.remove()
elif activity.has_type(ap.ActivityType.DELETE):
# TODO(tsileo): only invalidate if it's a delete of a reply
DB.cache2.remove()
elif activity.has_type(ap.ActivityType.UPDATE):
DB.cache2.remove()
elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
if not note.inReplyTo or note.inReplyTo.startswith(ID):
DB.cache2.remove()
# FIXME(tsileo): check if it's a reply of a reply
@app.route("/task/cache_attachments", methods=["POST"])
def task_cache_attachments():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
# Generates thumbnails for the actor's icon and the attachments if any
actor = activity.get_actor()
# Update the cached actor
DB.actors.update_one(
{"remote_id": iri},
{"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
upsert=True,
)
if actor.icon:
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
if activity.has_type(ap.ActivityType.CREATE):
for attachment in activity.get_object()._data.get("attachment", []):
if (
attachment.get("mediaType", "").startswith("image/")
or attachment.get("type") == ap.ActivityType.IMAGE.value
):
try:
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
except ValueError:
app.logger.exception(f"failed to cache {attachment}")
app.logger.info(f"attachments cached for {iri}")
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
app.logger.exception(f"dropping activity {iri}, no attachment caching")
except Exception:
app.logger.exception(f"failed to cache attachments for {iri}")
abort(500)
@app.route("/task/cache_actor", methods=["POST"])
def task_cache_actor():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri, also_cache_attachments = (
task.payload["iri"],
task.payload.get("also_cache_attachments", True),
)
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
if activity.has_type(ap.ActivityType.CREATE):
Tasks.fetch_og_metadata(iri)
if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]):
Tasks.cache_object(iri)
actor = activity.get_actor()
cache_actor_with_inbox = False
if activity.has_type(ap.ActivityType.FOLLOW):
if actor.id != ID:
# It's a Follow from the Inbox
cache_actor_with_inbox = True
else:
# It's a new following, cache the "object" (which is the actor we follow)
DB.activities.update_one(
{"remote_id": iri},
{
"$set": {
"meta.object": activitypub._actor_to_meta(
activity.get_object()
)
}
},
)
# Cache the actor info
DB.activities.update_one(
{"remote_id": iri},
{
"$set": {
"meta.actor": activitypub._actor_to_meta(
actor, cache_actor_with_inbox
)
}
},
)
app.logger.info(f"actor cached for {iri}")
if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE):
Tasks.cache_attachments(iri)
except (ActivityGoneError, ActivityNotFoundError):
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
app.logger.exception(f"flagging activity {iri} as deleted, no actor caching")
except Exception:
app.logger.exception(f"failed to cache actor for {iri}")
abort(500)
@app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901
def task_process_new_activity():
"""Process an activity received in the inbox"""
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
app.logger.info(f"activity={activity!r}")
# Is the activity expected?
# following = ap.get_backend().following()
should_forward = False
should_delete = False
tag_stream = False
if activity.has_type(ap.ActivityType.ANNOUNCE):
try:
activity.get_object()
tag_stream = True
except (NotAnActivityError, BadActivityError):
app.logger.exception(f"failed to get announce object for {activity!r}")
# Most likely on OStatus notice
tag_stream = False
should_delete = True
except (ActivityGoneError, ActivityNotFoundError):
# The announced activity is deleted/gone, drop it
should_delete = True
elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
# Make the note part of the stream if it's not a reply, or if it's a local reply
if not note.inReplyTo or note.inReplyTo.startswith(ID):
tag_stream = True
if note.inReplyTo:
try:
reply = ap.fetch_remote_activity(note.inReplyTo)
if (
reply.id.startswith(ID) or reply.has_mention(ID)
) and activity.is_public():
# The reply is public "local reply", forward the reply (i.e. the original activity) to the
# original recipients
should_forward = True
except NotAnActivityError:
# Most likely a reply to an OStatus notce
should_delete = True
# (partial) Ghost replies handling
# [X] This is the first time the server has seen this Activity.
should_forward = False
local_followers = ID + "/followers"
for field in ["to", "cc"]:
if field in activity._data:
if local_followers in activity._data[field]:
# [X] The values of to, cc, and/or audience contain a Collection owned by the server.
should_forward = True
# [X] The values of inReplyTo, object, target and/or tag are objects owned by the server
if not (note.inReplyTo and note.inReplyTo.startswith(ID)):
should_forward = False
elif activity.has_type(ap.ActivityType.DELETE):
note = DB.activities.find_one(
{"activity.object.id": activity.get_object().id}
)
if note and note["meta"].get("forwarded", False):
# If the activity was originally forwarded, forward the delete too
should_forward = True
elif activity.has_type(ap.ActivityType.LIKE):
if not activity.get_object_id().startswith(BASE_URL):
# We only want to keep a like if it's a like for a local activity
# (Pleroma relay the likes it received, we don't want to store them)
should_delete = True
if should_forward:
app.logger.info(f"will forward {activity!r} to followers")
Tasks.forward_activity(activity.id)
if should_delete:
app.logger.info(f"will soft delete {activity!r}")
app.logger.info(f"{iri} tag_stream={tag_stream}")
DB.activities.update_one(
{"remote_id": activity.id},
{
"$set": {
"meta.stream": tag_stream,
"meta.forwarded": should_forward,
"meta.deleted": should_delete,
}
},
)
app.logger.info(f"new activity {iri} processed")
if not should_delete and not activity.has_type(ap.ActivityType.DELETE):
Tasks.cache_actor(iri)
except (ActivityGoneError, ActivityNotFoundError):
app.logger.log.exception(f"dropping activity {iri}, skip processing")
except Exception:
app.logger.exception(f"failed to process new activity {iri}")
abort(500)
@app.route("/task/forward_activity")
def task_forward_activity():
task = p.parse(request)
app.logger.info(f"task={task!r}")
iri = task.payload
try:
activity = ap.fetch_remote_activity(iri)
recipients = back.followers_as_recipients()
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
app.logger.debug(f"forwarding {activity!r} to {recp}")
Tasks.post_to_remote_inbox(payload, recp)
except Exception:
app.logger.exception("task failed")
abort(500)
@app.route("/task/post_to_remote_inbox")
def task_post_to_remote_inbox():
task = p.parse(request)
app.logger.info(f"task={task!r}")
payload, to = task.payload["payload"], task.payload["to"]
try:
app.logger.info("payload=%s", payload)
app.logger.info("generating sig")
signed_payload = json.loads(payload)
# Don't overwrite the signature if we're forwarding an activity
if "signature" not in signed_payload:
generate_signature(signed_payload, KEY)
app.logger.info("to=%s", to)
resp = requests.post(
to,
data=json.dumps(signed_payload),
auth=SIG_AUTH,
headers={
"Content-Type": HEADERS[1],
"Accept": HEADERS[1],
"User-Agent": USER_AGENT,
},
)
app.logger.info("resp=%s", resp)
app.logger.info("resp_body=%s", resp.text)
resp.raise_for_status()
except HTTPError as err:
app.logger.exception("request failed")
if 400 >= err.response.status_code >= 499:
app.logger.info("client error, no retry")
return ""
abort(500)

View file

@ -105,12 +105,17 @@ MEDIA_CACHE = MediaCache(GRIDFS, USER_AGENT)
def create_indexes():
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
DB.activities.create_index([
("activity.object.id", pymongo.ASCENDING),
("meta.deleted", pymongo.ASCENDING),
])
DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)])
DB.cache2.create_index("date", expireAfterSeconds=3600*12)
DB.activities.create_index(
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
)
DB.cache2.create_index(
[
("path", pymongo.ASCENDING),
("type", pymongo.ASCENDING),
("arg", pymongo.ASCENDING),
]
)
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
# Index for the block query
DB.activities.create_index(

View file

@ -4,9 +4,6 @@ services:
image: 'microblogpub:latest'
ports:
- "${WEB_PORT}:5005"
links:
- mongo
- rmq
volumes:
- "${CONFIG_DIR}:/app/config"
- "./static:/app/static"
@ -14,12 +11,10 @@ services:
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
- MICROBLOGPUB_DEBUG=1
- POUSSETACHES_AUTH_KEY=123
celery:
# image: "instance1_web"
image: 'microblogpub:latest'
links:
- mongo
- rmq
command: 'celery worker -l info -A tasks'
volumes:
- "${CONFIG_DIR}:/app/config"
@ -35,6 +30,10 @@ services:
environment:
- RABBITMQ_ERLANG_COOKIE=secretrabbit
- RABBITMQ_NODENAME=rabbit@my-rabbit
poussetaches:
image: "poussetaches:latest"
environment:
- POUSSETACHES_AUTH_KEY=123
networks:
default:
name: microblogpubfede

View file

@ -7,12 +7,14 @@ services:
links:
- mongo
- rmq
- poussetaches
volumes:
- "${CONFIG_DIR}:/app/config"
- "./static:/app/static"
environment:
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
- POUSSETACHES_AUTH_KEY=123
celery:
image: 'microblogpub:latest'
links:
@ -36,3 +38,7 @@ services:
- RABBITMQ_NODENAME=rabbit@my-rabbit
volumes:
- "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq"
poussetaches:
image: "poussetaches:latest"
environment:
- POUSSETACHES_AUTH_KEY=123

48
poussetaches.py Normal file
View file

@ -0,0 +1,48 @@
import base64
import json
import os
from typing import Any
from dataclasses import dataclass
import flask
import requests
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@dataclass
class Task:
req_id: str
tries: int
payload: Any
class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url
self.base_url = base_url
def push(self, payload: Any, path: str, expected=200) -> str:
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
json={"url": self.base_url + path, "payload": p, "expected": expected},
)
resp.raise_for_status()
return resp.headers.get("Poussetaches-Task-ID")
def parse(self, req: flask.Request) -> Task:
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
raise ValueError("Bad auth key")
# Parse the "envelope"
envelope = json.loads(req.data)
print(req)
print(f"envelope={envelope!r}")
payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload)

View file

@ -339,6 +339,7 @@ def invalidate_cache(activity):
DB.cache2.remove()
# FIXME(tsileo): check if it's a reply of a reply
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
def finish_post_to_inbox(self, iri: str) -> None:
try: