Fix issues -- Bug fixes:

- Fix duplicate _legacy_version_dir check in storage.py
      - Fix max_size_bytes -> max_bytes param in quota handler
      - Move base64 import to module level in s3_api.py
      - Add retry logic and atomic file ops to multipart upload
      - Add shutdown() method to ReplicationManager

      Performance:
      - Add LRU eviction with OrderedDict to object cache
      - Add cache version tracking for stale read detection
      - Add streaming uploads for large files (>10 MiB) in replication
      - Create _find_element() XML parsing helpers

      Security:
      - Gate SigV4 debug logging behind DEBUG_SIGV4 config
This commit is contained in:
2025-12-29 12:46:23 +08:00
parent 9165e365e6
commit c8eb3de629
4 changed files with 234 additions and 153 deletions

View File

@@ -9,7 +9,7 @@ import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Dict, Optional from typing import Any, Dict, Optional
import boto3 import boto3
from botocore.config import Config from botocore.config import Config
@@ -24,11 +24,42 @@ logger = logging.getLogger(__name__)
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
REPLICATION_CONNECT_TIMEOUT = 5 REPLICATION_CONNECT_TIMEOUT = 5
REPLICATION_READ_TIMEOUT = 30 REPLICATION_READ_TIMEOUT = 30
STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files
REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_NEW_ONLY = "new_only"
REPLICATION_MODE_ALL = "all" REPLICATION_MODE_ALL = "all"
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
"""Create a boto3 S3 client for the given connection.
Args:
connection: Remote S3 connection configuration
health_check: If True, use minimal retries for quick health checks
Returns:
Configured boto3 S3 client
"""
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 1 if health_check else 2},
signature_version='s3v4',
s3={'addressing_style': 'path'},
request_checksum_calculation='when_required',
response_checksum_validation='when_required',
)
return boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region or 'us-east-1',
config=config,
)
@dataclass @dataclass
class ReplicationStats: class ReplicationStats:
"""Statistics for replication operations - computed dynamically.""" """Statistics for replication operations - computed dynamically."""
@@ -102,8 +133,19 @@ class ReplicationManager:
self._rules: Dict[str, ReplicationRule] = {} self._rules: Dict[str, ReplicationRule] = {}
self._stats_lock = threading.Lock() self._stats_lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
self._shutdown = False
self.reload_rules() self.reload_rules()
def shutdown(self, wait: bool = True) -> None:
"""Shutdown the replication executor gracefully.
Args:
wait: If True, wait for pending tasks to complete
"""
self._shutdown = True
self._executor.shutdown(wait=wait)
logger.info("Replication manager shut down")
def reload_rules(self) -> None: def reload_rules(self) -> None:
if not self.rules_path.exists(): if not self.rules_path.exists():
self._rules = {} self._rules = {}
@@ -129,20 +171,7 @@ class ReplicationManager:
Uses short timeouts to prevent blocking. Uses short timeouts to prevent blocking.
""" """
try: try:
config = Config( s3 = _create_s3_client(connection, health_check=True)
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 1}
)
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
config=config,
)
s3.list_buckets() s3.list_buckets()
return True return True
except Exception as e: except Exception as e:
@@ -185,13 +214,7 @@ class ReplicationManager:
source_objects = self.storage.list_objects_all(bucket_name) source_objects = self.storage.list_objects_all(bucket_name)
source_keys = {obj.key: obj.size for obj in source_objects} source_keys = {obj.key: obj.size for obj in source_objects}
s3 = boto3.client( s3 = _create_s3_client(connection)
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
)
dest_keys = set() dest_keys = set()
bytes_synced = 0 bytes_synced = 0
@@ -257,13 +280,7 @@ class ReplicationManager:
raise ValueError(f"Connection {connection_id} not found") raise ValueError(f"Connection {connection_id} not found")
try: try:
s3 = boto3.client( s3 = _create_s3_client(connection)
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
)
s3.create_bucket(Bucket=bucket_name) s3.create_bucket(Bucket=bucket_name)
except ClientError as e: except ClientError as e:
logger.error(f"Failed to create remote bucket {bucket_name}: {e}") logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
@@ -286,6 +303,9 @@ class ReplicationManager:
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
if self._shutdown:
return
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"): if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}") logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
return return
@@ -297,30 +317,8 @@ class ReplicationManager:
logger.error(f"Object key validation failed in replication: {e}") logger.error(f"Object key validation failed in replication: {e}")
return return
file_size = 0
try: try:
config = Config( s3 = _create_s3_client(conn)
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 2},
signature_version='s3v4',
s3={
'addressing_style': 'path',
},
# Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors
# with S3-compatible servers that don't support CRC32 checksum headers
request_checksum_calculation='when_required',
response_checksum_validation='when_required',
)
s3 = boto3.client(
"s3",
endpoint_url=conn.endpoint_url,
aws_access_key_id=conn.access_key,
aws_secret_access_key=conn.secret_key,
region_name=conn.region or 'us-east-1',
config=config,
)
if action == "delete": if action == "delete":
try: try:
@@ -337,34 +335,42 @@ class ReplicationManager:
logger.error(f"Source object not found: {bucket_name}/{object_key}") logger.error(f"Source object not found: {bucket_name}/{object_key}")
return return
# Don't replicate metadata - destination server will generate its own
# __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values.
content_type, _ = mimetypes.guess_type(path) content_type, _ = mimetypes.guess_type(path)
file_size = path.stat().st_size file_size = path.stat().st_size
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
def do_put_object() -> None: def do_upload() -> None:
"""Helper to upload object. """Upload object using appropriate method based on file size.
Reads the file content into memory first to avoid signature calculation For small files (< 10 MiB): Read into memory for simpler handling
issues with certain binary file types (like GIFs) when streaming. For large files: Use streaming upload to avoid memory issues
Do NOT set ContentLength explicitly - boto3 calculates it from the bytes
and setting it manually can cause SignatureDoesNotMatch errors.
""" """
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
if file_size >= STREAMING_THRESHOLD_BYTES:
# Use multipart upload for large files
s3.upload_file(
str(path),
rule.target_bucket,
object_key,
ExtraArgs=extra_args if extra_args else None,
)
else:
# Read small files into memory
file_content = path.read_bytes() file_content = path.read_bytes()
put_kwargs = { put_kwargs = {
"Bucket": rule.target_bucket, "Bucket": rule.target_bucket,
"Key": object_key, "Key": object_key,
"Body": file_content, "Body": file_content,
**extra_args,
} }
if content_type:
put_kwargs["ContentType"] = content_type
s3.put_object(**put_kwargs) s3.put_object(**put_kwargs)
try: try:
do_put_object() do_upload()
except (ClientError, S3UploadFailedError) as e: except (ClientError, S3UploadFailedError) as e:
error_code = None error_code = None
if isinstance(e, ClientError): if isinstance(e, ClientError):
@@ -389,7 +395,7 @@ class ReplicationManager:
raise e raise e
if bucket_ready: if bucket_ready:
do_put_object() do_upload()
else: else:
raise e raise e

