From 704f79dc44c4902f69a8fe23be38175d540ac899 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 25 Jan 2026 20:15:38 +0800 Subject: [PATCH 01/15] Add configurable rate limits for S3 API endpoints --- app/config.py | 74 +++++++++++++++++++++++++++++++++++++++++---- app/s3_api.py | 26 +++++++++++++--- app/version.py | 2 +- docs.md | 10 ++++-- templates/docs.html | 32 ++++++++++++++++---- 5 files changed, 123 insertions(+), 21 deletions(-) diff --git a/app/config.py b/app/config.py index b39000f..e6815bd 100644 --- a/app/config.py +++ b/app/config.py @@ -10,6 +10,23 @@ from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional +import psutil + + +def _calculate_auto_threads() -> int: + cpu_count = psutil.cpu_count(logical=True) or 4 + return max(1, min(cpu_count * 2, 64)) + + +def _calculate_auto_connection_limit() -> int: + available_mb = psutil.virtual_memory().available / (1024 * 1024) + calculated = int(available_mb / 5) + return max(20, min(calculated, 1000)) + + +def _calculate_auto_backlog(connection_limit: int) -> int: + return max(64, min(connection_limit * 2, 4096)) + def _validate_rate_limit(value: str) -> str: pattern = r"^\d+\s+per\s+(second|minute|hour|day)$" @@ -63,6 +80,10 @@ class AppConfig: log_backup_count: int ratelimit_default: str ratelimit_storage_uri: str + ratelimit_list_buckets: str + ratelimit_bucket_ops: str + ratelimit_object_ops: str + ratelimit_head_ops: str cors_origins: list[str] cors_methods: list[str] cors_allow_headers: list[str] @@ -94,6 +115,9 @@ class AppConfig: server_connection_limit: int server_backlog: int server_channel_timeout: int + server_threads_auto: bool + server_connection_limit_auto: bool + server_backlog_auto: bool site_sync_enabled: bool site_sync_interval_seconds: int site_sync_batch_size: int @@ -171,6 +195,10 @@ class AppConfig: log_backup_count = int(_get("LOG_BACKUP_COUNT", 3)) ratelimit_default = _validate_rate_limit(str(_get("RATE_LIMIT_DEFAULT", "200 per minute"))) ratelimit_storage_uri = str(_get("RATE_LIMIT_STORAGE_URI", "memory://")) + ratelimit_list_buckets = _validate_rate_limit(str(_get("RATE_LIMIT_LIST_BUCKETS", "60 per minute"))) + ratelimit_bucket_ops = _validate_rate_limit(str(_get("RATE_LIMIT_BUCKET_OPS", "120 per minute"))) + ratelimit_object_ops = _validate_rate_limit(str(_get("RATE_LIMIT_OBJECT_OPS", "240 per minute"))) + ratelimit_head_ops = _validate_rate_limit(str(_get("RATE_LIMIT_HEAD_OPS", "100 per minute"))) def _csv(value: str, default: list[str]) -> list[str]: if not value: @@ -200,9 +228,30 @@ class AppConfig: operation_metrics_interval_minutes = int(_get("OPERATION_METRICS_INTERVAL_MINUTES", 5)) operation_metrics_retention_hours = int(_get("OPERATION_METRICS_RETENTION_HOURS", 24)) - server_threads = int(_get("SERVER_THREADS", 4)) - server_connection_limit = int(_get("SERVER_CONNECTION_LIMIT", 100)) - server_backlog = int(_get("SERVER_BACKLOG", 1024)) + _raw_threads = int(_get("SERVER_THREADS", 0)) + if _raw_threads == 0: + server_threads = _calculate_auto_threads() + server_threads_auto = True + else: + server_threads = _raw_threads + server_threads_auto = False + + _raw_conn_limit = int(_get("SERVER_CONNECTION_LIMIT", 0)) + if _raw_conn_limit == 0: + server_connection_limit = _calculate_auto_connection_limit() + server_connection_limit_auto = True + else: + server_connection_limit = _raw_conn_limit + server_connection_limit_auto = False + + _raw_backlog = int(_get("SERVER_BACKLOG", 0)) + if _raw_backlog == 0: + server_backlog = _calculate_auto_backlog(server_connection_limit) + server_backlog_auto = True + else: + server_backlog = _raw_backlog + server_backlog_auto = False + server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120)) site_sync_enabled = str(_get("SITE_SYNC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) @@ -225,6 +274,10 @@ class AppConfig: log_backup_count=log_backup_count, ratelimit_default=ratelimit_default, ratelimit_storage_uri=ratelimit_storage_uri, + ratelimit_list_buckets=ratelimit_list_buckets, + ratelimit_bucket_ops=ratelimit_bucket_ops, + ratelimit_object_ops=ratelimit_object_ops, + ratelimit_head_ops=ratelimit_head_ops, cors_origins=cors_origins, cors_methods=cors_methods, cors_allow_headers=cors_allow_headers, @@ -256,6 +309,9 @@ class AppConfig: server_connection_limit=server_connection_limit, server_backlog=server_backlog, server_channel_timeout=server_channel_timeout, + server_threads_auto=server_threads_auto, + server_connection_limit_auto=server_connection_limit_auto, + server_backlog_auto=server_backlog_auto, site_sync_enabled=site_sync_enabled, site_sync_interval_seconds=site_sync_interval_seconds, site_sync_batch_size=site_sync_batch_size) @@ -364,9 +420,11 @@ class AppConfig: print(f" ENCRYPTION: Enabled (Master key: {self.encryption_master_key_path})") if self.kms_enabled: print(f" KMS: Enabled (Keys: {self.kms_keys_path})") - print(f" SERVER_THREADS: {self.server_threads}") - print(f" CONNECTION_LIMIT: {self.server_connection_limit}") - print(f" BACKLOG: {self.server_backlog}") + def _auto(flag: bool) -> str: + return " (auto)" if flag else "" + print(f" SERVER_THREADS: {self.server_threads}{_auto(self.server_threads_auto)}") + print(f" CONNECTION_LIMIT: {self.server_connection_limit}{_auto(self.server_connection_limit_auto)}") + print(f" BACKLOG: {self.server_backlog}{_auto(self.server_backlog_auto)}") print(f" CHANNEL_TIMEOUT: {self.server_channel_timeout}s") print("=" * 60) @@ -406,6 +464,10 @@ class AppConfig: "LOG_BACKUP_COUNT": self.log_backup_count, "RATELIMIT_DEFAULT": self.ratelimit_default, "RATELIMIT_STORAGE_URI": self.ratelimit_storage_uri, + "RATELIMIT_LIST_BUCKETS": self.ratelimit_list_buckets, + "RATELIMIT_BUCKET_OPS": self.ratelimit_bucket_ops, + "RATELIMIT_OBJECT_OPS": self.ratelimit_object_ops, + "RATELIMIT_HEAD_OPS": self.ratelimit_head_ops, "CORS_ORIGINS": self.cors_origins, "CORS_METHODS": self.cors_methods, "CORS_ALLOW_HEADERS": self.cors_allow_headers, diff --git a/app/s3_api.py b/app/s3_api.py index 1f49e15..e9a11ee 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -82,6 +82,22 @@ def _access_logging() -> AccessLoggingService: return current_app.extensions["access_logging"] +def _get_list_buckets_limit() -> str: + return current_app.config.get("RATELIMIT_LIST_BUCKETS", "60 per minute") + + +def _get_bucket_ops_limit() -> str: + return current_app.config.get("RATELIMIT_BUCKET_OPS", "120 per minute") + + +def _get_object_ops_limit() -> str: + return current_app.config.get("RATELIMIT_OBJECT_OPS", "240 per minute") + + +def _get_head_ops_limit() -> str: + return current_app.config.get("RATELIMIT_HEAD_OPS", "100 per minute") + + def _xml_response(element: Element, status: int = 200) -> Response: xml_bytes = tostring(element, encoding="utf-8") return Response(xml_bytes, status=status, mimetype="application/xml") @@ -2143,7 +2159,7 @@ def _bulk_delete_handler(bucket_name: str) -> Response: @s3_api_bp.get("/") -@limiter.limit("60 per minute") +@limiter.limit(_get_list_buckets_limit) def list_buckets() -> Response: principal, error = _require_principal() if error: @@ -2171,7 +2187,7 @@ def list_buckets() -> Response: @s3_api_bp.route("/", methods=["PUT", "DELETE", "GET", "POST"], strict_slashes=False) -@limiter.limit("120 per minute") +@limiter.limit(_get_bucket_ops_limit) def bucket_handler(bucket_name: str) -> Response: storage = _storage() subresource_response = _maybe_handle_bucket_subresource(bucket_name) @@ -2363,7 +2379,7 @@ def bucket_handler(bucket_name: str) -> Response: @s3_api_bp.route("//", methods=["PUT", "GET", "DELETE", "HEAD", "POST"], strict_slashes=False) -@limiter.limit("240 per minute") +@limiter.limit(_get_object_ops_limit) def object_handler(bucket_name: str, object_key: str): storage = _storage() @@ -2681,7 +2697,7 @@ def _bucket_policy_handler(bucket_name: str) -> Response: @s3_api_bp.route("/", methods=["HEAD"]) -@limiter.limit("100 per minute") +@limiter.limit(_get_head_ops_limit) def head_bucket(bucket_name: str) -> Response: principal, error = _require_principal() if error: @@ -2696,7 +2712,7 @@ def head_bucket(bucket_name: str) -> Response: @s3_api_bp.route("//", methods=["HEAD"]) -@limiter.limit("100 per minute") +@limiter.limit(_get_head_ops_limit) def head_object(bucket_name: str, object_key: str) -> Response: principal, error = _require_principal() if error: diff --git a/app/version.py b/app/version.py index 998adc1..54f0689 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.2.3" +APP_VERSION = "0.2.4" def get_version() -> str: diff --git a/docs.md b/docs.md index 68cf69a..e2d89fa 100644 --- a/docs.md +++ b/docs.md @@ -166,15 +166,19 @@ All configuration is done via environment variables. The table below lists every | Variable | Default | Notes | | --- | --- | --- | | `RATE_LIMIT_DEFAULT` | `200 per minute` | Default rate limit for API endpoints. | +| `RATE_LIMIT_LIST_BUCKETS` | `60 per minute` | Rate limit for listing buckets (`GET /`). | +| `RATE_LIMIT_BUCKET_OPS` | `120 per minute` | Rate limit for bucket operations (PUT/DELETE/GET/POST on `/`). | +| `RATE_LIMIT_OBJECT_OPS` | `240 per minute` | Rate limit for object operations (PUT/GET/DELETE/POST on `//`). | +| `RATE_LIMIT_HEAD_OPS` | `100 per minute` | Rate limit for HEAD requests (bucket and object). | | `RATE_LIMIT_STORAGE_URI` | `memory://` | Storage backend for rate limits. Use `redis://host:port` for distributed setups. | ### Server Configuration | Variable | Default | Notes | | --- | --- | --- | -| `SERVER_THREADS` | `4` | Waitress worker threads (1-64). More threads handle more concurrent requests but use more memory. | -| `SERVER_CONNECTION_LIMIT` | `100` | Maximum concurrent connections (10-1000). Ensure OS file descriptor limits support this value. | -| `SERVER_BACKLOG` | `1024` | TCP listen backlog (64-4096). Connections queue here when all threads are busy. | +| `SERVER_THREADS` | `0` (auto) | Waitress worker threads (1-64). Set to `0` for auto-calculation based on CPU cores (×2). | +| `SERVER_CONNECTION_LIMIT` | `0` (auto) | Maximum concurrent connections (10-1000). Set to `0` for auto-calculation based on available RAM. | +| `SERVER_BACKLOG` | `0` (auto) | TCP listen backlog (64-4096). Set to `0` for auto-calculation (connection_limit × 2). | | `SERVER_CHANNEL_TIMEOUT` | `120` | Seconds before idle connections are closed (10-300). | ### Logging diff --git a/templates/docs.html b/templates/docs.html index 6287678..ce9c95c 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -157,23 +157,43 @@ python run.py --mode ui 200 per minute Default API rate limit. + + RATE_LIMIT_LIST_BUCKETS + 60 per minute + Rate limit for listing buckets. + + + RATE_LIMIT_BUCKET_OPS + 120 per minute + Rate limit for bucket operations. + + + RATE_LIMIT_OBJECT_OPS + 240 per minute + Rate limit for object operations. + + + RATE_LIMIT_HEAD_OPS + 100 per minute + Rate limit for HEAD requests. + Server Settings SERVER_THREADS - 4 - Waitress worker threads (1-64). + 0 (auto) + Waitress worker threads (1-64). 0 = auto (CPU cores × 2). SERVER_CONNECTION_LIMIT - 100 - Max concurrent connections (10-1000). + 0 (auto) + Max concurrent connections (10-1000). 0 = auto (RAM-based). SERVER_BACKLOG - 1024 - TCP listen backlog (64-4096). + 0 (auto) + TCP listen backlog (64-4096). 0 = auto (conn_limit × 2). SERVER_CHANNEL_TIMEOUT From 6e3d280a753b6a9f3ceb6ff5007886f9185c082c Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 25 Jan 2026 21:29:58 +0800 Subject: [PATCH 02/15] Add SlowDown error code tracking for 429 rate limit responses --- app/errors.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/app/errors.py b/app/errors.py index 7e5d711..b2a4079 100644 --- a/app/errors.py +++ b/app/errors.py @@ -6,6 +6,7 @@ from typing import Optional, Dict, Any from xml.etree.ElementTree import Element, SubElement, tostring from flask import Response, jsonify, request, flash, redirect, url_for, g +from flask_limiter import RateLimitExceeded logger = logging.getLogger(__name__) @@ -172,10 +173,22 @@ def handle_app_error(error: AppError) -> Response: return error.to_xml_response() +def handle_rate_limit_exceeded(e: RateLimitExceeded) -> Response: + g.s3_error_code = "SlowDown" + error = Element("Error") + SubElement(error, "Code").text = "SlowDown" + SubElement(error, "Message").text = "Please reduce your request rate." + SubElement(error, "Resource").text = request.path + SubElement(error, "RequestId").text = getattr(g, "request_id", "") + xml_bytes = tostring(error, encoding="utf-8") + return Response(xml_bytes, status=429, mimetype="application/xml") + + def register_error_handlers(app): """Register error handlers with a Flask app.""" app.register_error_handler(AppError, handle_app_error) - + app.register_error_handler(RateLimitExceeded, handle_rate_limit_exceeded) + for error_class in [ BucketNotFoundError, BucketAlreadyExistsError, BucketNotEmptyError, ObjectNotFoundError, InvalidObjectKeyError, From b32f1f94f76463439421fddcb9a9aee4c3ce3a33 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 25 Jan 2026 23:32:36 +0800 Subject: [PATCH 03/15] Add configurable env variables for hardcoded timeouts and limits --- README.md | 5 ++ app/__init__.py | 28 ++++++++++- app/config.py | 83 +++++++++++++++++++++++++++++- app/encryption.py | 3 +- app/kms.py | 16 ++++-- app/lifecycle.py | 17 ++++--- app/replication.py | 64 ++++++++++++++++------- app/s3_api.py | 14 ++++-- app/site_sync.py | 40 +++++++++++---- app/storage.py | 45 ++++++++++------- app/ui.py | 4 +- templates/docs.html | 109 ++++++++++++++++++++++++++++++++++++++++ tests/test_site_sync.py | 1 - 13 files changed, 362 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index f1e7951..d485351 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,11 @@ python run.py --mode ui # UI only (port 5100) | `ENCRYPTION_ENABLED` | `false` | Enable server-side encryption | | `KMS_ENABLED` | `false` | Enable Key Management Service | | `LOG_LEVEL` | `INFO` | Logging verbosity | +| `SIGV4_TIMESTAMP_TOLERANCE_SECONDS` | `900` | Max time skew for SigV4 requests | +| `PRESIGNED_URL_MAX_EXPIRY_SECONDS` | `604800` | Max presigned URL expiry (7 days) | +| `REPLICATION_CONNECT_TIMEOUT_SECONDS` | `5` | Replication connection timeout | +| `SITE_SYNC_ENABLED` | `false` | Enable bi-directional site sync | +| `OBJECT_TAG_LIMIT` | `50` | Maximum tags per object | ## Data Layout diff --git a/app/__init__.py b/app/__init__.py index 2968c03..6cafe1c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -104,6 +104,9 @@ def create_app( storage = ObjectStorage( Path(app.config["STORAGE_ROOT"]), cache_ttl=app.config.get("OBJECT_CACHE_TTL", 5), + object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100), + bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0), + object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024), ) if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"): @@ -137,12 +140,23 @@ def create_app( ) connections = ConnectionStore(connections_path) - replication = ReplicationManager(storage, connections, replication_rules_path, storage_root) + replication = ReplicationManager( + storage, + connections, + replication_rules_path, + storage_root, + connect_timeout=app.config.get("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5), + read_timeout=app.config.get("REPLICATION_READ_TIMEOUT_SECONDS", 30), + max_retries=app.config.get("REPLICATION_MAX_RETRIES", 2), + streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024), + max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50), + ) encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), "default_encryption_algorithm": app.config.get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256"), + "encryption_chunk_size_bytes": app.config.get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024), } encryption_manager = EncryptionManager(encryption_config) @@ -150,7 +164,12 @@ def create_app( if app.config.get("KMS_ENABLED", False): kms_keys_path = Path(app.config.get("KMS_KEYS_PATH", "")) kms_master_key_path = Path(app.config.get("ENCRYPTION_MASTER_KEY_PATH", "")) - kms_manager = KMSManager(kms_keys_path, kms_master_key_path) + kms_manager = KMSManager( + kms_keys_path, + kms_master_key_path, + generate_data_key_min_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1), + generate_data_key_max_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024), + ) encryption_manager.set_kms_provider(kms_manager) if app.config.get("ENCRYPTION_ENABLED", False): @@ -170,6 +189,7 @@ def create_app( base_storage, interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600), storage_root=storage_root, + max_history_per_bucket=app.config.get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50), ) lifecycle_manager.start() @@ -218,6 +238,10 @@ def create_app( storage_root=storage_root, interval_seconds=app.config.get("SITE_SYNC_INTERVAL_SECONDS", 60), batch_size=app.config.get("SITE_SYNC_BATCH_SIZE", 100), + connect_timeout=app.config.get("SITE_SYNC_CONNECT_TIMEOUT_SECONDS", 10), + read_timeout=app.config.get("SITE_SYNC_READ_TIMEOUT_SECONDS", 120), + max_retries=app.config.get("SITE_SYNC_MAX_RETRIES", 2), + clock_skew_tolerance_seconds=app.config.get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0), ) site_sync_worker.start() app.extensions["site_sync"] = site_sync_worker diff --git a/app/config.py b/app/config.py index e6815bd..7c40e40 100644 --- a/app/config.py +++ b/app/config.py @@ -121,6 +121,26 @@ class AppConfig: site_sync_enabled: bool site_sync_interval_seconds: int site_sync_batch_size: int + sigv4_timestamp_tolerance_seconds: int + presigned_url_min_expiry_seconds: int + presigned_url_max_expiry_seconds: int + replication_connect_timeout_seconds: int + replication_read_timeout_seconds: int + replication_max_retries: int + replication_streaming_threshold_bytes: int + replication_max_failures_per_bucket: int + site_sync_connect_timeout_seconds: int + site_sync_read_timeout_seconds: int + site_sync_max_retries: int + site_sync_clock_skew_tolerance_seconds: float + object_key_max_length_bytes: int + object_cache_max_size: int + bucket_config_cache_ttl_seconds: float + object_tag_limit: int + encryption_chunk_size_bytes: int + kms_generate_data_key_min_bytes: int + kms_generate_data_key_max_bytes: int + lifecycle_max_history_per_bucket: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -257,6 +277,27 @@ class AppConfig: site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) site_sync_batch_size = int(_get("SITE_SYNC_BATCH_SIZE", 100)) + sigv4_timestamp_tolerance_seconds = int(_get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900)) + presigned_url_min_expiry_seconds = int(_get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1)) + presigned_url_max_expiry_seconds = int(_get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800)) + replication_connect_timeout_seconds = int(_get("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5)) + replication_read_timeout_seconds = int(_get("REPLICATION_READ_TIMEOUT_SECONDS", 30)) + replication_max_retries = int(_get("REPLICATION_MAX_RETRIES", 2)) + replication_streaming_threshold_bytes = int(_get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024)) + replication_max_failures_per_bucket = int(_get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50)) + site_sync_connect_timeout_seconds = int(_get("SITE_SYNC_CONNECT_TIMEOUT_SECONDS", 10)) + site_sync_read_timeout_seconds = int(_get("SITE_SYNC_READ_TIMEOUT_SECONDS", 120)) + site_sync_max_retries = int(_get("SITE_SYNC_MAX_RETRIES", 2)) + site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0)) + object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024)) + object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100)) + bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0)) + object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50)) + encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024)) + kms_generate_data_key_min_bytes = int(_get("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1)) + kms_generate_data_key_max_bytes = int(_get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024)) + lifecycle_max_history_per_bucket = int(_get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50)) + return cls(storage_root=storage_root, max_upload_size=max_upload_size, ui_page_size=ui_page_size, @@ -314,7 +355,27 @@ class AppConfig: server_backlog_auto=server_backlog_auto, site_sync_enabled=site_sync_enabled, site_sync_interval_seconds=site_sync_interval_seconds, - site_sync_batch_size=site_sync_batch_size) + site_sync_batch_size=site_sync_batch_size, + sigv4_timestamp_tolerance_seconds=sigv4_timestamp_tolerance_seconds, + presigned_url_min_expiry_seconds=presigned_url_min_expiry_seconds, + presigned_url_max_expiry_seconds=presigned_url_max_expiry_seconds, + replication_connect_timeout_seconds=replication_connect_timeout_seconds, + replication_read_timeout_seconds=replication_read_timeout_seconds, + replication_max_retries=replication_max_retries, + replication_streaming_threshold_bytes=replication_streaming_threshold_bytes, + replication_max_failures_per_bucket=replication_max_failures_per_bucket, + site_sync_connect_timeout_seconds=site_sync_connect_timeout_seconds, + site_sync_read_timeout_seconds=site_sync_read_timeout_seconds, + site_sync_max_retries=site_sync_max_retries, + site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds, + object_key_max_length_bytes=object_key_max_length_bytes, + object_cache_max_size=object_cache_max_size, + bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds, + object_tag_limit=object_tag_limit, + encryption_chunk_size_bytes=encryption_chunk_size_bytes, + kms_generate_data_key_min_bytes=kms_generate_data_key_min_bytes, + kms_generate_data_key_max_bytes=kms_generate_data_key_max_bytes, + lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -494,4 +555,24 @@ class AppConfig: "SITE_SYNC_ENABLED": self.site_sync_enabled, "SITE_SYNC_INTERVAL_SECONDS": self.site_sync_interval_seconds, "SITE_SYNC_BATCH_SIZE": self.site_sync_batch_size, + "SIGV4_TIMESTAMP_TOLERANCE_SECONDS": self.sigv4_timestamp_tolerance_seconds, + "PRESIGNED_URL_MIN_EXPIRY_SECONDS": self.presigned_url_min_expiry_seconds, + "PRESIGNED_URL_MAX_EXPIRY_SECONDS": self.presigned_url_max_expiry_seconds, + "REPLICATION_CONNECT_TIMEOUT_SECONDS": self.replication_connect_timeout_seconds, + "REPLICATION_READ_TIMEOUT_SECONDS": self.replication_read_timeout_seconds, + "REPLICATION_MAX_RETRIES": self.replication_max_retries, + "REPLICATION_STREAMING_THRESHOLD_BYTES": self.replication_streaming_threshold_bytes, + "REPLICATION_MAX_FAILURES_PER_BUCKET": self.replication_max_failures_per_bucket, + "SITE_SYNC_CONNECT_TIMEOUT_SECONDS": self.site_sync_connect_timeout_seconds, + "SITE_SYNC_READ_TIMEOUT_SECONDS": self.site_sync_read_timeout_seconds, + "SITE_SYNC_MAX_RETRIES": self.site_sync_max_retries, + "SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds, + "OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes, + "OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size, + "BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds, + "OBJECT_TAG_LIMIT": self.object_tag_limit, + "ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes, + "KMS_GENERATE_DATA_KEY_MIN_BYTES": self.kms_generate_data_key_min_bytes, + "KMS_GENERATE_DATA_KEY_MAX_BYTES": self.kms_generate_data_key_max_bytes, + "LIFECYCLE_MAX_HISTORY_PER_BUCKET": self.lifecycle_max_history_per_bucket, } diff --git a/app/encryption.py b/app/encryption.py index 4b1d817..3cc18cc 100644 --- a/app/encryption.py +++ b/app/encryption.py @@ -310,7 +310,8 @@ class EncryptionManager: def get_streaming_encryptor(self) -> StreamingEncryptor: if self._streaming_encryptor is None: - self._streaming_encryptor = StreamingEncryptor(self.get_local_provider()) + chunk_size = self.config.get("encryption_chunk_size_bytes", 64 * 1024) + self._streaming_encryptor = StreamingEncryptor(self.get_local_provider(), chunk_size=chunk_size) return self._streaming_encryptor def encrypt_object(self, data: bytes, algorithm: str = "AES256", diff --git a/app/kms.py b/app/kms.py index 548e7ea..9326e2d 100644 --- a/app/kms.py +++ b/app/kms.py @@ -108,9 +108,17 @@ class KMSManager: Keys are stored encrypted on disk. """ - def __init__(self, keys_path: Path, master_key_path: Path): + def __init__( + self, + keys_path: Path, + master_key_path: Path, + generate_data_key_min_bytes: int = 1, + generate_data_key_max_bytes: int = 1024, + ): self.keys_path = keys_path self.master_key_path = master_key_path + self.generate_data_key_min_bytes = generate_data_key_min_bytes + self.generate_data_key_max_bytes = generate_data_key_max_bytes self._keys: Dict[str, KMSKey] = {} self._master_key: bytes | None = None self._loaded = False @@ -358,6 +366,8 @@ class KMSManager: def generate_random(self, num_bytes: int = 32) -> bytes: """Generate cryptographically secure random bytes.""" - if num_bytes < 1 or num_bytes > 1024: - raise EncryptionError("Number of bytes must be between 1 and 1024") + if num_bytes < self.generate_data_key_min_bytes or num_bytes > self.generate_data_key_max_bytes: + raise EncryptionError( + f"Number of bytes must be between {self.generate_data_key_min_bytes} and {self.generate_data_key_max_bytes}" + ) return secrets.token_bytes(num_bytes) diff --git a/app/lifecycle.py b/app/lifecycle.py index ed9eb2c..ea2c262 100644 --- a/app/lifecycle.py +++ b/app/lifecycle.py @@ -71,10 +71,9 @@ class LifecycleExecutionRecord: class LifecycleHistoryStore: - MAX_HISTORY_PER_BUCKET = 50 - - def __init__(self, storage_root: Path) -> None: + def __init__(self, storage_root: Path, max_history_per_bucket: int = 50) -> None: self.storage_root = storage_root + self.max_history_per_bucket = max_history_per_bucket self._lock = threading.Lock() def _get_history_path(self, bucket_name: str) -> Path: @@ -95,7 +94,7 @@ class LifecycleHistoryStore: def save_history(self, bucket_name: str, records: List[LifecycleExecutionRecord]) -> None: path = self._get_history_path(bucket_name) path.parent.mkdir(parents=True, exist_ok=True) - data = {"executions": [r.to_dict() for r in records[:self.MAX_HISTORY_PER_BUCKET]]} + data = {"executions": [r.to_dict() for r in records[:self.max_history_per_bucket]]} try: with open(path, "w") as f: json.dump(data, f, indent=2) @@ -114,14 +113,20 @@ class LifecycleHistoryStore: class LifecycleManager: - def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600, storage_root: Optional[Path] = None): + def __init__( + self, + storage: ObjectStorage, + interval_seconds: int = 3600, + storage_root: Optional[Path] = None, + max_history_per_bucket: int = 50, + ): self.storage = storage self.interval_seconds = interval_seconds self.storage_root = storage_root self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() - self.history_store = LifecycleHistoryStore(storage_root) if storage_root else None + self.history_store = LifecycleHistoryStore(storage_root, max_history_per_bucket) if storage_root else None def start(self) -> None: if self._timer is not None: diff --git a/app/replication.py b/app/replication.py index 9cab869..d8022ba 100644 --- a/app/replication.py +++ b/app/replication.py @@ -21,16 +21,20 @@ from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" -REPLICATION_CONNECT_TIMEOUT = 5 -REPLICATION_READ_TIMEOUT = 30 -STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" REPLICATION_MODE_BIDIRECTIONAL = "bidirectional" -def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: +def _create_s3_client( + connection: RemoteConnection, + *, + health_check: bool = False, + connect_timeout: int = 5, + read_timeout: int = 30, + max_retries: int = 2, +) -> Any: """Create a boto3 S3 client for the given connection. Args: connection: Remote S3 connection configuration @@ -38,9 +42,9 @@ def _create_s3_client(connection: RemoteConnection, *, health_check: bool = Fals """ config = Config( user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 1 if health_check else 2}, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries={'max_attempts': 1 if health_check else max_retries}, signature_version='s3v4', s3={'addressing_style': 'path'}, request_checksum_calculation='when_required', @@ -164,10 +168,9 @@ class ReplicationRule: class ReplicationFailureStore: - MAX_FAILURES_PER_BUCKET = 50 - - def __init__(self, storage_root: Path) -> None: + def __init__(self, storage_root: Path, max_failures_per_bucket: int = 50) -> None: self.storage_root = storage_root + self.max_failures_per_bucket = max_failures_per_bucket self._lock = threading.Lock() def _get_failures_path(self, bucket_name: str) -> Path: @@ -188,7 +191,7 @@ class ReplicationFailureStore: def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: path = self._get_failures_path(bucket_name) path.parent.mkdir(parents=True, exist_ok=True) - data = {"failures": [f.to_dict() for f in failures[:self.MAX_FAILURES_PER_BUCKET]]} + data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]} try: with open(path, "w") as f: json.dump(data, f, indent=2) @@ -233,18 +236,43 @@ class ReplicationFailureStore: class ReplicationManager: - def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path, storage_root: Path) -> None: + def __init__( + self, + storage: ObjectStorage, + connections: ConnectionStore, + rules_path: Path, + storage_root: Path, + connect_timeout: int = 5, + read_timeout: int = 30, + max_retries: int = 2, + streaming_threshold_bytes: int = 10 * 1024 * 1024, + max_failures_per_bucket: int = 50, + ) -> None: self.storage = storage self.connections = connections self.rules_path = rules_path self.storage_root = storage_root + self.connect_timeout = connect_timeout + self.read_timeout = read_timeout + self.max_retries = max_retries + self.streaming_threshold_bytes = streaming_threshold_bytes self._rules: Dict[str, ReplicationRule] = {} self._stats_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") self._shutdown = False - self.failure_store = ReplicationFailureStore(storage_root) + self.failure_store = ReplicationFailureStore(storage_root, max_failures_per_bucket) self.reload_rules() + def _create_client(self, connection: RemoteConnection, *, health_check: bool = False) -> Any: + """Create an S3 client with the manager's configured timeouts.""" + return _create_s3_client( + connection, + health_check=health_check, + connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, + max_retries=self.max_retries, + ) + def shutdown(self, wait: bool = True) -> None: """Shutdown the replication executor gracefully. @@ -280,7 +308,7 @@ class ReplicationManager: Uses short timeouts to prevent blocking. """ try: - s3 = _create_s3_client(connection, health_check=True) + s3 = self._create_client(connection, health_check=True) s3.list_buckets() return True except Exception as e: @@ -329,7 +357,7 @@ class ReplicationManager: source_objects = self.storage.list_objects_all(bucket_name) source_keys = {obj.key: obj.size for obj in source_objects} - s3 = _create_s3_client(connection) + s3 = self._create_client(connection) dest_keys = set() bytes_synced = 0 @@ -395,7 +423,7 @@ class ReplicationManager: raise ValueError(f"Connection {connection_id} not found") try: - s3 = _create_s3_client(connection) + s3 = self._create_client(connection) s3.create_bucket(Bucket=bucket_name) except ClientError as e: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") @@ -438,7 +466,7 @@ class ReplicationManager: return try: - s3 = _create_s3_client(conn) + s3 = self._create_client(conn) if action == "delete": try: @@ -481,7 +509,7 @@ class ReplicationManager: if content_type: extra_args["ContentType"] = content_type - if file_size >= STREAMING_THRESHOLD_BYTES: + if file_size >= self.streaming_threshold_bytes: s3.upload_file( str(path), rule.target_bucket, diff --git a/app/s3_api.py b/app/s3_api.py index e9a11ee..47251cd 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -239,7 +239,8 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: now = datetime.now(timezone.utc) time_diff = abs((now - request_time).total_seconds()) - if time_diff > 900: + tolerance = current_app.config.get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900) + if time_diff > tolerance: raise IamError("Request timestamp too old or too far in the future") required_headers = {'host', 'x-amz-date'} @@ -501,8 +502,10 @@ def _validate_presigned_request(action: str, bucket_name: str, object_key: str) expiry = int(expires) except ValueError as exc: raise IamError("Invalid expiration") from exc - if expiry < 1 or expiry > 7 * 24 * 3600: - raise IamError("Expiration must be between 1 second and 7 days") + min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) + max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) + if expiry < min_expiry or expiry > max_expiry: + raise IamError(f"Expiration must be between {min_expiry} second(s) and {max_expiry} seconds") try: request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) @@ -1055,8 +1058,9 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: tags = _parse_tagging_document(payload) except ValueError as exc: return _error_response("MalformedXML", str(exc), 400) - if len(tags) > 50: - return _error_response("InvalidTag", "A maximum of 50 tags is supported", 400) + tag_limit = current_app.config.get("OBJECT_TAG_LIMIT", 50) + if len(tags) > tag_limit: + return _error_response("InvalidTag", f"A maximum of {tag_limit} tags is supported", 400) try: storage.set_bucket_tags(bucket_name, tags) except StorageError as exc: diff --git a/app/site_sync.py b/app/site_sync.py index 306ac28..57cf185 100644 --- a/app/site_sync.py +++ b/app/site_sync.py @@ -22,9 +22,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) SITE_SYNC_USER_AGENT = "SiteSyncAgent/1.0" -SITE_SYNC_CONNECT_TIMEOUT = 10 -SITE_SYNC_READ_TIMEOUT = 120 -CLOCK_SKEW_TOLERANCE_SECONDS = 1.0 @dataclass @@ -108,12 +105,18 @@ class RemoteObjectMeta: ) -def _create_sync_client(connection: "RemoteConnection") -> Any: +def _create_sync_client( + connection: "RemoteConnection", + *, + connect_timeout: int = 10, + read_timeout: int = 120, + max_retries: int = 2, +) -> Any: config = Config( user_agent_extra=SITE_SYNC_USER_AGENT, - connect_timeout=SITE_SYNC_CONNECT_TIMEOUT, - read_timeout=SITE_SYNC_READ_TIMEOUT, - retries={"max_attempts": 2}, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries={"max_attempts": max_retries}, signature_version="s3v4", s3={"addressing_style": "path"}, request_checksum_calculation="when_required", @@ -138,6 +141,10 @@ class SiteSyncWorker: storage_root: Path, interval_seconds: int = 60, batch_size: int = 100, + connect_timeout: int = 10, + read_timeout: int = 120, + max_retries: int = 2, + clock_skew_tolerance_seconds: float = 1.0, ): self.storage = storage self.connections = connections @@ -145,11 +152,24 @@ class SiteSyncWorker: self.storage_root = storage_root self.interval_seconds = interval_seconds self.batch_size = batch_size + self.connect_timeout = connect_timeout + self.read_timeout = read_timeout + self.max_retries = max_retries + self.clock_skew_tolerance_seconds = clock_skew_tolerance_seconds self._lock = threading.Lock() self._shutdown = threading.Event() self._sync_thread: Optional[threading.Thread] = None self._bucket_stats: Dict[str, SiteSyncStats] = {} + def _create_client(self, connection: "RemoteConnection") -> Any: + """Create an S3 client with the worker's configured timeouts.""" + return _create_sync_client( + connection, + connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, + max_retries=self.max_retries, + ) + def start(self) -> None: if self._sync_thread is not None and self._sync_thread.is_alive(): return @@ -294,7 +314,7 @@ class SiteSyncWorker: return {obj.key: obj for obj in objects} def _list_remote_objects(self, rule: "ReplicationRule", connection: "RemoteConnection") -> Dict[str, RemoteObjectMeta]: - s3 = _create_sync_client(connection) + s3 = self._create_client(connection) result: Dict[str, RemoteObjectMeta] = {} paginator = s3.get_paginator("list_objects_v2") try: @@ -312,7 +332,7 @@ class SiteSyncWorker: local_ts = local_meta.last_modified.timestamp() remote_ts = remote_meta.last_modified.timestamp() - if abs(remote_ts - local_ts) < CLOCK_SKEW_TOLERANCE_SECONDS: + if abs(remote_ts - local_ts) < self.clock_skew_tolerance_seconds: local_etag = local_meta.etag or "" if remote_meta.etag == local_etag: return "skip" @@ -327,7 +347,7 @@ class SiteSyncWorker: connection: "RemoteConnection", remote_meta: RemoteObjectMeta, ) -> bool: - s3 = _create_sync_client(connection) + s3 = self._create_client(connection) tmp_path = None try: tmp_dir = self.storage_root / ".myfsio.sys" / "tmp" diff --git a/app/storage.py b/app/storage.py index 70488d0..ea0ec9b 100644 --- a/app/storage.py +++ b/app/storage.py @@ -137,10 +137,15 @@ class ObjectStorage: BUCKET_VERSIONS_DIR = "versions" MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" - DEFAULT_CACHE_TTL = 5 - OBJECT_CACHE_MAX_SIZE = 100 - def __init__(self, root: Path, cache_ttl: int = DEFAULT_CACHE_TTL) -> None: + def __init__( + self, + root: Path, + cache_ttl: int = 5, + object_cache_max_size: int = 100, + bucket_config_cache_ttl: float = 30.0, + object_key_max_length_bytes: int = 1024, + ) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() @@ -149,8 +154,10 @@ class ObjectStorage: self._bucket_locks: Dict[str, threading.Lock] = {} self._cache_version: Dict[str, int] = {} self._bucket_config_cache: Dict[str, tuple[dict[str, Any], float]] = {} - self._bucket_config_cache_ttl = 30.0 + self._bucket_config_cache_ttl = bucket_config_cache_ttl self._cache_ttl = cache_ttl + self._object_cache_max_size = object_cache_max_size + self._object_key_max_length_bytes = object_key_max_length_bytes def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: """Get or create a lock for a specific bucket. Reduces global lock contention.""" @@ -364,7 +371,7 @@ class ObjectStorage: raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) destination = bucket_path / safe_key destination.parent.mkdir(parents=True, exist_ok=True) @@ -439,7 +446,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): return {} - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return self._read_metadata(bucket_path.name, safe_key) or {} def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: @@ -487,7 +494,7 @@ class ObjectStorage: self._safe_unlink(target) self._delete_metadata(bucket_id, rel) else: - rel = self._sanitize_object_key(object_key) + rel = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) self._delete_metadata(bucket_id, rel) version_dir = self._version_dir(bucket_id, rel) if version_dir.exists(): @@ -696,7 +703,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) object_path = bucket_path / safe_key if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") @@ -719,7 +726,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) object_path = bucket_path / safe_key if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") @@ -758,7 +765,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) if not version_dir.exists(): version_dir = self._legacy_version_dir(bucket_id, safe_key) @@ -782,7 +789,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) data_path = version_dir / f"{version_id}.bin" meta_path = version_dir / f"{version_id}.json" @@ -819,7 +826,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) data_path = version_dir / f"{version_id}.bin" meta_path = version_dir / f"{version_id}.json" @@ -910,7 +917,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) upload_id = uuid.uuid4().hex upload_root = self._multipart_dir(bucket_id, upload_id) upload_root.mkdir(parents=True, exist_ok=False) @@ -1034,7 +1041,7 @@ class ObjectStorage: total_size += record.get("size", 0) validated.sort(key=lambda entry: entry[0]) - safe_key = self._sanitize_object_key(manifest["object_key"]) + safe_key = self._sanitize_object_key(manifest["object_key"], self._object_key_max_length_bytes) destination = bucket_path / safe_key is_overwrite = destination.exists() @@ -1213,7 +1220,7 @@ class ObjectStorage: def _object_path(self, bucket_name: str, object_key: str) -> Path: bucket_path = self._bucket_path(bucket_name) - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return bucket_path / safe_key def _system_root_path(self) -> Path: @@ -1429,7 +1436,7 @@ class ObjectStorage: current_version = self._cache_version.get(bucket_id, 0) if current_version != cache_version: objects = self._build_object_cache(bucket_path) - while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: + while len(self._object_cache) >= self._object_cache_max_size: self._object_cache.popitem(last=False) self._object_cache[bucket_id] = (objects, time.time()) @@ -1764,11 +1771,11 @@ class ObjectStorage: return name @staticmethod - def _sanitize_object_key(object_key: str) -> Path: + def _sanitize_object_key(object_key: str, max_length_bytes: int = 1024) -> Path: if not object_key: raise StorageError("Object key required") - if len(object_key.encode("utf-8")) > 1024: - raise StorageError("Object key exceeds maximum length of 1024 bytes") + if len(object_key.encode("utf-8")) > max_length_bytes: + raise StorageError(f"Object key exceeds maximum length of {max_length_bytes} bytes") if "\x00" in object_key: raise StorageError("Object key contains null bytes") if object_key.startswith(("/", "\\")): diff --git a/app/ui.py b/app/ui.py index 738521c..6634a19 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1091,7 +1091,9 @@ def object_presign(bucket_name: str, object_key: str): expires = int(payload.get("expires_in", 900)) except (TypeError, ValueError): return jsonify({"error": "expires_in must be an integer"}), 400 - expires = max(1, min(expires, 7 * 24 * 3600)) + min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) + max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) + expires = max(min_expiry, min(expires, max_expiry)) storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 diff --git a/templates/docs.html b/templates/docs.html index ce9c95c..1de6147 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -262,6 +262,115 @@ python run.py --mode ui 100 Max objects to pull per sync cycle. + + SITE_SYNC_CONNECT_TIMEOUT_SECONDS + 10 + Connection timeout for site sync (seconds). + + + SITE_SYNC_READ_TIMEOUT_SECONDS + 120 + Read timeout for site sync (seconds). + + + SITE_SYNC_MAX_RETRIES + 2 + Max retry attempts for site sync operations. + + + SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS + 1.0 + Clock skew tolerance for conflict resolution. + + + Replication Settings + + + REPLICATION_CONNECT_TIMEOUT_SECONDS + 5 + Connection timeout for replication (seconds). + + + REPLICATION_READ_TIMEOUT_SECONDS + 30 + Read timeout for replication (seconds). + + + REPLICATION_MAX_RETRIES + 2 + Max retry attempts for replication operations. + + + REPLICATION_STREAMING_THRESHOLD_BYTES + 10485760 + Objects larger than this use streaming upload (10 MB). + + + REPLICATION_MAX_FAILURES_PER_BUCKET + 50 + Max failure records to keep per bucket. + + + Security & Auth Settings + + + SIGV4_TIMESTAMP_TOLERANCE_SECONDS + 900 + Max time skew for SigV4 requests (15 minutes). + + + PRESIGNED_URL_MIN_EXPIRY_SECONDS + 1 + Minimum presigned URL expiry time. + + + PRESIGNED_URL_MAX_EXPIRY_SECONDS + 604800 + Maximum presigned URL expiry time (7 days). + + + Storage Limits + + + OBJECT_KEY_MAX_LENGTH_BYTES + 1024 + Maximum object key length in bytes. + + + OBJECT_CACHE_MAX_SIZE + 100 + Maximum number of objects in cache. + + + BUCKET_CONFIG_CACHE_TTL_SECONDS + 30 + Bucket config cache TTL in seconds. + + + OBJECT_TAG_LIMIT + 50 + Maximum number of tags per object. + + + LIFECYCLE_MAX_HISTORY_PER_BUCKET + 50 + Max lifecycle history records per bucket. + + + ENCRYPTION_CHUNK_SIZE_BYTES + 65536 + Chunk size for streaming encryption (64 KB). + + + KMS_GENERATE_DATA_KEY_MIN_BYTES + 1 + Minimum data key size for KMS generation. + + + KMS_GENERATE_DATA_KEY_MAX_BYTES + 1024 + Maximum data key size for KMS generation. + diff --git a/tests/test_site_sync.py b/tests/test_site_sync.py index 4975375..405d6e4 100644 --- a/tests/test_site_sync.py +++ b/tests/test_site_sync.py @@ -20,7 +20,6 @@ from app.site_sync import ( SyncedObjectInfo, SiteSyncStats, RemoteObjectMeta, - CLOCK_SKEW_TOLERANCE_SECONDS, ) from app.storage import ObjectStorage From 62c36f7a6ca5074498a0330bbc18e2afe411fcbf Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 26 Jan 2026 19:49:23 +0800 Subject: [PATCH 04/15] Add site registry UI and update documentation for geo-distribution --- app/__init__.py | 17 +- app/admin_api.py | 320 ++++++++++++++++++++++++++++++++ app/config.py | 25 ++- app/site_registry.py | 177 ++++++++++++++++++ app/ui.py | 228 +++++++++++++++++++++++ templates/base.html | 12 ++ templates/docs.html | 180 +++++++++++++++++- templates/sites.html | 432 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 1382 insertions(+), 9 deletions(-) create mode 100644 app/admin_api.py create mode 100644 app/site_registry.py create mode 100644 templates/sites.html diff --git a/app/__init__.py b/app/__init__.py index 6cafe1c..d301c83 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -31,6 +31,7 @@ from .notifications import NotificationService from .object_lock import ObjectLockService from .replication import ReplicationManager from .secret_store import EphemeralSecretStore +from .site_registry import SiteRegistry, SiteInfo from .storage import ObjectStorage from .version import get_version @@ -151,7 +152,17 @@ def create_app( streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024), max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50), ) - + + site_registry_path = config_dir / "site_registry.json" + site_registry = SiteRegistry(site_registry_path) + if app.config.get("SITE_ID") and not site_registry.get_local_site(): + site_registry.set_local_site(SiteInfo( + site_id=app.config["SITE_ID"], + endpoint=app.config.get("SITE_ENDPOINT") or "", + region=app.config.get("SITE_REGION", "us-east-1"), + priority=app.config.get("SITE_PRIORITY", 100), + )) + encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), @@ -207,6 +218,7 @@ def create_app( app.extensions["object_lock"] = object_lock_service app.extensions["notifications"] = notification_service app.extensions["access_logging"] = access_logging_service + app.extensions["site_registry"] = site_registry operation_metrics_collector = None if app.config.get("OPERATION_METRICS_ENABLED", False): @@ -313,11 +325,14 @@ def create_app( if include_api: from .s3_api import s3_api_bp from .kms_api import kms_api_bp + from .admin_api import admin_api_bp app.register_blueprint(s3_api_bp) app.register_blueprint(kms_api_bp) + app.register_blueprint(admin_api_bp) csrf.exempt(s3_api_bp) csrf.exempt(kms_api_bp) + csrf.exempt(admin_api_bp) if include_ui: from .ui import ui_bp diff --git a/app/admin_api.py b/app/admin_api.py new file mode 100644 index 0000000..48c18a3 --- /dev/null +++ b/app/admin_api.py @@ -0,0 +1,320 @@ +from __future__ import annotations + +import logging +import time +from typing import Any, Dict, Optional, Tuple + +from flask import Blueprint, Response, current_app, jsonify, request + +from .connections import ConnectionStore +from .extensions import limiter +from .iam import IamError, Principal +from .replication import ReplicationManager +from .site_registry import PeerSite, SiteInfo, SiteRegistry + +logger = logging.getLogger(__name__) + +admin_api_bp = Blueprint("admin_api", __name__, url_prefix="/admin") + + +def _require_principal() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: + from .s3_api import _require_principal as s3_require_principal + return s3_require_principal() + + +def _require_admin() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: + principal, error = _require_principal() + if error: + return None, error + + try: + _iam().authorize(principal, None, "iam:*") + return principal, None + except IamError: + return None, _json_error("AccessDenied", "Admin access required", 403) + + +def _site_registry() -> SiteRegistry: + return current_app.extensions["site_registry"] + + +def _connections() -> ConnectionStore: + return current_app.extensions["connections"] + + +def _replication() -> ReplicationManager: + return current_app.extensions["replication"] + + +def _iam(): + return current_app.extensions["iam"] + + +def _json_error(code: str, message: str, status: int) -> Tuple[Dict[str, Any], int]: + return {"error": {"code": code, "message": message}}, status + + +def _get_admin_rate_limit() -> str: + return current_app.config.get("RATE_LIMIT_ADMIN", "60 per minute") + + +@admin_api_bp.route("/site", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_local_site(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local_site = registry.get_local_site() + + if local_site: + return jsonify(local_site.to_dict()) + + config_site_id = current_app.config.get("SITE_ID") + config_endpoint = current_app.config.get("SITE_ENDPOINT") + + if config_site_id: + return jsonify({ + "site_id": config_site_id, + "endpoint": config_endpoint or "", + "region": current_app.config.get("SITE_REGION", "us-east-1"), + "priority": current_app.config.get("SITE_PRIORITY", 100), + "display_name": config_site_id, + "source": "environment", + }) + + return _json_error("NotFound", "Local site not configured", 404) + + +@admin_api_bp.route("/site", methods=["PUT"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def update_local_site(): + principal, error = _require_admin() + if error: + return error + + payload = request.get_json(silent=True) or {} + + site_id = payload.get("site_id") + endpoint = payload.get("endpoint") + + if not site_id: + return _json_error("ValidationError", "site_id is required", 400) + + registry = _site_registry() + existing = registry.get_local_site() + + site = SiteInfo( + site_id=site_id, + endpoint=endpoint or "", + region=payload.get("region", "us-east-1"), + priority=payload.get("priority", 100), + display_name=payload.get("display_name", site_id), + created_at=existing.created_at if existing else None, + ) + + registry.set_local_site(site) + + logger.info("Local site updated", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(site.to_dict()) + + +@admin_api_bp.route("/sites", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def list_all_sites(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local = registry.get_local_site() + peers = registry.list_peers() + + result = { + "local": local.to_dict() if local else None, + "peers": [peer.to_dict() for peer in peers], + "total_peers": len(peers), + } + + return jsonify(result) + + +@admin_api_bp.route("/sites", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def register_peer_site(): + principal, error = _require_admin() + if error: + return error + + payload = request.get_json(silent=True) or {} + + site_id = payload.get("site_id") + endpoint = payload.get("endpoint") + + if not site_id: + return _json_error("ValidationError", "site_id is required", 400) + if not endpoint: + return _json_error("ValidationError", "endpoint is required", 400) + + registry = _site_registry() + + if registry.get_peer(site_id): + return _json_error("AlreadyExists", f"Peer site '{site_id}' already exists", 409) + + connection_id = payload.get("connection_id") + if connection_id: + if not _connections().get(connection_id): + return _json_error("ValidationError", f"Connection '{connection_id}' not found", 400) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=payload.get("region", "us-east-1"), + priority=payload.get("priority", 100), + display_name=payload.get("display_name", site_id), + connection_id=connection_id, + ) + + registry.add_peer(peer) + + logger.info("Peer site registered", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(peer.to_dict()), 201 + + +@admin_api_bp.route("/sites/", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + return jsonify(peer.to_dict()) + + +@admin_api_bp.route("/sites/", methods=["PUT"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def update_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + existing = registry.get_peer(site_id) + + if not existing: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + payload = request.get_json(silent=True) or {} + + peer = PeerSite( + site_id=site_id, + endpoint=payload.get("endpoint", existing.endpoint), + region=payload.get("region", existing.region), + priority=payload.get("priority", existing.priority), + display_name=payload.get("display_name", existing.display_name), + connection_id=payload.get("connection_id", existing.connection_id), + created_at=existing.created_at, + is_healthy=existing.is_healthy, + last_health_check=existing.last_health_check, + ) + + registry.update_peer(peer) + + logger.info("Peer site updated", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(peer.to_dict()) + + +@admin_api_bp.route("/sites/", methods=["DELETE"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def delete_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + + if not registry.delete_peer(site_id): + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + logger.info("Peer site deleted", extra={"site_id": site_id, "principal": principal.access_key}) + return Response(status=204) + + +@admin_api_bp.route("/sites//health", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def check_peer_health(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + is_healthy = False + error_message = None + + if peer.connection_id: + connection = _connections().get(peer.connection_id) + if connection: + is_healthy = _replication().check_endpoint_health(connection) + else: + error_message = f"Connection '{peer.connection_id}' not found" + else: + error_message = "No connection configured for this peer" + + registry.update_health(site_id, is_healthy) + + result = { + "site_id": site_id, + "is_healthy": is_healthy, + "checked_at": time.time(), + } + if error_message: + result["error"] = error_message + + return jsonify(result) + + +@admin_api_bp.route("/topology", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_topology(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local = registry.get_local_site() + peers = registry.list_peers() + + sites = [] + + if local: + sites.append({ + **local.to_dict(), + "is_local": True, + "is_healthy": True, + }) + + for peer in peers: + sites.append({ + **peer.to_dict(), + "is_local": False, + }) + + sites.sort(key=lambda s: s.get("priority", 100)) + + return jsonify({ + "sites": sites, + "total": len(sites), + "healthy_count": sum(1 for s in sites if s.get("is_healthy")), + }) diff --git a/app/config.py b/app/config.py index 7c40e40..8b779a6 100644 --- a/app/config.py +++ b/app/config.py @@ -141,6 +141,11 @@ class AppConfig: kms_generate_data_key_min_bytes: int kms_generate_data_key_max_bytes: int lifecycle_max_history_per_bucket: int + site_id: Optional[str] + site_endpoint: Optional[str] + site_region: str + site_priority: int + ratelimit_admin: str @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -298,6 +303,14 @@ class AppConfig: kms_generate_data_key_max_bytes = int(_get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024)) lifecycle_max_history_per_bucket = int(_get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50)) + site_id_raw = _get("SITE_ID", None) + site_id = str(site_id_raw).strip() if site_id_raw else None + site_endpoint_raw = _get("SITE_ENDPOINT", None) + site_endpoint = str(site_endpoint_raw).strip() if site_endpoint_raw else None + site_region = str(_get("SITE_REGION", "us-east-1")) + site_priority = int(_get("SITE_PRIORITY", 100)) + ratelimit_admin = _validate_rate_limit(str(_get("RATE_LIMIT_ADMIN", "60 per minute"))) + return cls(storage_root=storage_root, max_upload_size=max_upload_size, ui_page_size=ui_page_size, @@ -375,7 +388,12 @@ class AppConfig: encryption_chunk_size_bytes=encryption_chunk_size_bytes, kms_generate_data_key_min_bytes=kms_generate_data_key_min_bytes, kms_generate_data_key_max_bytes=kms_generate_data_key_max_bytes, - lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket) + lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket, + site_id=site_id, + site_endpoint=site_endpoint, + site_region=site_region, + site_priority=site_priority, + ratelimit_admin=ratelimit_admin) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -575,4 +593,9 @@ class AppConfig: "KMS_GENERATE_DATA_KEY_MIN_BYTES": self.kms_generate_data_key_min_bytes, "KMS_GENERATE_DATA_KEY_MAX_BYTES": self.kms_generate_data_key_max_bytes, "LIFECYCLE_MAX_HISTORY_PER_BUCKET": self.lifecycle_max_history_per_bucket, + "SITE_ID": self.site_id, + "SITE_ENDPOINT": self.site_endpoint, + "SITE_REGION": self.site_region, + "SITE_PRIORITY": self.site_priority, + "RATE_LIMIT_ADMIN": self.ratelimit_admin, } diff --git a/app/site_registry.py b/app/site_registry.py new file mode 100644 index 0000000..b257326 --- /dev/null +++ b/app/site_registry.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + + +@dataclass +class SiteInfo: + site_id: str + endpoint: str + region: str = "us-east-1" + priority: int = 100 + display_name: str = "" + created_at: Optional[float] = None + updated_at: Optional[float] = None + + def __post_init__(self) -> None: + if not self.display_name: + self.display_name = self.site_id + if self.created_at is None: + self.created_at = time.time() + + def to_dict(self) -> Dict[str, Any]: + return { + "site_id": self.site_id, + "endpoint": self.endpoint, + "region": self.region, + "priority": self.priority, + "display_name": self.display_name, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> SiteInfo: + return cls( + site_id=data["site_id"], + endpoint=data.get("endpoint", ""), + region=data.get("region", "us-east-1"), + priority=data.get("priority", 100), + display_name=data.get("display_name", ""), + created_at=data.get("created_at"), + updated_at=data.get("updated_at"), + ) + + +@dataclass +class PeerSite: + site_id: str + endpoint: str + region: str = "us-east-1" + priority: int = 100 + display_name: str = "" + created_at: Optional[float] = None + updated_at: Optional[float] = None + connection_id: Optional[str] = None + is_healthy: Optional[bool] = None + last_health_check: Optional[float] = None + + def __post_init__(self) -> None: + if not self.display_name: + self.display_name = self.site_id + if self.created_at is None: + self.created_at = time.time() + + def to_dict(self) -> Dict[str, Any]: + return { + "site_id": self.site_id, + "endpoint": self.endpoint, + "region": self.region, + "priority": self.priority, + "display_name": self.display_name, + "created_at": self.created_at, + "updated_at": self.updated_at, + "connection_id": self.connection_id, + "is_healthy": self.is_healthy, + "last_health_check": self.last_health_check, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> PeerSite: + return cls( + site_id=data["site_id"], + endpoint=data.get("endpoint", ""), + region=data.get("region", "us-east-1"), + priority=data.get("priority", 100), + display_name=data.get("display_name", ""), + created_at=data.get("created_at"), + updated_at=data.get("updated_at"), + connection_id=data.get("connection_id"), + is_healthy=data.get("is_healthy"), + last_health_check=data.get("last_health_check"), + ) + + +class SiteRegistry: + def __init__(self, config_path: Path) -> None: + self.config_path = config_path + self._local_site: Optional[SiteInfo] = None + self._peers: Dict[str, PeerSite] = {} + self.reload() + + def reload(self) -> None: + if not self.config_path.exists(): + self._local_site = None + self._peers = {} + return + + try: + with open(self.config_path, "r", encoding="utf-8") as f: + data = json.load(f) + + if data.get("local"): + self._local_site = SiteInfo.from_dict(data["local"]) + else: + self._local_site = None + + self._peers = {} + for peer_data in data.get("peers", []): + peer = PeerSite.from_dict(peer_data) + self._peers[peer.site_id] = peer + + except (OSError, json.JSONDecodeError, KeyError): + self._local_site = None + self._peers = {} + + def save(self) -> None: + self.config_path.parent.mkdir(parents=True, exist_ok=True) + data = { + "local": self._local_site.to_dict() if self._local_site else None, + "peers": [peer.to_dict() for peer in self._peers.values()], + } + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + def get_local_site(self) -> Optional[SiteInfo]: + return self._local_site + + def set_local_site(self, site: SiteInfo) -> None: + site.updated_at = time.time() + self._local_site = site + self.save() + + def list_peers(self) -> List[PeerSite]: + return list(self._peers.values()) + + def get_peer(self, site_id: str) -> Optional[PeerSite]: + return self._peers.get(site_id) + + def add_peer(self, peer: PeerSite) -> None: + peer.created_at = peer.created_at or time.time() + self._peers[peer.site_id] = peer + self.save() + + def update_peer(self, peer: PeerSite) -> None: + if peer.site_id not in self._peers: + raise ValueError(f"Peer {peer.site_id} not found") + peer.updated_at = time.time() + self._peers[peer.site_id] = peer + self.save() + + def delete_peer(self, site_id: str) -> bool: + if site_id in self._peers: + del self._peers[site_id] + self.save() + return True + return False + + def update_health(self, site_id: str, is_healthy: bool) -> None: + peer = self._peers.get(site_id) + if peer: + peer.is_healthy = is_healthy + peer.last_health_check = time.time() + self.save() diff --git a/app/ui.py b/app/ui.py index 6634a19..172295d 100644 --- a/app/ui.py +++ b/app/ui.py @@ -38,6 +38,7 @@ from .kms import KMSManager from .replication import ReplicationManager, ReplicationRule from .s3_api import _generate_presigned_url from .secret_store import EphemeralSecretStore +from .site_registry import SiteRegistry, SiteInfo, PeerSite from .storage import ObjectStorage, StorageError ui_bp = Blueprint("ui", __name__, template_folder="../templates", url_prefix="/ui") @@ -145,6 +146,10 @@ def _operation_metrics(): return current_app.extensions.get("operation_metrics") +def _site_registry() -> SiteRegistry: + return current_app.extensions["site_registry"] + + def _format_bytes(num: int) -> str: step = 1024 units = ["B", "KB", "MB", "GB", "TB", "PB"] @@ -2663,6 +2668,229 @@ def list_buckets_for_copy(bucket_name: str): return jsonify({"buckets": allowed}) +@ui_bp.get("/sites") +def sites_dashboard(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied: Site management requires admin permissions", "danger") + return redirect(url_for("ui.buckets_overview")) + + registry = _site_registry() + local_site = registry.get_local_site() + peers = registry.list_peers() + connections = _connections().list() + + return render_template( + "sites.html", + principal=principal, + local_site=local_site, + peers=peers, + connections=connections, + config_site_id=current_app.config.get("SITE_ID"), + config_site_endpoint=current_app.config.get("SITE_ENDPOINT"), + config_site_region=current_app.config.get("SITE_REGION", "us-east-1"), + ) + + +@ui_bp.post("/sites/local") +def update_local_site(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + site_id = request.form.get("site_id", "").strip() + endpoint = request.form.get("endpoint", "").strip() + region = request.form.get("region", "us-east-1").strip() + priority = request.form.get("priority", "100") + display_name = request.form.get("display_name", "").strip() + + if not site_id: + flash("Site ID is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + + try: + priority_int = int(priority) + except ValueError: + priority_int = 100 + + registry = _site_registry() + existing = registry.get_local_site() + + site = SiteInfo( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + created_at=existing.created_at if existing else None, + ) + registry.set_local_site(site) + + flash("Local site configuration updated", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers") +def add_peer_site(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + site_id = request.form.get("site_id", "").strip() + endpoint = request.form.get("endpoint", "").strip() + region = request.form.get("region", "us-east-1").strip() + priority = request.form.get("priority", "100") + display_name = request.form.get("display_name", "").strip() + connection_id = request.form.get("connection_id", "").strip() or None + + if not site_id: + flash("Site ID is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + if not endpoint: + flash("Endpoint is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + + try: + priority_int = int(priority) + except ValueError: + priority_int = 100 + + registry = _site_registry() + + if registry.get_peer(site_id): + flash(f"Peer site '{site_id}' already exists", "danger") + return redirect(url_for("ui.sites_dashboard")) + + if connection_id and not _connections().get(connection_id): + flash(f"Connection '{connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + connection_id=connection_id, + ) + registry.add_peer(peer) + + flash(f"Peer site '{site_id}' added", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers//update") +def update_peer_site(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + existing = registry.get_peer(site_id) + + if not existing: + flash(f"Peer site '{site_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + endpoint = request.form.get("endpoint", existing.endpoint).strip() + region = request.form.get("region", existing.region).strip() + priority = request.form.get("priority", str(existing.priority)) + display_name = request.form.get("display_name", existing.display_name).strip() + connection_id = request.form.get("connection_id", "").strip() or existing.connection_id + + try: + priority_int = int(priority) + except ValueError: + priority_int = existing.priority + + if connection_id and not _connections().get(connection_id): + flash(f"Connection '{connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + connection_id=connection_id, + created_at=existing.created_at, + is_healthy=existing.is_healthy, + last_health_check=existing.last_health_check, + ) + registry.update_peer(peer) + + flash(f"Peer site '{site_id}' updated", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers//delete") +def delete_peer_site(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + if registry.delete_peer(site_id): + flash(f"Peer site '{site_id}' deleted", "success") + else: + flash(f"Peer site '{site_id}' not found", "danger") + + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.get("/sites/peers//health") +def check_peer_site_health(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return jsonify({"error": f"Peer site '{site_id}' not found"}), 404 + + is_healthy = False + error_message = None + + if peer.connection_id: + connection = _connections().get(peer.connection_id) + if connection: + is_healthy = _replication().check_endpoint_health(connection) + else: + error_message = f"Connection '{peer.connection_id}' not found" + else: + error_message = "No connection configured for this peer" + + registry.update_health(site_id, is_healthy) + + result = { + "site_id": site_id, + "is_healthy": is_healthy, + } + if error_message: + result["error"] = error_message + + return jsonify(result) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/templates/base.html b/templates/base.html index 146acc7..7cd24be 100644 --- a/templates/base.html +++ b/templates/base.html @@ -94,6 +94,12 @@ Metrics + + + + + Sites + {% endif %} -
+
09 +

Site Registry

+
+

Track cluster membership and site identity for geo-distributed deployments. The site registry stores local site identity and peer site information.

+ +

Connections vs Sites

+

Understanding the difference between Connections and Sites is key to configuring geo-distribution:

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
AspectConnectionsSites
PurposeStore credentials to authenticate with remote S3 endpointsTrack cluster membership and site identity
ContainsEndpoint URL, access key, secret key, regionSite ID, endpoint, region, priority, display name
Used byReplication rules, site sync workersGeo-distribution awareness, cluster topology
Analogy"How do I log in to that server?""Who are the members of my cluster?"
+
+

Sites can optionally link to a Connection (via connection_id) to perform health checks against peer sites.

+ +

Configuration

+

Set environment variables to bootstrap local site identity on startup:

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
VariableDefaultDescription
SITE_IDNoneUnique identifier for this site (e.g., us-west-1)
SITE_ENDPOINTNonePublic URL for this site (e.g., https://s3.us-west-1.example.com)
SITE_REGIONus-east-1AWS-style region identifier
SITE_PRIORITY100Routing priority (lower = preferred)
+
+
# Example: Configure site identity
+export SITE_ID=us-west-1
+export SITE_ENDPOINT=https://s3.us-west-1.example.com
+export SITE_REGION=us-west-1
+export SITE_PRIORITY=100
+python run.py
+ +

Using the Sites UI

+

Navigate to Sites in the sidebar to manage site configuration:

+
+
+
+
Local Site Identity
+
+
    +
  • Configure this site's ID, endpoint, region, and priority
  • +
  • Display name for easier identification
  • +
  • Changes persist to site_registry.json
  • +
+
+
+
+
+
+
Peer Sites
+
+
    +
  • Register remote sites in your cluster
  • +
  • Link to a Connection for health checks
  • +
  • View health status (green/red/unknown)
  • +
  • Edit or delete peers as needed
  • +
+
+
+
+
+ +

Admin API Endpoints

+

The /admin API provides programmatic access to site registry:

+
# Get local site configuration
+curl {{ api_base }}/admin/site \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Update local site
+curl -X PUT {{ api_base }}/admin/site \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"site_id": "us-west-1", "endpoint": "https://s3.example.com", "region": "us-west-1"}'
+
+# List all peer sites
+curl {{ api_base }}/admin/sites \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Add a peer site
+curl -X POST {{ api_base }}/admin/sites \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"site_id": "us-east-1", "endpoint": "https://s3.us-east-1.example.com"}'
+
+# Check peer health
+curl {{ api_base }}/admin/sites/us-east-1/health \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Get cluster topology
+curl {{ api_base }}/admin/topology \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+ +

Storage Location

+

Site registry data is stored at:

+ data/.myfsio.sys/config/site_registry.json + +
+
+ + + + +
+ Planned: The site registry lays the groundwork for features like automatic failover, intelligent routing, and multi-site consistency. Currently it provides cluster awareness and health monitoring. +
+
+
+
+
+
+
+
+ 10

Object Versioning

Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.

@@ -1046,7 +1211,7 @@ curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
- 10 + 11

Bucket Quotas

Limit how much data a bucket can hold using storage quotas. Quotas are enforced on uploads and multipart completions.

@@ -1114,7 +1279,7 @@ curl -X PUT "{{ api_base }}/bucket/<bucket>?quota" \
- 11 + 12

Encryption

Protect data at rest with server-side encryption using AES-256-GCM. Objects are encrypted before being written to disk and decrypted transparently on read.

@@ -1208,7 +1373,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
- 12 + 13

Lifecycle Rules

Automatically delete expired objects, clean up old versions, and abort incomplete multipart uploads using time-based lifecycle rules.

@@ -1290,7 +1455,7 @@ curl "{{ api_base }}/<bucket>?lifecycle" \
- 13 + 14

Metrics History

Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.

@@ -1374,7 +1539,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
- 14 + 15

Operation Metrics

Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.

@@ -1481,7 +1646,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
- 15 + 16

Troubleshooting & tips

@@ -1543,6 +1708,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
  • REST endpoints
  • API Examples
  • Site Replication & Sync
  • +
  • Site Registry
  • Object Versioning
  • Bucket Quotas
  • Encryption
  • diff --git a/templates/sites.html b/templates/sites.html new file mode 100644 index 0000000..608a777 --- /dev/null +++ b/templates/sites.html @@ -0,0 +1,432 @@ +{% extends "base.html" %} + +{% block title %}Sites - S3 Compatible Storage{% endblock %} + +{% block content %} + + +
    +
    +
    +
    +
    + + + + Local Site Identity +
    +

    This site's configuration

    +
    +
    +
    + +
    + + +
    Unique identifier for this site
    +
    +
    + + +
    Public URL for this site
    +
    +
    + + +
    +
    +
    + + +
    Lower = preferred
    +
    +
    + + +
    +
    +
    + +
    +
    +
    +
    + +
    +
    +
    + + + + Add Peer Site +
    +

    Register a remote site

    +
    +
    +
    + +
    + + +
    +
    + + +
    +
    + + +
    +
    +
    + + +
    +
    + + +
    +
    +
    + + +
    Link to a remote connection for health checks
    +
    +
    + +
    +
    +
    +
    +
    + +
    +
    +
    +
    +
    + + + + Peer Sites +
    +

    Known remote sites in the cluster

    +
    +
    +
    + {% if peers %} +
    + + + + + + + + + + + + + {% for peer in peers %} + + + + + + + + + {% endfor %} + +
    HealthSite IDEndpointRegionPriorityActions
    + + {% if peer.is_healthy == true %} + + + + {% elif peer.is_healthy == false %} + + + + {% else %} + + + + + {% endif %} + + +
    +
    + + + +
    +
    + {{ peer.display_name or peer.site_id }} + {% if peer.display_name and peer.display_name != peer.site_id %} +
    {{ peer.site_id }} + {% endif %} +
    +
    +
    + {{ peer.endpoint }} + {{ peer.region }}{{ peer.priority }} +
    + + + +
    +
    +
    + {% else %} +
    +
    + + + +
    +
    No peer sites yet
    +

    Add peer sites to enable geo-distribution and site-to-site replication.

    +
    + {% endif %} +
    +
    +
    +
    + + + + + + +{% endblock %} From 6b715851b94d3edf431f84c06ba88e143b93e897 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 26 Jan 2026 21:39:47 +0800 Subject: [PATCH 05/15] Add replication setup wizard and site-level sync dashboard for site registry --- app/replication.py | 3 + app/ui.py | 195 ++++++++++++++++++++++++++ templates/replication_wizard.html | 226 ++++++++++++++++++++++++++++++ templates/sites.html | 69 ++++++++- 4 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 templates/replication_wizard.html diff --git a/app/replication.py b/app/replication.py index d8022ba..c0023dd 100644 --- a/app/replication.py +++ b/app/replication.py @@ -318,6 +318,9 @@ class ReplicationManager: def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]: return self._rules.get(bucket_name) + def list_rules(self) -> List[ReplicationRule]: + return list(self._rules.values()) + def set_rule(self, rule: ReplicationRule) -> None: old_rule = self._rules.get(rule.bucket_name) was_all_mode = old_rule and old_rule.mode == REPLICATION_MODE_ALL if old_rule else False diff --git a/app/ui.py b/app/ui.py index 172295d..4f0bb58 100644 --- a/app/ui.py +++ b/app/ui.py @@ -2682,11 +2682,28 @@ def sites_dashboard(): peers = registry.list_peers() connections = _connections().list() + replication = _replication() + all_rules = replication.list_rules() + + peers_with_stats = [] + for peer in peers: + buckets_syncing = 0 + if peer.connection_id: + for rule in all_rules: + if rule.target_connection_id == peer.connection_id: + buckets_syncing += 1 + peers_with_stats.append({ + "peer": peer, + "buckets_syncing": buckets_syncing, + "has_connection": bool(peer.connection_id), + }) + return render_template( "sites.html", principal=principal, local_site=local_site, peers=peers, + peers_with_stats=peers_with_stats, connections=connections, config_site_id=current_app.config.get("SITE_ID"), config_site_endpoint=current_app.config.get("SITE_ENDPOINT"), @@ -2784,6 +2801,9 @@ def add_peer_site(): registry.add_peer(peer) flash(f"Peer site '{site_id}' added", "success") + + if connection_id: + return redirect(url_for("ui.replication_wizard", site_id=site_id)) return redirect(url_for("ui.sites_dashboard")) @@ -2891,6 +2911,181 @@ def check_peer_site_health(site_id: str): return jsonify(result) +@ui_bp.get("/sites/peers//replication-wizard") +def replication_wizard(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer: + flash(f"Peer site '{site_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + if not peer.connection_id: + flash("This peer has no connection configured. Add a connection first to set up replication.", "warning") + return redirect(url_for("ui.sites_dashboard")) + + connection = _connections().get(peer.connection_id) + if not connection: + flash(f"Connection '{peer.connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + buckets = _storage().list_buckets() + replication = _replication() + + bucket_info = [] + for bucket in buckets: + existing_rule = replication.get_rule(bucket.name) + has_rule_for_peer = ( + existing_rule and + existing_rule.target_connection_id == peer.connection_id + ) + bucket_info.append({ + "name": bucket.name, + "has_rule": has_rule_for_peer, + "existing_mode": existing_rule.mode if has_rule_for_peer else None, + "existing_target": existing_rule.target_bucket if has_rule_for_peer else None, + }) + + return render_template( + "replication_wizard.html", + principal=principal, + peer=peer, + connection=connection, + buckets=bucket_info, + csrf_token=generate_csrf, + ) + + +@ui_bp.post("/sites/peers//replication-rules") +def create_peer_replication_rules(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer or not peer.connection_id: + flash("Invalid peer site or no connection configured", "danger") + return redirect(url_for("ui.sites_dashboard")) + + from .replication import REPLICATION_MODE_NEW_ONLY, REPLICATION_MODE_ALL + import time as time_module + + selected_buckets = request.form.getlist("buckets") + mode = request.form.get("mode", REPLICATION_MODE_NEW_ONLY) + + if not selected_buckets: + flash("No buckets selected", "warning") + return redirect(url_for("ui.sites_dashboard")) + + created = 0 + failed = 0 + replication = _replication() + + for bucket_name in selected_buckets: + target_bucket = request.form.get(f"target_{bucket_name}", bucket_name).strip() + if not target_bucket: + target_bucket = bucket_name + + try: + rule = ReplicationRule( + bucket_name=bucket_name, + target_connection_id=peer.connection_id, + target_bucket=target_bucket, + enabled=True, + mode=mode, + created_at=time_module.time(), + ) + replication.set_rule(rule) + + if mode == REPLICATION_MODE_ALL: + replication.replicate_existing_objects(bucket_name) + + created += 1 + except Exception: + failed += 1 + + if created > 0: + flash(f"Created {created} replication rule(s) for {peer.display_name or peer.site_id}", "success") + if failed > 0: + flash(f"Failed to create {failed} rule(s)", "danger") + + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.get("/sites/peers//sync-stats") +def get_peer_sync_stats(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer: + return jsonify({"error": "Peer not found"}), 404 + + if not peer.connection_id: + return jsonify({"error": "No connection configured"}), 400 + + replication = _replication() + all_rules = replication.list_rules() + + stats = { + "buckets_syncing": 0, + "objects_synced": 0, + "objects_pending": 0, + "objects_failed": 0, + "bytes_synced": 0, + "last_sync_at": None, + "buckets": [], + } + + for rule in all_rules: + if rule.target_connection_id != peer.connection_id: + continue + + stats["buckets_syncing"] += 1 + + bucket_stats = { + "bucket_name": rule.bucket_name, + "target_bucket": rule.target_bucket, + "mode": rule.mode, + "enabled": rule.enabled, + } + + if rule.stats: + stats["objects_synced"] += rule.stats.objects_synced + stats["objects_pending"] += rule.stats.objects_pending + stats["bytes_synced"] += rule.stats.bytes_synced + + if rule.stats.last_sync_at: + if not stats["last_sync_at"] or rule.stats.last_sync_at > stats["last_sync_at"]: + stats["last_sync_at"] = rule.stats.last_sync_at + + bucket_stats["last_sync_at"] = rule.stats.last_sync_at + bucket_stats["objects_synced"] = rule.stats.objects_synced + bucket_stats["objects_pending"] = rule.stats.objects_pending + + failure_count = replication.get_failure_count(rule.bucket_name) + stats["objects_failed"] += failure_count + bucket_stats["failures"] = failure_count + + stats["buckets"].append(bucket_stats) + + return jsonify(stats) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/templates/replication_wizard.html b/templates/replication_wizard.html new file mode 100644 index 0000000..24489e6 --- /dev/null +++ b/templates/replication_wizard.html @@ -0,0 +1,226 @@ +{% extends "base.html" %} + +{% block title %}Set Up Replication - S3 Compatible Storage{% endblock %} + +{% block content %} + + +
    +
    +
    +
    +
    + + + + Peer Site +
    +
    +
    +
    +
    Site ID
    +
    {{ peer.site_id }}
    +
    Endpoint
    +
    {{ peer.endpoint }}
    +
    Region
    +
    {{ peer.region }}
    +
    Connection
    +
    {{ connection.name }}
    +
    +
    +
    + +
    +
    +
    + + + + Replication Modes +
    +
    +
    +

    New Only: Only replicate new objects uploaded after the rule is created.

    +

    All Objects: Replicate all existing objects plus new uploads.

    +

    Bidirectional: Two-way sync between sites. Changes on either side are synchronized.

    +
    +
    +
    + +
    +
    +
    +
    + + + + Select Buckets to Replicate +
    +

    Choose which buckets should be replicated to this peer site

    +
    +
    + {% if buckets %} +
    + + +
    + + +
    + +
    + + + + + + + + + + + {% for bucket in buckets %} + + + + + + + {% endfor %} + +
    + + Local BucketTarget Bucket NameStatus
    + + +
    + + + + {{ bucket.name }} +
    +
    + + + {% if bucket.has_rule %} + + Already configured ({{ bucket.existing_mode }}) + + {% else %} + + Not configured + + {% endif %} +
    +
    + +
    + + + Skip for Now + +
    +
    + {% else %} +
    +
    + + + +
    +
    No buckets yet
    +

    Create some buckets first, then come back to set up replication.

    + + Go to Buckets + +
    + {% endif %} +
    +
    +
    +
    + + +{% endblock %} diff --git a/templates/sites.html b/templates/sites.html index 608a777..ef7d14c 100644 --- a/templates/sites.html +++ b/templates/sites.html @@ -163,11 +163,13 @@ Endpoint Region Priority + Sync Status Actions - {% for peer in peers %} + {% for item in peers_with_stats %} + {% set peer = item.peer %} @@ -207,8 +209,37 @@ {{ peer.region }} {{ peer.priority }} + + {% if item.has_connection %} +
    + {{ item.buckets_syncing }} bucket{{ 's' if item.buckets_syncing != 1 else '' }} + {% if item.buckets_syncing > 0 %} + + {% endif %} +
    +
    + +
    + {% else %} + No connection + {% endif %} +
    + + + + + +
    + +
    -
    +
    +
    +
    +
    +
    + 19 +

    Access Logging

    +
    +

    Enable S3-style access logging to track all requests to your buckets for audit and analysis.

    + +
    # Enable access logging
    +curl -X PUT "{{ api_base }}/<bucket>?logging" \
    +  -H "Content-Type: application/json" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -d '{
    +    "LoggingEnabled": {
    +      "TargetBucket": "log-bucket",
    +      "TargetPrefix": "logs/my-bucket/"
    +    }
    +  }'
    +
    +# Get logging configuration
    +curl "{{ api_base }}/<bucket>?logging" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    + +

    Log Contents

    +

    Logs include: timestamp, bucket, key, operation type, request ID, requester, source IP, HTTP status, error codes, bytes transferred, timing, referrer, and User-Agent.

    +
    +
    +
    +
    +
    + 20 +

    Notifications & Webhooks

    +
    +

    Configure event notifications to trigger webhooks when objects are created or deleted.

    + +

    Supported Events

    +
    + + + + + + + + + + + + + + + + + +
    Event TypeDescription
    s3:ObjectCreated:*Any object creation (PUT, POST, COPY, multipart)
    s3:ObjectRemoved:*Any object deletion
    +
    + +
    # Set notification configuration
    +curl -X PUT "{{ api_base }}/<bucket>?notification" \
    +  -H "Content-Type: application/json" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -d '{
    +    "TopicConfigurations": [{
    +      "Id": "upload-notify",
    +      "TopicArn": "https://webhook.example.com/s3-events",
    +      "Events": ["s3:ObjectCreated:*"],
    +      "Filter": {
    +        "Key": {
    +          "FilterRules": [
    +            {"Name": "prefix", "Value": "uploads/"},
    +            {"Name": "suffix", "Value": ".jpg"}
    +          ]
    +        }
    +      }
    +    }]
    +  }'
    + +
    +
    + + + + +
    + Security: Webhook URLs are validated to prevent SSRF attacks. Internal/private IP ranges are blocked. +
    +
    +
    +
    +
    +
    +
    +
    + 21 +

    SelectObjectContent (SQL)

    +
    +

    Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.

    + +
    + Prerequisite: Requires DuckDB to be installed (pip install duckdb) +
    + +
    # Query a CSV file
    +curl -X POST "{{ api_base }}/<bucket>/data.csv?select" \
    +  -H "Content-Type: application/json" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -d '{
    +    "Expression": "SELECT name, age FROM s3object WHERE age > 25",
    +    "ExpressionType": "SQL",
    +    "InputSerialization": {
    +      "CSV": {"FileHeaderInfo": "USE", "FieldDelimiter": ","}
    +    },
    +    "OutputSerialization": {"JSON": {}}
    +  }'
    + +

    Supported Formats

    +
    +
    +
    + CSV
    Headers, delimiters +
    +
    +
    +
    + JSON
    Document or lines +
    +
    +
    +
    + Parquet
    Auto schema +
    +
    +
    +
    +
    +
    +
    +
    + 22 +

    Advanced S3 Operations

    +
    +

    Copy objects, upload part copies, and use range requests for partial downloads.

    + +

    CopyObject

    +
    # Copy within same bucket
    +curl -X PUT "{{ api_base }}/<bucket>/copy-of-file.txt" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-copy-source: /<bucket>/original-file.txt"
    +
    +# Copy with metadata replacement
    +curl -X PUT "{{ api_base }}/<bucket>/file.txt" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-copy-source: /<bucket>/file.txt" \
    +  -H "x-amz-metadata-directive: REPLACE" \
    +  -H "x-amz-meta-newkey: newvalue"
    + +

    UploadPartCopy

    +

    Copy data from an existing object into a multipart upload part:

    +
    # Copy bytes 0-10485759 from source as part 1
    +curl -X PUT "{{ api_base }}/<bucket>/<key>?uploadId=X&partNumber=1" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-copy-source: /source-bucket/source-file.bin" \
    +  -H "x-amz-copy-source-range: bytes=0-10485759"
    + +

    Range Requests

    +
    # Get first 1000 bytes
    +curl "{{ api_base }}/<bucket>/<key>" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "Range: bytes=0-999"
    +
    +# Get last 500 bytes
    +curl "{{ api_base }}/<bucket>/<key>" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "Range: bytes=-500"
    + +

    Conditional Requests

    +
    + + + + + + + + + + + + + + + + + + + + + +
    HeaderBehavior
    If-Modified-SinceOnly download if changed after date
    If-None-MatchOnly download if ETag differs
    If-MatchOnly download if ETag matches
    +
    +
    +
    +
    +
    +
    + 23 +

    Access Control Lists (ACLs)

    +
    +

    ACLs provide legacy-style permission management for buckets and objects.

    + +

    Canned ACLs

    +
    + + + + + + + + + + + + + + + + + + + + + + + + + +
    ACLDescription
    privateOwner gets FULL_CONTROL (default)
    public-readOwner FULL_CONTROL, public READ
    public-read-writeOwner FULL_CONTROL, public READ and WRITE
    authenticated-readOwner FULL_CONTROL, authenticated users READ
    +
    + +
    # Set bucket ACL
    +curl -X PUT "{{ api_base }}/<bucket>?acl" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-acl: public-read"
    +
    +# Set object ACL during upload
    +curl -X PUT "{{ api_base }}/<bucket>/<key>" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-acl: private" \
    +  --data-binary @file.txt
    + +
    + Recommendation: For most use cases, prefer bucket policies over ACLs for more flexible access control. +
    +
    +
    +
    +
    +
    + 24 +

    Object & Bucket Tagging

    +
    +

    Add metadata tags to buckets and objects for organization, cost allocation, or lifecycle rule filtering.

    + +

    Object Tagging

    +
    # Set object tags
    +curl -X PUT "{{ api_base }}/<bucket>/<key>?tagging" \
    +  -H "Content-Type: application/json" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -d '{
    +    "TagSet": [
    +      {"Key": "Classification", "Value": "Confidential"},
    +      {"Key": "Owner", "Value": "john@example.com"}
    +    ]
    +  }'
    +
    +# Get object tags
    +curl "{{ api_base }}/<bucket>/<key>?tagging" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    +
    +# Set tags during upload
    +curl -X PUT "{{ api_base }}/<bucket>/<key>" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "x-amz-tagging: Environment=Staging&Team=QA" \
    +  --data-binary @file.txt
    + +

    Bucket Tagging

    +
    # Set bucket tags
    +curl -X PUT "{{ api_base }}/<bucket>?tagging" \
    +  -H "Content-Type: application/json" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -d '{
    +    "TagSet": [
    +      {"Key": "Environment", "Value": "Production"},
    +      {"Key": "Team", "Value": "Engineering"}
    +    ]
    +  }'
    + +

    Use Cases

    +
    +
    +
      +
    • Filter objects for lifecycle expiration by tag
    • +
    • Use tag conditions in bucket policies
    • +
    +
    +
    +
      +
    • Group objects by project or department
    • +
    • Trigger automation based on object tags
    • +
    +
    +
    +
    +