Add configurable rate limits for S3 API endpoints
This commit is contained in:
@@ -10,6 +10,23 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import psutil
|
||||
|
||||
|
||||
def _calculate_auto_threads() -> int:
|
||||
cpu_count = psutil.cpu_count(logical=True) or 4
|
||||
return max(1, min(cpu_count * 2, 64))
|
||||
|
||||
|
||||
def _calculate_auto_connection_limit() -> int:
|
||||
available_mb = psutil.virtual_memory().available / (1024 * 1024)
|
||||
calculated = int(available_mb / 5)
|
||||
return max(20, min(calculated, 1000))
|
||||
|
||||
|
||||
def _calculate_auto_backlog(connection_limit: int) -> int:
|
||||
return max(64, min(connection_limit * 2, 4096))
|
||||
|
||||
|
||||
def _validate_rate_limit(value: str) -> str:
|
||||
pattern = r"^\d+\s+per\s+(second|minute|hour|day)$"
|
||||
@@ -63,6 +80,10 @@ class AppConfig:
|
||||
log_backup_count: int
|
||||
ratelimit_default: str
|
||||
ratelimit_storage_uri: str
|
||||
ratelimit_list_buckets: str
|
||||
ratelimit_bucket_ops: str
|
||||
ratelimit_object_ops: str
|
||||
ratelimit_head_ops: str
|
||||
cors_origins: list[str]
|
||||
cors_methods: list[str]
|
||||
cors_allow_headers: list[str]
|
||||
@@ -94,6 +115,9 @@ class AppConfig:
|
||||
server_connection_limit: int
|
||||
server_backlog: int
|
||||
server_channel_timeout: int
|
||||
server_threads_auto: bool
|
||||
server_connection_limit_auto: bool
|
||||
server_backlog_auto: bool
|
||||
site_sync_enabled: bool
|
||||
site_sync_interval_seconds: int
|
||||
site_sync_batch_size: int
|
||||
@@ -171,6 +195,10 @@ class AppConfig:
|
||||
log_backup_count = int(_get("LOG_BACKUP_COUNT", 3))
|
||||
ratelimit_default = _validate_rate_limit(str(_get("RATE_LIMIT_DEFAULT", "200 per minute")))
|
||||
ratelimit_storage_uri = str(_get("RATE_LIMIT_STORAGE_URI", "memory://"))
|
||||
ratelimit_list_buckets = _validate_rate_limit(str(_get("RATE_LIMIT_LIST_BUCKETS", "60 per minute")))
|
||||
ratelimit_bucket_ops = _validate_rate_limit(str(_get("RATE_LIMIT_BUCKET_OPS", "120 per minute")))
|
||||
ratelimit_object_ops = _validate_rate_limit(str(_get("RATE_LIMIT_OBJECT_OPS", "240 per minute")))
|
||||
ratelimit_head_ops = _validate_rate_limit(str(_get("RATE_LIMIT_HEAD_OPS", "100 per minute")))
|
||||
|
||||
def _csv(value: str, default: list[str]) -> list[str]:
|
||||
if not value:
|
||||
@@ -200,9 +228,30 @@ class AppConfig:
|
||||
operation_metrics_interval_minutes = int(_get("OPERATION_METRICS_INTERVAL_MINUTES", 5))
|
||||
operation_metrics_retention_hours = int(_get("OPERATION_METRICS_RETENTION_HOURS", 24))
|
||||
|
||||
server_threads = int(_get("SERVER_THREADS", 4))
|
||||
server_connection_limit = int(_get("SERVER_CONNECTION_LIMIT", 100))
|
||||
server_backlog = int(_get("SERVER_BACKLOG", 1024))
|
||||
_raw_threads = int(_get("SERVER_THREADS", 0))
|
||||
if _raw_threads == 0:
|
||||
server_threads = _calculate_auto_threads()
|
||||
server_threads_auto = True
|
||||
else:
|
||||
server_threads = _raw_threads
|
||||
server_threads_auto = False
|
||||
|
||||
_raw_conn_limit = int(_get("SERVER_CONNECTION_LIMIT", 0))
|
||||
if _raw_conn_limit == 0:
|
||||
server_connection_limit = _calculate_auto_connection_limit()
|
||||
server_connection_limit_auto = True
|
||||
else:
|
||||
server_connection_limit = _raw_conn_limit
|
||||
server_connection_limit_auto = False
|
||||
|
||||
_raw_backlog = int(_get("SERVER_BACKLOG", 0))
|
||||
if _raw_backlog == 0:
|
||||
server_backlog = _calculate_auto_backlog(server_connection_limit)
|
||||
server_backlog_auto = True
|
||||
else:
|
||||
server_backlog = _raw_backlog
|
||||
server_backlog_auto = False
|
||||
|
||||
server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120))
|
||||
site_sync_enabled = str(_get("SITE_SYNC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
|
||||
site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60))
|
||||
@@ -225,6 +274,10 @@ class AppConfig:
|
||||
log_backup_count=log_backup_count,
|
||||
ratelimit_default=ratelimit_default,
|
||||
ratelimit_storage_uri=ratelimit_storage_uri,
|
||||
ratelimit_list_buckets=ratelimit_list_buckets,
|
||||
ratelimit_bucket_ops=ratelimit_bucket_ops,
|
||||
ratelimit_object_ops=ratelimit_object_ops,
|
||||
ratelimit_head_ops=ratelimit_head_ops,
|
||||
cors_origins=cors_origins,
|
||||
cors_methods=cors_methods,
|
||||
cors_allow_headers=cors_allow_headers,
|
||||
@@ -256,6 +309,9 @@ class AppConfig:
|
||||
server_connection_limit=server_connection_limit,
|
||||
server_backlog=server_backlog,
|
||||
server_channel_timeout=server_channel_timeout,
|
||||
server_threads_auto=server_threads_auto,
|
||||
server_connection_limit_auto=server_connection_limit_auto,
|
||||
server_backlog_auto=server_backlog_auto,
|
||||
site_sync_enabled=site_sync_enabled,
|
||||
site_sync_interval_seconds=site_sync_interval_seconds,
|
||||
site_sync_batch_size=site_sync_batch_size)
|
||||
@@ -364,9 +420,11 @@ class AppConfig:
|
||||
print(f" ENCRYPTION: Enabled (Master key: {self.encryption_master_key_path})")
|
||||
if self.kms_enabled:
|
||||
print(f" KMS: Enabled (Keys: {self.kms_keys_path})")
|
||||
print(f" SERVER_THREADS: {self.server_threads}")
|
||||
print(f" CONNECTION_LIMIT: {self.server_connection_limit}")
|
||||
print(f" BACKLOG: {self.server_backlog}")
|
||||
def _auto(flag: bool) -> str:
|
||||
return " (auto)" if flag else ""
|
||||
print(f" SERVER_THREADS: {self.server_threads}{_auto(self.server_threads_auto)}")
|
||||
print(f" CONNECTION_LIMIT: {self.server_connection_limit}{_auto(self.server_connection_limit_auto)}")
|
||||
print(f" BACKLOG: {self.server_backlog}{_auto(self.server_backlog_auto)}")
|
||||
print(f" CHANNEL_TIMEOUT: {self.server_channel_timeout}s")
|
||||
print("=" * 60)
|
||||
|
||||
@@ -406,6 +464,10 @@ class AppConfig:
|
||||
"LOG_BACKUP_COUNT": self.log_backup_count,
|
||||
"RATELIMIT_DEFAULT": self.ratelimit_default,
|
||||
"RATELIMIT_STORAGE_URI": self.ratelimit_storage_uri,
|
||||
"RATELIMIT_LIST_BUCKETS": self.ratelimit_list_buckets,
|
||||
"RATELIMIT_BUCKET_OPS": self.ratelimit_bucket_ops,
|
||||
"RATELIMIT_OBJECT_OPS": self.ratelimit_object_ops,
|
||||
"RATELIMIT_HEAD_OPS": self.ratelimit_head_ops,
|
||||
"CORS_ORIGINS": self.cors_origins,
|
||||
"CORS_METHODS": self.cors_methods,
|
||||
"CORS_ALLOW_HEADERS": self.cors_allow_headers,
|
||||
|
||||
@@ -82,6 +82,22 @@ def _access_logging() -> AccessLoggingService:
|
||||
return current_app.extensions["access_logging"]
|
||||
|
||||
|
||||
def _get_list_buckets_limit() -> str:
|
||||
return current_app.config.get("RATELIMIT_LIST_BUCKETS", "60 per minute")
|
||||
|
||||
|
||||
def _get_bucket_ops_limit() -> str:
|
||||
return current_app.config.get("RATELIMIT_BUCKET_OPS", "120 per minute")
|
||||
|
||||
|
||||
def _get_object_ops_limit() -> str:
|
||||
return current_app.config.get("RATELIMIT_OBJECT_OPS", "240 per minute")
|
||||
|
||||
|
||||
def _get_head_ops_limit() -> str:
|
||||
return current_app.config.get("RATELIMIT_HEAD_OPS", "100 per minute")
|
||||
|
||||
|
||||
def _xml_response(element: Element, status: int = 200) -> Response:
|
||||
xml_bytes = tostring(element, encoding="utf-8")
|
||||
return Response(xml_bytes, status=status, mimetype="application/xml")
|
||||
@@ -2143,7 +2159,7 @@ def _bulk_delete_handler(bucket_name: str) -> Response:
|
||||
|
||||
|
||||
@s3_api_bp.get("/")
|
||||
@limiter.limit("60 per minute")
|
||||
@limiter.limit(_get_list_buckets_limit)
|
||||
def list_buckets() -> Response:
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
@@ -2171,7 +2187,7 @@ def list_buckets() -> Response:
|
||||
|
||||
|
||||
@s3_api_bp.route("/<bucket_name>", methods=["PUT", "DELETE", "GET", "POST"], strict_slashes=False)
|
||||
@limiter.limit("120 per minute")
|
||||
@limiter.limit(_get_bucket_ops_limit)
|
||||
def bucket_handler(bucket_name: str) -> Response:
|
||||
storage = _storage()
|
||||
subresource_response = _maybe_handle_bucket_subresource(bucket_name)
|
||||
@@ -2363,7 +2379,7 @@ def bucket_handler(bucket_name: str) -> Response:
|
||||
|
||||
|
||||
@s3_api_bp.route("/<bucket_name>/<path:object_key>", methods=["PUT", "GET", "DELETE", "HEAD", "POST"], strict_slashes=False)
|
||||
@limiter.limit("240 per minute")
|
||||
@limiter.limit(_get_object_ops_limit)
|
||||
def object_handler(bucket_name: str, object_key: str):
|
||||
storage = _storage()
|
||||
|
||||
@@ -2681,7 +2697,7 @@ def _bucket_policy_handler(bucket_name: str) -> Response:
|
||||
|
||||
|
||||
@s3_api_bp.route("/<bucket_name>", methods=["HEAD"])
|
||||
@limiter.limit("100 per minute")
|
||||
@limiter.limit(_get_head_ops_limit)
|
||||
def head_bucket(bucket_name: str) -> Response:
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
@@ -2696,7 +2712,7 @@ def head_bucket(bucket_name: str) -> Response:
|
||||
|
||||
|
||||
@s3_api_bp.route("/<bucket_name>/<path:object_key>", methods=["HEAD"])
|
||||
@limiter.limit("100 per minute")
|
||||
@limiter.limit(_get_head_ops_limit)
|
||||
def head_object(bucket_name: str, object_key: str) -> Response:
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.2.3"
|
||||
APP_VERSION = "0.2.4"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
Reference in New Issue
Block a user