2018-05-18 13:41:41 -05:00
|
|
|
import os
|
2018-06-01 13:29:44 -05:00
|
|
|
import json
|
2018-05-18 13:41:41 -05:00
|
|
|
import logging
|
|
|
|
import random
|
|
|
|
|
|
|
|
import requests
|
|
|
|
from celery import Celery
|
|
|
|
from requests.exceptions import HTTPError
|
|
|
|
|
|
|
|
from config import HEADERS
|
|
|
|
from config import ID
|
|
|
|
from config import DB
|
|
|
|
from config import KEY
|
|
|
|
from config import USER_AGENT
|
|
|
|
from utils.httpsig import HTTPSigAuth
|
|
|
|
from utils.opengraph import fetch_og_metadata
|
2018-06-01 13:29:44 -05:00
|
|
|
from utils.linked_data_sig import generate_signature
|
2018-05-18 13:41:41 -05:00
|
|
|
|
|
|
|
|
2018-05-28 12:46:23 -05:00
|
|
|
log = logging.getLogger(__name__)
|
2018-05-18 13:41:41 -05:00
|
|
|
app = Celery('tasks', broker=os.getenv('MICROBLOGPUB_AMQP_BROKER', 'pyamqp://guest@localhost//'))
|
|
|
|
SigAuth = HTTPSigAuth(ID+'#main-key', KEY.privkey)
|
|
|
|
|
|
|
|
|
|
|
|
@app.task(bind=True, max_retries=12)
|
2018-06-01 13:29:44 -05:00
|
|
|
def post_to_inbox(self, payload: str, to: str) -> None:
|
2018-05-18 13:41:41 -05:00
|
|
|
try:
|
|
|
|
log.info('payload=%s', payload)
|
2018-06-01 13:29:44 -05:00
|
|
|
log.info('generating sig')
|
|
|
|
signed_payload = json.loads(payload)
|
|
|
|
generate_signature(signed_payload, KEY.privkey)
|
2018-05-18 13:41:41 -05:00
|
|
|
log.info('to=%s', to)
|
2018-06-01 13:29:44 -05:00
|
|
|
resp = requests.post(to, data=json.dumps(signed_payload), auth=SigAuth, headers={
|
2018-05-18 13:41:41 -05:00
|
|
|
'Content-Type': HEADERS[1],
|
|
|
|
'Accept': HEADERS[1],
|
|
|
|
'User-Agent': USER_AGENT,
|
|
|
|
})
|
|
|
|
log.info('resp=%s', resp)
|
|
|
|
log.info('resp_body=%s', resp.text)
|
|
|
|
resp.raise_for_status()
|
|
|
|
except HTTPError as err:
|
|
|
|
log.exception('request failed')
|
|
|
|
if 400 >= err.response.status_code >= 499:
|
|
|
|
log.info('client error, no retry')
|
|
|
|
return
|
|
|
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
|
|
|
|
|
|
|
|
|
|
|
@app.task(bind=True, max_retries=12)
|
|
|
|
def fetch_og(self, col, remote_id):
|
|
|
|
try:
|
|
|
|
log.info('fetch_og_meta remote_id=%s col=%s', remote_id, col)
|
|
|
|
if col == 'INBOX':
|
|
|
|
log.info('%d links saved', fetch_og_metadata(USER_AGENT, DB.inbox, remote_id))
|
|
|
|
elif col == 'OUTBOX':
|
|
|
|
log.info('%d links saved', fetch_og_metadata(USER_AGENT, DB.outbox, remote_id))
|
|
|
|
except Exception as err:
|
|
|
|
self.log.exception('failed')
|
|
|
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|