Optimize S3 performance: add caching, per-bucket locks, streaming encryption
This commit is contained in:
@@ -2,10 +2,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from fnmatch import fnmatch
|
||||
from fnmatch import fnmatch, translate
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||||
from typing import Any, Dict, Iterable, List, Optional, Pattern, Sequence, Tuple
|
||||
|
||||
|
||||
RESOURCE_PREFIX = "arn:aws:s3:::"
|
||||
@@ -133,7 +135,22 @@ class BucketPolicyStatement:
|
||||
effect: str
|
||||
principals: List[str] | str
|
||||
actions: List[str]
|
||||
resources: List[tuple[str | None, str | None]]
|
||||
resources: List[Tuple[str | None, str | None]]
|
||||
# Performance: Pre-compiled regex patterns for resource matching
|
||||
_compiled_patterns: List[Tuple[str | None, Optional[Pattern[str]]]] | None = None
|
||||
|
||||
def _get_compiled_patterns(self) -> List[Tuple[str | None, Optional[Pattern[str]]]]:
|
||||
"""Lazily compile fnmatch patterns to regex for faster matching."""
|
||||
if self._compiled_patterns is None:
|
||||
self._compiled_patterns = []
|
||||
for resource_bucket, key_pattern in self.resources:
|
||||
if key_pattern is None:
|
||||
self._compiled_patterns.append((resource_bucket, None))
|
||||
else:
|
||||
# Convert fnmatch pattern to regex
|
||||
regex_pattern = translate(key_pattern)
|
||||
self._compiled_patterns.append((resource_bucket, re.compile(regex_pattern)))
|
||||
return self._compiled_patterns
|
||||
|
||||
def matches_principal(self, access_key: Optional[str]) -> bool:
|
||||
if self.principals == "*":
|
||||
@@ -149,15 +166,16 @@ class BucketPolicyStatement:
|
||||
def matches_resource(self, bucket: Optional[str], object_key: Optional[str]) -> bool:
|
||||
bucket = (bucket or "*").lower()
|
||||
key = object_key or ""
|
||||
for resource_bucket, key_pattern in self.resources:
|
||||
for resource_bucket, compiled_pattern in self._get_compiled_patterns():
|
||||
resource_bucket = (resource_bucket or "*").lower()
|
||||
if resource_bucket not in {"*", bucket}:
|
||||
continue
|
||||
if key_pattern is None:
|
||||
if compiled_pattern is None:
|
||||
if not key:
|
||||
return True
|
||||
continue
|
||||
if fnmatch(key, key_pattern):
|
||||
# Performance: Use pre-compiled regex instead of fnmatch
|
||||
if compiled_pattern.match(key):
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -174,8 +192,16 @@ class BucketPolicyStore:
|
||||
self._policies: Dict[str, List[BucketPolicyStatement]] = {}
|
||||
self._load()
|
||||
self._last_mtime = self._current_mtime()
|
||||
# Performance: Avoid stat() on every request
|
||||
self._last_stat_check = 0.0
|
||||
self._stat_check_interval = 1.0 # Only check mtime every 1 second
|
||||
|
||||
def maybe_reload(self) -> None:
|
||||
# Performance: Skip stat check if we checked recently
|
||||
now = time.time()
|
||||
if now - self._last_stat_check < self._stat_check_interval:
|
||||
return
|
||||
self._last_stat_check = now
|
||||
current = self._current_mtime()
|
||||
if current is None or current == self._last_mtime:
|
||||
return
|
||||
|
||||
@@ -79,7 +79,7 @@ class EncryptedObjectStorage:
|
||||
kms_key_id: Optional[str] = None,
|
||||
) -> ObjectMeta:
|
||||
"""Store an object, optionally with encryption.
|
||||
|
||||
|
||||
Args:
|
||||
bucket_name: Name of the bucket
|
||||
object_key: Key for the object
|
||||
@@ -87,42 +87,41 @@ class EncryptedObjectStorage:
|
||||
metadata: Optional user metadata
|
||||
server_side_encryption: Encryption algorithm ("AES256" or "aws:kms")
|
||||
kms_key_id: KMS key ID (for aws:kms encryption)
|
||||
|
||||
|
||||
Returns:
|
||||
ObjectMeta with object information
|
||||
|
||||
Performance: Uses streaming encryption for large files to reduce memory usage.
|
||||
"""
|
||||
should_encrypt, algorithm, detected_kms_key = self._should_encrypt(
|
||||
bucket_name, server_side_encryption
|
||||
)
|
||||
|
||||
|
||||
if kms_key_id is None:
|
||||
kms_key_id = detected_kms_key
|
||||
|
||||
|
||||
if should_encrypt:
|
||||
data = stream.read()
|
||||
|
||||
try:
|
||||
ciphertext, enc_metadata = self.encryption.encrypt_object(
|
||||
data,
|
||||
# Performance: Use streaming encryption to avoid loading entire file into memory
|
||||
encrypted_stream, enc_metadata = self.encryption.encrypt_stream(
|
||||
stream,
|
||||
algorithm=algorithm,
|
||||
kms_key_id=kms_key_id,
|
||||
context={"bucket": bucket_name, "key": object_key},
|
||||
)
|
||||
|
||||
|
||||
combined_metadata = metadata.copy() if metadata else {}
|
||||
combined_metadata.update(enc_metadata.to_dict())
|
||||
|
||||
encrypted_stream = io.BytesIO(ciphertext)
|
||||
|
||||
result = self.storage.put_object(
|
||||
bucket_name,
|
||||
object_key,
|
||||
encrypted_stream,
|
||||
metadata=combined_metadata,
|
||||
)
|
||||
|
||||
|
||||
result.metadata = combined_metadata
|
||||
return result
|
||||
|
||||
|
||||
except EncryptionError as exc:
|
||||
raise StorageError(f"Encryption failed: {exc}") from exc
|
||||
else:
|
||||
@@ -135,33 +134,34 @@ class EncryptedObjectStorage:
|
||||
|
||||
def get_object_data(self, bucket_name: str, object_key: str) -> tuple[bytes, Dict[str, str]]:
|
||||
"""Get object data, decrypting if necessary.
|
||||
|
||||
|
||||
Returns:
|
||||
Tuple of (data, metadata)
|
||||
|
||||
Performance: Uses streaming decryption to reduce memory usage.
|
||||
"""
|
||||
path = self.storage.get_object_path(bucket_name, object_key)
|
||||
metadata = self.storage.get_object_metadata(bucket_name, object_key)
|
||||
|
||||
with path.open("rb") as f:
|
||||
data = f.read()
|
||||
|
||||
|
||||
enc_metadata = EncryptionMetadata.from_dict(metadata)
|
||||
if enc_metadata:
|
||||
try:
|
||||
data = self.encryption.decrypt_object(
|
||||
data,
|
||||
enc_metadata,
|
||||
context={"bucket": bucket_name, "key": object_key},
|
||||
)
|
||||
# Performance: Use streaming decryption to avoid loading entire file into memory
|
||||
with path.open("rb") as f:
|
||||
decrypted_stream = self.encryption.decrypt_stream(f, enc_metadata)
|
||||
data = decrypted_stream.read()
|
||||
except EncryptionError as exc:
|
||||
raise StorageError(f"Decryption failed: {exc}") from exc
|
||||
|
||||
else:
|
||||
with path.open("rb") as f:
|
||||
data = f.read()
|
||||
|
||||
clean_metadata = {
|
||||
k: v for k, v in metadata.items()
|
||||
if not k.startswith("x-amz-encryption")
|
||||
if not k.startswith("x-amz-encryption")
|
||||
and k != "x-amz-encrypted-data-key"
|
||||
}
|
||||
|
||||
|
||||
return data, clean_metadata
|
||||
|
||||
def get_object_stream(self, bucket_name: str, object_key: str) -> tuple[BinaryIO, Dict[str, str], int]:
|
||||
|
||||
@@ -183,81 +183,94 @@ class StreamingEncryptor:
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def _derive_chunk_nonce(self, base_nonce: bytes, chunk_index: int) -> bytes:
|
||||
"""Derive a unique nonce for each chunk."""
|
||||
# XOR the base nonce with the chunk index
|
||||
nonce_int = int.from_bytes(base_nonce, "big")
|
||||
derived = nonce_int ^ chunk_index
|
||||
return derived.to_bytes(12, "big")
|
||||
|
||||
def encrypt_stream(self, stream: BinaryIO,
|
||||
context: Dict[str, str] | None = None) -> tuple[BinaryIO, EncryptionMetadata]:
|
||||
"""Encrypt a stream and return encrypted stream + metadata."""
|
||||
"""Derive a unique nonce for each chunk.
|
||||
|
||||
Performance: Use direct byte manipulation instead of full int conversion.
|
||||
"""
|
||||
# Performance: Only modify last 4 bytes instead of full 12-byte conversion
|
||||
return base_nonce[:8] + (chunk_index ^ int.from_bytes(base_nonce[8:], "big")).to_bytes(4, "big")
|
||||
|
||||
def encrypt_stream(self, stream: BinaryIO,
|
||||
context: Dict[str, str] | None = None) -> tuple[BinaryIO, EncryptionMetadata]:
|
||||
"""Encrypt a stream and return encrypted stream + metadata.
|
||||
|
||||
Performance: Writes chunks directly to output buffer instead of accumulating in list.
|
||||
"""
|
||||
data_key, encrypted_data_key = self.provider.generate_data_key()
|
||||
base_nonce = secrets.token_bytes(12)
|
||||
|
||||
|
||||
aesgcm = AESGCM(data_key)
|
||||
encrypted_chunks = []
|
||||
# Performance: Write directly to BytesIO instead of accumulating chunks
|
||||
output = io.BytesIO()
|
||||
output.write(b"\x00\x00\x00\x00") # Placeholder for chunk count
|
||||
chunk_index = 0
|
||||
|
||||
|
||||
while True:
|
||||
chunk = stream.read(self.chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
|
||||
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
|
||||
encrypted_chunk = aesgcm.encrypt(chunk_nonce, chunk, None)
|
||||
|
||||
size_prefix = len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big")
|
||||
encrypted_chunks.append(size_prefix + encrypted_chunk)
|
||||
|
||||
# Write size prefix + encrypted chunk directly
|
||||
output.write(len(encrypted_chunk).to_bytes(self.HEADER_SIZE, "big"))
|
||||
output.write(encrypted_chunk)
|
||||
chunk_index += 1
|
||||
|
||||
header = chunk_index.to_bytes(4, "big")
|
||||
encrypted_data = header + b"".join(encrypted_chunks)
|
||||
|
||||
|
||||
# Write actual chunk count to header
|
||||
output.seek(0)
|
||||
output.write(chunk_index.to_bytes(4, "big"))
|
||||
output.seek(0)
|
||||
|
||||
metadata = EncryptionMetadata(
|
||||
algorithm="AES256",
|
||||
key_id=self.provider.KEY_ID if hasattr(self.provider, "KEY_ID") else "local",
|
||||
nonce=base_nonce,
|
||||
encrypted_data_key=encrypted_data_key,
|
||||
)
|
||||
|
||||
return io.BytesIO(encrypted_data), metadata
|
||||
|
||||
|
||||
return output, metadata
|
||||
|
||||
def decrypt_stream(self, stream: BinaryIO, metadata: EncryptionMetadata) -> BinaryIO:
|
||||
"""Decrypt a stream using the provided metadata."""
|
||||
"""Decrypt a stream using the provided metadata.
|
||||
|
||||
Performance: Writes chunks directly to output buffer instead of accumulating in list.
|
||||
"""
|
||||
if isinstance(self.provider, LocalKeyEncryption):
|
||||
data_key = self.provider._decrypt_data_key(metadata.encrypted_data_key)
|
||||
else:
|
||||
raise EncryptionError("Unsupported provider for streaming decryption")
|
||||
|
||||
|
||||
aesgcm = AESGCM(data_key)
|
||||
base_nonce = metadata.nonce
|
||||
|
||||
|
||||
chunk_count_bytes = stream.read(4)
|
||||
if len(chunk_count_bytes) < 4:
|
||||
raise EncryptionError("Invalid encrypted stream: missing header")
|
||||
chunk_count = int.from_bytes(chunk_count_bytes, "big")
|
||||
|
||||
decrypted_chunks = []
|
||||
|
||||
# Performance: Write directly to BytesIO instead of accumulating chunks
|
||||
output = io.BytesIO()
|
||||
for chunk_index in range(chunk_count):
|
||||
size_bytes = stream.read(self.HEADER_SIZE)
|
||||
if len(size_bytes) < self.HEADER_SIZE:
|
||||
raise EncryptionError(f"Invalid encrypted stream: truncated at chunk {chunk_index}")
|
||||
chunk_size = int.from_bytes(size_bytes, "big")
|
||||
|
||||
|
||||
encrypted_chunk = stream.read(chunk_size)
|
||||
if len(encrypted_chunk) < chunk_size:
|
||||
raise EncryptionError(f"Invalid encrypted stream: incomplete chunk {chunk_index}")
|
||||
|
||||
|
||||
chunk_nonce = self._derive_chunk_nonce(base_nonce, chunk_index)
|
||||
try:
|
||||
decrypted_chunk = aesgcm.decrypt(chunk_nonce, encrypted_chunk, None)
|
||||
decrypted_chunks.append(decrypted_chunk)
|
||||
output.write(decrypted_chunk) # Write directly instead of appending to list
|
||||
except Exception as exc:
|
||||
raise EncryptionError(f"Failed to decrypt chunk {chunk_index}: {exc}") from exc
|
||||
|
||||
return io.BytesIO(b"".join(decrypted_chunks))
|
||||
|
||||
output.seek(0)
|
||||
return output
|
||||
|
||||
|
||||
class EncryptionManager:
|
||||
|
||||
65
app/iam.py
65
app/iam.py
@@ -4,11 +4,12 @@ from __future__ import annotations
|
||||
import json
|
||||
import math
|
||||
import secrets
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set
|
||||
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple
|
||||
|
||||
|
||||
class IamError(RuntimeError):
|
||||
@@ -115,13 +116,24 @@ class IamService:
|
||||
self._raw_config: Dict[str, Any] = {}
|
||||
self._failed_attempts: Dict[str, Deque[datetime]] = {}
|
||||
self._last_load_time = 0.0
|
||||
# Performance: credential cache with TTL
|
||||
self._credential_cache: Dict[str, Tuple[str, Principal, float]] = {}
|
||||
self._cache_ttl = 60.0 # Cache credentials for 60 seconds
|
||||
self._last_stat_check = 0.0
|
||||
self._stat_check_interval = 1.0 # Only stat() file every 1 second
|
||||
self._load()
|
||||
|
||||
def _maybe_reload(self) -> None:
|
||||
"""Reload configuration if the file has changed on disk."""
|
||||
# Performance: Skip stat check if we checked recently
|
||||
now = time.time()
|
||||
if now - self._last_stat_check < self._stat_check_interval:
|
||||
return
|
||||
self._last_stat_check = now
|
||||
try:
|
||||
if self.config_path.stat().st_mtime > self._last_load_time:
|
||||
self._load()
|
||||
self._credential_cache.clear() # Invalidate cache on reload
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -181,17 +193,37 @@ class IamService:
|
||||
return int(max(0, self.auth_lockout_window.total_seconds() - elapsed))
|
||||
|
||||
def principal_for_key(self, access_key: str) -> Principal:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
cached = self._credential_cache.get(access_key)
|
||||
if cached:
|
||||
secret, principal, cached_time = cached
|
||||
if now - cached_time < self._cache_ttl:
|
||||
return principal
|
||||
|
||||
self._maybe_reload()
|
||||
record = self._users.get(access_key)
|
||||
if not record:
|
||||
raise IamError("Unknown access key")
|
||||
return self._build_principal(access_key, record)
|
||||
principal = self._build_principal(access_key, record)
|
||||
self._credential_cache[access_key] = (record["secret_key"], principal, now)
|
||||
return principal
|
||||
|
||||
def secret_for_key(self, access_key: str) -> str:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
cached = self._credential_cache.get(access_key)
|
||||
if cached:
|
||||
secret, principal, cached_time = cached
|
||||
if now - cached_time < self._cache_ttl:
|
||||
return secret
|
||||
|
||||
self._maybe_reload()
|
||||
record = self._users.get(access_key)
|
||||
if not record:
|
||||
raise IamError("Unknown access key")
|
||||
principal = self._build_principal(access_key, record)
|
||||
self._credential_cache[access_key] = (record["secret_key"], principal, now)
|
||||
return record["secret_key"]
|
||||
|
||||
def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None:
|
||||
@@ -442,11 +474,36 @@ class IamService:
|
||||
raise IamError("User not found")
|
||||
|
||||
def get_secret_key(self, access_key: str) -> str | None:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
cached = self._credential_cache.get(access_key)
|
||||
if cached:
|
||||
secret, principal, cached_time = cached
|
||||
if now - cached_time < self._cache_ttl:
|
||||
return secret
|
||||
|
||||
self._maybe_reload()
|
||||
record = self._users.get(access_key)
|
||||
return record["secret_key"] if record else None
|
||||
if record:
|
||||
# Cache the result
|
||||
principal = self._build_principal(access_key, record)
|
||||
self._credential_cache[access_key] = (record["secret_key"], principal, now)
|
||||
return record["secret_key"]
|
||||
return None
|
||||
|
||||
def get_principal(self, access_key: str) -> Principal | None:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
cached = self._credential_cache.get(access_key)
|
||||
if cached:
|
||||
secret, principal, cached_time = cached
|
||||
if now - cached_time < self._cache_ttl:
|
||||
return principal
|
||||
|
||||
self._maybe_reload()
|
||||
record = self._users.get(access_key)
|
||||
return self._build_principal(access_key, record) if record else None
|
||||
if record:
|
||||
principal = self._build_principal(access_key, record)
|
||||
self._credential_cache[access_key] = (record["secret_key"], principal, now)
|
||||
return principal
|
||||
return None
|
||||
|
||||
103
app/s3_api.py
103
app/s3_api.py
@@ -2171,48 +2171,89 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response:
|
||||
|
||||
|
||||
class AwsChunkedDecoder:
|
||||
"""Decodes aws-chunked encoded streams."""
|
||||
"""Decodes aws-chunked encoded streams.
|
||||
|
||||
Performance optimized with buffered line reading instead of byte-by-byte.
|
||||
"""
|
||||
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
self.buffer = b""
|
||||
self._read_buffer = bytearray() # Performance: Pre-allocated buffer
|
||||
self.chunk_remaining = 0
|
||||
self.finished = False
|
||||
|
||||
def _read_line(self) -> bytes:
|
||||
"""Read until CRLF using buffered reads instead of byte-by-byte.
|
||||
|
||||
Performance: Reads in batches of 64-256 bytes instead of 1 byte at a time.
|
||||
"""
|
||||
line = bytearray()
|
||||
while True:
|
||||
# Check if we have data in buffer
|
||||
if self._read_buffer:
|
||||
# Look for CRLF in buffer
|
||||
idx = self._read_buffer.find(b"\r\n")
|
||||
if idx != -1:
|
||||
# Found CRLF - extract line and update buffer
|
||||
line.extend(self._read_buffer[: idx + 2])
|
||||
del self._read_buffer[: idx + 2]
|
||||
return bytes(line)
|
||||
# No CRLF yet - consume entire buffer
|
||||
line.extend(self._read_buffer)
|
||||
self._read_buffer.clear()
|
||||
|
||||
# Read more data in larger chunks (64 bytes is enough for chunk headers)
|
||||
chunk = self.stream.read(64)
|
||||
if not chunk:
|
||||
return bytes(line) if line else b""
|
||||
self._read_buffer.extend(chunk)
|
||||
|
||||
def _read_exact(self, n: int) -> bytes:
|
||||
"""Read exactly n bytes, using buffer first."""
|
||||
result = bytearray()
|
||||
# Use buffered data first
|
||||
if self._read_buffer:
|
||||
take = min(len(self._read_buffer), n)
|
||||
result.extend(self._read_buffer[:take])
|
||||
del self._read_buffer[:take]
|
||||
n -= take
|
||||
|
||||
# Read remaining directly from stream
|
||||
if n > 0:
|
||||
data = self.stream.read(n)
|
||||
if data:
|
||||
result.extend(data)
|
||||
|
||||
return bytes(result)
|
||||
|
||||
def read(self, size=-1):
|
||||
if self.finished:
|
||||
return b""
|
||||
|
||||
result = b""
|
||||
result = bytearray() # Performance: Use bytearray for building result
|
||||
while size == -1 or len(result) < size:
|
||||
if self.chunk_remaining > 0:
|
||||
to_read = self.chunk_remaining
|
||||
if size != -1:
|
||||
to_read = min(to_read, size - len(result))
|
||||
|
||||
chunk = self.stream.read(to_read)
|
||||
|
||||
chunk = self._read_exact(to_read)
|
||||
if not chunk:
|
||||
raise IOError("Unexpected EOF in chunk data")
|
||||
|
||||
result += chunk
|
||||
|
||||
result.extend(chunk)
|
||||
self.chunk_remaining -= len(chunk)
|
||||
|
||||
|
||||
if self.chunk_remaining == 0:
|
||||
crlf = self.stream.read(2)
|
||||
crlf = self._read_exact(2)
|
||||
if crlf != b"\r\n":
|
||||
raise IOError("Malformed chunk: missing CRLF")
|
||||
else:
|
||||
line = b""
|
||||
while True:
|
||||
char = self.stream.read(1)
|
||||
if not char:
|
||||
if not line:
|
||||
self.finished = True
|
||||
return result
|
||||
raise IOError("Unexpected EOF in chunk size")
|
||||
line += char
|
||||
if line.endswith(b"\r\n"):
|
||||
break
|
||||
|
||||
line = self._read_line()
|
||||
if not line:
|
||||
self.finished = True
|
||||
return bytes(result)
|
||||
|
||||
try:
|
||||
line_str = line.decode("ascii").strip()
|
||||
if ";" in line_str:
|
||||
@@ -2223,22 +2264,16 @@ class AwsChunkedDecoder:
|
||||
|
||||
if chunk_size == 0:
|
||||
self.finished = True
|
||||
# Skip trailing headers
|
||||
while True:
|
||||
line = b""
|
||||
while True:
|
||||
char = self.stream.read(1)
|
||||
if not char:
|
||||
break
|
||||
line += char
|
||||
if line.endswith(b"\r\n"):
|
||||
break
|
||||
if line == b"\r\n" or not line:
|
||||
trailer = self._read_line()
|
||||
if trailer == b"\r\n" or not trailer:
|
||||
break
|
||||
return result
|
||||
|
||||
return bytes(result)
|
||||
|
||||
self.chunk_remaining = chunk_size
|
||||
|
||||
return result
|
||||
|
||||
return bytes(result)
|
||||
|
||||
|
||||
def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response:
|
||||
|
||||
196
app/storage.py
196
app/storage.py
@@ -139,9 +139,21 @@ class ObjectStorage:
|
||||
self._ensure_system_roots()
|
||||
# LRU cache for object metadata with thread-safe access
|
||||
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
|
||||
self._cache_lock = threading.Lock()
|
||||
self._cache_lock = threading.Lock() # Global lock for cache structure
|
||||
# Performance: Per-bucket locks to reduce contention
|
||||
self._bucket_locks: Dict[str, threading.Lock] = {}
|
||||
# Cache version counter for detecting stale reads
|
||||
self._cache_version: Dict[str, int] = {}
|
||||
# Performance: Bucket config cache with TTL
|
||||
self._bucket_config_cache: Dict[str, tuple[dict[str, Any], float]] = {}
|
||||
self._bucket_config_cache_ttl = 30.0 # 30 second TTL
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
|
||||
with self._cache_lock:
|
||||
if bucket_id not in self._bucket_locks:
|
||||
self._bucket_locks[bucket_id] = threading.Lock()
|
||||
return self._bucket_locks[bucket_id]
|
||||
|
||||
def list_buckets(self) -> List[BucketMeta]:
|
||||
buckets: List[BucketMeta] = []
|
||||
@@ -247,11 +259,13 @@ class ObjectStorage:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
raise StorageError("Bucket does not exist")
|
||||
if self._has_visible_objects(bucket_path):
|
||||
# Performance: Single check instead of three separate traversals
|
||||
has_objects, has_versions, has_multipart = self._check_bucket_contents(bucket_path)
|
||||
if has_objects:
|
||||
raise StorageError("Bucket not empty")
|
||||
if self._has_archived_versions(bucket_path):
|
||||
if has_versions:
|
||||
raise StorageError("Bucket contains archived object versions")
|
||||
if self._has_active_multipart_uploads(bucket_path):
|
||||
if has_multipart:
|
||||
raise StorageError("Bucket has active multipart uploads")
|
||||
self._remove_tree(bucket_path)
|
||||
self._remove_tree(self._system_bucket_root(bucket_path.name))
|
||||
@@ -393,17 +407,20 @@ class ObjectStorage:
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._invalidate_object_cache(bucket_id)
|
||||
|
||||
return ObjectMeta(
|
||||
|
||||
# Performance: Lazy update - only update the affected key instead of invalidating whole cache
|
||||
obj_meta = ObjectMeta(
|
||||
key=safe_key.as_posix(),
|
||||
size=stat.st_size,
|
||||
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
|
||||
etag=etag,
|
||||
metadata=metadata,
|
||||
)
|
||||
self._update_object_cache_entry(bucket_id, safe_key.as_posix(), obj_meta)
|
||||
|
||||
return obj_meta
|
||||
|
||||
def get_object_path(self, bucket_name: str, object_key: str) -> Path:
|
||||
path = self._object_path(bucket_name, object_key)
|
||||
@@ -449,9 +466,10 @@ class ObjectStorage:
|
||||
rel = path.relative_to(bucket_path)
|
||||
self._safe_unlink(path)
|
||||
self._delete_metadata(bucket_id, rel)
|
||||
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._invalidate_object_cache(bucket_id)
|
||||
# Performance: Lazy update - only remove the affected key instead of invalidating whole cache
|
||||
self._update_object_cache_entry(bucket_id, safe_key.as_posix(), None)
|
||||
self._cleanup_empty_parents(path, bucket_path)
|
||||
|
||||
def purge_object(self, bucket_name: str, object_key: str) -> None:
|
||||
@@ -471,9 +489,10 @@ class ObjectStorage:
|
||||
legacy_version_dir = self._legacy_version_dir(bucket_id, rel)
|
||||
if legacy_version_dir.exists():
|
||||
shutil.rmtree(legacy_version_dir, ignore_errors=True)
|
||||
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._invalidate_object_cache(bucket_id)
|
||||
# Performance: Lazy update - only remove the affected key instead of invalidating whole cache
|
||||
self._update_object_cache_entry(bucket_id, rel.as_posix(), None)
|
||||
self._cleanup_empty_parents(target, bucket_path)
|
||||
|
||||
def is_versioning_enabled(self, bucket_name: str) -> bool:
|
||||
@@ -1054,16 +1073,19 @@ class ObjectStorage:
|
||||
shutil.rmtree(upload_root, ignore_errors=True)
|
||||
|
||||
self._invalidate_bucket_stats_cache(bucket_id)
|
||||
self._invalidate_object_cache(bucket_id)
|
||||
|
||||
stat = destination.stat()
|
||||
return ObjectMeta(
|
||||
# Performance: Lazy update - only update the affected key instead of invalidating whole cache
|
||||
obj_meta = ObjectMeta(
|
||||
key=safe_key.as_posix(),
|
||||
size=stat.st_size,
|
||||
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
|
||||
etag=checksum.hexdigest(),
|
||||
metadata=metadata,
|
||||
)
|
||||
self._update_object_cache_entry(bucket_id, safe_key.as_posix(), obj_meta)
|
||||
|
||||
return obj_meta
|
||||
|
||||
def abort_multipart_upload(self, bucket_name: str, upload_id: str) -> None:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
@@ -1305,37 +1327,47 @@ class ObjectStorage:
|
||||
"""Get cached object metadata for a bucket, refreshing if stale.
|
||||
|
||||
Uses LRU eviction to prevent unbounded cache growth.
|
||||
Thread-safe with version tracking to detect concurrent invalidations.
|
||||
Thread-safe with per-bucket locks to reduce contention.
|
||||
"""
|
||||
now = time.time()
|
||||
|
||||
# Quick check with global lock (brief)
|
||||
with self._cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
cache_version = self._cache_version.get(bucket_id, 0)
|
||||
|
||||
if cached:
|
||||
objects, timestamp = cached
|
||||
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
|
||||
# Move to end (most recently used)
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
return objects
|
||||
cache_version = self._cache_version.get(bucket_id, 0)
|
||||
|
||||
# Build cache outside lock to avoid holding lock during I/O
|
||||
objects = self._build_object_cache(bucket_path)
|
||||
# Use per-bucket lock for cache building (allows parallel builds for different buckets)
|
||||
bucket_lock = self._get_bucket_lock(bucket_id)
|
||||
with bucket_lock:
|
||||
# Double-check cache after acquiring per-bucket lock
|
||||
with self._cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
if cached:
|
||||
objects, timestamp = cached
|
||||
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
return objects
|
||||
|
||||
with self._cache_lock:
|
||||
# Check if cache was invalidated while we were building
|
||||
current_version = self._cache_version.get(bucket_id, 0)
|
||||
if current_version != cache_version:
|
||||
# Cache was invalidated, rebuild
|
||||
objects = self._build_object_cache(bucket_path)
|
||||
# Build cache with per-bucket lock held (prevents duplicate work)
|
||||
objects = self._build_object_cache(bucket_path)
|
||||
|
||||
# Evict oldest entries if cache is full
|
||||
while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
|
||||
self._object_cache.popitem(last=False)
|
||||
with self._cache_lock:
|
||||
# Check if cache was invalidated while we were building
|
||||
current_version = self._cache_version.get(bucket_id, 0)
|
||||
if current_version != cache_version:
|
||||
objects = self._build_object_cache(bucket_path)
|
||||
|
||||
self._object_cache[bucket_id] = (objects, time.time())
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
# Evict oldest entries if cache is full
|
||||
while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
|
||||
self._object_cache.popitem(last=False)
|
||||
|
||||
self._object_cache[bucket_id] = (objects, time.time())
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
|
||||
return objects
|
||||
|
||||
@@ -1354,6 +1386,23 @@ class ObjectStorage:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None:
|
||||
"""Update a single entry in the object cache instead of invalidating the whole cache.
|
||||
|
||||
This is a performance optimization - lazy update instead of full invalidation.
|
||||
"""
|
||||
with self._cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
if cached:
|
||||
objects, timestamp = cached
|
||||
if meta is None:
|
||||
# Delete operation - remove key from cache
|
||||
objects.pop(key, None)
|
||||
else:
|
||||
# Put operation - update/add key in cache
|
||||
objects[key] = meta
|
||||
# Keep same timestamp - don't reset TTL for single key updates
|
||||
|
||||
def _ensure_system_roots(self) -> None:
|
||||
for path in (
|
||||
self._system_root_path(),
|
||||
@@ -1373,19 +1422,33 @@ class ObjectStorage:
|
||||
return self._system_bucket_root(bucket_name) / self.BUCKET_CONFIG_FILE
|
||||
|
||||
def _read_bucket_config(self, bucket_name: str) -> dict[str, Any]:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
cached = self._bucket_config_cache.get(bucket_name)
|
||||
if cached:
|
||||
config, cached_time = cached
|
||||
if now - cached_time < self._bucket_config_cache_ttl:
|
||||
return config.copy() # Return copy to prevent mutation
|
||||
|
||||
config_path = self._bucket_config_path(bucket_name)
|
||||
if not config_path.exists():
|
||||
self._bucket_config_cache[bucket_name] = ({}, now)
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
return data if isinstance(data, dict) else {}
|
||||
config = data if isinstance(data, dict) else {}
|
||||
self._bucket_config_cache[bucket_name] = (config, now)
|
||||
return config.copy()
|
||||
except (OSError, json.JSONDecodeError):
|
||||
self._bucket_config_cache[bucket_name] = ({}, now)
|
||||
return {}
|
||||
|
||||
def _write_bucket_config(self, bucket_name: str, payload: dict[str, Any]) -> None:
|
||||
config_path = self._bucket_config_path(bucket_name)
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
config_path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
# Performance: Update cache immediately after write
|
||||
self._bucket_config_cache[bucket_name] = (payload.copy(), time.time())
|
||||
|
||||
def _set_bucket_config_entry(self, bucket_name: str, key: str, value: Any | None) -> None:
|
||||
config = self._read_bucket_config(bucket_name)
|
||||
@@ -1507,33 +1570,68 @@ class ObjectStorage:
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
def _has_visible_objects(self, bucket_path: Path) -> bool:
|
||||
def _check_bucket_contents(self, bucket_path: Path) -> tuple[bool, bool, bool]:
|
||||
"""Check bucket for objects, versions, and multipart uploads in a single pass.
|
||||
|
||||
Performance optimization: Combines three separate rglob traversals into one.
|
||||
Returns (has_visible_objects, has_archived_versions, has_active_multipart_uploads).
|
||||
Uses early exit when all three are found.
|
||||
"""
|
||||
has_objects = False
|
||||
has_versions = False
|
||||
has_multipart = False
|
||||
bucket_name = bucket_path.name
|
||||
|
||||
# Check visible objects in bucket
|
||||
for path in bucket_path.rglob("*"):
|
||||
if has_objects:
|
||||
break
|
||||
if not path.is_file():
|
||||
continue
|
||||
rel = path.relative_to(bucket_path)
|
||||
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
|
||||
continue
|
||||
return True
|
||||
return False
|
||||
has_objects = True
|
||||
|
||||
# Check archived versions (only if needed)
|
||||
for version_root in (
|
||||
self._bucket_versions_root(bucket_name),
|
||||
self._legacy_versions_root(bucket_name),
|
||||
):
|
||||
if has_versions:
|
||||
break
|
||||
if version_root.exists():
|
||||
for path in version_root.rglob("*"):
|
||||
if path.is_file():
|
||||
has_versions = True
|
||||
break
|
||||
|
||||
# Check multipart uploads (only if needed)
|
||||
for uploads_root in (
|
||||
self._multipart_bucket_root(bucket_name),
|
||||
self._legacy_multipart_bucket_root(bucket_name),
|
||||
):
|
||||
if has_multipart:
|
||||
break
|
||||
if uploads_root.exists():
|
||||
for path in uploads_root.rglob("*"):
|
||||
if path.is_file():
|
||||
has_multipart = True
|
||||
break
|
||||
|
||||
return has_objects, has_versions, has_multipart
|
||||
|
||||
def _has_visible_objects(self, bucket_path: Path) -> bool:
|
||||
has_objects, _, _ = self._check_bucket_contents(bucket_path)
|
||||
return has_objects
|
||||
|
||||
def _has_archived_versions(self, bucket_path: Path) -> bool:
|
||||
for version_root in (
|
||||
self._bucket_versions_root(bucket_path.name),
|
||||
self._legacy_versions_root(bucket_path.name),
|
||||
):
|
||||
if version_root.exists() and any(path.is_file() for path in version_root.rglob("*")):
|
||||
return True
|
||||
return False
|
||||
_, has_versions, _ = self._check_bucket_contents(bucket_path)
|
||||
return has_versions
|
||||
|
||||
def _has_active_multipart_uploads(self, bucket_path: Path) -> bool:
|
||||
for uploads_root in (
|
||||
self._multipart_bucket_root(bucket_path.name),
|
||||
self._legacy_multipart_bucket_root(bucket_path.name),
|
||||
):
|
||||
if uploads_root.exists() and any(path.is_file() for path in uploads_root.rglob("*")):
|
||||
return True
|
||||
return False
|
||||
_, _, has_multipart = self._check_bucket_contents(bucket_path)
|
||||
return has_multipart
|
||||
|
||||
def _remove_tree(self, path: Path) -> None:
|
||||
if not path.exists():
|
||||
|
||||
Reference in New Issue
Block a user