parent
8a57d0dfda
commit
5811583163
4 changed files with 96 additions and 76 deletions
3
Makefile
3
Makefile
|
@ -18,9 +18,6 @@ reload-dev:
|
||||||
# docker build . -t microblogpub:latest
|
# docker build . -t microblogpub:latest
|
||||||
docker-compose -f docker-compose-dev.yml up -d --force-recreate
|
docker-compose -f docker-compose-dev.yml up -d --force-recreate
|
||||||
|
|
||||||
update-poussetaches:
|
|
||||||
git clone https://github.com/tsileo/poussetaches.git tmp_poussetaches && cd tmp_poussetaches && docker build . -t poussetaches:latest && cd - && rm -rf tmp_poussetaches
|
|
||||||
|
|
||||||
update:
|
update:
|
||||||
git pull
|
git pull
|
||||||
docker build . -t microblogpub:latest
|
docker build . -t microblogpub:latest
|
||||||
|
|
136
activitypub.py
136
activitypub.py
|
@ -1,5 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
@ -78,52 +79,6 @@ class Box(Enum):
|
||||||
REPLIES = "replies"
|
REPLIES = "replies"
|
||||||
|
|
||||||
|
|
||||||
def save(box: Box, activity: ap.BaseActivity) -> None:
|
|
||||||
"""Custom helper for saving an activity to the DB."""
|
|
||||||
DB.activities.insert_one(
|
|
||||||
{
|
|
||||||
"box": box.value,
|
|
||||||
"activity": activity.to_dict(),
|
|
||||||
"type": _to_list(activity.type),
|
|
||||||
"remote_id": activity.id,
|
|
||||||
"meta": {"undo": False, "deleted": False},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def followers() -> List[str]:
|
|
||||||
q = {
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap.ActivityType.FOLLOW.value,
|
|
||||||
"meta.undo": False,
|
|
||||||
}
|
|
||||||
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
|
|
||||||
|
|
||||||
|
|
||||||
def following() -> List[str]:
|
|
||||||
q = {
|
|
||||||
"box": Box.OUTBOX.value,
|
|
||||||
"type": ap.ActivityType.FOLLOW.value,
|
|
||||||
"meta.undo": False,
|
|
||||||
}
|
|
||||||
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
|
|
||||||
|
|
||||||
|
|
||||||
def followers_as_recipients() -> List[str]:
|
|
||||||
q = {
|
|
||||||
"box": Box.INBOX.value,
|
|
||||||
"type": ap.ActivityType.FOLLOW.value,
|
|
||||||
"meta.undo": False,
|
|
||||||
}
|
|
||||||
recipients = []
|
|
||||||
for doc in DB.activities.find(q):
|
|
||||||
recipients.append(
|
|
||||||
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
|
|
||||||
)
|
|
||||||
|
|
||||||
return list(set(recipients))
|
|
||||||
|
|
||||||
|
|
||||||
class MicroblogPubBackend(Backend):
|
class MicroblogPubBackend(Backend):
|
||||||
"""Implements a Little Boxes backend, backed by MongoDB."""
|
"""Implements a Little Boxes backend, backed by MongoDB."""
|
||||||
|
|
||||||
|
@ -149,18 +104,73 @@ class MicroblogPubBackend(Backend):
|
||||||
"""URL for activity link."""
|
"""URL for activity link."""
|
||||||
return f"{BASE_URL}/note/{obj_id}"
|
return f"{BASE_URL}/note/{obj_id}"
|
||||||
|
|
||||||
|
def save(self, box: Box, activity: ap.BaseActivity) -> None:
|
||||||
|
"""Custom helper for saving an activity to the DB."""
|
||||||
|
DB.activities.insert_one(
|
||||||
|
{
|
||||||
|
"box": box.value,
|
||||||
|
"activity": activity.to_dict(),
|
||||||
|
"type": _to_list(activity.type),
|
||||||
|
"remote_id": activity.id,
|
||||||
|
"meta": {"undo": False, "deleted": False},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def followers(self) -> List[str]:
|
||||||
|
q = {
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap.ActivityType.FOLLOW.value,
|
||||||
|
"meta.undo": False,
|
||||||
|
}
|
||||||
|
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
|
||||||
|
|
||||||
|
def followers_as_recipients(self) -> List[str]:
|
||||||
|
q = {
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap.ActivityType.FOLLOW.value,
|
||||||
|
"meta.undo": False,
|
||||||
|
}
|
||||||
|
recipients = []
|
||||||
|
for doc in DB.activities.find(q):
|
||||||
|
recipients.append(
|
||||||
|
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
|
||||||
|
)
|
||||||
|
|
||||||
|
return list(set(recipients))
|
||||||
|
|
||||||
|
def following(self) -> List[str]:
|
||||||
|
q = {
|
||||||
|
"box": Box.OUTBOX.value,
|
||||||
|
"type": ap.ActivityType.FOLLOW.value,
|
||||||
|
"meta.undo": False,
|
||||||
|
}
|
||||||
|
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
|
||||||
|
|
||||||
def parse_collection(
|
def parse_collection(
|
||||||
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
|
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
|
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
|
||||||
# Resolve internal collections via MongoDB directly
|
# Resolve internal collections via MongoDB directly
|
||||||
if url == ID + "/followers":
|
if url == ID + "/followers":
|
||||||
return followers()
|
return self.followers()
|
||||||
elif url == ID + "/following":
|
elif url == ID + "/following":
|
||||||
return following()
|
return self.following()
|
||||||
|
|
||||||
return super().parse_collection(payload, url)
|
return super().parse_collection(payload, url)
|
||||||
|
|
||||||
|
@ensure_it_is_me
|
||||||
|
def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool:
|
||||||
|
return bool(
|
||||||
|
DB.activities.find_one(
|
||||||
|
{
|
||||||
|
"box": Box.OUTBOX.value,
|
||||||
|
"type": ap.ActivityType.BLOCK.value,
|
||||||
|
"activity.object": actor_id,
|
||||||
|
"meta.undo": False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def _fetch_iri(self, iri: str) -> ap.ObjectType:
|
def _fetch_iri(self, iri: str) -> ap.ObjectType:
|
||||||
if iri == ME["id"]:
|
if iri == ME["id"]:
|
||||||
return ME
|
return ME
|
||||||
|
@ -219,6 +229,13 @@ class MicroblogPubBackend(Backend):
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ensure_it_is_me
|
||||||
|
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
|
||||||
|
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
|
||||||
|
|
||||||
|
def set_post_to_remote_inbox(self, cb):
|
||||||
|
self.post_to_remote_inbox_cb = cb
|
||||||
|
|
||||||
@ensure_it_is_me
|
@ensure_it_is_me
|
||||||
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
|
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
|
||||||
DB.activities.update_one(
|
DB.activities.update_one(
|
||||||
|
@ -454,7 +471,7 @@ class MicroblogPubBackend(Backend):
|
||||||
)
|
)
|
||||||
if not creply:
|
if not creply:
|
||||||
# It means the activity is not in the inbox, and not in the outbox, we want to save it
|
# It means the activity is not in the inbox, and not in the outbox, we want to save it
|
||||||
save(Box.REPLIES, reply)
|
self.save(Box.REPLIES, reply)
|
||||||
new_threads.append(reply.id)
|
new_threads.append(reply.id)
|
||||||
|
|
||||||
while reply is not None:
|
while reply is not None:
|
||||||
|
@ -465,7 +482,7 @@ class MicroblogPubBackend(Backend):
|
||||||
reply = ap.fetch_remote_activity(root_reply)
|
reply = ap.fetch_remote_activity(root_reply)
|
||||||
q = {"activity.object.id": root_reply}
|
q = {"activity.object.id": root_reply}
|
||||||
if not DB.activities.count(q):
|
if not DB.activities.count(q):
|
||||||
save(Box.REPLIES, reply)
|
self.save(Box.REPLIES, reply)
|
||||||
new_threads.append(reply.id)
|
new_threads.append(reply.id)
|
||||||
|
|
||||||
DB.activities.update_one(
|
DB.activities.update_one(
|
||||||
|
@ -476,6 +493,25 @@ class MicroblogPubBackend(Backend):
|
||||||
{"$set": {"meta.thread_root_parent": root_reply}},
|
{"$set": {"meta.thread_root_parent": root_reply}},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def post_to_outbox(self, activity: ap.BaseActivity) -> None:
|
||||||
|
if activity.has_type(ap.CREATE_TYPES):
|
||||||
|
activity = activity.build_create()
|
||||||
|
|
||||||
|
self.save(Box.OUTBOX, activity)
|
||||||
|
|
||||||
|
# Assign create a random ID
|
||||||
|
obj_id = self.random_object_id()
|
||||||
|
activity.set_id(self.activity_url(obj_id), obj_id)
|
||||||
|
|
||||||
|
recipients = activity.recipients()
|
||||||
|
logger.info(f"recipients={recipients}")
|
||||||
|
activity = ap.clean_activity(activity.to_dict())
|
||||||
|
|
||||||
|
payload = json.dumps(activity)
|
||||||
|
for recp in recipients:
|
||||||
|
logger.debug(f"posting to {recp}")
|
||||||
|
self.post_to_remote_inbox(self.get_actor(), payload, recp)
|
||||||
|
|
||||||
|
|
||||||
def gen_feed():
|
def gen_feed():
|
||||||
fg = FeedGenerator()
|
fg = FeedGenerator()
|
||||||
|
|
27
app.py
27
app.py
|
@ -66,8 +66,6 @@ import config
|
||||||
import tasks # noqa: here just for the migration # FIXME(tsileo): remove me
|
import tasks # noqa: here just for the migration # FIXME(tsileo): remove me
|
||||||
from activitypub import Box
|
from activitypub import Box
|
||||||
from activitypub import embed_collection
|
from activitypub import embed_collection
|
||||||
from activitypub import save
|
|
||||||
from activitypub import followers_as_recipients
|
|
||||||
from config import USER_AGENT
|
from config import USER_AGENT
|
||||||
from config import ADMIN_API_KEY
|
from config import ADMIN_API_KEY
|
||||||
from config import BASE_URL
|
from config import BASE_URL
|
||||||
|
@ -1645,7 +1643,7 @@ def inbox():
|
||||||
data["object"]
|
data["object"]
|
||||||
):
|
):
|
||||||
logger.info(f"received a Delete for an actor {data!r}")
|
logger.info(f"received a Delete for an actor {data!r}")
|
||||||
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": data["id"]}):
|
if get_backend().inbox_check_duplicate(MY_PERSON, data["id"]):
|
||||||
# The activity is already in the inbox
|
# The activity is already in the inbox
|
||||||
logger.info(f"received duplicate activity {data!r}, dropping it")
|
logger.info(f"received duplicate activity {data!r}, dropping it")
|
||||||
|
|
||||||
|
@ -2297,9 +2295,7 @@ def task_finish_post_to_outbox():
|
||||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||||
back.outbox_undo_announce(MY_PERSON, obj)
|
back.outbox_undo_announce(MY_PERSON, obj)
|
||||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||||
DB.activities.update_one(
|
back.undo_new_following(MY_PERSON, obj)
|
||||||
{"remote_id": obj.id}, {"$set": {"meta.undo": True}}
|
|
||||||
)
|
|
||||||
|
|
||||||
app.logger.info(f"recipients={recipients}")
|
app.logger.info(f"recipients={recipients}")
|
||||||
activity = ap.clean_activity(activity.to_dict())
|
activity = ap.clean_activity(activity.to_dict())
|
||||||
|
@ -2349,9 +2345,7 @@ def task_finish_post_to_inbox():
|
||||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||||
back.inbox_undo_announce(MY_PERSON, obj)
|
back.inbox_undo_announce(MY_PERSON, obj)
|
||||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||||
DB.activities.update_one(
|
back.undo_new_follower(MY_PERSON, obj)
|
||||||
{"remote_id": obj.id}, {"$set": {"meta.undo": True}}
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
invalidate_cache(activity)
|
invalidate_cache(activity)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -2373,7 +2367,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||||
obj_id = back.random_object_id()
|
obj_id = back.random_object_id()
|
||||||
activity.set_id(back.activity_url(obj_id), obj_id)
|
activity.set_id(back.activity_url(obj_id), obj_id)
|
||||||
|
|
||||||
save(Box.OUTBOX, activity)
|
back.save(Box.OUTBOX, activity)
|
||||||
Tasks.cache_actor(activity.id)
|
Tasks.cache_actor(activity.id)
|
||||||
Tasks.finish_post_to_outbox(activity.id)
|
Tasks.finish_post_to_outbox(activity.id)
|
||||||
return activity.id
|
return activity.id
|
||||||
|
@ -2382,14 +2376,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||||
def post_to_inbox(activity: ap.BaseActivity) -> None:
|
def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||||
# Check for Block activity
|
# Check for Block activity
|
||||||
actor = activity.get_actor()
|
actor = activity.get_actor()
|
||||||
if DB.activities.find_one(
|
if back.outbox_is_blocked(MY_PERSON, actor.id):
|
||||||
{
|
|
||||||
"box": Box.OUTBOX.value,
|
|
||||||
"type": ap.ActivityType.BLOCK.value,
|
|
||||||
"activity.object": actor.id,
|
|
||||||
"meta.undo": False,
|
|
||||||
}
|
|
||||||
):
|
|
||||||
app.logger.info(
|
app.logger.info(
|
||||||
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
|
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
|
||||||
)
|
)
|
||||||
|
@ -2399,7 +2386,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||||
# The activity is already in the inbox
|
# The activity is already in the inbox
|
||||||
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
|
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
|
||||||
|
|
||||||
save(Box.INBOX, activity)
|
back.save(Box.INBOX, activity)
|
||||||
Tasks.process_new_activity(activity.id)
|
Tasks.process_new_activity(activity.id)
|
||||||
|
|
||||||
app.logger.info(f"spawning task for {activity!r}")
|
app.logger.info(f"spawning task for {activity!r}")
|
||||||
|
@ -2667,7 +2654,7 @@ def task_forward_activity():
|
||||||
iri = task.payload
|
iri = task.payload
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
recipients = followers_as_recipients()
|
recipients = back.followers_as_recipients()
|
||||||
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
|
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
|
||||||
activity = ap.clean_activity(activity.to_dict())
|
activity = ap.clean_activity(activity.to_dict())
|
||||||
payload = json.dumps(activity)
|
payload = json.dumps(activity)
|
||||||
|
|
6
tasks.py
6
tasks.py
|
@ -312,7 +312,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||||
# The activity is already in the inbox
|
# The activity is already in the inbox
|
||||||
log.info(f"received duplicate activity {activity!r}, dropping it")
|
log.info(f"received duplicate activity {activity!r}, dropping it")
|
||||||
|
|
||||||
activitypub.save(Box.INBOX, activity)
|
back.save(Box.INBOX, activity)
|
||||||
process_new_activity.delay(activity.id)
|
process_new_activity.delay(activity.id)
|
||||||
|
|
||||||
log.info(f"spawning task for {activity!r}")
|
log.info(f"spawning task for {activity!r}")
|
||||||
|
@ -387,7 +387,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||||
obj_id = back.random_object_id()
|
obj_id = back.random_object_id()
|
||||||
activity.set_id(back.activity_url(obj_id), obj_id)
|
activity.set_id(back.activity_url(obj_id), obj_id)
|
||||||
|
|
||||||
activitypub.save(Box.OUTBOX, activity)
|
back.save(Box.OUTBOX, activity)
|
||||||
cache_actor.delay(activity.id)
|
cache_actor.delay(activity.id)
|
||||||
finish_post_to_outbox.delay(activity.id)
|
finish_post_to_outbox.delay(activity.id)
|
||||||
return activity.id
|
return activity.id
|
||||||
|
@ -440,7 +440,7 @@ def finish_post_to_outbox(self, iri: str) -> None:
|
||||||
def forward_activity(self, iri: str) -> None:
|
def forward_activity(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
recipients = activitypub.followers_as_recipients()
|
recipients = back.followers_as_recipients()
|
||||||
log.debug(f"Forwarding {activity!r} to {recipients}")
|
log.debug(f"Forwarding {activity!r} to {recipients}")
|
||||||
activity = ap.clean_activity(activity.to_dict())
|
activity = ap.clean_activity(activity.to_dict())
|
||||||
payload = json.dumps(activity)
|
payload = json.dumps(activity)
|
||||||
|
|
Loading…
Reference in a new issue