MyFSIO v0.2.6 Release #18
@@ -285,6 +285,10 @@ class ObjectStorage:
|
|||||||
return cached_stats
|
return cached_stats
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
existing_serial = 0
|
||||||
|
if cached_stats is not None:
|
||||||
|
existing_serial = cached_stats.get("_cache_serial", 0)
|
||||||
|
|
||||||
stats = {
|
stats = {
|
||||||
"objects": object_count,
|
"objects": object_count,
|
||||||
"bytes": total_bytes,
|
"bytes": total_bytes,
|
||||||
@@ -292,6 +296,7 @@ class ObjectStorage:
|
|||||||
"version_bytes": version_bytes,
|
"version_bytes": version_bytes,
|
||||||
"total_objects": object_count + version_count,
|
"total_objects": object_count + version_count,
|
||||||
"total_bytes": total_bytes + version_bytes,
|
"total_bytes": total_bytes + version_bytes,
|
||||||
|
"_cache_serial": existing_serial,
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -323,7 +328,7 @@ class ObjectStorage:
|
|||||||
|
|
||||||
This avoids expensive full directory scans on every PUT/DELETE by
|
This avoids expensive full directory scans on every PUT/DELETE by
|
||||||
adjusting the cached values directly. Also signals cross-process cache
|
adjusting the cached values directly. Also signals cross-process cache
|
||||||
invalidation by updating the file mtime.
|
invalidation by incrementing _cache_serial.
|
||||||
"""
|
"""
|
||||||
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||||
try:
|
try:
|
||||||
@@ -331,13 +336,14 @@ class ObjectStorage:
|
|||||||
if cache_path.exists():
|
if cache_path.exists():
|
||||||
data = json.loads(cache_path.read_text(encoding="utf-8"))
|
data = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||||
else:
|
else:
|
||||||
data = {"objects": 0, "bytes": 0, "version_count": 0, "version_bytes": 0, "total_objects": 0, "total_bytes": 0}
|
data = {"objects": 0, "bytes": 0, "version_count": 0, "version_bytes": 0, "total_objects": 0, "total_bytes": 0, "_cache_serial": 0}
|
||||||
data["objects"] = max(0, data.get("objects", 0) + objects_delta)
|
data["objects"] = max(0, data.get("objects", 0) + objects_delta)
|
||||||
data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta)
|
data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta)
|
||||||
data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta)
|
data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta)
|
||||||
data["version_bytes"] = max(0, data.get("version_bytes", 0) + version_bytes_delta)
|
data["version_bytes"] = max(0, data.get("version_bytes", 0) + version_bytes_delta)
|
||||||
data["total_objects"] = max(0, data.get("total_objects", 0) + objects_delta + version_count_delta)
|
data["total_objects"] = max(0, data.get("total_objects", 0) + objects_delta + version_count_delta)
|
||||||
data["total_bytes"] = max(0, data.get("total_bytes", 0) + bytes_delta + version_bytes_delta)
|
data["total_bytes"] = max(0, data.get("total_bytes", 0) + bytes_delta + version_bytes_delta)
|
||||||
|
data["_cache_serial"] = data.get("_cache_serial", 0) + 1
|
||||||
cache_path.write_text(json.dumps(data), encoding="utf-8")
|
cache_path.write_text(json.dumps(data), encoding="utf-8")
|
||||||
except (OSError, json.JSONDecodeError):
|
except (OSError, json.JSONDecodeError):
|
||||||
pass
|
pass
|
||||||
@@ -1679,15 +1685,16 @@ class ObjectStorage:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _get_cache_marker_mtime(self, bucket_id: str) -> float:
|
def _get_cache_marker_mtime(self, bucket_id: str) -> float:
|
||||||
"""Get the mtime of stats.json for cross-process cache invalidation.
|
"""Get the cache serial from stats.json for cross-process cache invalidation.
|
||||||
|
|
||||||
Uses stats.json because it's already updated on every object change
|
Uses _cache_serial field instead of file mtime because Windows filesystem
|
||||||
via _update_bucket_stats_cache.
|
caching can delay mtime visibility across processes.
|
||||||
"""
|
"""
|
||||||
stats_path = self._system_bucket_root(bucket_id) / "stats.json"
|
stats_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||||
try:
|
try:
|
||||||
return stats_path.stat().st_mtime
|
data = json.loads(stats_path.read_text(encoding="utf-8"))
|
||||||
except OSError:
|
return float(data.get("_cache_serial", 0))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None:
|
def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user