View File

@@ -1,13 +1,15 @@
"""Flask blueprint exposing a subset of the S3 REST API.""" """Flask blueprint exposing a subset of the S3 REST API."""
from __future__ import annotations from __future__ import annotations
import base64
import hashlib import hashlib
import hmac import hmac
import logging
import mimetypes import mimetypes
import re import re
import uuid import uuid
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode, urlparse, unquote from urllib.parse import quote, urlencode, urlparse, unquote
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
@@ -20,6 +22,8 @@ from .iam import IamError, Principal
from .replication import ReplicationManager from .replication import ReplicationManager
from .storage import ObjectStorage, StorageError, QuotaExceededError from .storage import ObjectStorage, StorageError, QuotaExceededError
logger = logging.getLogger(__name__)
s3_api_bp = Blueprint("s3_api", __name__) s3_api_bp = Blueprint("s3_api", __name__)
def _storage() -> ObjectStorage: def _storage() -> ObjectStorage:
@@ -118,6 +122,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
if header_val is None: if header_val is None:
header_val = "" header_val = ""
if header.lower() == 'expect' and header_val == "":
header_val = "100-continue"
header_val = " ".join(header_val.split()) header_val = " ".join(header_val.split())
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n") canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
canonical_headers = "".join(canonical_headers_parts) canonical_headers = "".join(canonical_headers_parts)
@@ -128,15 +135,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}" canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
# Debug logging for signature issues
import logging
logger = logging.getLogger(__name__)
logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}")
logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}")
logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}")
logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}")
logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}")
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date") amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
if not amz_date: if not amz_date:
raise IamError("Missing Date header") raise IamError("Missing Date header")
@@ -167,24 +165,18 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature): if not hmac.compare_digest(calculated_signature, signature):
# Debug logging for signature mismatch # Only log detailed signature debug info if DEBUG_SIGV4 is enabled
import logging if current_app.config.get("DEBUG_SIGV4"):
logger = logging.getLogger(__name__) logger.warning(
logger.error(f"Signature mismatch for {req.path}") "SigV4 signature mismatch",
logger.error(f" Content-Type: {req.headers.get('Content-Type')}") extra={
logger.error(f" Content-Length: {req.headers.get('Content-Length')}") "path": req.path,
logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}") "method": method,
logger.error(f" Canonical URI: {canonical_uri}") "signed_headers": signed_headers_str,
logger.error(f" Signed headers: {signed_headers_str}") "content_type": req.headers.get("Content-Type"),
# Log each signed header's value "content_length": req.headers.get("Content-Length"),
for h in signed_headers_list: }
logger.error(f" Header '{h}': {repr(req.headers.get(h))}") )
logger.error(f" Expected sig: {signature[:16]}...")
logger.error(f" Calculated sig: {calculated_signature[:16]}...")
# Log first part of canonical request to compare
logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...")
# Log the full canonical request for debugging
logger.error(f" Canonical request:\n{canonical_request[:500]}...")
raise IamError("SignatureDoesNotMatch") raise IamError("SignatureDoesNotMatch")
return _iam().get_principal(access_key) return _iam().get_principal(access_key)
@@ -236,6 +228,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_headers_parts = [] canonical_headers_parts = []
for header in signed_headers_list: for header in signed_headers_list:
val = req.headers.get(header, "").strip() val = req.headers.get(header, "").strip()
if header.lower() == 'expect' and val == "":
val = "100-continue"
val = " ".join(val.split()) val = " ".join(val.split())
canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers_parts.append(f"{header}:{val}\n")
canonical_headers = "".join(canonical_headers_parts) canonical_headers = "".join(canonical_headers_parts)
@@ -569,6 +563,28 @@ def _strip_ns(tag: str | None) -> str:
return tag.split("}")[-1] return tag.split("}")[-1]
def _find_element(parent: Element, name: str) -> Optional[Element]:
"""Find a child element by name, trying both namespaced and non-namespaced variants.
This handles XML documents that may or may not include namespace prefixes.
"""
el = parent.find(f"{{*}}{name}")
if el is None:
el = parent.find(name)
return el
def _find_element_text(parent: Element, name: str, default: str = "") -> str:
"""Find a child element and return its text content.
Returns the default value if element not found or has no text.
"""
el = _find_element(parent, name)
if el is None or el.text is None:
return default
return el.text.strip()
def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]: def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
try: try:
root = fromstring(payload) root = fromstring(payload)
@@ -585,17 +601,11 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
for tag_el in list(tagset): for tag_el in list(tagset):
if _strip_ns(tag_el.tag) != "Tag": if _strip_ns(tag_el.tag) != "Tag":
continue continue
key_el = tag_el.find("{*}Key") key = _find_element_text(tag_el, "Key")
if key_el is None:
key_el = tag_el.find("Key")
value_el = tag_el.find("{*}Value")
if value_el is None:
value_el = tag_el.find("Value")
key = (key_el.text or "").strip() if key_el is not None else ""
if not key: if not key:
continue continue
value = value_el.text if value_el is not None else "" value = _find_element_text(tag_el, "Value")
tags.append({"Key": key, "Value": value or ""}) tags.append({"Key": key, "Value": value})
return tags return tags
@@ -1439,7 +1449,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
if request.method == "DELETE": if request.method == "DELETE":
try: try:
storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None) storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None)
except StorageError as exc: except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404) return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name}) current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name})
@@ -1473,7 +1483,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
return _error_response("InvalidArgument", f"max_objects {exc}", 400) return _error_response("InvalidArgument", f"max_objects {exc}", 400)
try: try:
storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects) storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects)
except StorageError as exc: except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404) return _error_response("NoSuchBucket", str(exc), 404)
@@ -1665,7 +1675,6 @@ def bucket_handler(bucket_name: str) -> Response:
effective_start = "" effective_start = ""
if list_type == "2": if list_type == "2":
if continuation_token: if continuation_token:
import base64
try: try:
effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8") effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8")
except Exception: except Exception:
@@ -1722,7 +1731,6 @@ def bucket_handler(bucket_name: str) -> Response:
next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1] next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1]
if list_type == "2" and next_marker: if list_type == "2" and next_marker:
import base64
next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8") next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8")
if list_type == "2": if list_type == "2":

