Improve caching
This commit is contained in:
parent
2ee3fc0c67
commit
7237fbcc68
5 changed files with 106 additions and 37 deletions
97
app.py
97
app.py
|
@ -816,16 +816,41 @@ def paginated_query(db, q, limit=25, sort_key="_id"):
|
|||
return outbox_data, older_than, newer_than
|
||||
|
||||
|
||||
CACHING = True
|
||||
|
||||
|
||||
def _get_cached(type_="html", arg=None):
|
||||
if not CACHING:
|
||||
return None
|
||||
logged_in = session.get("logged_in")
|
||||
if not logged_in:
|
||||
cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg})
|
||||
if cached:
|
||||
app.logger.info("from cache")
|
||||
return cached['response_data']
|
||||
return None
|
||||
|
||||
def _cache(resp, type_="html", arg=None):
|
||||
if not CACHING:
|
||||
return None
|
||||
logged_in = session.get("logged_in")
|
||||
if not logged_in:
|
||||
DB.cache2.update_one(
|
||||
{"path": request.path, "type": type_, "arg": arg},
|
||||
{"$set": {"response_data": resp, "date": datetime.now(timezone.utc)}},
|
||||
upsert=True,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
if is_api_request():
|
||||
return jsonify(**ME)
|
||||
logged_in = session.get("logged_in", False)
|
||||
if not logged_in:
|
||||
cached = DB.cache.find_one({"path": request.path, "type": "html"})
|
||||
if cached:
|
||||
app.logger.info("from cache")
|
||||
return cached['response_data']
|
||||
cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
|
||||
cached = _get_cached("html", cache_arg)
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
q = {
|
||||
"box": Box.OUTBOX.value,
|
||||
|
@ -859,12 +884,7 @@ def index():
|
|||
newer_than=newer_than,
|
||||
pinned=pinned,
|
||||
)
|
||||
if not logged_in:
|
||||
DB.cache.update_one(
|
||||
{"path": request.path, "type": "html"},
|
||||
{"$set": {"response_data": resp, "date": datetime.now(timezone.utc)}},
|
||||
upsert=True,
|
||||
)
|
||||
_cache(resp, "html", cache_arg)
|
||||
return resp
|
||||
|
||||
|
||||
|
@ -1011,32 +1031,41 @@ def note_by_id(note_id):
|
|||
|
||||
@app.route("/nodeinfo")
|
||||
def nodeinfo():
|
||||
q = {
|
||||
"box": Box.OUTBOX.value,
|
||||
"meta.deleted": False, # TODO(tsileo): retrieve deleted and expose tombstone
|
||||
"type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
|
||||
}
|
||||
response = _get_cached("api")
|
||||
cached = True
|
||||
if not response:
|
||||
cached = False
|
||||
q = {
|
||||
"box": Box.OUTBOX.value,
|
||||
"meta.deleted": False, # TODO(tsileo): retrieve deleted and expose tombstone
|
||||
"type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]},
|
||||
}
|
||||
|
||||
response = json.dumps(
|
||||
{
|
||||
"version": "2.0",
|
||||
"software": {
|
||||
"name": "microblogpub",
|
||||
"version": f"Microblog.pub {VERSION}",
|
||||
},
|
||||
"protocols": ["activitypub"],
|
||||
"services": {"inbound": [], "outbound": []},
|
||||
"openRegistrations": False,
|
||||
"usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
|
||||
"metadata": {
|
||||
"sourceCode": "https://github.com/tsileo/microblog.pub",
|
||||
"nodeName": f"@{USERNAME}@{DOMAIN}",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if not cached:
|
||||
_cache(response, "api")
|
||||
return Response(
|
||||
headers={
|
||||
"Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.0#"
|
||||
},
|
||||
response=json.dumps(
|
||||
{
|
||||
"version": "2.0",
|
||||
"software": {
|
||||
"name": "microblogpub",
|
||||
"version": f"Microblog.pub {VERSION}",
|
||||
},
|
||||
"protocols": ["activitypub"],
|
||||
"services": {"inbound": [], "outbound": []},
|
||||
"openRegistrations": False,
|
||||
"usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)},
|
||||
"metadata": {
|
||||
"sourceCode": "https://github.com/tsileo/microblog.pub",
|
||||
"nodeName": f"@{USERNAME}@{DOMAIN}",
|
||||
},
|
||||
}
|
||||
),
|
||||
response=response,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -109,8 +109,8 @@ def create_indexes():
|
|||
("activity.object.id", pymongo.ASCENDING),
|
||||
("meta.deleted", pymongo.ASCENDING),
|
||||
])
|
||||
DB.cache.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING)])
|
||||
DB.cache.create_index("date", expireAfterSeconds=60)
|
||||
DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)])
|
||||
DB.cache2.create_index("date", expireAfterSeconds=3600*12)
|
||||
|
||||
# Index for the block query
|
||||
DB.activities.create_index(
|
||||
|
|
|
@ -1,5 +1,16 @@
|
|||
version: '3'
|
||||
services:
|
||||
flower:
|
||||
image: microblogpub:latest
|
||||
links:
|
||||
- mongo
|
||||
- rabbitmq
|
||||
command: 'celery flower -l info -A tasks --broker amqp://guest@rabbitmq// --address=0.0.0.0 --port=5556'
|
||||
environment:
|
||||
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rabbitmq//
|
||||
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
|
||||
ports:
|
||||
- "5556:5556"
|
||||
celery:
|
||||
image: microblogpub:latest
|
||||
links:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
python-dateutil
|
||||
libsass
|
||||
flower
|
||||
gunicorn
|
||||
piexif
|
||||
requests
|
||||
|
|
30
tasks.py
30
tasks.py
|
@ -6,6 +6,7 @@ import random
|
|||
import requests
|
||||
from celery import Celery
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.errors import BadActivityError
|
||||
from little_boxes.errors import ActivityGoneError
|
||||
from little_boxes.errors import ActivityNotFoundError
|
||||
from little_boxes.errors import NotAnActivityError
|
||||
|
@ -59,7 +60,8 @@ def process_new_activity(self, iri: str) -> None:
|
|||
try:
|
||||
activity.get_object()
|
||||
tag_stream = True
|
||||
except NotAnActivityError:
|
||||
except (NotAnActivityError, BadActivityError):
|
||||
log.exception(f"failed to get announce object for {activity!r}")
|
||||
# Most likely on OStatus notice
|
||||
tag_stream = False
|
||||
should_delete = True
|
||||
|
@ -317,6 +319,26 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
|
|||
finish_post_to_inbox.delay(activity.id)
|
||||
|
||||
|
||||
def invalidate_cache(activity):
|
||||
if activity.has_type(ap.ActivityType.LIKE):
|
||||
if activity.get_object().id.startswith(BASE_URL):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
if activity.get_object.id.startswith(BASE_URL):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.DELETE):
|
||||
# TODO(tsileo): only invalidate if it's a delete of a reply
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
if not note.inReplyTo or note.inReplyTo.startswith(ID):
|
||||
DB.cache2.remove()
|
||||
# FIXME(tsileo): check if it's a reply of a reply
|
||||
|
||||
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
|
||||
def finish_post_to_inbox(self, iri: str) -> None:
|
||||
try:
|
||||
|
@ -345,6 +367,10 @@ def finish_post_to_inbox(self, iri: str) -> None:
|
|||
back.inbox_undo_announce(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||
back.undo_new_follower(MY_PERSON, obj)
|
||||
try:
|
||||
invalidate_cache(activity)
|
||||
except Exception:
|
||||
log.exception("failed to invalidate cache")
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
log.exception(f"no retry")
|
||||
except Exception as err:
|
||||
|
@ -396,6 +422,8 @@ def finish_post_to_outbox(self, iri: str) -> None:
|
|||
log.info(f"recipients={recipients}")
|
||||
activity = ap.clean_activity(activity.to_dict())
|
||||
|
||||
DB.cache2.remove()
|
||||
|
||||
payload = json.dumps(activity)
|
||||
for recp in recipients:
|
||||
log.debug(f"posting to {recp}")
|
||||
|
|
Loading…
Reference in a new issue