Add ALLOW_INTERNAL_ENDPOINTS config for self-hosted internal network deployments

This commit is contained in:
2026-02-01 18:26:14 +08:00
parent 9629507acd
commit 45d21cce21
5 changed files with 76 additions and 33 deletions

View File

@@ -17,21 +17,33 @@ from urllib.parse import urlparse
import requests
def _is_safe_url(url: str) -> bool:
"""Check if a URL is safe to make requests to (not internal/private)."""
def _is_safe_url(url: str, allow_internal: bool = False) -> bool:
"""Check if a URL is safe to make requests to (not internal/private).
Args:
url: The URL to check.
allow_internal: If True, allows internal/private IP addresses.
Use for self-hosted deployments on internal networks.
"""
try:
parsed = urlparse(url)
hostname = parsed.hostname
if not hostname:
return False
cloud_metadata_hosts = {
"metadata.google.internal",
"169.254.169.254",
}
if hostname.lower() in cloud_metadata_hosts:
return False
if allow_internal:
return True
blocked_hosts = {
"localhost",
"127.0.0.1",
"0.0.0.0",
"::1",
"[::1]",
"metadata.google.internal",
"169.254.169.254",
}
if hostname.lower() in blocked_hosts:
return False
@@ -197,8 +209,9 @@ class NotificationConfiguration:
class NotificationService:
def __init__(self, storage_root: Path, worker_count: int = 2):
def __init__(self, storage_root: Path, worker_count: int = 2, allow_internal_endpoints: bool = False):
self.storage_root = storage_root
self._allow_internal_endpoints = allow_internal_endpoints
self._configs: Dict[str, List[NotificationConfiguration]] = {}
self._queue: queue.Queue[tuple[NotificationEvent, WebhookDestination]] = queue.Queue()
self._workers: List[threading.Thread] = []
@@ -331,8 +344,8 @@ class NotificationService:
self._queue.task_done()
def _send_notification(self, event: NotificationEvent, destination: WebhookDestination) -> None:
if not _is_safe_url(destination.url):
raise RuntimeError(f"Blocked request to internal/private URL: {destination.url}")
if not _is_safe_url(destination.url, allow_internal=self._allow_internal_endpoints):
raise RuntimeError(f"Blocked request to cloud metadata service (SSRF protection): {destination.url}")
payload = event.to_s3_event()
headers = {"Content-Type": "application/json", **destination.headers}