diff --git a/app/storage.py b/app/storage.py index f102949..992a25c 100644 --- a/app/storage.py +++ b/app/storage.py @@ -1617,14 +1617,16 @@ class ObjectStorage: Uses LRU eviction to prevent unbounded cache growth. Thread-safe with per-bucket locks to reduce contention. + Also checks file-based marker for cross-process cache invalidation. """ now = time.time() + marker_mtime = self._get_cache_marker_mtime(bucket_id) with self._cache_lock: cached = self._object_cache.get(bucket_id) if cached: objects, timestamp = cached - if now - timestamp < self._cache_ttl: + if now - timestamp < self._cache_ttl and marker_mtime <= timestamp: self._object_cache.move_to_end(bucket_id) return objects cache_version = self._cache_version.get(bucket_id, 0) @@ -1635,7 +1637,7 @@ class ObjectStorage: cached = self._object_cache.get(bucket_id) if cached: objects, timestamp = cached - if now - timestamp < self._cache_ttl: + if now - timestamp < self._cache_ttl and marker_mtime <= timestamp: self._object_cache.move_to_end(bucket_id) return objects objects = self._build_object_cache(bucket_path) @@ -1656,21 +1658,42 @@ class ObjectStorage: """Invalidate the object cache and etag index for a bucket. Increments version counter to signal stale reads. + Also touches marker file for cross-process invalidation. """ with self._cache_lock: self._object_cache.pop(bucket_id, None) self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 + self._touch_cache_marker(bucket_id) + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True) except OSError: pass + def _touch_cache_marker(self, bucket_id: str) -> None: + """Touch the cache marker file to signal other processes that cache is stale.""" + marker_path = self._system_bucket_root(bucket_id) / ".cache_marker" + try: + marker_path.parent.mkdir(parents=True, exist_ok=True) + marker_path.write_text(str(time.time()), encoding="utf-8") + except OSError: + pass + + def _get_cache_marker_mtime(self, bucket_id: str) -> float: + """Get the mtime of the cache marker file, or 0 if it doesn't exist.""" + marker_path = self._system_bucket_root(bucket_id) / ".cache_marker" + try: + return marker_path.stat().st_mtime + except OSError: + return 0 + def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None: """Update a single entry in the object cache instead of invalidating the whole cache. This is a performance optimization - lazy update instead of full invalidation. + Also touches a marker file to signal cache invalidation to other processes. """ with self._cache_lock: cached = self._object_cache.get(bucket_id) @@ -1682,6 +1705,7 @@ class ObjectStorage: objects[key] = meta self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 self._sorted_key_cache.pop(bucket_id, None) + self._touch_cache_marker(bucket_id) def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None: """Pre-warm the object cache for specified buckets or all buckets.