diff --git a/app/storage.py b/app/storage.py index f50961e..c3e1afa 100644 --- a/app/storage.py +++ b/app/storage.py @@ -215,6 +215,8 @@ class ObjectStorage: self._stats_lock = threading.Lock() self._stats_dirty: set[str] = set() self._stats_flush_timer: Optional[threading.Timer] = None + self._etag_index_dirty: set[str] = set() + self._etag_index_flush_timer: Optional[threading.Timer] = None def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: with self._registry_lock: @@ -422,6 +424,9 @@ class ObjectStorage: if self._stats_flush_timer is not None: self._stats_flush_timer.cancel() self._flush_stats() + if self._etag_index_flush_timer is not None: + self._etag_index_flush_timer.cancel() + self._flush_etag_indexes() def delete_bucket(self, bucket_name: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -451,6 +456,7 @@ class ObjectStorage: self._stats_mem.pop(bucket_id, None) self._stats_serial.pop(bucket_id, None) self._stats_dirty.discard(bucket_id) + self._etag_index_dirty.discard(bucket_id) def list_objects( self, @@ -2169,10 +2175,10 @@ class ObjectStorage: if now - timestamp < self._cache_ttl and current_stats_mtime == cached_stats_mtime: self._object_cache.move_to_end(bucket_id) return objects - cache_version = self._cache_version.get(bucket_id, 0) bucket_lock = self._get_bucket_lock(bucket_id) with bucket_lock: + now = time.time() current_stats_mtime = self._get_cache_marker_mtime(bucket_id) with self._obj_cache_lock: cached = self._object_cache.get(bucket_id) @@ -2186,16 +2192,12 @@ class ObjectStorage: new_stats_mtime = self._get_cache_marker_mtime(bucket_id) with self._obj_cache_lock: - current_version = self._cache_version.get(bucket_id, 0) - if current_version != cache_version: - objects = self._build_object_cache(bucket_path) - new_stats_mtime = self._get_cache_marker_mtime(bucket_id) while len(self._object_cache) >= self._object_cache_max_size: self._object_cache.popitem(last=False) self._object_cache[bucket_id] = (objects, time.time(), new_stats_mtime) self._object_cache.move_to_end(bucket_id) - self._cache_version[bucket_id] = current_version + 1 + self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 self._sorted_key_cache.pop(bucket_id, None) return objects @@ -2205,6 +2207,7 @@ class ObjectStorage: self._object_cache.pop(bucket_id, None) self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 + self._etag_index_dirty.discard(bucket_id) etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True) @@ -2226,23 +2229,32 @@ class ObjectStorage: self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 self._sorted_key_cache.pop(bucket_id, None) - self._update_etag_index(bucket_id, key, meta.etag if meta else None) + self._etag_index_dirty.add(bucket_id) + self._schedule_etag_index_flush() - def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None: - etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" - if not etag_index_path.exists(): - return - try: - with open(etag_index_path, 'r', encoding='utf-8') as f: - index = json.load(f) - if etag is None: - index.pop(key, None) - else: - index[key] = etag - with open(etag_index_path, 'w', encoding='utf-8') as f: - json.dump(index, f) - except (OSError, json.JSONDecodeError): - pass + def _schedule_etag_index_flush(self) -> None: + if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive(): + self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes) + self._etag_index_flush_timer.daemon = True + self._etag_index_flush_timer.start() + + def _flush_etag_indexes(self) -> None: + dirty = set(self._etag_index_dirty) + self._etag_index_dirty.clear() + for bucket_id in dirty: + with self._obj_cache_lock: + cached = self._object_cache.get(bucket_id) + if not cached: + continue + objects = cached[0] + index = {k: v.etag for k, v in objects.items() if v.etag} + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + etag_index_path.parent.mkdir(parents=True, exist_ok=True) + with open(etag_index_path, 'w', encoding='utf-8') as f: + json.dump(index, f) + except OSError: + pass def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None: """Pre-warm the object cache for specified buckets or all buckets. @@ -2299,12 +2311,7 @@ class ObjectStorage: if cached: config, cached_time, cached_mtime = cached if now - cached_time < self._bucket_config_cache_ttl: - try: - current_mtime = config_path.stat().st_mtime if config_path.exists() else 0.0 - except OSError: - current_mtime = 0.0 - if current_mtime == cached_mtime: - return config.copy() + return config.copy() if not config_path.exists(): self._bucket_config_cache[bucket_name] = ({}, now, 0.0)