MyFSIO v0.2.6 Release #18
@@ -322,12 +322,16 @@ class ObjectStorage:
|
||||
"""Incrementally update cached bucket statistics instead of invalidating.
|
||||
|
||||
This avoids expensive full directory scans on every PUT/DELETE by
|
||||
adjusting the cached values directly.
|
||||
adjusting the cached values directly. Also signals cross-process cache
|
||||
invalidation by updating the file mtime.
|
||||
"""
|
||||
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if cache_path.exists():
|
||||
data = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
else:
|
||||
data = {"objects": 0, "bytes": 0, "version_count": 0, "version_bytes": 0, "total_objects": 0, "total_bytes": 0}
|
||||
data["objects"] = max(0, data.get("objects", 0) + objects_delta)
|
||||
data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta)
|
||||
data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta)
|
||||
@@ -1658,34 +1662,27 @@ class ObjectStorage:
|
||||
"""Invalidate the object cache and etag index for a bucket.
|
||||
|
||||
Increments version counter to signal stale reads.
|
||||
Also touches marker file for cross-process invalidation.
|
||||
Cross-process invalidation is handled by checking stats.json mtime.
|
||||
"""
|
||||
with self._cache_lock:
|
||||
self._object_cache.pop(bucket_id, None)
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
|
||||
self._touch_cache_marker(bucket_id)
|
||||
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
try:
|
||||
etag_index_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _touch_cache_marker(self, bucket_id: str) -> None:
|
||||
"""Touch the cache marker file to signal other processes that cache is stale."""
|
||||
marker_path = self._system_bucket_root(bucket_id) / ".cache_marker"
|
||||
try:
|
||||
marker_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
marker_path.write_text(str(time.time()), encoding="utf-8")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _get_cache_marker_mtime(self, bucket_id: str) -> float:
|
||||
"""Get the mtime of the cache marker file, or 0 if it doesn't exist."""
|
||||
marker_path = self._system_bucket_root(bucket_id) / ".cache_marker"
|
||||
"""Get the mtime of stats.json for cross-process cache invalidation.
|
||||
|
||||
Uses stats.json because it's already updated on every object change
|
||||
via _update_bucket_stats_cache.
|
||||
"""
|
||||
stats_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||
try:
|
||||
return marker_path.stat().st_mtime
|
||||
return stats_path.stat().st_mtime
|
||||
except OSError:
|
||||
return 0
|
||||
|
||||
@@ -1693,7 +1690,7 @@ class ObjectStorage:
|
||||
"""Update a single entry in the object cache instead of invalidating the whole cache.
|
||||
|
||||
This is a performance optimization - lazy update instead of full invalidation.
|
||||
Also touches a marker file to signal cache invalidation to other processes.
|
||||
Cross-process invalidation is handled by checking stats.json mtime.
|
||||
"""
|
||||
with self._cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
@@ -1705,7 +1702,6 @@ class ObjectStorage:
|
||||
objects[key] = meta
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
self._sorted_key_cache.pop(bucket_id, None)
|
||||
self._touch_cache_marker(bucket_id)
|
||||
|
||||
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
|
||||
"""Pre-warm the object cache for specified buckets or all buckets.
|
||||
|
||||
Reference in New Issue
Block a user