microblog.pub/activitypub.py

1148 lines
40 KiB
Python
Raw Normal View History

2018-05-27 07:21:06 -05:00
import logging
2018-05-18 13:41:41 -05:00
import json
import binascii
import os
from datetime import datetime
from enum import Enum
from bson.objectid import ObjectId
from html2text import html2text
from feedgen.feed import FeedGenerator
from utils.linked_data_sig import generate_signature
from utils.actor_service import NotAnActorError
2018-05-27 15:30:43 -05:00
from utils.errors import BadActivityError, UnexpectedActivityTypeError
from utils import activitypub_utils
2018-05-18 13:41:41 -05:00
from config import USERNAME, BASE_URL, ID
from config import CTX_AS, CTX_SECURITY, AS_PUBLIC
from config import KEY, DB, ME, ACTOR_SERVICE
from config import OBJECT_SERVICE
from config import PUBLIC_INSTANCES
import tasks
from typing import List, Optional, Dict, Any, Union
2018-05-18 13:41:41 -05:00
2018-05-27 07:21:06 -05:00
logger = logging.getLogger(__name__)
2018-05-27 15:30:43 -05:00
# Helper/shortcut for typing
2018-05-18 13:41:41 -05:00
ObjectType = Dict[str, Any]
ObjectOrIDType = Union[str, ObjectType]
2018-05-27 13:40:42 -05:00
class ActivityType(Enum):
2018-05-27 15:30:43 -05:00
"""Supported activity `type`."""
2018-05-18 13:41:41 -05:00
ANNOUNCE = 'Announce'
BLOCK = 'Block'
LIKE = 'Like'
CREATE = 'Create'
UPDATE = 'Update'
PERSON = 'Person'
ORDERED_COLLECTION = 'OrderedCollection'
ORDERED_COLLECTION_PAGE = 'OrderedCollectionPage'
COLLECTION_PAGE = 'CollectionPage'
COLLECTION = 'Collection'
NOTE = 'Note'
ACCEPT = 'Accept'
REJECT = 'Reject'
FOLLOW = 'Follow'
DELETE = 'Delete'
UNDO = 'Undo'
IMAGE = 'Image'
TOMBSTONE = 'Tombstone'
def random_object_id() -> str:
2018-05-27 15:30:43 -05:00
"""Generates a random object ID."""
2018-05-18 13:41:41 -05:00
return binascii.hexlify(os.urandom(8)).decode('utf-8')
def _remove_id(doc: ObjectType) -> ObjectType:
2018-05-27 15:30:43 -05:00
"""Helper for removing MongoDB's `_id` field."""
2018-05-18 13:41:41 -05:00
doc = doc.copy()
if '_id' in doc:
del(doc['_id'])
return doc
def _to_list(data: Union[List[Any], Any]) -> List[Any]:
2018-05-27 15:30:43 -05:00
"""Helper to convert fields that can be either an object or a list of objects to a list of object."""
2018-05-18 13:41:41 -05:00
if isinstance(data, list):
return data
return [data]
def clean_activity(activity: ObjectType) -> Dict[str, Any]:
2018-05-27 15:30:43 -05:00
"""Clean the activity before rendering it.
- Remove the hidden bco and bcc field
"""
2018-05-18 13:41:41 -05:00
for field in ['bto', 'bcc']:
if field in activity:
del(activity[field])
if activity['type'] == 'Create' and field in activity['object']:
del(activity['object'][field])
return activity
def _get_actor_id(actor: ObjectOrIDType) -> str:
2018-05-27 15:30:43 -05:00
"""Helper for retrieving an actor `id`."""
2018-05-18 13:41:41 -05:00
if isinstance(actor, dict):
return actor['id']
return actor
class BaseActivity(object):
2018-05-27 15:30:43 -05:00
"""Base class for ActivityPub activities."""
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE: Optional[ActivityType] = None
ALLOWED_OBJECT_TYPES: List[ActivityType] = []
2018-05-18 13:41:41 -05:00
def __init__(self, **kwargs) -> None:
2018-05-27 15:30:43 -05:00
# Ensure the class has an activity type defined
2018-05-18 13:41:41 -05:00
if not self.ACTIVITY_TYPE:
2018-05-27 15:30:43 -05:00
raise BadActivityError('Missing ACTIVITY_TYPE')
2018-05-18 13:41:41 -05:00
2018-05-27 15:30:43 -05:00
# Ensure the activity has a type and a valid one
2018-05-18 13:41:41 -05:00
if kwargs.get('type') is not None and kwargs.pop('type') != self.ACTIVITY_TYPE.value:
2018-05-27 15:30:43 -05:00
raise UnexpectedActivityTypeError('Expect the type to be {}'.format(self.ACTIVITY_TYPE))
2018-05-18 13:41:41 -05:00
2018-05-27 15:30:43 -05:00
# Initialize the object
self._data: Dict[str, Any] = {'type': self.ACTIVITY_TYPE.value}
2018-05-29 11:59:37 -05:00
logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs}')
2018-05-18 13:41:41 -05:00
if 'id' in kwargs:
self._data['id'] = kwargs.pop('id')
2018-05-27 13:40:42 -05:00
if self.ACTIVITY_TYPE != ActivityType.PERSON:
2018-05-18 13:41:41 -05:00
actor = kwargs.get('actor')
if actor:
kwargs.pop('actor')
actor = self._validate_person(actor)
self._data['actor'] = actor
else:
if not self.NO_CONTEXT and self.ACTIVITY_TYPE != ActivityType.TOMBSTONE:
2018-05-18 13:41:41 -05:00
actor = ID
self._data['actor'] = actor
if 'object' in kwargs:
obj = kwargs.pop('object')
if isinstance(obj, str):
self._data['object'] = obj
else:
if not self.ALLOWED_OBJECT_TYPES:
2018-05-27 15:30:43 -05:00
raise UnexpectedActivityTypeError('unexpected object')
2018-05-27 13:40:42 -05:00
if 'type' not in obj or (self.ACTIVITY_TYPE != ActivityType.CREATE and 'id' not in obj):
2018-05-27 15:30:43 -05:00
raise BadActivityError('invalid object, missing type')
2018-05-27 13:40:42 -05:00
if ActivityType(obj['type']) not in self.ALLOWED_OBJECT_TYPES:
2018-05-27 15:30:43 -05:00
raise UnexpectedActivityTypeError(
f'unexpected object type {obj["type"]} (allowed={self.ALLOWED_OBJECT_TYPES})'
)
2018-05-18 13:41:41 -05:00
self._data['object'] = obj
if '@context' not in kwargs:
if not self.NO_CONTEXT:
self._data['@context'] = CTX_AS
else:
self._data['@context'] = kwargs.pop('@context')
# @context check
if not self.NO_CONTEXT:
if not isinstance(self._data['@context'], list):
self._data['@context'] = [self._data['@context']]
if CTX_SECURITY not in self._data['@context']:
2018-05-18 13:41:41 -05:00
self._data['@context'].append(CTX_SECURITY)
if isinstance(self._data['@context'][-1], dict):
self._data['@context'][-1]['Hashtag'] = 'as:Hashtag'
self._data['@context'][-1]['sensitive'] = 'as:sensitive'
else:
self._data['@context'].append({'Hashtag': 'as:Hashtag', 'sensitive': 'as:sensitive'})
allowed_keys = None
try:
allowed_keys = self._init(**kwargs)
2018-05-27 15:30:43 -05:00
logger.debug('calling custom init')
2018-05-18 13:41:41 -05:00
except NotImplementedError:
pass
if allowed_keys:
# Allows an extra to (like for Accept and Follow)
kwargs.pop('to', None)
if len(set(kwargs.keys()) - set(allowed_keys)) > 0:
2018-05-27 15:30:43 -05:00
raise BadActivityError('extra data left: {}'.format(kwargs))
2018-05-18 13:41:41 -05:00
else:
2018-05-20 15:02:48 -05:00
# Remove keys with `None` value
valid_kwargs = {}
for k, v in kwargs.items():
if v is None:
break
valid_kwargs[k] = v
self._data.update(**valid_kwargs)
2018-05-18 13:41:41 -05:00
def _init(self, **kwargs) -> Optional[List[str]]:
raise NotImplementedError
def _verify(self) -> None:
raise NotImplementedError
def verify(self) -> None:
try:
self._verify()
except NotImplementedError:
pass
def __repr__(self) -> str:
return '{}({!r})'.format(self.__class__.__qualname__, self._data.get('id'))
def __str__(self) -> str:
2018-05-27 15:30:43 -05:00
return str(self._data.get('id', f'[new {self.ACTIVITY_TYPE} activity]'))
2018-05-18 13:41:41 -05:00
def __getattr__(self, name: str) -> Any:
if self._data.get(name):
return self._data.get(name)
@property
2018-05-27 13:40:42 -05:00
def type_enum(self) -> ActivityType:
return ActivityType(self.type)
2018-05-18 13:41:41 -05:00
2018-05-20 15:02:48 -05:00
def _set_id(self, uri: str, obj_id: str) -> None:
raise NotImplementedError
2018-05-18 13:41:41 -05:00
def set_id(self, uri: str, obj_id: str) -> None:
2018-05-27 15:30:43 -05:00
logger.debug(f'setting ID {uri} / {obj_id}')
2018-05-18 13:41:41 -05:00
self._data['id'] = uri
2018-05-20 15:02:48 -05:00
try:
self._set_id(uri, obj_id)
except NotImplementedError:
pass
2018-05-18 13:41:41 -05:00
def _actor_id(self, obj: ObjectOrIDType) -> str:
2018-05-27 13:40:42 -05:00
if isinstance(obj, dict) and obj['type'] == ActivityType.PERSON.value:
2018-05-18 13:41:41 -05:00
obj_id = obj.get('id')
if not obj_id:
raise ValueError('missing object id')
return obj_id
else:
return str(obj)
def _validate_person(self, obj: ObjectOrIDType) -> str:
obj_id = self._actor_id(obj)
try:
actor = ACTOR_SERVICE.get(obj_id)
except Exception:
return obj_id # FIXME(tsileo): handle this
if not actor:
raise ValueError('Invalid actor')
return actor['id']
def get_object(self) -> 'BaseActivity':
if self.__obj:
return self.__obj
if isinstance(self._data['object'], dict):
p = parse_activity(self._data['object'])
else:
2018-05-27 13:40:42 -05:00
if self.ACTIVITY_TYPE == ActivityType.FOLLOW:
2018-05-18 13:41:41 -05:00
p = Person(**ACTOR_SERVICE.get(self._data['object']))
else:
obj = OBJECT_SERVICE.get(self._data['object'])
2018-05-27 13:40:42 -05:00
if ActivityType(obj.get('type')) not in self.ALLOWED_OBJECT_TYPES:
2018-05-27 15:30:43 -05:00
raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")}')
2018-05-18 13:41:41 -05:00
p = parse_activity(obj)
self.__obj: BaseActivity = p
2018-05-18 13:41:41 -05:00
return p
2018-05-28 12:46:23 -05:00
def to_dict(self, embed: bool = False, embed_object_id_only: bool = False) -> ObjectType:
2018-05-18 13:41:41 -05:00
data = dict(self._data)
if embed:
for k in ['@context', 'signature']:
if k in data:
del(data[k])
2018-05-28 12:46:23 -05:00
if data.get('object') and embed_object_id_only and isinstance(data['object'], dict):
try:
data['object'] = data['object']['id']
except KeyError:
raise BadActivityError('embedded object does not have an id')
return data
2018-05-18 13:41:41 -05:00
def get_actor(self) -> 'BaseActivity':
actor = self._data.get('actor')
if not actor:
2018-05-27 13:40:42 -05:00
if self.type_enum == ActivityType.NOTE:
2018-05-18 13:41:41 -05:00
actor = str(self._data.get('attributedTo'))
else:
raise ValueError('failed to fetch actor')
actor_id = self._actor_id(actor)
return Person(**ACTOR_SERVICE.get(actor_id))
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
raise NotImplementedError
def _undo_outbox(self) -> None:
raise NotImplementedError
def _process_from_inbox(self) -> None:
raise NotImplementedError
def _undo_inbox(self) -> None:
raise NotImplementedError
def _undo_should_purge_cache(self) -> bool:
raise NotImplementedError
def _should_purge_cache(self) -> bool:
raise NotImplementedError
2018-05-18 13:41:41 -05:00
def process_from_inbox(self) -> None:
2018-05-27 15:30:43 -05:00
logger.debug(f'calling main process from inbox hook for {self}')
2018-05-18 13:41:41 -05:00
self.verify()
actor = self.get_actor()
# Check for Block activity
2018-05-27 13:40:42 -05:00
if DB.outbox.find_one({'type': ActivityType.BLOCK.value,
2018-05-18 13:41:41 -05:00
'activity.object': actor.id,
'meta.undo': False}):
2018-05-27 15:30:43 -05:00
logger.info(f'actor {actor} is blocked, dropping the received activity {self}')
2018-05-18 13:41:41 -05:00
return
if DB.inbox.find_one({'remote_id': self.id}):
# The activity is already in the inbox
2018-05-27 15:30:43 -05:00
logger.info(f'received duplicate activity {self}, dropping it')
2018-05-18 13:41:41 -05:00
return
activity = self.to_dict()
DB.inbox.insert_one({
'activity': activity,
'type': self.type,
'remote_id': self.id,
'meta': {'undo': False, 'deleted': False},
})
2018-05-27 15:30:43 -05:00
logger.info('activity {self} saved')
2018-05-18 13:41:41 -05:00
try:
self._process_from_inbox()
2018-05-27 15:30:43 -05:00
logger.debug('called process from inbox hook')
2018-05-18 13:41:41 -05:00
except NotImplementedError:
2018-05-27 15:30:43 -05:00
logger.debug('process from inbox hook not implemented')
2018-05-18 13:41:41 -05:00
def post_to_outbox(self) -> None:
2018-05-27 15:30:43 -05:00
logger.debug(f'calling main post to outbox hook for {self}')
2018-05-18 13:41:41 -05:00
obj_id = random_object_id()
self.set_id(f'{ID}/outbox/{obj_id}', obj_id)
self.verify()
activity = self.to_dict()
DB.outbox.insert_one({
'id': obj_id,
'activity': activity,
'type': self.type,
'remote_id': self.id,
'meta': {'undo': False, 'deleted': False},
})
recipients = self.recipients()
2018-05-27 15:30:43 -05:00
logger.info(f'recipients={recipients}')
2018-05-18 13:41:41 -05:00
activity = clean_activity(activity)
try:
self._post_to_outbox(obj_id, activity, recipients)
2018-05-27 15:30:43 -05:00
logger.debug(f'called post to outbox hook')
2018-05-18 13:41:41 -05:00
except NotImplementedError:
2018-05-27 15:47:33 -05:00
logger.debug('post to outbox hook not implemented')
2018-05-18 13:41:41 -05:00
generate_signature(activity, KEY.privkey)
payload = json.dumps(activity)
for recp in recipients:
2018-05-27 15:30:43 -05:00
logger.debug(f'posting to {recp}')
2018-05-18 13:41:41 -05:00
self._post_to_inbox(payload, recp)
def _post_to_inbox(self, payload: str, to: str):
tasks.post_to_inbox.delay(payload, to)
def _recipients(self) -> List[str]:
return []
def recipients(self) -> List[str]:
recipients = self._recipients()
2018-05-21 10:26:48 -05:00
out: List[str] = []
2018-05-18 13:41:41 -05:00
for recipient in recipients:
if recipient in PUBLIC_INSTANCES:
if recipient not in out:
out.append(str(recipient))
continue
if recipient in [ME, AS_PUBLIC, None]:
continue
if isinstance(recipient, Person):
if recipient.id == ME:
continue
actor = recipient
else:
try:
actor = Person(**ACTOR_SERVICE.get(recipient))
if actor.endpoints:
shared_inbox = actor.endpoints.get('sharedInbox')
if shared_inbox not in out:
out.append(shared_inbox)
continue
if actor.inbox and actor.inbox not in out:
out.append(actor.inbox)
2018-05-18 13:41:41 -05:00
except NotAnActorError as error:
# Is the activity a `Collection`/`OrderedCollection`?
2018-05-27 13:40:42 -05:00
if error.activity and error.activity['type'] in [ActivityType.COLLECTION.value,
ActivityType.ORDERED_COLLECTION.value]:
2018-05-18 13:41:41 -05:00
for item in parse_collection(error.activity):
if item in [ME, AS_PUBLIC]:
continue
try:
col_actor = Person(**ACTOR_SERVICE.get(item))
except NotAnActorError:
pass
if col_actor.endpoints:
shared_inbox = col_actor.endpoints.get('sharedInbox')
if shared_inbox not in out:
out.append(shared_inbox)
continue
2018-05-27 15:30:43 -05:00
if col_actor.inbox and col_actor.inbox not in out:
out.append(col_actor.inbox)
2018-05-18 13:41:41 -05:00
return out
def build_undo(self) -> 'BaseActivity':
raise NotImplementedError
def build_delete(self) -> 'BaseActivity':
raise NotImplementedError
2018-05-18 13:41:41 -05:00
class Person(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.PERSON
2018-05-18 13:41:41 -05:00
def _init(self, **kwargs):
# if 'icon' in kwargs:
# self._data['icon'] = Image(**kwargs.pop('icon'))
2018-05-18 13:41:41 -05:00
pass
def _verify(self) -> None:
ACTOR_SERVICE.get(self._data['id'])
class Block(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.BLOCK
2018-05-18 13:41:41 -05:00
class Collection(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.COLLECTION
2018-05-18 13:41:41 -05:00
class Image(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.IMAGE
2018-05-18 13:41:41 -05:00
NO_CONTEXT = True
def _init(self, **kwargs):
self._data.update(
url=kwargs.pop('url'),
)
def __repr__(self):
return 'Image({!r})'.format(self._data.get('url'))
class Follow(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.FOLLOW
ALLOWED_OBJECT_TYPES = [ActivityType.PERSON]
2018-05-18 13:41:41 -05:00
2018-05-27 13:40:42 -05:00
def _build_reply(self, reply_type: ActivityType) -> BaseActivity:
if reply_type == ActivityType.ACCEPT:
2018-05-18 13:41:41 -05:00
return Accept(
object=self.to_dict(embed=True),
)
raise ValueError(f'type {reply_type} is invalid for building a reply')
def _recipients(self) -> List[str]:
return [self.get_object().id]
def _process_from_inbox(self) -> None:
accept = self.build_accept()
accept.post_to_outbox()
remote_actor = self.get_actor().id
if DB.followers.find({'remote_actor': remote_actor}).count() == 0:
DB.followers.insert_one({'remote_actor': remote_actor})
def _undo_inbox(self) -> None:
DB.followers.delete_one({'remote_actor': self.get_actor().id})
2018-05-27 07:21:06 -05:00
def _undo_outbox(self) -> None:
DB.following.delete_one({'remote_actor': self.get_object().id})
2018-05-18 13:41:41 -05:00
def build_accept(self) -> BaseActivity:
2018-05-27 13:40:42 -05:00
return self._build_reply(ActivityType.ACCEPT)
2018-05-18 13:41:41 -05:00
def build_undo(self) -> BaseActivity:
return Undo(object=self.to_dict(embed=True))
def _should_purge_cache(self) -> bool:
# Receiving a follow activity in the inbox should reset the application cache
return True
2018-05-18 13:41:41 -05:00
class Accept(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.ACCEPT
ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW]
2018-05-18 13:41:41 -05:00
def _recipients(self) -> List[str]:
return [self.get_object().get_actor().id]
def _process_from_inbox(self) -> None:
remote_actor = self.get_actor().id
if DB.following.find({'remote_actor': remote_actor}).count() == 0:
DB.following.insert_one({'remote_actor': remote_actor})
def _should_purge_cache(self) -> bool:
# Receiving an accept activity in the inbox should reset the application cache
# (a follow request has been accepted)
return True
2018-05-18 13:41:41 -05:00
class Undo(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.UNDO
ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW, ActivityType.LIKE, ActivityType.ANNOUNCE]
2018-05-18 13:41:41 -05:00
def _recipients(self) -> List[str]:
obj = self.get_object()
2018-05-27 13:40:42 -05:00
if obj.type_enum == ActivityType.FOLLOW:
2018-05-18 13:41:41 -05:00
return [obj.get_object().id]
else:
return [obj.get_object().get_actor().id]
# TODO(tsileo): handle like and announce
raise Exception('TODO')
def _process_from_inbox(self) -> None:
obj = self.get_object()
DB.inbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}})
try:
obj._undo_inbox()
except NotImplementedError:
pass
def _should_purge_cache(self) -> bool:
obj = self.get_object()
try:
2018-05-21 10:26:48 -05:00
# Receiving a undo activity regarding an activity that was mentioning a published activity
# should purge the cache
return obj._undo_should_purge_cache()
except NotImplementedError:
pass
return False
2018-05-18 13:41:41 -05:00
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
2018-05-27 07:21:06 -05:00
logger.debug('processing undo to outbox')
logger.debug('self={}'.format(self))
2018-05-18 13:41:41 -05:00
obj = self.get_object()
2018-05-27 07:21:06 -05:00
logger.debug('obj={}'.format(obj))
2018-05-18 13:41:41 -05:00
DB.outbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}})
try:
obj._undo_outbox()
2018-05-27 07:21:06 -05:00
logger.debug(f'_undo_outbox called for {obj}')
2018-05-18 13:41:41 -05:00
except NotImplementedError:
2018-05-27 07:21:06 -05:00
logger.debug(f'_undo_outbox not implemented for {obj}')
2018-05-18 13:41:41 -05:00
pass
2018-05-18 13:41:41 -05:00
class Like(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.LIKE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE]
2018-05-18 13:41:41 -05:00
def _recipients(self) -> List[str]:
return [self.get_object().get_actor().id]
def _process_from_inbox(self):
obj = self.get_object()
# Update the meta counter if the object is published by the server
2018-05-28 12:46:23 -05:00
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': 1},
'$addToSet': {'meta.col_likes': self.to_dict(embed=True, embed_object_id_only=True)},
})
# XXX(tsileo): notification??
2018-05-18 13:41:41 -05:00
def _undo_inbox(self) -> None:
obj = self.get_object()
# Update the meta counter if the object is published by the server
2018-05-28 12:46:23 -05:00
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': -1},
'$pull': {'meta.col_likes': {'id': self.id}},
})
2018-05-18 13:41:41 -05:00
def _undo_should_purge_cache(self) -> bool:
# If a like coutn was decremented, we need to purge the application cache
return self.get_object().id.startswith(BASE_URL)
2018-05-18 13:41:41 -05:00
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]):
obj = self.get_object()
# Unlikely, but an actor can like it's own post
2018-05-28 12:46:23 -05:00
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': 1},
'$addToSet': {'meta.col_likes': self.to_dict(embed=True, embed_object_id_only=True)},
})
2018-05-18 13:41:41 -05:00
2018-05-28 12:46:23 -05:00
# Keep track of the like we just performed
2018-05-18 13:41:41 -05:00
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': obj_id}})
def _undo_outbox(self) -> None:
obj = self.get_object()
# Unlikely, but an actor can like it's own post
2018-05-28 12:46:23 -05:00
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': -1},
'$pull': {'meta.col_likes': {'id': self.id}},
})
2018-05-18 13:41:41 -05:00
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': False}})
def build_undo(self) -> BaseActivity:
2018-05-28 12:46:23 -05:00
return Undo(object=self.to_dict(embed=True, embed_object_id_only=True))
2018-05-18 13:41:41 -05:00
class Announce(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.ANNOUNCE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE]
2018-05-18 13:41:41 -05:00
def _recipients(self) -> List[str]:
recipients = []
for field in ['to', 'cc']:
if field in self._data:
recipients.extend(_to_list(self._data[field]))
return recipients
def _process_from_inbox(self) -> None:
if isinstance(self._data['object'], str) and not self._data['object'].startswith('http'):
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
2018-05-28 12:46:23 -05:00
logger.warn(
f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message'
)
2018-05-18 13:41:41 -05:00
return
# Save/cache the object, and make it part of the stream so we can fetch it
if isinstance(self._data['object'], str):
raw_obj = OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
2018-05-18 13:41:41 -05:00
obj = parse_activity(raw_obj)
else:
obj = self.get_object()
2018-05-28 12:46:23 -05:00
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_boost': 1},
'$addToSet': {'meta.col_shares': self.to_dict(embed=True, embed_object_id_only=True)},
})
2018-05-18 13:41:41 -05:00
def _undo_inbox(self) -> None:
obj = self.get_object()
2018-05-28 12:46:23 -05:00
# Update the meta counter if the object is published by the server
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_boost': -1},
'$pull': {'meta.col_shares': {'id': self.id}},
})
2018-05-18 13:41:41 -05:00
def _undo_should_purge_cache(self) -> bool:
# If a like coutn was decremented, we need to purge the application cache
return self.get_object().id.startswith(BASE_URL)
2018-05-18 13:41:41 -05:00
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
if isinstance(self._data['object'], str):
# Put the object in the cache
OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
2018-05-18 13:41:41 -05:00
obj = self.get_object()
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': obj_id}})
def _undo_outbox(self) -> None:
obj = self.get_object()
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': False}})
def build_undo(self) -> BaseActivity:
return Undo(object=self.to_dict(embed=True))
class Delete(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.DELETE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.TOMBSTONE]
2018-05-18 13:41:41 -05:00
2018-05-29 11:59:37 -05:00
def _get_actual_object(self) -> BaseActivity:
obj = self.get_object()
if obj.type_enum == ActivityType.TOMBSTONE:
obj = parse_activity(OBJECT_SERVICE.get(obj.id))
2018-05-29 11:59:37 -05:00
return obj
def _recipients(self) -> List[str]:
obj = self._get_actual_object()
return obj._recipients()
2018-05-18 13:41:41 -05:00
def _process_from_inbox(self) -> None:
2018-05-18 13:41:41 -05:00
DB.inbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}})
2018-05-29 11:59:37 -05:00
obj = self._get_actual_object()
if obj.type_enum == ActivityType.NOTE:
obj._delete_from_threads()
# TODO(tsileo): also purge the cache if it's a reply of a published activity
2018-05-18 13:41:41 -05:00
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
2018-05-18 13:41:41 -05:00
DB.outbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}})
class Update(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.UPDATE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.PERSON]
2018-05-18 13:41:41 -05:00
# TODO(tsileo): ensure the actor updating is the same as the orinial activity
# (ensuring that the Update and its object are of same origin)
def _process_from_inbox(self):
obj = self.get_object()
2018-05-27 13:40:42 -05:00
if obj.type_enum == ActivityType.NOTE:
2018-05-18 13:41:41 -05:00
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'activity.object': obj.to_dict()}})
return
# If the object is a Person, it means the profile was updated, we just refresh our local cache
ACTOR_SERVICE.get(obj.id, reload_cache=True)
# TODO(tsileo): implements _should_purge_cache if it's a reply of a published activity (i.e. in the outbox)
2018-05-20 15:02:48 -05:00
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
2018-05-18 13:41:41 -05:00
obj = self.get_object()
update_prefix = 'activity.object.'
update: Dict[str, Any] = {'$set': dict(), '$unset': dict()}
2018-05-18 13:41:41 -05:00
update['$set'][f'{update_prefix}updated'] = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'
for k, v in obj._data.items():
if k in ['id', 'type']:
continue
if v is None:
update['$unset'][f'{update_prefix}{k}'] = ''
else:
update['$set'][f'{update_prefix}{k}'] = v
if len(update['$unset']) == 0:
del(update['$unset'])
DB.outbox.update_one({'remote_id': obj.id.replace('/activity', '')}, update)
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
# (create a new Update with the result of the update, and send it without saving it?)
class Create(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.CREATE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE]
2018-05-18 13:41:41 -05:00
2018-05-20 15:02:48 -05:00
def _set_id(self, uri: str, obj_id: str) -> None:
2018-05-18 13:41:41 -05:00
self._data['object']['id'] = uri + '/activity'
self._data['object']['url'] = ID + '/' + self.get_object().type.lower() + '/' + obj_id
def _init(self, **kwargs):
obj = self.get_object()
if not obj.attributedTo:
self._data['object']['attributedTo'] = self.get_actor().id
if not obj.published:
if self.published:
self._data['object']['published'] = self.published
else:
now = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'
self._data['published'] = now
self._data['object']['published'] = now
def _recipients(self) -> List[str]:
# TODO(tsileo): audience support?
recipients = []
for field in ['to', 'cc', 'bto', 'bcc']:
if field in self._data:
recipients.extend(_to_list(self._data[field]))
recipients.extend(self.get_object()._recipients())
return recipients
2018-05-29 11:59:37 -05:00
def _update_threads(self) -> None:
logger.debug('_update_threads hook')
2018-05-18 13:41:41 -05:00
obj = self.get_object()
2018-05-29 11:59:37 -05:00
# TODO(tsileo): re-enable me
# tasks.fetch_og.delay('INBOX', self.id)
threads = []
reply = obj.get_local_reply()
logger.debug(f'initial_reply={reply}')
reply_id = None
direct_reply = 1
while reply is not None:
if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
}):
DB.outbox.update_one({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
})
direct_reply = 0
reply_id = reply.id
reply = reply.get_local_reply()
logger.debug(f'next_reply={reply}')
if reply:
# Only append to threads if it's not the root
threads.append(reply_id)
if reply_id:
if not DB.inbox.find_one_and_update({'activity.object.id': obj.id}, {
'$set': {
'meta.thread_parents': threads,
'meta.thread_root_parent': reply_id,
},
}):
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$set': {
'meta.thread_parents': threads,
'meta.thread_root_parent': reply_id,
},
})
logger.debug('_update_threads done')
2018-05-18 13:41:41 -05:00
2018-05-29 11:59:37 -05:00
def _process_from_inbox(self) -> None:
self._update_threads()
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
self._update_threads()
2018-05-18 13:41:41 -05:00
def _should_purge_cache(self) -> bool:
# TODO(tsileo): handle reply of a reply...
obj = self.get_object()
in_reply_to = obj.inReplyTo
if in_reply_to:
local_activity = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if local_activity:
return True
return False
2018-05-21 10:29:31 -05:00
2018-05-18 13:41:41 -05:00
class Tombstone(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.TOMBSTONE
2018-05-18 13:41:41 -05:00
class Note(BaseActivity):
2018-05-27 13:40:42 -05:00
ACTIVITY_TYPE = ActivityType.NOTE
2018-05-18 13:41:41 -05:00
def _init(self, **kwargs):
print(self._data)
# Remove the `actor` field as `attributedTo` is used for `Note` instead
if 'actor' in self._data:
del(self._data['actor'])
if 'sensitive' not in kwargs:
self._data['sensitive'] = False
def _recipients(self) -> List[str]:
# TODO(tsileo): audience support?
recipients: List[str] = []
2018-05-18 13:41:41 -05:00
# If the note is public, we publish it to the defined "public instances"
if AS_PUBLIC in self._data.get('to', []):
recipients.extend(PUBLIC_INSTANCES)
print('publishing to public instances')
print(recipients)
for field in ['to', 'cc', 'bto', 'bcc']:
if field in self._data:
recipients.extend(_to_list(self._data[field]))
return recipients
2018-05-29 11:59:37 -05:00
def _delete_from_threads(self) -> None:
logger.debug('_delete_from_threads hook')
reply = self.get_local_reply()
logger.debug(f'initial_reply={reply}')
direct_reply = -1
while reply is not None:
if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': -1,
'meta.count_direct_reply': direct_reply,
},
}):
DB.outbox.update_one({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
})
direct_reply = 0
reply = reply.get_local_reply()
logger.debug(f'next_reply={reply}')
logger.debug('_delete_from_threads done')
return None
def get_local_reply(self) -> Optional[BaseActivity]:
"Find the note reply if any."""
in_reply_to = self.inReplyTo
if not in_reply_to:
# This is the root comment
return None
inbox_parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if inbox_parent:
return parse_activity(inbox_parent['activity']['object'])
outbox_parent = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if outbox_parent:
return parse_activity(outbox_parent['activity']['object'])
# The parent is no stored on this instance
return None
2018-05-18 13:41:41 -05:00
def build_create(self) -> BaseActivity:
"""Wraps an activity in a Create activity."""
create_payload = {
'object': self.to_dict(embed=True),
'actor': self.attributedTo or ME,
}
for field in ['published', 'to', 'bto', 'cc', 'bcc', 'audience']:
if field in self._data:
create_payload[field] = self._data[field]
return Create(**create_payload)
def build_like(self) -> BaseActivity:
return Like(object=self.id)
def build_announce(self) -> BaseActivity:
return Announce(
2018-05-29 11:59:37 -05:00
object=self.id,
to=[AS_PUBLIC],
cc=[ID+'/followers', self.attributedTo],
published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z',
2018-05-18 13:41:41 -05:00
)
def build_delete(self) -> BaseActivity:
return Delete(object=Tombstone(id=self.id).to_dict(embed=True))
2018-05-18 13:41:41 -05:00
_ACTIVITY_TYPE_TO_CLS = {
2018-05-27 13:40:42 -05:00
ActivityType.IMAGE: Image,
ActivityType.PERSON: Person,
ActivityType.FOLLOW: Follow,
ActivityType.ACCEPT: Accept,
ActivityType.UNDO: Undo,
ActivityType.LIKE: Like,
ActivityType.ANNOUNCE: Announce,
ActivityType.UPDATE: Update,
ActivityType.DELETE: Delete,
ActivityType.CREATE: Create,
ActivityType.NOTE: Note,
ActivityType.BLOCK: Block,
ActivityType.COLLECTION: Collection,
ActivityType.TOMBSTONE: Tombstone,
2018-05-18 13:41:41 -05:00
}
2018-05-29 14:36:05 -05:00
def parse_activity(payload: ObjectType, expected: Optional[ActivityType] = None) -> BaseActivity:
2018-05-27 13:40:42 -05:00
t = ActivityType(payload['type'])
2018-05-29 14:36:05 -05:00
if expected and t != expected:
raise UnexpectedActivityTypeError(f'expected a {expected.name} activity, got a {payload["type"]}')
2018-05-18 13:41:41 -05:00
if t not in _ACTIVITY_TYPE_TO_CLS:
2018-05-29 14:47:28 -05:00
raise BadActivityError(f'unsupported activity type {payload["type"]}')
2018-05-18 13:41:41 -05:00
2018-05-29 14:36:05 -05:00
activity = _ACTIVITY_TYPE_TO_CLS[t](**payload)
2018-05-18 13:41:41 -05:00
2018-05-29 14:36:05 -05:00
return activity
2018-05-18 13:41:41 -05:00
2018-05-29 14:47:28 -05:00
2018-05-18 13:41:41 -05:00
def gen_feed():
fg = FeedGenerator()
fg.id(f'{ID}')
fg.title(f'{USERNAME} notes')
fg.author({'name': USERNAME, 'email': 't@a4.io'})
2018-05-18 13:41:41 -05:00
fg.link(href=ID, rel='alternate')
fg.description(f'{USERNAME} notes')
fg.logo(ME.get('icon', {}).get('url'))
fg.language('en')
for item in DB.outbox.find({'type': 'Create'}, limit=50):
fe = fg.add_entry()
fe.id(item['activity']['object'].get('url'))
fe.link(href=item['activity']['object'].get('url'))
fe.title(item['activity']['object']['content'])
fe.description(item['activity']['object']['content'])
return fg
def json_feed(path: str) -> Dict[str, Any]:
"""JSON Feed (https://jsonfeed.org/) document."""
data = []
for item in DB.outbox.find({'type': 'Create'}, limit=50):
data.append({
"id": item["id"],
"url": item['activity']['object'].get('url'),
"content_html": item['activity']['object']['content'],
"content_text": html2text(item['activity']['object']['content']),
"date_published": item['activity']['object'].get('published'),
})
return {
"version": "https://jsonfeed.org/version/1",
"user_comment": ("This is a microblog feed. You can add this to your feed reader using the following URL: "
+ ID + path),
2018-05-18 13:41:41 -05:00
"title": USERNAME,
"home_page_url": ID,
"feed_url": ID + path,
"author": {
"name": USERNAME,
"url": ID,
"avatar": ME.get('icon', {}).get('url'),
},
"items": data,
}
def build_inbox_json_feed(path: str, request_cursor: Optional[str] = None) -> Dict[str, Any]:
data = []
cursor = None
q: Dict[str, Any] = {'type': 'Create', 'meta.deleted': False}
2018-05-18 13:41:41 -05:00
if request_cursor:
q['_id'] = {'$lt': request_cursor}
for item in DB.inbox.find(q, limit=50).sort('_id', -1):
actor = ACTOR_SERVICE.get(item['activity']['actor'])
data.append({
"id": item["activity"]["id"],
"url": item['activity']['object'].get('url'),
"content_html": item['activity']['object']['content'],
"content_text": html2text(item['activity']['object']['content']),
"date_published": item['activity']['object'].get('published'),
"author": {
"name": actor.get('name', actor.get('preferredUsername')),
"url": actor.get('url'),
'avatar': actor.get('icon', {}).get('url'),
},
})
cursor = str(item['_id'])
resp = {
"version": "https://jsonfeed.org/version/1",
"title": f'{USERNAME}\'s stream',
"home_page_url": ID,
"feed_url": ID + path,
"items": data,
}
if cursor and len(data) == 50:
resp['next_url'] = ID + path + '?cursor=' + cursor
return resp
def parse_collection(payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None) -> List[str]:
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
# Resolve internal collections via MongoDB directly
if url == ID + '/followers':
return [doc['remote_actor'] for doc in DB.followers.find()]
elif url == ID + '/following':
return [doc['remote_actor'] for doc in DB.following.find()]
# Go through all the pages
return activitypub_utils.parse_collection(payload, url)
2018-05-18 13:41:41 -05:00
def embed_collection(total_items, first_page_id):
2018-05-28 12:46:23 -05:00
return {
"type": ActivityType.ORDERED_COLLECTION.value,
"totalItems": total_items,
"first": first_page_id,
2018-05-28 12:46:23 -05:00
}
2018-05-18 13:41:41 -05:00
def build_ordered_collection(col, q=None, cursor=None, map_func=None, limit=50, col_name=None):
col_name = col_name or col.name
if q is None:
q = {}
if cursor:
q['_id'] = {'$lt': ObjectId(cursor)}
data = list(col.find(q, limit=limit).sort('_id', -1))
if not data:
return {
'id': BASE_URL + '/' + col_name,
'totalItems': 0,
'type': ActivityType.ORDERED_COLLECTION.value,
2018-05-18 13:41:41 -05:00
'orederedItems': [],
}
start_cursor = str(data[0]['_id'])
next_page_cursor = str(data[-1]['_id'])
total_items = col.find(q).count()
data = [_remove_id(doc) for doc in data]
if map_func:
data = [map_func(doc) for doc in data]
# No cursor, this is the first page and we return an OrderedCollection
if not cursor:
resp = {
'@context': CTX_AS,
'id': f'{BASE_URL}/{col_name}',
2018-05-18 13:41:41 -05:00
'totalItems': total_items,
'type': ActivityType.ORDERED_COLLECTION.value,
'first': {
'id': f'{BASE_URL}/{col_name}?cursor={start_cursor}',
'orderedItems': data,
'partOf': f'{BASE_URL}/{col_name}',
'totalItems': total_items,
'type': ActivityType.ORDERED_COLLECTION_PAGE.value,
},
2018-05-18 13:41:41 -05:00
}
if len(data) == limit:
resp['first']['next'] = BASE_URL + '/' + col_name + '?cursor=' + next_page_cursor
return resp
2018-05-18 13:41:41 -05:00
# If there's a cursor, then we return an OrderedCollectionPage
resp = {
'@context': CTX_AS,
'type': ActivityType.ORDERED_COLLECTION_PAGE.value,
2018-05-18 13:41:41 -05:00
'id': BASE_URL + '/' + col_name + '?cursor=' + start_cursor,
'totalItems': total_items,
'partOf': BASE_URL + '/' + col_name,
'orderedItems': data,
}
if len(data) == limit:
resp['next'] = BASE_URL + '/' + col_name + '?cursor=' + next_page_cursor
# TODO(tsileo): implements prev with prev=<first item cursor>
2018-05-18 13:41:41 -05:00
return resp