From c498fe7aee833a6eea8ef276c6c331ef83ec3f82 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 31 Mar 2026 21:10:47 +0800 Subject: [PATCH] Add self-heal missing ETags and harden ETag index persistence --- app/s3_api.py | 17 ++++++++++++----- app/storage.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/app/s3_api.py b/app/s3_api.py index 5cde319..97cfd0a 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -2833,9 +2833,12 @@ def object_handler(bucket_name: str, object_key: str): is_encrypted = "x-amz-server-side-encryption" in metadata cond_etag = metadata.get("__etag__") + _etag_was_healed = False if not cond_etag and not is_encrypted: try: cond_etag = storage._compute_etag(path) + _etag_was_healed = True + storage.heal_missing_etag(bucket_name, object_key, cond_etag) except OSError: cond_etag = None if cond_etag: @@ -2881,7 +2884,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() file_size = stat.st_size - etag = metadata.get("__etag__") or storage._compute_etag(path) + etag = cond_etag or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -2929,7 +2932,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() response = Response(status=200) - etag = metadata.get("__etag__") or storage._compute_etag(path) + etag = cond_etag or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -3314,9 +3317,13 @@ def head_object(bucket_name: str, object_key: str) -> Response: return error try: _authorize_action(principal, bucket_name, "read", object_key=object_key) - path = _storage().get_object_path(bucket_name, object_key) - metadata = _storage().get_object_metadata(bucket_name, object_key) - etag = metadata.get("__etag__") or _storage()._compute_etag(path) + storage = _storage() + path = storage.get_object_path(bucket_name, object_key) + metadata = storage.get_object_metadata(bucket_name, object_key) + etag = metadata.get("__etag__") + if not etag: + etag = storage._compute_etag(path) + storage.heal_missing_etag(bucket_name, object_key, etag) head_mtime = float(metadata["__last_modified__"]) if "__last_modified__" in metadata else None if head_mtime is None: diff --git a/app/storage.py b/app/storage.py index 3ccf645..d2469e9 100644 --- a/app/storage.py +++ b/app/storage.py @@ -2,6 +2,7 @@ from __future__ import annotations import hashlib import json +import logging import os import re import shutil @@ -33,7 +34,8 @@ except ImportError: _rc = None _HAS_RUST = False -# Platform-specific file locking +logger = logging.getLogger(__name__) + if os.name == "nt": import msvcrt @@ -1082,6 +1084,30 @@ class ObjectStorage: safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) return self._read_metadata(bucket_path.name, safe_key) or {} + def heal_missing_etag(self, bucket_name: str, object_key: str, etag: str) -> None: + """Persist a computed ETag back to metadata (self-heal on read).""" + try: + bucket_path = self._bucket_path(bucket_name) + if not bucket_path.exists(): + return + bucket_id = bucket_path.name + safe_key = self._sanitize_object_key(object_key, self._object_key_max_length_bytes) + existing = self._read_metadata(bucket_id, safe_key) or {} + if existing.get("__etag__"): + return + existing["__etag__"] = etag + self._write_metadata(bucket_id, safe_key, existing) + with self._obj_cache_lock: + cached = self._object_cache.get(bucket_id) + if cached: + obj = cached[0].get(safe_key.as_posix()) + if obj and not obj.etag: + obj.etag = etag + self._etag_index_dirty.add(bucket_id) + self._schedule_etag_index_flush() + except Exception: + logger.warning("Failed to heal missing ETag for %s/%s", bucket_name, object_key) + def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: """Remove empty parent directories in a background thread. @@ -2366,12 +2392,10 @@ class ObjectStorage: index = {k: v.etag for k, v in objects.items() if v.etag} etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: - etag_index_path.parent.mkdir(parents=True, exist_ok=True) - with open(etag_index_path, 'w', encoding='utf-8') as f: - json.dump(index, f) + self._atomic_write_json(etag_index_path, index, sync=False) self._etag_index_mem[bucket_id] = (index, etag_index_path.stat().st_mtime) except OSError: - pass + logger.warning("Failed to flush etag index for bucket %s", bucket_id) def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None: """Pre-warm the object cache for specified buckets or all buckets.