MyFSIO v0.3.5 Release #28
@@ -215,6 +215,8 @@ class ObjectStorage:
|
||||
self._stats_lock = threading.Lock()
|
||||
self._stats_dirty: set[str] = set()
|
||||
self._stats_flush_timer: Optional[threading.Timer] = None
|
||||
self._etag_index_dirty: set[str] = set()
|
||||
self._etag_index_flush_timer: Optional[threading.Timer] = None
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
with self._registry_lock:
|
||||
@@ -422,6 +424,9 @@ class ObjectStorage:
|
||||
if self._stats_flush_timer is not None:
|
||||
self._stats_flush_timer.cancel()
|
||||
self._flush_stats()
|
||||
if self._etag_index_flush_timer is not None:
|
||||
self._etag_index_flush_timer.cancel()
|
||||
self._flush_etag_indexes()
|
||||
|
||||
def delete_bucket(self, bucket_name: str) -> None:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
@@ -451,6 +456,7 @@ class ObjectStorage:
|
||||
self._stats_mem.pop(bucket_id, None)
|
||||
self._stats_serial.pop(bucket_id, None)
|
||||
self._stats_dirty.discard(bucket_id)
|
||||
self._etag_index_dirty.discard(bucket_id)
|
||||
|
||||
def list_objects(
|
||||
self,
|
||||
@@ -2169,10 +2175,10 @@ class ObjectStorage:
|
||||
if now - timestamp < self._cache_ttl and current_stats_mtime == cached_stats_mtime:
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
return objects
|
||||
cache_version = self._cache_version.get(bucket_id, 0)
|
||||
|
||||
bucket_lock = self._get_bucket_lock(bucket_id)
|
||||
with bucket_lock:
|
||||
now = time.time()
|
||||
current_stats_mtime = self._get_cache_marker_mtime(bucket_id)
|
||||
with self._obj_cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
@@ -2186,16 +2192,12 @@ class ObjectStorage:
|
||||
new_stats_mtime = self._get_cache_marker_mtime(bucket_id)
|
||||
|
||||
with self._obj_cache_lock:
|
||||
current_version = self._cache_version.get(bucket_id, 0)
|
||||
if current_version != cache_version:
|
||||
objects = self._build_object_cache(bucket_path)
|
||||
new_stats_mtime = self._get_cache_marker_mtime(bucket_id)
|
||||
while len(self._object_cache) >= self._object_cache_max_size:
|
||||
self._object_cache.popitem(last=False)
|
||||
|
||||
self._object_cache[bucket_id] = (objects, time.time(), new_stats_mtime)
|
||||
self._object_cache.move_to_end(bucket_id)
|
||||
self._cache_version[bucket_id] = current_version + 1
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
self._sorted_key_cache.pop(bucket_id, None)
|
||||
|
||||
return objects
|
||||
@@ -2205,6 +2207,7 @@ class ObjectStorage:
|
||||
self._object_cache.pop(bucket_id, None)
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
|
||||
self._etag_index_dirty.discard(bucket_id)
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
try:
|
||||
etag_index_path.unlink(missing_ok=True)
|
||||
@@ -2226,23 +2229,32 @@ class ObjectStorage:
|
||||
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||
self._sorted_key_cache.pop(bucket_id, None)
|
||||
|
||||
self._update_etag_index(bucket_id, key, meta.etag if meta else None)
|
||||
self._etag_index_dirty.add(bucket_id)
|
||||
self._schedule_etag_index_flush()
|
||||
|
||||
def _update_etag_index(self, bucket_id: str, key: str, etag: Optional[str]) -> None:
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
if not etag_index_path.exists():
|
||||
return
|
||||
try:
|
||||
with open(etag_index_path, 'r', encoding='utf-8') as f:
|
||||
index = json.load(f)
|
||||
if etag is None:
|
||||
index.pop(key, None)
|
||||
else:
|
||||
index[key] = etag
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(index, f)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
def _schedule_etag_index_flush(self) -> None:
|
||||
if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive():
|
||||
self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes)
|
||||
self._etag_index_flush_timer.daemon = True
|
||||
self._etag_index_flush_timer.start()
|
||||
|
||||
def _flush_etag_indexes(self) -> None:
|
||||
dirty = set(self._etag_index_dirty)
|
||||
self._etag_index_dirty.clear()
|
||||
for bucket_id in dirty:
|
||||
with self._obj_cache_lock:
|
||||
cached = self._object_cache.get(bucket_id)
|
||||
if not cached:
|
||||
continue
|
||||
objects = cached[0]
|
||||
index = {k: v.etag for k, v in objects.items() if v.etag}
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
try:
|
||||
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(index, f)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
|
||||
"""Pre-warm the object cache for specified buckets or all buckets.
|
||||
@@ -2299,12 +2311,7 @@ class ObjectStorage:
|
||||
if cached:
|
||||
config, cached_time, cached_mtime = cached
|
||||
if now - cached_time < self._bucket_config_cache_ttl:
|
||||
try:
|
||||
current_mtime = config_path.stat().st_mtime if config_path.exists() else 0.0
|
||||
except OSError:
|
||||
current_mtime = 0.0
|
||||
if current_mtime == cached_mtime:
|
||||
return config.copy()
|
||||
return config.copy()
|
||||
|
||||
if not config_path.exists():
|
||||
self._bucket_config_cache[bucket_name] = ({}, now, 0.0)
|
||||
|
||||
Reference in New Issue
Block a user