import base64 import mimetypes from enum import Enum from gzip import GzipFile from io import BytesIO from typing import Any from typing import Dict from little_boxes import activitypub as ap import gridfs import piexif import requests from PIL import Image def load(url, user_agent): """Initializes a `PIL.Image` from the URL.""" with requests.get(url, stream=True, headers={"User-Agent": user_agent}) as resp: resp.raise_for_status() resp.raw.decode_content = True return Image.open(BytesIO(resp.raw.read())) def to_data_uri(img): out = BytesIO() img.save(out, format=img.format) out.seek(0) data = base64.b64encode(out.read()).decode("utf-8") return f"data:{img.get_format_mimetype()};base64,{data}" class Kind(Enum): ATTACHMENT = "attachment" ACTOR_ICON = "actor_icon" UPLOAD = "upload" OG_IMAGE = "og" class MediaCache(object): def __init__(self, gridfs_db: str, user_agent: str) -> None: self.fs = gridfs.GridFS(gridfs_db) self.user_agent = user_agent def cache_og_image(self, url: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}): return i = load(url, self.user_agent) # Save the original attachment (gzipped) i.thumbnail((100, 100)) with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: i.save(f1, format=i.format) buf.seek(0) self.fs.put( buf, url=url, size=100, content_type=i.get_format_mimetype(), kind=Kind.OG_IMAGE.value, ) def cache_og_image2(self, url: str, remote_id: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}): return i = load(url, self.user_agent) # Save the original attachment (gzipped) i.thumbnail((100, 100)) with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: i.save(f1, format=i.format) buf.seek(0) self.fs.put( buf, url=url, size=100, content_type=i.get_format_mimetype(), kind=Kind.OG_IMAGE.value, remote_id=remote_id, ) def cache_attachment(self, url: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): return if ( url.endswith(".png") or url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".gif") ): i = load(url, self.user_agent) # Save the original attachment (gzipped) with BytesIO() as buf: f1 = GzipFile(mode="wb", fileobj=buf) i.save(f1, format=i.format) f1.close() buf.seek(0) self.fs.put( buf, url=url, size=None, content_type=i.get_format_mimetype(), kind=Kind.ATTACHMENT.value, ) # Save a thumbnail (gzipped) i.thumbnail((720, 720)) with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: i.save(f1, format=i.format) buf.seek(0) self.fs.put( buf, url=url, size=720, content_type=i.get_format_mimetype(), kind=Kind.ATTACHMENT.value, ) return def cache_attachment2(self, attachment: Dict[str, Any], remote_id: str) -> None: url = attachment["url"] # Ensure it's not already there if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): return # If it's an image, make some thumbnails if ( url.endswith(".png") or url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".gif") or attachment.get("mediaType", "").startswith("image/") or ap._has_type(attachment.get("type"), ap.ActivityType.IMAGE) ): try: i = load(url, self.user_agent) # Save the original attachment (gzipped) with BytesIO() as buf: f1 = GzipFile(mode="wb", fileobj=buf) i.save(f1, format=i.format) f1.close() buf.seek(0) self.fs.put( buf, url=url, size=None, content_type=i.get_format_mimetype(), kind=Kind.ATTACHMENT.value, remote_id=remote_id, ) # Save a thumbnail (gzipped) i.thumbnail((720, 720)) with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: i.save(f1, format=i.format) buf.seek(0) self.fs.put( buf, url=url, size=720, content_type=i.get_format_mimetype(), kind=Kind.ATTACHMENT.value, remote_id=remote_id, ) return except Exception: # FIXME(tsileo): logging pass # The attachment is not an image, download and save it anyway with requests.get( url, stream=True, headers={"User-Agent": self.user_agent} ) as resp: resp.raise_for_status() with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: for chunk in resp.iter_content(): if chunk: f1.write(chunk) buf.seek(0) self.fs.put( buf, url=url, size=None, content_type=mimetypes.guess_type(url)[0], kind=Kind.ATTACHMENT.value, remote_id=remote_id, ) def cache_actor_icon(self, url: str) -> None: if self.fs.find_one({"url": url, "kind": Kind.ACTOR_ICON.value}): return i = load(url, self.user_agent) for size in [50, 80]: t1 = i.copy() t1.thumbnail((size, size)) with BytesIO() as buf: with GzipFile(mode="wb", fileobj=buf) as f1: t1.save(f1, format=i.format) buf.seek(0) self.fs.put( buf, url=url, size=size, content_type=i.get_format_mimetype(), kind=Kind.ACTOR_ICON.value, ) def save_upload(self, obuf: BytesIO, filename: str) -> str: # Remove EXIF metadata if filename.lower().endswith(".jpg") or filename.lower().endswith(".jpeg"): obuf.seek(0) with BytesIO() as buf2: piexif.remove(obuf.getvalue(), buf2) obuf.truncate(0) obuf.write(buf2.getvalue()) obuf.seek(0) mtype = mimetypes.guess_type(filename)[0] with BytesIO() as gbuf: with GzipFile(mode="wb", fileobj=gbuf) as gzipfile: gzipfile.write(obuf.getvalue()) gbuf.seek(0) oid = self.fs.put( gbuf, content_type=mtype, upload_filename=filename, kind=Kind.UPLOAD.value, ) return str(oid) def cache(self, url: str, kind: Kind) -> None: if kind == Kind.ACTOR_ICON: self.cache_actor_icon(url) elif kind == Kind.OG_IMAGE: self.cache_og_image(url) else: self.cache_attachment(url) def get_actor_icon(self, url: str, size: int) -> Any: return self.get_file(url, size, Kind.ACTOR_ICON) def get_attachment(self, url: str, size: int) -> Any: return self.get_file(url, size, Kind.ATTACHMENT) def get_file(self, url: str, size: int, kind: Kind) -> Any: return self.fs.find_one({"url": url, "size": size, "kind": kind.value})