MyFSIO v0.2.6 Release #18

Merged
kqjy merged 14 commits from next into main 2026-02-05 16:18:04 +00:00
Showing only changes of commit e9a035827b - Show all commits

View File

@@ -1617,14 +1617,16 @@ class ObjectStorage:
Uses LRU eviction to prevent unbounded cache growth.
Thread-safe with per-bucket locks to reduce contention.
Also checks file-based marker for cross-process cache invalidation.
"""
now = time.time()
marker_mtime = self._get_cache_marker_mtime(bucket_id)
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self._cache_ttl:
if now - timestamp < self._cache_ttl and marker_mtime <= timestamp:
self._object_cache.move_to_end(bucket_id)
return objects
cache_version = self._cache_version.get(bucket_id, 0)
@@ -1635,7 +1637,7 @@ class ObjectStorage:
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self._cache_ttl:
if now - timestamp < self._cache_ttl and marker_mtime <= timestamp:
self._object_cache.move_to_end(bucket_id)
return objects
objects = self._build_object_cache(bucket_path)
@@ -1656,21 +1658,42 @@ class ObjectStorage:
"""Invalidate the object cache and etag index for a bucket.
Increments version counter to signal stale reads.
Also touches marker file for cross-process invalidation.
"""
with self._cache_lock:
self._object_cache.pop(bucket_id, None)
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
self._touch_cache_marker(bucket_id)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)
except OSError:
pass
def _touch_cache_marker(self, bucket_id: str) -> None:
"""Touch the cache marker file to signal other processes that cache is stale."""
marker_path = self._system_bucket_root(bucket_id) / ".cache_marker"
try:
marker_path.parent.mkdir(parents=True, exist_ok=True)
marker_path.write_text(str(time.time()), encoding="utf-8")
except OSError:
pass
def _get_cache_marker_mtime(self, bucket_id: str) -> float:
"""Get the mtime of the cache marker file, or 0 if it doesn't exist."""
marker_path = self._system_bucket_root(bucket_id) / ".cache_marker"
try:
return marker_path.stat().st_mtime
except OSError:
return 0
def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None:
"""Update a single entry in the object cache instead of invalidating the whole cache.
This is a performance optimization - lazy update instead of full invalidation.
Also touches a marker file to signal cache invalidation to other processes.
"""
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
@@ -1682,6 +1705,7 @@ class ObjectStorage:
objects[key] = meta
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
self._sorted_key_cache.pop(bucket_id, None)
self._touch_cache_marker(bucket_id)
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
"""Pre-warm the object cache for specified buckets or all buckets.