microblog.pub/app/utils/url.py

99 lines
2.5 KiB
Python
Raw Normal View History

2022-06-22 13:11:22 -05:00
import functools
import ipaddress
import socket
from urllib.parse import urlparse
from loguru import logger
2022-08-15 03:15:00 -05:00
from app.config import BLOCKED_SERVERS
2022-06-22 13:11:22 -05:00
from app.config import DEBUG
2022-07-10 04:04:28 -05:00
def make_abs(url: str | None, parent: str) -> str | None:
if url is None:
return None
if url.startswith("http"):
return url
return (
urlparse(parent)._replace(path=url, params="", query="", fragment="").geturl()
)
2022-12-31 09:53:05 -06:00
def must_make_abs(url: str | None, parent: str) -> str:
abs_url = make_abs(url, parent)
if not abs_url:
raise ValueError("missing URL")
return abs_url
2022-06-22 13:11:22 -05:00
class InvalidURLError(Exception):
pass
2022-07-15 13:50:27 -05:00
@functools.lru_cache(maxsize=256)
2022-06-22 13:11:22 -05:00
def _getaddrinfo(hostname: str, port: int) -> str:
try:
ip_address = str(ipaddress.ip_address(hostname))
except ValueError:
try:
ip_address = socket.getaddrinfo(hostname, port)[0][4][0]
logger.debug(f"DNS lookup: {hostname} -> {ip_address}")
except socket.gaierror:
logger.exception(f"failed to lookup addr info for {hostname}")
raise
return ip_address
def is_url_valid(url: str) -> bool:
"""Implements basic SSRF protection."""
parsed = urlparse(url)
if parsed.scheme not in ["http", "https"]:
return False
# XXX in debug mode, we want to allow requests to localhost to test the
# federation with local instances
if DEBUG: # pragma: no cover
return True
if not parsed.hostname or parsed.hostname.lower() in ["localhost"]:
return False
2022-12-04 04:51:52 -06:00
if is_hostname_blocked(parsed.hostname):
2022-08-15 03:15:00 -05:00
logger.warning(f"{parsed.hostname} is blocked")
return False
2022-09-29 02:16:35 -05:00
if parsed.hostname.endswith(".onion"):
logger.warning(f"{url} is an onion service")
return False
2022-06-22 13:11:22 -05:00
ip_address = _getaddrinfo(
parsed.hostname, parsed.port or (80 if parsed.scheme == "http" else 443)
)
logger.debug(f"{ip_address=}")
if ipaddress.ip_address(ip_address).is_private:
logger.info(f"rejecting private URL {url} -> {ip_address}")
return False
return True
2022-07-15 13:55:37 -05:00
@functools.lru_cache(maxsize=512)
2022-07-15 13:50:27 -05:00
def check_url(url: str) -> None:
2022-06-22 13:11:22 -05:00
logger.debug(f"check_url {url=}")
if not is_url_valid(url):
raise InvalidURLError(f'"{url}" is invalid')
return None
2022-12-04 04:51:52 -06:00
@functools.lru_cache(maxsize=256)
def is_hostname_blocked(hostname: str) -> bool:
for blocked_hostname in BLOCKED_SERVERS:
if hostname == blocked_hostname or hostname.endswith(f".{blocked_hostname}"):
return True
return False