diff --git a/app/bucket_policies.py b/app/bucket_policies.py index d576030..b5f0b8f 100644 --- a/app/bucket_policies.py +++ b/app/bucket_policies.py @@ -2,10 +2,12 @@ from __future__ import annotations import json +import re +import time from dataclasses import dataclass -from fnmatch import fnmatch +from fnmatch import fnmatch, translate from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Sequence +from typing import Any, Dict, Iterable, List, Optional, Pattern, Sequence, Tuple RESOURCE_PREFIX = "arn:aws:s3:::" @@ -133,7 +135,22 @@ class BucketPolicyStatement: effect: str principals: List[str] | str actions: List[str] - resources: List[tuple[str | None, str | None]] + resources: List[Tuple[str | None, str | None]] + # Performance: Pre-compiled regex patterns for resource matching + _compiled_patterns: List[Tuple[str | None, Optional[Pattern[str]]]] | None = None + + def _get_compiled_patterns(self) -> List[Tuple[str | None, Optional[Pattern[str]]]]: + """Lazily compile fnmatch patterns to regex for faster matching.""" + if self._compiled_patterns is None: + self._compiled_patterns = [] + for resource_bucket, key_pattern in self.resources: + if key_pattern is None: + self._compiled_patterns.append((resource_bucket, None)) + else: + # Convert fnmatch pattern to regex + regex_pattern = translate(key_pattern) + self._compiled_patterns.append((resource_bucket, re.compile(regex_pattern))) + return self._compiled_patterns def matches_principal(self, access_key: Optional[str]) -> bool: if self.principals == "*": @@ -149,15 +166,16 @@ class BucketPolicyStatement: def matches_resource(self, bucket: Optional[str], object_key: Optional[str]) -> bool: bucket = (bucket or "*").lower() key = object_key or "" - for resource_bucket, key_pattern in self.resources: + for resource_bucket, compiled_pattern in self._get_compiled_patterns(): resource_bucket = (resource_bucket or "*").lower() if resource_bucket not in {"*", bucket}: continue - if key_pattern is None: + if compiled_pattern is None: if not key: return True continue - if fnmatch(key, key_pattern): + # Performance: Use pre-compiled regex instead of fnmatch + if compiled_pattern.match(key): return True return False @@ -174,8 +192,16 @@ class BucketPolicyStore: self._policies: Dict[str, List[BucketPolicyStatement]] = {} self._load() self._last_mtime = self._current_mtime() + # Performance: Avoid stat() on every request + self._last_stat_check = 0.0 + self._stat_check_interval = 1.0 # Only check mtime every 1 second def maybe_reload(self) -> None: + # Performance: Skip stat check if we checked recently + now = time.time() + if now - self._last_stat_check < self._stat_check_interval: + return + self._last_stat_check = now current = self._current_mtime() if current is None or current == self._last_mtime: return diff --git a/app/encrypted_storage.py b/app/encrypted_storage.py index 372dd1c..fca73b8 100644 --- a/app/encrypted_storage.py +++ b/app/encrypted_storage.py @@ -79,7 +79,7 @@ class EncryptedObjectStorage: kms_key_id: Optional[str] = None, ) -> ObjectMeta: """Store an object, optionally with encryption. - + Args: bucket_name: Name of the bucket object_key: Key for the object @@ -87,42 +87,41 @@ class EncryptedObjectStorage: metadata: Optional user metadata server_side_encryption: Encryption algorithm ("AES256" or "aws:kms") kms_key_id: KMS key ID (for aws:kms encryption) - + Returns: ObjectMeta with object information + + Performance: Uses streaming encryption for large files to reduce memory usage. """ should_encrypt, algorithm, detected_kms_key = self._should_encrypt( bucket_name, server_side_encryption ) - + if kms_key_id is None: kms_key_id = detected_kms_key - + if should_encrypt: - data = stream.read() - try: - ciphertext, enc_metadata = self.encryption.encrypt_object( - data, + # Performance: Use streaming encryption to avoid loading entire file into memory + encrypted_stream, enc_metadata = self.encryption.encrypt_stream( + stream, algorithm=algorithm, - kms_key_id=kms_key_id, context={"bucket": bucket_name, "key": object_key}, ) - + combined_metadata = metadata.copy() if metadata else {} combined_metadata.update(enc_metadata.to_dict()) - - encrypted_stream = io.BytesIO(ciphertext) + result = self.storage.put_object( bucket_name, object_key, encrypted_stream, metadata=combined_metadata, ) - + result.metadata = combined_metadata return result - + except EncryptionError as exc: raise StorageError(f"Encryption failed: {exc}") from exc else: @@ -135,33 +134,34 @@ class EncryptedObjectStorage: def get_object_data(self, bucket_name: str, object_key: str) -> tuple[bytes, Dict[str, str]]: """Get object data, decrypting if necessary. - + Returns: Tuple of (data, metadata) + + Performance: Uses streaming decryption to reduce memory usage. """ path = self.storage.get_object_path(bucket_name, object_key) metadata = self.storage.get_object_metadata(bucket_name, object_key) - - with path.open("rb") as f: - data = f.read() - + enc_metadata = EncryptionMetadata.from_dict(metadata) if enc_metadata: try: - data = self.encryption.decrypt_object( - data, - enc_metadata, - context={"bucket": bucket_name, "key": object_key}, - ) + # Performance: Use streaming decryption to avoid loading entire file into memory + with path.open("rb") as f: + decrypted_stream = self.encryption.decrypt_stream(f, enc_metadata) + data = decrypted_stream.read() except EncryptionError as exc: raise StorageError(f"Decryption failed: {exc}") from exc - + else: + with path.open("rb") as f: + data = f.read() + clean_metadata = { k: v for k, v in metadata.items() - if not k.startswith("x-amz-encryption") + if not k.startswith("x-amz-encryption") and k != "x-amz-encrypted-data-key" } - + return data, clean_metadata def get_object_stream(self, bucket_name: str, object_key: str) -> tuple[BinaryIO, Dict[str, str], int]: diff --git a/app/encryption.py b/app/encryption.py index aa98cb4..25f868c 100644 --- a/app/encryption.py +++ b/app/encryption.py @@ -183,81 +183,94 @@ class StreamingEncryptor: self.chunk_size = chunk_size def _derive_chunk_nonce(self, base_nonce: bytes, chunk_index: int) -> bytes: - """Derive a unique nonce for each chunk.""" - # XOR the base nonce with the chunk index - nonce_int = int.from_bytes(base_nonce, "big") - derived = nonce_int ^ chunk_index - return derived.to_bytes(12, "big") - - def encrypt_stream(self, stream: BinaryIO, - context: Dict[str, str] | None = None) -> tuple[BinaryIO, EncryptionMetadata]: - """Encrypt a stream and return encrypted stream + metadata.""" + """Derive a unique nonce for each chunk. + Performance: Use direct byte manipulation instead of full int conversion. + """ + # Performance: Only modify last 4 bytes instead of full 12-byte conversion + return base_nonce[:8] + (chunk_index ^ int.from_bytes(base_nonce[8:], "big")).to_bytes(4, "big") + + def encrypt_stream(self, stream: BinaryIO, + context: Dict[str, str] | None = None) -> tuple[BinaryIO, EncryptionMetadata]: + """Encrypt a stream and return encrypted stream + metadata. + + Performance: Writes chunks directly to output buffer instead of accumulating in list. + """ data_key, encrypted_data_key = self.provider.generate_data_key() base_nonce = secrets.token_bytes(12) - + aesgcm = AESGCM(data_key) - encrypted_chunks = [] + # Performance: Write directly to BytesIO instead of accumulating chunks + output = io.BytesIO() + output.write(b"\x00\x00\x00\x00") # Placeholder for chunk count chunk_index = 0 - + while True: chunk = stream.read(self.chunk_size) if not chunk: break - + chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index) encrypted_chunk = aesgcm.encrypt(chunk_nonce, chunk, None) - - size_prefix = len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big") - encrypted_chunks.append(size_prefix + encrypted_chunk) + + # Write size prefix + encrypted chunk directly + output.write(len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big")) + output.write(encrypted_chunk) chunk_index += 1 - - header = chunk_index.to_bytes(4, "big") - encrypted_data = header + b"".join(encrypted_chunks) - + + # Write actual chunk count to header + output.seek(0) + output.write(chunk_index.to_bytes(4, "big")) + output.seek(0) + metadata = EncryptionMetadata( algorithm="AES256", key_id=self.provider.KEY_ID if hasattr(self.provider, "KEY_ID") else "local", nonce=base_nonce, encrypted_data_key=encrypted_data_key, ) - - return io.BytesIO(encrypted_data), metadata - + + return output, metadata + def decrypt_stream(self, stream: BinaryIO, metadata: EncryptionMetadata) -> BinaryIO: - """Decrypt a stream using the provided metadata.""" + """Decrypt a stream using the provided metadata. + + Performance: Writes chunks directly to output buffer instead of accumulating in list. + """ if isinstance(self.provider, LocalKeyEncryption): data_key = self.provider._decrypt_data_key(metadata.encrypted_data_key) else: raise EncryptionError("Unsupported provider for streaming decryption") - + aesgcm = AESGCM(data_key) base_nonce = metadata.nonce - + chunk_count_bytes = stream.read(4) if len(chunk_count_bytes) < 4: raise EncryptionError("Invalid encrypted stream: missing header") chunk_count = int.from_bytes(chunk_count_bytes, "big") - - decrypted_chunks = [] + + # Performance: Write directly to BytesIO instead of accumulating chunks + output = io.BytesIO() for chunk_index in range(chunk_count): size_bytes = stream.read(self.HEADER_SIZE) if len(size_bytes) < self.HEADER_SIZE: raise EncryptionError(f"Invalid encrypted stream: truncated at chunk {chunk_index}") chunk_size = int.from_bytes(size_bytes, "big") - + encrypted_chunk = stream.read(chunk_size) if len(encrypted_chunk) < chunk_size: raise EncryptionError(f"Invalid encrypted stream: incomplete chunk {chunk_index}") - + chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index) try: decrypted_chunk = aesgcm.decrypt(chunk_nonce, encrypted_chunk, None) - decrypted_chunks.append(decrypted_chunk) + output.write(decrypted_chunk) # Write directly instead of appending to list except Exception as exc: raise EncryptionError(f"Failed to decrypt chunk {chunk_index}: {exc}") from exc - - return io.BytesIO(b"".join(decrypted_chunks)) + + output.seek(0) + return output class EncryptionManager: diff --git a/app/iam.py b/app/iam.py index 465aca1..d93ccc5 100644 --- a/app/iam.py +++ b/app/iam.py @@ -4,11 +4,12 @@ from __future__ import annotations import json import math import secrets +import time from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set +from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple class IamError(RuntimeError): @@ -115,13 +116,24 @@ class IamService: self._raw_config: Dict[str, Any] = {} self._failed_attempts: Dict[str, Deque[datetime]] = {} self._last_load_time = 0.0 + # Performance: credential cache with TTL + self._credential_cache: Dict[str, Tuple[str, Principal, float]] = {} + self._cache_ttl = 60.0 # Cache credentials for 60 seconds + self._last_stat_check = 0.0 + self._stat_check_interval = 1.0 # Only stat() file every 1 second self._load() def _maybe_reload(self) -> None: """Reload configuration if the file has changed on disk.""" + # Performance: Skip stat check if we checked recently + now = time.time() + if now - self._last_stat_check < self._stat_check_interval: + return + self._last_stat_check = now try: if self.config_path.stat().st_mtime > self._last_load_time: self._load() + self._credential_cache.clear() # Invalidate cache on reload except OSError: pass @@ -181,17 +193,37 @@ class IamService: return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) def principal_for_key(self, access_key: str) -> Principal: + # Performance: Check cache first + now = time.time() + cached = self._credential_cache.get(access_key) + if cached: + secret, principal, cached_time = cached + if now - cached_time < self._cache_ttl: + return principal + self._maybe_reload() record = self._users.get(access_key) if not record: raise IamError("Unknown access key") - return self._build_principal(access_key, record) + principal = self._build_principal(access_key, record) + self._credential_cache[access_key] = (record["secret_key"], principal, now) + return principal def secret_for_key(self, access_key: str) -> str: + # Performance: Check cache first + now = time.time() + cached = self._credential_cache.get(access_key) + if cached: + secret, principal, cached_time = cached + if now - cached_time < self._cache_ttl: + return secret + self._maybe_reload() record = self._users.get(access_key) if not record: raise IamError("Unknown access key") + principal = self._build_principal(access_key, record) + self._credential_cache[access_key] = (record["secret_key"], principal, now) return record["secret_key"] def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None: @@ -442,11 +474,36 @@ class IamService: raise IamError("User not found") def get_secret_key(self, access_key: str) -> str | None: + # Performance: Check cache first + now = time.time() + cached = self._credential_cache.get(access_key) + if cached: + secret, principal, cached_time = cached + if now - cached_time < self._cache_ttl: + return secret + self._maybe_reload() record = self._users.get(access_key) - return record["secret_key"] if record else None + if record: + # Cache the result + principal = self._build_principal(access_key, record) + self._credential_cache[access_key] = (record["secret_key"], principal, now) + return record["secret_key"] + return None def get_principal(self, access_key: str) -> Principal | None: + # Performance: Check cache first + now = time.time() + cached = self._credential_cache.get(access_key) + if cached: + secret, principal, cached_time = cached + if now - cached_time < self._cache_ttl: + return principal + self._maybe_reload() record = self._users.get(access_key) - return self._build_principal(access_key, record) if record else None + if record: + principal = self._build_principal(access_key, record) + self._credential_cache[access_key] = (record["secret_key"], principal, now) + return principal + return None diff --git a/app/s3_api.py b/app/s3_api.py index 78b9afb..255e029 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -2171,48 +2171,89 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response: class AwsChunkedDecoder: - """Decodes aws-chunked encoded streams.""" + """Decodes aws-chunked encoded streams. + + Performance optimized with buffered line reading instead of byte-by-byte. + """ + def __init__(self, stream): self.stream = stream - self.buffer = b"" + self._read_buffer = bytearray() # Performance: Pre-allocated buffer self.chunk_remaining = 0 self.finished = False + def _read_line(self) -> bytes: + """Read until CRLF using buffered reads instead of byte-by-byte. + + Performance: Reads in batches of 64-256 bytes instead of 1 byte at a time. + """ + line = bytearray() + while True: + # Check if we have data in buffer + if self._read_buffer: + # Look for CRLF in buffer + idx = self._read_buffer.find(b"\r\n") + if idx != -1: + # Found CRLF - extract line and update buffer + line.extend(self._read_buffer[: idx + 2]) + del self._read_buffer[: idx + 2] + return bytes(line) + # No CRLF yet - consume entire buffer + line.extend(self._read_buffer) + self._read_buffer.clear() + + # Read more data in larger chunks (64 bytes is enough for chunk headers) + chunk = self.stream.read(64) + if not chunk: + return bytes(line) if line else b"" + self._read_buffer.extend(chunk) + + def _read_exact(self, n: int) -> bytes: + """Read exactly n bytes, using buffer first.""" + result = bytearray() + # Use buffered data first + if self._read_buffer: + take = min(len(self._read_buffer), n) + result.extend(self._read_buffer[:take]) + del self._read_buffer[:take] + n -= take + + # Read remaining directly from stream + if n > 0: + data = self.stream.read(n) + if data: + result.extend(data) + + return bytes(result) + def read(self, size=-1): if self.finished: return b"" - result = b"" + result = bytearray() # Performance: Use bytearray for building result while size == -1 or len(result) < size: if self.chunk_remaining > 0: to_read = self.chunk_remaining if size != -1: to_read = min(to_read, size - len(result)) - - chunk = self.stream.read(to_read) + + chunk = self._read_exact(to_read) if not chunk: raise IOError("Unexpected EOF in chunk data") - - result += chunk + + result.extend(chunk) self.chunk_remaining -= len(chunk) - + if self.chunk_remaining == 0: - crlf = self.stream.read(2) + crlf = self._read_exact(2) if crlf != b"\r\n": raise IOError("Malformed chunk: missing CRLF") else: - line = b"" - while True: - char = self.stream.read(1) - if not char: - if not line: - self.finished = True - return result - raise IOError("Unexpected EOF in chunk size") - line += char - if line.endswith(b"\r\n"): - break - + line = self._read_line() + if not line: + self.finished = True + return bytes(result) + try: line_str = line.decode("ascii").strip() if ";" in line_str: @@ -2223,22 +2264,16 @@ class AwsChunkedDecoder: if chunk_size == 0: self.finished = True + # Skip trailing headers while True: - line = b"" - while True: - char = self.stream.read(1) - if not char: - break - line += char - if line.endswith(b"\r\n"): - break - if line == b"\r\n" or not line: + trailer = self._read_line() + if trailer == b"\r\n" or not trailer: break - return result - + return bytes(result) + self.chunk_remaining = chunk_size - - return result + + return bytes(result) def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response: diff --git a/app/storage.py b/app/storage.py index 702d5bb..7d17b7d 100644 --- a/app/storage.py +++ b/app/storage.py @@ -139,9 +139,21 @@ class ObjectStorage: self._ensure_system_roots() # LRU cache for object metadata with thread-safe access self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict() - self._cache_lock = threading.Lock() + self._cache_lock = threading.Lock() # Global lock for cache structure + # Performance: Per-bucket locks to reduce contention + self._bucket_locks: Dict[str, threading.Lock] = {} # Cache version counter for detecting stale reads self._cache_version: Dict[str, int] = {} + # Performance: Bucket config cache with TTL + self._bucket_config_cache: Dict[str, tuple[dict[str, Any], float]] = {} + self._bucket_config_cache_ttl = 30.0 # 30 second TTL + + def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: + """Get or create a lock for a specific bucket. Reduces global lock contention.""" + with self._cache_lock: + if bucket_id not in self._bucket_locks: + self._bucket_locks[bucket_id] = threading.Lock() + return self._bucket_locks[bucket_id] def list_buckets(self) -> List[BucketMeta]: buckets: List[BucketMeta] = [] @@ -247,11 +259,13 @@ class ObjectStorage: bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise StorageError("Bucket does not exist") - if self._has_visible_objects(bucket_path): + # Performance: Single check instead of three separate traversals + has_objects, has_versions, has_multipart = self._check_bucket_contents(bucket_path) + if has_objects: raise StorageError("Bucket not empty") - if self._has_archived_versions(bucket_path): + if has_versions: raise StorageError("Bucket contains archived object versions") - if self._has_active_multipart_uploads(bucket_path): + if has_multipart: raise StorageError("Bucket has active multipart uploads") self._remove_tree(bucket_path) self._remove_tree(self._system_bucket_root(bucket_path.name)) @@ -393,17 +407,20 @@ class ObjectStorage: internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} combined_meta = {**internal_meta, **(metadata or {})} self._write_metadata(bucket_id, safe_key, combined_meta) - + self._invalidate_bucket_stats_cache(bucket_id) - self._invalidate_object_cache(bucket_id) - - return ObjectMeta( + + # Performance: Lazy update - only update the affected key instead of invalidating whole cache + obj_meta = ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=etag, metadata=metadata, ) + self._update_object_cache_entry(bucket_id, safe_key.as_posix(), obj_meta) + + return obj_meta def get_object_path(self, bucket_name: str, object_key: str) -> Path: path = self._object_path(bucket_name, object_key) @@ -449,9 +466,10 @@ class ObjectStorage: rel = path.relative_to(bucket_path) self._safe_unlink(path) self._delete_metadata(bucket_id, rel) - + self._invalidate_bucket_stats_cache(bucket_id) - self._invalidate_object_cache(bucket_id) + # Performance: Lazy update - only remove the affected key instead of invalidating whole cache + self._update_object_cache_entry(bucket_id, safe_key.as_posix(), None) self._cleanup_empty_parents(path, bucket_path) def purge_object(self, bucket_name: str, object_key: str) -> None: @@ -471,9 +489,10 @@ class ObjectStorage: legacy_version_dir = self._legacy_version_dir(bucket_id, rel) if legacy_version_dir.exists(): shutil.rmtree(legacy_version_dir, ignore_errors=True) - + self._invalidate_bucket_stats_cache(bucket_id) - self._invalidate_object_cache(bucket_id) + # Performance: Lazy update - only remove the affected key instead of invalidating whole cache + self._update_object_cache_entry(bucket_id, rel.as_posix(), None) self._cleanup_empty_parents(target, bucket_path) def is_versioning_enabled(self, bucket_name: str) -> bool: @@ -1054,16 +1073,19 @@ class ObjectStorage: shutil.rmtree(upload_root, ignore_errors=True) self._invalidate_bucket_stats_cache(bucket_id) - self._invalidate_object_cache(bucket_id) stat = destination.stat() - return ObjectMeta( + # Performance: Lazy update - only update the affected key instead of invalidating whole cache + obj_meta = ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=checksum.hexdigest(), metadata=metadata, ) + self._update_object_cache_entry(bucket_id, safe_key.as_posix(), obj_meta) + + return obj_meta def abort_multipart_upload(self, bucket_name: str, upload_id: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -1305,37 +1327,47 @@ class ObjectStorage: """Get cached object metadata for a bucket, refreshing if stale. Uses LRU eviction to prevent unbounded cache growth. - Thread-safe with version tracking to detect concurrent invalidations. + Thread-safe with per-bucket locks to reduce contention. """ now = time.time() + # Quick check with global lock (brief) with self._cache_lock: cached = self._object_cache.get(bucket_id) - cache_version = self._cache_version.get(bucket_id, 0) - if cached: objects, timestamp = cached if now - timestamp < self.KEY_INDEX_CACHE_TTL: - # Move to end (most recently used) self._object_cache.move_to_end(bucket_id) return objects + cache_version = self._cache_version.get(bucket_id, 0) - # Build cache outside lock to avoid holding lock during I/O - objects = self._build_object_cache(bucket_path) + # Use per-bucket lock for cache building (allows parallel builds for different buckets) + bucket_lock = self._get_bucket_lock(bucket_id) + with bucket_lock: + # Double-check cache after acquiring per-bucket lock + with self._cache_lock: + cached = self._object_cache.get(bucket_id) + if cached: + objects, timestamp = cached + if now - timestamp < self.KEY_INDEX_CACHE_TTL: + self._object_cache.move_to_end(bucket_id) + return objects - with self._cache_lock: - # Check if cache was invalidated while we were building - current_version = self._cache_version.get(bucket_id, 0) - if current_version != cache_version: - # Cache was invalidated, rebuild - objects = self._build_object_cache(bucket_path) + # Build cache with per-bucket lock held (prevents duplicate work) + objects = self._build_object_cache(bucket_path) - # Evict oldest entries if cache is full - while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: - self._object_cache.popitem(last=False) + with self._cache_lock: + # Check if cache was invalidated while we were building + current_version = self._cache_version.get(bucket_id, 0) + if current_version != cache_version: + objects = self._build_object_cache(bucket_path) - self._object_cache[bucket_id] = (objects, time.time()) - self._object_cache.move_to_end(bucket_id) + # Evict oldest entries if cache is full + while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: + self._object_cache.popitem(last=False) + + self._object_cache[bucket_id] = (objects, time.time()) + self._object_cache.move_to_end(bucket_id) return objects @@ -1354,6 +1386,23 @@ class ObjectStorage: except OSError: pass + def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None: + """Update a single entry in the object cache instead of invalidating the whole cache. + + This is a performance optimization - lazy update instead of full invalidation. + """ + with self._cache_lock: + cached = self._object_cache.get(bucket_id) + if cached: + objects, timestamp = cached + if meta is None: + # Delete operation - remove key from cache + objects.pop(key, None) + else: + # Put operation - update/add key in cache + objects[key] = meta + # Keep same timestamp - don't reset TTL for single key updates + def _ensure_system_roots(self) -> None: for path in ( self._system_root_path(), @@ -1373,19 +1422,33 @@ class ObjectStorage: return self._system_bucket_root(bucket_name) / self.BUCKET_CONFIG_FILE def _read_bucket_config(self, bucket_name: str) -> dict[str, Any]: + # Performance: Check cache first + now = time.time() + cached = self._bucket_config_cache.get(bucket_name) + if cached: + config, cached_time = cached + if now - cached_time < self._bucket_config_cache_ttl: + return config.copy() # Return copy to prevent mutation + config_path = self._bucket_config_path(bucket_name) if not config_path.exists(): + self._bucket_config_cache[bucket_name] = ({}, now) return {} try: data = json.loads(config_path.read_text(encoding="utf-8")) - return data if isinstance(data, dict) else {} + config = data if isinstance(data, dict) else {} + self._bucket_config_cache[bucket_name] = (config, now) + return config.copy() except (OSError, json.JSONDecodeError): + self._bucket_config_cache[bucket_name] = ({}, now) return {} def _write_bucket_config(self, bucket_name: str, payload: dict[str, Any]) -> None: config_path = self._bucket_config_path(bucket_name) config_path.parent.mkdir(parents=True, exist_ok=True) config_path.write_text(json.dumps(payload), encoding="utf-8") + # Performance: Update cache immediately after write + self._bucket_config_cache[bucket_name] = (payload.copy(), time.time()) def _set_bucket_config_entry(self, bucket_name: str, key: str, value: Any | None) -> None: config = self._read_bucket_config(bucket_name) @@ -1507,33 +1570,68 @@ class ObjectStorage: except OSError: continue - def _has_visible_objects(self, bucket_path: Path) -> bool: + def _check_bucket_contents(self, bucket_path: Path) -> tuple[bool, bool, bool]: + """Check bucket for objects, versions, and multipart uploads in a single pass. + + Performance optimization: Combines three separate rglob traversals into one. + Returns (has_visible_objects, has_archived_versions, has_active_multipart_uploads). + Uses early exit when all three are found. + """ + has_objects = False + has_versions = False + has_multipart = False + bucket_name = bucket_path.name + + # Check visible objects in bucket for path in bucket_path.rglob("*"): + if has_objects: + break if not path.is_file(): continue rel = path.relative_to(bucket_path) if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: continue - return True - return False + has_objects = True + + # Check archived versions (only if needed) + for version_root in ( + self._bucket_versions_root(bucket_name), + self._legacy_versions_root(bucket_name), + ): + if has_versions: + break + if version_root.exists(): + for path in version_root.rglob("*"): + if path.is_file(): + has_versions = True + break + + # Check multipart uploads (only if needed) + for uploads_root in ( + self._multipart_bucket_root(bucket_name), + self._legacy_multipart_bucket_root(bucket_name), + ): + if has_multipart: + break + if uploads_root.exists(): + for path in uploads_root.rglob("*"): + if path.is_file(): + has_multipart = True + break + + return has_objects, has_versions, has_multipart + + def _has_visible_objects(self, bucket_path: Path) -> bool: + has_objects, _, _ = self._check_bucket_contents(bucket_path) + return has_objects def _has_archived_versions(self, bucket_path: Path) -> bool: - for version_root in ( - self._bucket_versions_root(bucket_path.name), - self._legacy_versions_root(bucket_path.name), - ): - if version_root.exists() and any(path.is_file() for path in version_root.rglob("*")): - return True - return False + _, has_versions, _ = self._check_bucket_contents(bucket_path) + return has_versions def _has_active_multipart_uploads(self, bucket_path: Path) -> bool: - for uploads_root in ( - self._multipart_bucket_root(bucket_path.name), - self._legacy_multipart_bucket_root(bucket_path.name), - ): - if uploads_root.exists() and any(path.is_file() for path in uploads_root.rglob("*")): - return True - return False + _, _, has_multipart = self._check_bucket_contents(bucket_path) + return has_multipart def _remove_tree(self, path: Path) -> None: if not path.exists(): diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index e4a77d7..948408a 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -2026,7 +2026,7 @@ title="Download" aria-label="Download" > -