View File

@@ -7,9 +7,11 @@ import os
import re import re
import shutil import shutil
import stat import stat
import threading
import time import time
import unicodedata import unicodedata
import uuid import uuid
from collections import OrderedDict
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -129,12 +131,17 @@ class ObjectStorage:
MULTIPART_MANIFEST = "manifest.json" MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json" BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30 KEY_INDEX_CACHE_TTL = 30
OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache
def __init__(self, root: Path) -> None: def __init__(self, root: Path) -> None:
self.root = Path(root) self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True) self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots() self._ensure_system_roots()
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} # LRU cache for object metadata with thread-safe access
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
self._cache_lock = threading.Lock()
# Cache version counter for detecting stale reads
self._cache_version: Dict[str, int] = {}
def list_buckets(self) -> List[BucketMeta]: def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = [] buckets: List[BucketMeta] = []
@@ -729,8 +736,6 @@ class ObjectStorage:
bucket_id = bucket_path.name bucket_id = bucket_path.name
safe_key = self._sanitize_object_key(object_key) safe_key = self._sanitize_object_key(object_key)
version_dir = self._version_dir(bucket_id, safe_key) version_dir = self._version_dir(bucket_id, safe_key)
if not version_dir.exists():
version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists(): if not version_dir.exists():
version_dir = self._legacy_version_dir(bucket_id, safe_key) version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists(): if not version_dir.exists():
@@ -879,6 +884,10 @@ class ObjectStorage:
part_number: int, part_number: int,
stream: BinaryIO, stream: BinaryIO,
) -> str: ) -> str:
"""Upload a part for a multipart upload.
Uses file locking to safely update the manifest and handle concurrent uploads.
"""
if part_number < 1: if part_number < 1:
raise StorageError("part_number must be >= 1") raise StorageError("part_number must be >= 1")
bucket_path = self._bucket_path(bucket_name) bucket_path = self._bucket_path(bucket_name)
@@ -889,11 +898,26 @@ class ObjectStorage:
if not upload_root.exists(): if not upload_root.exists():
raise StorageError("Multipart upload not found") raise StorageError("Multipart upload not found")
# Write part to temporary file first, then rename atomically
checksum = hashlib.md5() checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part" part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename part_path = upload_root / part_filename
with part_path.open("wb") as target: temp_path = upload_root / f".{part_filename}.tmp"
try:
with temp_path.open("wb") as target:
shutil.copyfileobj(_HashingReader(stream, checksum), target) shutil.copyfileobj(_HashingReader(stream, checksum), target)
# Atomic rename (or replace on Windows)
temp_path.replace(part_path)
except OSError:
# Clean up temp file on failure
try:
temp_path.unlink(missing_ok=True)
except OSError:
pass
raise
record = { record = {
"etag": checksum.hexdigest(), "etag": checksum.hexdigest(),
"size": part_path.stat().st_size, "size": part_path.stat().st_size,
@@ -903,16 +927,29 @@ class ObjectStorage:
manifest_path = upload_root / self.MULTIPART_MANIFEST manifest_path = upload_root / self.MULTIPART_MANIFEST
lock_path = upload_root / ".manifest.lock" lock_path = upload_root / ".manifest.lock"
# Retry loop for handling transient lock/read failures
max_retries = 3
for attempt in range(max_retries):
try:
with lock_path.open("w") as lock_file: with lock_path.open("w") as lock_file:
with _file_lock(lock_file): with _file_lock(lock_file):
try: try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8")) manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc: except (OSError, json.JSONDecodeError) as exc:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise StorageError("Multipart manifest unreadable") from exc raise StorageError("Multipart manifest unreadable") from exc
parts = manifest.setdefault("parts", {}) parts = manifest.setdefault("parts", {})
parts[str(part_number)] = record parts[str(part_number)] = record
manifest_path.write_text(json.dumps(manifest), encoding="utf-8") manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
break
except OSError as exc:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise StorageError(f"Failed to update multipart manifest: {exc}") from exc
return record["etag"] return record["etag"]
@@ -1264,22 +1301,52 @@ class ObjectStorage:
return objects return objects
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]: def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Get cached object metadata for a bucket, refreshing if stale.""" """Get cached object metadata for a bucket, refreshing if stale.
Uses LRU eviction to prevent unbounded cache growth.
Thread-safe with version tracking to detect concurrent invalidations.
"""
now = time.time() now = time.time()
with self._cache_lock:
cached = self._object_cache.get(bucket_id) cached = self._object_cache.get(bucket_id)
cache_version = self._cache_version.get(bucket_id, 0)
if cached: if cached:
objects, timestamp = cached objects, timestamp = cached
if now - timestamp < self.KEY_INDEX_CACHE_TTL: if now - timestamp < self.KEY_INDEX_CACHE_TTL:
# Move to end (most recently used)
self._object_cache.move_to_end(bucket_id)
return objects return objects
# Build cache outside lock to avoid holding lock during I/O
objects = self._build_object_cache(bucket_path) objects = self._build_object_cache(bucket_path)
self._object_cache[bucket_id] = (objects, now)
with self._cache_lock:
# Check if cache was invalidated while we were building
current_version = self._cache_version.get(bucket_id, 0)
if current_version != cache_version:
# Cache was invalidated, rebuild
objects = self._build_object_cache(bucket_path)
# Evict oldest entries if cache is full
while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
self._object_cache.popitem(last=False)
self._object_cache[bucket_id] = (objects, time.time())
self._object_cache.move_to_end(bucket_id)
return objects return objects
def _invalidate_object_cache(self, bucket_id: str) -> None: def _invalidate_object_cache(self, bucket_id: str) -> None:
"""Invalidate the object cache and etag index for a bucket.""" """Invalidate the object cache and etag index for a bucket.
Increments version counter to signal stale reads.
"""
with self._cache_lock:
self._object_cache.pop(bucket_id, None) self._object_cache.pop(bucket_id, None)
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try: try:
etag_index_path.unlink(missing_ok=True) etag_index_path.unlink(missing_ok=True)

View File

@@ -1,7 +1,7 @@
"""Central location for the application version string.""" """Central location for the application version string."""
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.1.8" APP_VERSION = "0.1.9"
def get_version() -> str: def get_version() -> str: