Add Rust index reader, metadata read cache, and 256KB stream chunks
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
@@ -196,6 +197,8 @@ class ObjectStorage:
|
||||
self._object_key_max_length_bytes = object_key_max_length_bytes
|
||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||
self._meta_index_locks: Dict[str, threading.Lock] = {}
|
||||
self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict()
|
||||
self._meta_read_cache_max = 2048
|
||||
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
@@ -1904,16 +1907,38 @@ class ObjectStorage:
|
||||
return self._meta_index_locks[index_path]
|
||||
|
||||
def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]:
|
||||
cache_key = (bucket_name, str(key))
|
||||
with self._cache_lock:
|
||||
hit = self._meta_read_cache.get(cache_key)
|
||||
if hit is not None:
|
||||
self._meta_read_cache.move_to_end(cache_key)
|
||||
cached = hit[0]
|
||||
return copy.deepcopy(cached) if cached is not None else None
|
||||
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
if _HAS_RUST:
|
||||
return _rc.read_index_entry(str(index_path), entry_name)
|
||||
if not index_path.exists():
|
||||
return None
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
return index_data.get(entry_name)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
result = _rc.read_index_entry(str(index_path), entry_name)
|
||||
else:
|
||||
if not index_path.exists():
|
||||
result = None
|
||||
else:
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
result = index_data.get(entry_name)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
result = None
|
||||
|
||||
with self._cache_lock:
|
||||
while len(self._meta_read_cache) >= self._meta_read_cache_max:
|
||||
self._meta_read_cache.popitem(last=False)
|
||||
self._meta_read_cache[cache_key] = (copy.deepcopy(result) if result is not None else None,)
|
||||
|
||||
return result
|
||||
|
||||
def _invalidate_meta_read_cache(self, bucket_name: str, key: Path) -> None:
|
||||
cache_key = (bucket_name, str(key))
|
||||
with self._cache_lock:
|
||||
self._meta_read_cache.pop(cache_key, None)
|
||||
|
||||
def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None:
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
@@ -1928,16 +1953,19 @@ class ObjectStorage:
|
||||
pass
|
||||
index_data[entry_name] = entry
|
||||
index_path.write_text(json.dumps(index_data), encoding="utf-8")
|
||||
self._invalidate_meta_read_cache(bucket_name, key)
|
||||
|
||||
def _delete_index_entry(self, bucket_name: str, key: Path) -> None:
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
if not index_path.exists():
|
||||
self._invalidate_meta_read_cache(bucket_name, key)
|
||||
return
|
||||
lock = self._get_meta_index_lock(str(index_path))
|
||||
with lock:
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
self._invalidate_meta_read_cache(bucket_name, key)
|
||||
return
|
||||
if entry_name in index_data:
|
||||
del index_data[entry_name]
|
||||
@@ -1948,6 +1976,7 @@ class ObjectStorage:
|
||||
index_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._invalidate_meta_read_cache(bucket_name, key)
|
||||
|
||||
def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
|
||||
if not metadata:
|
||||
|
||||
Reference in New Issue
Block a user