From 776967e80d857eb524b7fba0b85b0ae2f3b8fbe8 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 19 Feb 2026 23:01:40 +0800 Subject: [PATCH] Add Rust index reader, metadata read cache, and 256KB stream chunks --- app/s3_api.py | 4 ++-- app/storage.py | 45 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/app/s3_api.py b/app/s3_api.py index 92161b6..18777f4 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -986,7 +986,7 @@ def _render_encryption_document(config: dict[str, Any]) -> Element: return root -def _stream_file(path, chunk_size: int = 64 * 1024): +def _stream_file(path, chunk_size: int = 256 * 1024): with path.open("rb") as handle: while True: chunk = handle.read(chunk_size) @@ -2923,7 +2923,7 @@ def object_handler(bucket_name: str, object_key: str): f.seek(start_pos) remaining = length_to_read while remaining > 0: - chunk_size = min(65536, remaining) + chunk_size = min(262144, remaining) chunk = f.read(chunk_size) if not chunk: break diff --git a/app/storage.py b/app/storage.py index 6bf3a96..683fd4b 100644 --- a/app/storage.py +++ b/app/storage.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import hashlib import json import os @@ -196,6 +197,8 @@ class ObjectStorage: self._object_key_max_length_bytes = object_key_max_length_bytes self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} self._meta_index_locks: Dict[str, threading.Lock] = {} + self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict() + self._meta_read_cache_max = 2048 self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: @@ -1904,16 +1907,38 @@ class ObjectStorage: return self._meta_index_locks[index_path] def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]: + cache_key = (bucket_name, str(key)) + with self._cache_lock: + hit = self._meta_read_cache.get(cache_key) + if hit is not None: + self._meta_read_cache.move_to_end(cache_key) + cached = hit[0] + return copy.deepcopy(cached) if cached is not None else None + index_path, entry_name = self._index_file_for_key(bucket_name, key) if _HAS_RUST: - return _rc.read_index_entry(str(index_path), entry_name) - if not index_path.exists(): - return None - try: - index_data = json.loads(index_path.read_text(encoding="utf-8")) - return index_data.get(entry_name) - except (OSError, json.JSONDecodeError): - return None + result = _rc.read_index_entry(str(index_path), entry_name) + else: + if not index_path.exists(): + result = None + else: + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + result = index_data.get(entry_name) + except (OSError, json.JSONDecodeError): + result = None + + with self._cache_lock: + while len(self._meta_read_cache) >= self._meta_read_cache_max: + self._meta_read_cache.popitem(last=False) + self._meta_read_cache[cache_key] = (copy.deepcopy(result) if result is not None else None,) + + return result + + def _invalidate_meta_read_cache(self, bucket_name: str, key: Path) -> None: + cache_key = (bucket_name, str(key)) + with self._cache_lock: + self._meta_read_cache.pop(cache_key, None) def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None: index_path, entry_name = self._index_file_for_key(bucket_name, key) @@ -1928,16 +1953,19 @@ class ObjectStorage: pass index_data[entry_name] = entry index_path.write_text(json.dumps(index_data), encoding="utf-8") + self._invalidate_meta_read_cache(bucket_name, key) def _delete_index_entry(self, bucket_name: str, key: Path) -> None: index_path, entry_name = self._index_file_for_key(bucket_name, key) if not index_path.exists(): + self._invalidate_meta_read_cache(bucket_name, key) return lock = self._get_meta_index_lock(str(index_path)) with lock: try: index_data = json.loads(index_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): + self._invalidate_meta_read_cache(bucket_name, key) return if entry_name in index_data: del index_data[entry_name] @@ -1948,6 +1976,7 @@ class ObjectStorage: index_path.unlink() except OSError: pass + self._invalidate_meta_read_cache(bucket_name, key) def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]: if not metadata: