diff --git a/activitypub.py b/activitypub.py index 353faca..95ce35a 100644 --- a/activitypub.py +++ b/activitypub.py @@ -11,6 +11,7 @@ from feedgen.feed import FeedGenerator from utils.linked_data_sig import generate_signature from utils.actor_service import NotAnActorError +from utils.errors import BadActivityError, UnexpectedActivityTypeError from utils import activitypub_utils from config import USERNAME, BASE_URL, ID from config import CTX_AS, CTX_SECURITY, AS_PUBLIC @@ -24,11 +25,13 @@ from typing import TypeVar logger = logging.getLogger(__name__) +# Helper/shortcut for typing ObjectType = Dict[str, Any] ObjectOrIDType = Union[str, ObjectType] class ActivityType(Enum): + """Supported activity `type`.""" ANNOUNCE = 'Announce' BLOCK = 'Block' LIKE = 'Like' @@ -50,10 +53,12 @@ class ActivityType(Enum): def random_object_id() -> str: + """Generates a random object ID.""" return binascii.hexlify(os.urandom(8)).decode('utf-8') def _remove_id(doc: ObjectType) -> ObjectType: + """Helper for removing MongoDB's `_id` field.""" doc = doc.copy() if '_id' in doc: del(doc['_id']) @@ -61,13 +66,16 @@ def _remove_id(doc: ObjectType) -> ObjectType: def _to_list(data: Union[List[Any], Any]) -> List[Any]: + """Helper to convert fields that can be either an object or a list of objects to a list of object.""" if isinstance(data, list): return data return [data] def clean_activity(activity: ObjectType) -> Dict[str, Any]: - # Remove the hidden bco and bcc field + """Clean the activity before rendering it. + - Remove the hidden bco and bcc field + """ for field in ['bto', 'bcc']: if field in activity: del(activity[field]) @@ -77,23 +85,30 @@ def clean_activity(activity: ObjectType) -> Dict[str, Any]: def _get_actor_id(actor: ObjectOrIDType) -> str: + """Helper for retrieving an actor `id`.""" if isinstance(actor, dict): return actor['id'] return actor class BaseActivity(object): + """Base class for ActivityPub activities.""" + ACTIVITY_TYPE: Optional[ActivityType] = None ALLOWED_OBJECT_TYPES: List[ActivityType] = [] def __init__(self, **kwargs) -> None: + # Ensure the class has an activity type defined if not self.ACTIVITY_TYPE: - raise ValueError('Missing ACTIVITY_TYPE') + raise BadActivityError('Missing ACTIVITY_TYPE') + # Ensure the activity has a type and a valid one if kwargs.get('type') is not None and kwargs.pop('type') != self.ACTIVITY_TYPE.value: - raise ValueError('Expect the type to be {}'.format(self.ACTIVITY_TYPE)) + raise UnexpectedActivityTypeError('Expect the type to be {}'.format(self.ACTIVITY_TYPE)) + # Initialize the object self._data: Dict[str, Any] = {'type': self.ACTIVITY_TYPE.value} + logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity') if 'id' in kwargs: self._data['id'] = kwargs.pop('id') @@ -115,12 +130,13 @@ class BaseActivity(object): self._data['object'] = obj else: if not self.ALLOWED_OBJECT_TYPES: - raise ValueError('unexpected object') + raise UnexpectedActivityTypeError('unexpected object') if 'type' not in obj or (self.ACTIVITY_TYPE != ActivityType.CREATE and 'id' not in obj): - raise ValueError('invalid object') + raise BadActivityError('invalid object, missing type') if ActivityType(obj['type']) not in self.ALLOWED_OBJECT_TYPES: - print(self, kwargs) - raise ValueError(f'unexpected object type {obj["type"]} (allowed={self.ALLOWED_OBJECT_TYPES})') + raise UnexpectedActivityTypeError( + f'unexpected object type {obj["type"]} (allowed={self.ALLOWED_OBJECT_TYPES})' + ) self._data['object'] = obj if '@context' not in kwargs: @@ -144,6 +160,7 @@ class BaseActivity(object): allowed_keys = None try: allowed_keys = self._init(**kwargs) + logger.debug('calling custom init') except NotImplementedError: pass @@ -151,7 +168,7 @@ class BaseActivity(object): # Allows an extra to (like for Accept and Follow) kwargs.pop('to', None) if len(set(kwargs.keys()) - set(allowed_keys)) > 0: - raise ValueError('extra data left: {}'.format(kwargs)) + raise BadActivityError('extra data left: {}'.format(kwargs)) else: # Remove keys with `None` value valid_kwargs = {} @@ -177,7 +194,7 @@ class BaseActivity(object): return '{}({!r})'.format(self.__class__.__qualname__, self._data.get('id')) def __str__(self) -> str: - return str(self._data['id']) + return str(self._data.get('id', f'[new {self.ACTIVITY_TYPE} activity]')) def __getattr__(self, name: str) -> Any: if self._data.get(name): @@ -191,6 +208,7 @@ class BaseActivity(object): raise NotImplementedError def set_id(self, uri: str, obj_id: str) -> None: + logger.debug(f'setting ID {uri} / {obj_id}') self._data['id'] = uri try: self._set_id(uri, obj_id) @@ -227,7 +245,7 @@ class BaseActivity(object): else: obj = OBJECT_SERVICE.get(self._data['object']) if ActivityType(obj.get('type')) not in self.ALLOWED_OBJECT_TYPES: - raise ValueError('invalid object type') + raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")}') p = parse_activity(obj) @@ -275,18 +293,19 @@ class BaseActivity(object): raise NotImplementedError def process_from_inbox(self) -> None: + logger.debug(f'calling main process from inbox hook for {self}') self.verify() actor = self.get_actor() if DB.outbox.find_one({'type': ActivityType.BLOCK.value, 'activity.object': actor.id, 'meta.undo': False}): - print('actor is blocked, drop activity') + logger.info(f'actor {actor} is blocked, dropping the received activity {self}') return if DB.inbox.find_one({'remote_id': self.id}): # The activity is already in the inbox - print('received duplicate activity') + logger.info(f'received duplicate activity {self}, dropping it') return activity = self.to_dict() @@ -296,13 +315,16 @@ class BaseActivity(object): 'remote_id': self.id, 'meta': {'undo': False, 'deleted': False}, }) + logger.info('activity {self} saved') try: self._process_from_inbox() + logger.debug('called process from inbox hook') except NotImplementedError: - pass + logger.debug('process from inbox hook not implemented') def post_to_outbox(self) -> None: + logger.debug(f'calling main post to outbox hook for {self}') obj_id = random_object_id() self.set_id(f'{ID}/outbox/{obj_id}', obj_id) self.verify() @@ -316,19 +338,20 @@ class BaseActivity(object): }) recipients = self.recipients() + logger.info(f'recipients={recipients}') activity = clean_activity(activity) try: self._post_to_outbox(obj_id, activity, recipients) + logger.debug(f'called post to outbox hook') except NotImplementedError: - pass + logger.debug('post to outbox hook not implemented') generate_signature(activity, KEY.privkey) payload = json.dumps(activity) - print('will post') for recp in recipients: + logger.debug(f'posting to {recp}') self._post_to_inbox(payload, recp) - print('done') def _post_to_inbox(self, payload: str, to: str): tasks.post_to_inbox.delay(payload, to) @@ -371,8 +394,8 @@ class BaseActivity(object): if shared_inbox not in out: out.append(shared_inbox) continue - if col_actor.inbox and col_actor.inbox not in out: - out.append(col_actor.inbox) + if col_actor.inbox and col_actor.inbox not in out: + out.append(col_actor.inbox) continue diff --git a/app.py b/app.py index 21a61a2..6b11aa0 100644 --- a/app.py +++ b/app.py @@ -810,10 +810,11 @@ def api_new_note(): return Response( status=201, response='OK', - headers={'Microblogpub-Created-Activity': created.id}, + headers={'Microblogpub-Created-Activity': create.id}, ) @app.route('/api/stream') +@api_required def api_stream(): return Response( response=json.dumps(activitypub.build_inbox_json_feed('/api/stream', request.args.get('cursor'))), diff --git a/tests/federation_test.py b/tests/federation_test.py index ff8db17..3bd1021 100644 --- a/tests/federation_test.py +++ b/tests/federation_test.py @@ -18,6 +18,7 @@ class Instance(object): self.host_url = host_url self.docker_url = docker_url or host_url self.session = requests.Session() + self._create_delay = 8 def _do_req(self, url, headers): url = url.replace(self.docker_url, self.host_url) @@ -57,7 +58,14 @@ class Instance(object): assert resp.status_code == 201 # We need to wait for the Follow/Accept dance - time.sleep(10) + time.sleep(self._create_delay) + return resp.headers.get('microblogpub-created-activity') + + def new_note(self, content): + resp = self.session.get(f'{self.host_url}/api/new_note', params={'content': content}) + assert resp.status_code == 201 + + time.sleep(self._create_delay) return resp.headers.get('microblogpub-created-activity') def undo(self, oid: str) -> None: @@ -65,7 +73,7 @@ class Instance(object): assert resp.status_code == 201 # We need to wait for the Follow/Accept dance - time.sleep(10) + time.sleep(self._create_delay) return resp.headers.get('microblogpub-created-activity') def followers(self): @@ -89,6 +97,11 @@ class Instance(object): resp.raise_for_status() return resp.json() + def stream_jsonfeed(self): + resp = self.session.get(f'{self.host_url}/api/stream', headers={'Accept': 'application/json'}) + resp.raise_for_status() + return resp.json() + def _instances(): instance1 = Instance('http://localhost:5006', 'http://instance1_web_1:5005') @@ -111,12 +124,10 @@ def test_follow(): # Instance1 follows instance2 instance1.follow(instance2) instance1_debug = instance1.debug() - print(f'instance1_debug={instance1_debug}') assert instance1_debug['inbox'] == 1 # An Accept activity should be there assert instance1_debug['outbox'] == 1 # We've sent a Follow activity instance2_debug = instance2.debug() - print(f'instance2_debug={instance2_debug}') assert instance2_debug['inbox'] == 1 # An Follow activity should be there assert instance2_debug['outbox'] == 1 # We've sent a Accept activity @@ -152,4 +163,21 @@ def test_follow_unfollow(): assert instance2_debug['inbox'] == 2 # An Follow and Undo activity should be there assert instance2_debug['outbox'] == 1 # We've sent a Accept activity +def test_post_content(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 0 + + create_id = instance1.new_note('hello') + instance2_debug = instance2.debug() + assert instance2_debug['inbox'] == 3 # An Follow, Accept and Create activity should be there + instance2_debug['outbox'] == 2 # We've sent a Accept and a Follow activity + + # Ensure the post is visible in instance2's stream + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 1 + assert inbox_stream['items'][0]['id'] == create_id