diff --git a/README.md b/README.md index f1e7951..d485351 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,11 @@ python run.py --mode ui # UI only (port 5100) | `ENCRYPTION_ENABLED` | `false` | Enable server-side encryption | | `KMS_ENABLED` | `false` | Enable Key Management Service | | `LOG_LEVEL` | `INFO` | Logging verbosity | +| `SIGV4_TIMESTAMP_TOLERANCE_SECONDS` | `900` | Max time skew for SigV4 requests | +| `PRESIGNED_URL_MAX_EXPIRY_SECONDS` | `604800` | Max presigned URL expiry (7 days) | +| `REPLICATION_CONNECT_TIMEOUT_SECONDS` | `5` | Replication connection timeout | +| `SITE_SYNC_ENABLED` | `false` | Enable bi-directional site sync | +| `OBJECT_TAG_LIMIT` | `50` | Maximum tags per object | ## Data Layout diff --git a/app/__init__.py b/app/__init__.py index 2968c03..6cafe1c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -104,6 +104,9 @@ def create_app( storage = ObjectStorage( Path(app.config["STORAGE_ROOT"]), cache_ttl=app.config.get("OBJECT_CACHE_TTL", 5), + object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100), + bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0), + object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024), ) if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"): @@ -137,12 +140,23 @@ def create_app( ) connections = ConnectionStore(connections_path) - replication = ReplicationManager(storage, connections, replication_rules_path, storage_root) + replication = ReplicationManager( + storage, + connections, + replication_rules_path, + storage_root, + connect_timeout=app.config.get("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5), + read_timeout=app.config.get("REPLICATION_READ_TIMEOUT_SECONDS", 30), + max_retries=app.config.get("REPLICATION_MAX_RETRIES", 2), + streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024), + max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50), + ) encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), "default_encryption_algorithm": app.config.get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256"), + "encryption_chunk_size_bytes": app.config.get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024), } encryption_manager = EncryptionManager(encryption_config) @@ -150,7 +164,12 @@ def create_app( if app.config.get("KMS_ENABLED", False): kms_keys_path = Path(app.config.get("KMS_KEYS_PATH", "")) kms_master_key_path = Path(app.config.get("ENCRYPTION_MASTER_KEY_PATH", "")) - kms_manager = KMSManager(kms_keys_path, kms_master_key_path) + kms_manager = KMSManager( + kms_keys_path, + kms_master_key_path, + generate_data_key_min_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1), + generate_data_key_max_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024), + ) encryption_manager.set_kms_provider(kms_manager) if app.config.get("ENCRYPTION_ENABLED", False): @@ -170,6 +189,7 @@ def create_app( base_storage, interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600), storage_root=storage_root, + max_history_per_bucket=app.config.get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50), ) lifecycle_manager.start() @@ -218,6 +238,10 @@ def create_app( storage_root=storage_root, interval_seconds=app.config.get("SITE_SYNC_INTERVAL_SECONDS", 60), batch_size=app.config.get("SITE_SYNC_BATCH_SIZE", 100), + connect_timeout=app.config.get("SITE_SYNC_CONNECT_TIMEOUT_SECONDS", 10), + read_timeout=app.config.get("SITE_SYNC_READ_TIMEOUT_SECONDS", 120), + max_retries=app.config.get("SITE_SYNC_MAX_RETRIES", 2), + clock_skew_tolerance_seconds=app.config.get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0), ) site_sync_worker.start() app.extensions["site_sync"] = site_sync_worker diff --git a/app/config.py b/app/config.py index e6815bd..7c40e40 100644 --- a/app/config.py +++ b/app/config.py @@ -121,6 +121,26 @@ class AppConfig: site_sync_enabled: bool site_sync_interval_seconds: int site_sync_batch_size: int + sigv4_timestamp_tolerance_seconds: int + presigned_url_min_expiry_seconds: int + presigned_url_max_expiry_seconds: int + replication_connect_timeout_seconds: int + replication_read_timeout_seconds: int + replication_max_retries: int + replication_streaming_threshold_bytes: int + replication_max_failures_per_bucket: int + site_sync_connect_timeout_seconds: int + site_sync_read_timeout_seconds: int + site_sync_max_retries: int + site_sync_clock_skew_tolerance_seconds: float + object_key_max_length_bytes: int + object_cache_max_size: int + bucket_config_cache_ttl_seconds: float + object_tag_limit: int + encryption_chunk_size_bytes: int + kms_generate_data_key_min_bytes: int + kms_generate_data_key_max_bytes: int + lifecycle_max_history_per_bucket: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -257,6 +277,27 @@ class AppConfig: site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) site_sync_batch_size = int(_get("SITE_SYNC_BATCH_SIZE", 100)) + sigv4_timestamp_tolerance_seconds = int(_get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900)) + presigned_url_min_expiry_seconds = int(_get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1)) + presigned_url_max_expiry_seconds = int(_get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800)) + replication_connect_timeout_seconds = int(_get("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5)) + replication_read_timeout_seconds = int(_get("REPLICATION_READ_TIMEOUT_SECONDS", 30)) + replication_max_retries = int(_get("REPLICATION_MAX_RETRIES", 2)) + replication_streaming_threshold_bytes = int(_get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024)) + replication_max_failures_per_bucket = int(_get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50)) + site_sync_connect_timeout_seconds = int(_get("SITE_SYNC_CONNECT_TIMEOUT_SECONDS", 10)) + site_sync_read_timeout_seconds = int(_get("SITE_SYNC_READ_TIMEOUT_SECONDS", 120)) + site_sync_max_retries = int(_get("SITE_SYNC_MAX_RETRIES", 2)) + site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0)) + object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024)) + object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100)) + bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0)) + object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50)) + encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024)) + kms_generate_data_key_min_bytes = int(_get("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1)) + kms_generate_data_key_max_bytes = int(_get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024)) + lifecycle_max_history_per_bucket = int(_get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50)) + return cls(storage_root=storage_root, max_upload_size=max_upload_size, ui_page_size=ui_page_size, @@ -314,7 +355,27 @@ class AppConfig: server_backlog_auto=server_backlog_auto, site_sync_enabled=site_sync_enabled, site_sync_interval_seconds=site_sync_interval_seconds, - site_sync_batch_size=site_sync_batch_size) + site_sync_batch_size=site_sync_batch_size, + sigv4_timestamp_tolerance_seconds=sigv4_timestamp_tolerance_seconds, + presigned_url_min_expiry_seconds=presigned_url_min_expiry_seconds, + presigned_url_max_expiry_seconds=presigned_url_max_expiry_seconds, + replication_connect_timeout_seconds=replication_connect_timeout_seconds, + replication_read_timeout_seconds=replication_read_timeout_seconds, + replication_max_retries=replication_max_retries, + replication_streaming_threshold_bytes=replication_streaming_threshold_bytes, + replication_max_failures_per_bucket=replication_max_failures_per_bucket, + site_sync_connect_timeout_seconds=site_sync_connect_timeout_seconds, + site_sync_read_timeout_seconds=site_sync_read_timeout_seconds, + site_sync_max_retries=site_sync_max_retries, + site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds, + object_key_max_length_bytes=object_key_max_length_bytes, + object_cache_max_size=object_cache_max_size, + bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds, + object_tag_limit=object_tag_limit, + encryption_chunk_size_bytes=encryption_chunk_size_bytes, + kms_generate_data_key_min_bytes=kms_generate_data_key_min_bytes, + kms_generate_data_key_max_bytes=kms_generate_data_key_max_bytes, + lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -494,4 +555,24 @@ class AppConfig: "SITE_SYNC_ENABLED": self.site_sync_enabled, "SITE_SYNC_INTERVAL_SECONDS": self.site_sync_interval_seconds, "SITE_SYNC_BATCH_SIZE": self.site_sync_batch_size, + "SIGV4_TIMESTAMP_TOLERANCE_SECONDS": self.sigv4_timestamp_tolerance_seconds, + "PRESIGNED_URL_MIN_EXPIRY_SECONDS": self.presigned_url_min_expiry_seconds, + "PRESIGNED_URL_MAX_EXPIRY_SECONDS": self.presigned_url_max_expiry_seconds, + "REPLICATION_CONNECT_TIMEOUT_SECONDS": self.replication_connect_timeout_seconds, + "REPLICATION_READ_TIMEOUT_SECONDS": self.replication_read_timeout_seconds, + "REPLICATION_MAX_RETRIES": self.replication_max_retries, + "REPLICATION_STREAMING_THRESHOLD_BYTES": self.replication_streaming_threshold_bytes, + "REPLICATION_MAX_FAILURES_PER_BUCKET": self.replication_max_failures_per_bucket, + "SITE_SYNC_CONNECT_TIMEOUT_SECONDS": self.site_sync_connect_timeout_seconds, + "SITE_SYNC_READ_TIMEOUT_SECONDS": self.site_sync_read_timeout_seconds, + "SITE_SYNC_MAX_RETRIES": self.site_sync_max_retries, + "SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds, + "OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes, + "OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size, + "BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds, + "OBJECT_TAG_LIMIT": self.object_tag_limit, + "ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes, + "KMS_GENERATE_DATA_KEY_MIN_BYTES": self.kms_generate_data_key_min_bytes, + "KMS_GENERATE_DATA_KEY_MAX_BYTES": self.kms_generate_data_key_max_bytes, + "LIFECYCLE_MAX_HISTORY_PER_BUCKET": self.lifecycle_max_history_per_bucket, } diff --git a/app/encryption.py b/app/encryption.py index 4b1d817..3cc18cc 100644 --- a/app/encryption.py +++ b/app/encryption.py @@ -310,7 +310,8 @@ class EncryptionManager: def get_streaming_encryptor(self) -> StreamingEncryptor: if self._streaming_encryptor is None: - self._streaming_encryptor = StreamingEncryptor(self.get_local_provider()) + chunk_size = self.config.get("encryption_chunk_size_bytes", 64 * 1024) + self._streaming_encryptor = StreamingEncryptor(self.get_local_provider(), chunk_size=chunk_size) return self._streaming_encryptor def encrypt_object(self, data: bytes, algorithm: str = "AES256", diff --git a/app/kms.py b/app/kms.py index 548e7ea..9326e2d 100644 --- a/app/kms.py +++ b/app/kms.py @@ -108,9 +108,17 @@ class KMSManager: Keys are stored encrypted on disk. """ - def __init__(self, keys_path: Path, master_key_path: Path): + def __init__( + self, + keys_path: Path, + master_key_path: Path, + generate_data_key_min_bytes: int = 1, + generate_data_key_max_bytes: int = 1024, + ): self.keys_path = keys_path self.master_key_path = master_key_path + self.generate_data_key_min_bytes = generate_data_key_min_bytes + self.generate_data_key_max_bytes = generate_data_key_max_bytes self._keys: Dict[str, KMSKey] = {} self._master_key: bytes | None = None self._loaded = False @@ -358,6 +366,8 @@ class KMSManager: def generate_random(self, num_bytes: int = 32) -> bytes: """Generate cryptographically secure random bytes.""" - if num_bytes < 1 or num_bytes > 1024: - raise EncryptionError("Number of bytes must be between 1 and 1024") + if num_bytes < self.generate_data_key_min_bytes or num_bytes > self.generate_data_key_max_bytes: + raise EncryptionError( + f"Number of bytes must be between {self.generate_data_key_min_bytes} and {self.generate_data_key_max_bytes}" + ) return secrets.token_bytes(num_bytes) diff --git a/app/lifecycle.py b/app/lifecycle.py index ed9eb2c..ea2c262 100644 --- a/app/lifecycle.py +++ b/app/lifecycle.py @@ -71,10 +71,9 @@ class LifecycleExecutionRecord: class LifecycleHistoryStore: - MAX_HISTORY_PER_BUCKET = 50 - - def __init__(self, storage_root: Path) -> None: + def __init__(self, storage_root: Path, max_history_per_bucket: int = 50) -> None: self.storage_root = storage_root + self.max_history_per_bucket = max_history_per_bucket self._lock = threading.Lock() def _get_history_path(self, bucket_name: str) -> Path: @@ -95,7 +94,7 @@ class LifecycleHistoryStore: def save_history(self, bucket_name: str, records: List[LifecycleExecutionRecord]) -> None: path = self._get_history_path(bucket_name) path.parent.mkdir(parents=True, exist_ok=True) - data = {"executions": [r.to_dict() for r in records[:self.MAX_HISTORY_PER_BUCKET]]} + data = {"executions": [r.to_dict() for r in records[:self.max_history_per_bucket]]} try: with open(path, "w") as f: json.dump(data, f, indent=2) @@ -114,14 +113,20 @@ class LifecycleHistoryStore: class LifecycleManager: - def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600, storage_root: Optional[Path] = None): + def __init__( + self, + storage: ObjectStorage, + interval_seconds: int = 3600, + storage_root: Optional[Path] = None, + max_history_per_bucket: int = 50, + ): self.storage = storage self.interval_seconds = interval_seconds self.storage_root = storage_root self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() - self.history_store = LifecycleHistoryStore(storage_root) if storage_root else None + self.history_store = LifecycleHistoryStore(storage_root, max_history_per_bucket) if storage_root else None def start(self) -> None: if self._timer is not None: diff --git a/app/replication.py b/app/replication.py index 9cab869..d8022ba 100644 --- a/app/replication.py +++ b/app/replication.py @@ -21,16 +21,20 @@ from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" -REPLICATION_CONNECT_TIMEOUT = 5 -REPLICATION_READ_TIMEOUT = 30 -STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" REPLICATION_MODE_BIDIRECTIONAL = "bidirectional" -def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: +def _create_s3_client( + connection: RemoteConnection, + *, + health_check: bool = False, + connect_timeout: int = 5, + read_timeout: int = 30, + max_retries: int = 2, +) -> Any: """Create a boto3 S3 client for the given connection. Args: connection: Remote S3 connection configuration @@ -38,9 +42,9 @@ def _create_s3_client(connection: RemoteConnection, *, health_check: bool = Fals """ config = Config( user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 1 if health_check else 2}, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries={'max_attempts': 1 if health_check else max_retries}, signature_version='s3v4', s3={'addressing_style': 'path'}, request_checksum_calculation='when_required', @@ -164,10 +168,9 @@ class ReplicationRule: class ReplicationFailureStore: - MAX_FAILURES_PER_BUCKET = 50 - - def __init__(self, storage_root: Path) -> None: + def __init__(self, storage_root: Path, max_failures_per_bucket: int = 50) -> None: self.storage_root = storage_root + self.max_failures_per_bucket = max_failures_per_bucket self._lock = threading.Lock() def _get_failures_path(self, bucket_name: str) -> Path: @@ -188,7 +191,7 @@ class ReplicationFailureStore: def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: path = self._get_failures_path(bucket_name) path.parent.mkdir(parents=True, exist_ok=True) - data = {"failures": [f.to_dict() for f in failures[:self.MAX_FAILURES_PER_BUCKET]]} + data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]} try: with open(path, "w") as f: json.dump(data, f, indent=2) @@ -233,18 +236,43 @@ class ReplicationFailureStore: class ReplicationManager: - def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path, storage_root: Path) -> None: + def __init__( + self, + storage: ObjectStorage, + connections: ConnectionStore, + rules_path: Path, + storage_root: Path, + connect_timeout: int = 5, + read_timeout: int = 30, + max_retries: int = 2, + streaming_threshold_bytes: int = 10 * 1024 * 1024, + max_failures_per_bucket: int = 50, + ) -> None: self.storage = storage self.connections = connections self.rules_path = rules_path self.storage_root = storage_root + self.connect_timeout = connect_timeout + self.read_timeout = read_timeout + self.max_retries = max_retries + self.streaming_threshold_bytes = streaming_threshold_bytes self._rules: Dict[str, ReplicationRule] = {} self._stats_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") self._shutdown = False - self.failure_store = ReplicationFailureStore(storage_root) + self.failure_store = ReplicationFailureStore(storage_root, max_failures_per_bucket) self.reload_rules() + def _create_client(self, connection: RemoteConnection, *, health_check: bool = False) -> Any: + """Create an S3 client with the manager's configured timeouts.""" + return _create_s3_client( + connection, + health_check=health_check, + connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, + max_retries=self.max_retries, + ) + def shutdown(self, wait: bool = True) -> None: """Shutdown the replication executor gracefully. @@ -280,7 +308,7 @@ class ReplicationManager: Uses short timeouts to prevent blocking. """ try: - s3 = _create_s3_client(connection, health_check=True) + s3 = self._create_client(connection, health_check=True) s3.list_buckets() return True except Exception as e: @@ -329,7 +357,7 @@ class ReplicationManager: source_objects = self.storage.list_objects_all(bucket_name) source_keys = {obj.key: obj.size for obj in source_objects} - s3 = _create_s3_client(connection) + s3 = self._create_client(connection) dest_keys = set() bytes_synced = 0 @@ -395,7 +423,7 @@ class ReplicationManager: raise ValueError(f"Connection {connection_id} not found") try: - s3 = _create_s3_client(connection) + s3 = self._create_client(connection) s3.create_bucket(Bucket=bucket_name) except ClientError as e: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") @@ -438,7 +466,7 @@ class ReplicationManager: return try: - s3 = _create_s3_client(conn) + s3 = self._create_client(conn) if action == "delete": try: @@ -481,7 +509,7 @@ class ReplicationManager: if content_type: extra_args["ContentType"] = content_type - if file_size >= STREAMING_THRESHOLD_BYTES: + if file_size >= self.streaming_threshold_bytes: s3.upload_file( str(path), rule.target_bucket, diff --git a/app/s3_api.py b/app/s3_api.py index e9a11ee..47251cd 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -239,7 +239,8 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: now = datetime.now(timezone.utc) time_diff = abs((now - request_time).total_seconds()) - if time_diff > 900: + tolerance = current_app.config.get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900) + if time_diff > tolerance: raise IamError("Request timestamp too old or too far in the future") required_headers = {'host', 'x-amz-date'} @@ -501,8 +502,10 @@ def _validate_presigned_request(action: str, bucket_name: str, object_key: str) expiry = int(expires) except ValueError as exc: raise IamError("Invalid expiration") from exc - if expiry < 1 or expiry > 7 * 24 * 3600: - raise IamError("Expiration must be between 1 second and 7 days") + min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) + max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) + if expiry < min_expiry or expiry > max_expiry: + raise IamError(f"Expiration must be between {min_expiry} second(s) and {max_expiry} seconds") try: request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) @@ -1055,8 +1058,9 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: tags = _parse_tagging_document(payload) except ValueError as exc: return _error_response("MalformedXML", str(exc), 400) - if len(tags) > 50: - return _error_response("InvalidTag", "A maximum of 50 tags is supported", 400) + tag_limit = current_app.config.get("OBJECT_TAG_LIMIT", 50) + if len(tags) > tag_limit: + return _error_response("InvalidTag", f"A maximum of {tag_limit} tags is supported", 400) try: storage.set_bucket_tags(bucket_name, tags) except StorageError as exc: diff --git a/app/site_sync.py b/app/site_sync.py index 306ac28..57cf185 100644 --- a/app/site_sync.py +++ b/app/site_sync.py @@ -22,9 +22,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) SITE_SYNC_USER_AGENT = "SiteSyncAgent/1.0" -SITE_SYNC_CONNECT_TIMEOUT = 10 -SITE_SYNC_READ_TIMEOUT = 120 -CLOCK_SKEW_TOLERANCE_SECONDS = 1.0 @dataclass @@ -108,12 +105,18 @@ class RemoteObjectMeta: ) -def _create_sync_client(connection: "RemoteConnection") -> Any: +def _create_sync_client( + connection: "RemoteConnection", + *, + connect_timeout: int = 10, + read_timeout: int = 120, + max_retries: int = 2, +) -> Any: config = Config( user_agent_extra=SITE_SYNC_USER_AGENT, - connect_timeout=SITE_SYNC_CONNECT_TIMEOUT, - read_timeout=SITE_SYNC_READ_TIMEOUT, - retries={"max_attempts": 2}, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + retries={"max_attempts": max_retries}, signature_version="s3v4", s3={"addressing_style": "path"}, request_checksum_calculation="when_required", @@ -138,6 +141,10 @@ class SiteSyncWorker: storage_root: Path, interval_seconds: int = 60, batch_size: int = 100, + connect_timeout: int = 10, + read_timeout: int = 120, + max_retries: int = 2, + clock_skew_tolerance_seconds: float = 1.0, ): self.storage = storage self.connections = connections @@ -145,11 +152,24 @@ class SiteSyncWorker: self.storage_root = storage_root self.interval_seconds = interval_seconds self.batch_size = batch_size + self.connect_timeout = connect_timeout + self.read_timeout = read_timeout + self.max_retries = max_retries + self.clock_skew_tolerance_seconds = clock_skew_tolerance_seconds self._lock = threading.Lock() self._shutdown = threading.Event() self._sync_thread: Optional[threading.Thread] = None self._bucket_stats: Dict[str, SiteSyncStats] = {} + def _create_client(self, connection: "RemoteConnection") -> Any: + """Create an S3 client with the worker's configured timeouts.""" + return _create_sync_client( + connection, + connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, + max_retries=self.max_retries, + ) + def start(self) -> None: if self._sync_thread is not None and self._sync_thread.is_alive(): return @@ -294,7 +314,7 @@ class SiteSyncWorker: return {obj.key: obj for obj in objects} def _list_remote_objects(self, rule: "ReplicationRule", connection: "RemoteConnection") -> Dict[str, RemoteObjectMeta]: - s3 = _create_sync_client(connection) + s3 = self._create_client(connection) result: Dict[str, RemoteObjectMeta] = {} paginator = s3.get_paginator("list_objects_v2") try: @@ -312,7 +332,7 @@ class SiteSyncWorker: local_ts = local_meta.last_modified.timestamp() remote_ts = remote_meta.last_modified.timestamp() - if abs(remote_ts - local_ts) < CLOCK_SKEW_TOLERANCE_SECONDS: + if abs(remote_ts - local_ts) < self.clock_skew_tolerance_seconds: local_etag = local_meta.etag or "" if remote_meta.etag == local_etag: return "skip" @@ -327,7 +347,7 @@ class SiteSyncWorker: connection: "RemoteConnection", remote_meta: RemoteObjectMeta, ) -> bool: - s3 = _create_sync_client(connection) + s3 = self._create_client(connection) tmp_path = None try: tmp_dir = self.storage_root / ".myfsio.sys" / "tmp" diff --git a/app/storage.py b/app/storage.py index 70488d0..ea0ec9b 100644 --- a/app/storage.py +++ b/app/storage.py @@ -137,10 +137,15 @@ class ObjectStorage: BUCKET_VERSIONS_DIR = "versions" MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" - DEFAULT_CACHE_TTL = 5 - OBJECT_CACHE_MAX_SIZE = 100 - def __init__(self, root: Path, cache_ttl: int = DEFAULT_CACHE_TTL) -> None: + def __init__( + self, + root: Path, + cache_ttl: int = 5, + object_cache_max_size: int = 100, + bucket_config_cache_ttl: float = 30.0, + object_key_max_length_bytes: int = 1024, + ) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() @@ -149,8 +154,10 @@ class ObjectStorage: self._bucket_locks: Dict[str, threading.Lock] = {} self._cache_version: Dict[str, int] = {} self._bucket_config_cache: Dict[str, tuple[dict[str, Any], float]] = {} - self._bucket_config_cache_ttl = 30.0 + self._bucket_config_cache_ttl = bucket_config_cache_ttl self._cache_ttl = cache_ttl + self._object_cache_max_size = object_cache_max_size + self._object_key_max_length_bytes = object_key_max_length_bytes def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: """Get or create a lock for a specific bucket. Reduces global lock contention.""" @@ -364,7 +371,7 @@ class ObjectStorage: raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) destination = bucket_path / safe_key destination.parent.mkdir(parents=True, exist_ok=True) @@ -439,7 +446,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): return {} - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return self._read_metadata(bucket_path.name, safe_key) or {} def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: @@ -487,7 +494,7 @@ class ObjectStorage: self._safe_unlink(target) self._delete_metadata(bucket_id, rel) else: - rel = self._sanitize_object_key(object_key) + rel = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) self._delete_metadata(bucket_id, rel) version_dir = self._version_dir(bucket_id, rel) if version_dir.exists(): @@ -696,7 +703,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) object_path = bucket_path / safe_key if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") @@ -719,7 +726,7 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) object_path = bucket_path / safe_key if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") @@ -758,7 +765,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) if not version_dir.exists(): version_dir = self._legacy_version_dir(bucket_id, safe_key) @@ -782,7 +789,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) data_path = version_dir / f"{version_id}.bin" meta_path = version_dir / f"{version_id}.json" @@ -819,7 +826,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) version_dir = self._version_dir(bucket_id, safe_key) data_path = version_dir / f"{version_id}.bin" meta_path = version_dir / f"{version_id}.json" @@ -910,7 +917,7 @@ class ObjectStorage: if not bucket_path.exists(): raise BucketNotFoundError("Bucket does not exist") bucket_id = bucket_path.name - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) upload_id = uuid.uuid4().hex upload_root = self._multipart_dir(bucket_id, upload_id) upload_root.mkdir(parents=True, exist_ok=False) @@ -1034,7 +1041,7 @@ class ObjectStorage: total_size += record.get("size", 0) validated.sort(key=lambda entry: entry[0]) - safe_key = self._sanitize_object_key(manifest["object_key"]) + safe_key = self._sanitize_object_key(manifest["object_key"], self._object_key_max_length_bytes) destination = bucket_path / safe_key is_overwrite = destination.exists() @@ -1213,7 +1220,7 @@ class ObjectStorage: def _object_path(self, bucket_name: str, object_key: str) -> Path: bucket_path = self._bucket_path(bucket_name) - safe_key = self._sanitize_object_key(object_key) + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return bucket_path / safe_key def _system_root_path(self) -> Path: @@ -1429,7 +1436,7 @@ class ObjectStorage: current_version = self._cache_version.get(bucket_id, 0) if current_version != cache_version: objects = self._build_object_cache(bucket_path) - while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: + while len(self._object_cache) >= self._object_cache_max_size: self._object_cache.popitem(last=False) self._object_cache[bucket_id] = (objects, time.time()) @@ -1764,11 +1771,11 @@ class ObjectStorage: return name @staticmethod - def _sanitize_object_key(object_key: str) -> Path: + def _sanitize_object_key(object_key: str, max_length_bytes: int = 1024) -> Path: if not object_key: raise StorageError("Object key required") - if len(object_key.encode("utf-8")) > 1024: - raise StorageError("Object key exceeds maximum length of 1024 bytes") + if len(object_key.encode("utf-8")) > max_length_bytes: + raise StorageError(f"Object key exceeds maximum length of {max_length_bytes} bytes") if "\x00" in object_key: raise StorageError("Object key contains null bytes") if object_key.startswith(("/", "\\")): diff --git a/app/ui.py b/app/ui.py index 738521c..6634a19 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1091,7 +1091,9 @@ def object_presign(bucket_name: str, object_key: str): expires = int(payload.get("expires_in", 900)) except (TypeError, ValueError): return jsonify({"error": "expires_in must be an integer"}), 400 - expires = max(1, min(expires, 7 * 24 * 3600)) + min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) + max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) + expires = max(min_expiry, min(expires, max_expiry)) storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 diff --git a/templates/docs.html b/templates/docs.html index ce9c95c..1de6147 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -262,6 +262,115 @@ python run.py --mode ui
100SITE_SYNC_CONNECT_TIMEOUT_SECONDS10SITE_SYNC_READ_TIMEOUT_SECONDS120SITE_SYNC_MAX_RETRIES2SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS1.0REPLICATION_CONNECT_TIMEOUT_SECONDS5REPLICATION_READ_TIMEOUT_SECONDS30REPLICATION_MAX_RETRIES2REPLICATION_STREAMING_THRESHOLD_BYTES10485760REPLICATION_MAX_FAILURES_PER_BUCKET50SIGV4_TIMESTAMP_TOLERANCE_SECONDS900PRESIGNED_URL_MIN_EXPIRY_SECONDS1PRESIGNED_URL_MAX_EXPIRY_SECONDS604800OBJECT_KEY_MAX_LENGTH_BYTES1024OBJECT_CACHE_MAX_SIZE100BUCKET_CONFIG_CACHE_TTL_SECONDS30OBJECT_TAG_LIMIT50LIFECYCLE_MAX_HISTORY_PER_BUCKET50ENCRYPTION_CHUNK_SIZE_BYTES65536KMS_GENERATE_DATA_KEY_MIN_BYTES1KMS_GENERATE_DATA_KEY_MAX_BYTES1024