From f05b2668c0adaea2720fa439556a5d2cc5fb190d Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 25 Mar 2026 13:44:34 +0800 Subject: [PATCH] Reduce per-request overhead: pre-compile SigV4 regex, in-memory etag index cache, 1MB GET chunks, configurable meta cache, skip fsync for rebuildable caches --- app/__init__.py | 1 + app/config.py | 4 ++++ app/s3_api.py | 21 +++++++++------------ app/storage.py | 44 +++++++++++++++++++++++++++++++------------- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 9961fff..25f0028 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -184,6 +184,7 @@ def create_app( object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100), bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0), object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024), + meta_read_cache_max=app.config.get("META_READ_CACHE_MAX", 2048), ) if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"): diff --git a/app/config.py b/app/config.py index 04cae7a..38906bc 100644 --- a/app/config.py +++ b/app/config.py @@ -136,6 +136,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds: float object_key_max_length_bytes: int object_cache_max_size: int + meta_read_cache_max: int bucket_config_cache_ttl_seconds: float object_tag_limit: int encryption_chunk_size_bytes: int @@ -315,6 +316,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0)) object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024)) object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100)) + meta_read_cache_max = int(_get("META_READ_CACHE_MAX", 2048)) bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0)) object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50)) encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024)) @@ -421,6 +423,7 @@ class AppConfig: site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds, object_key_max_length_bytes=object_key_max_length_bytes, object_cache_max_size=object_cache_max_size, + meta_read_cache_max=meta_read_cache_max, bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds, object_tag_limit=object_tag_limit, encryption_chunk_size_bytes=encryption_chunk_size_bytes, @@ -648,6 +651,7 @@ class AppConfig: "SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds, "OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes, "OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size, + "META_READ_CACHE_MAX": self.meta_read_cache_max, "BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds, "OBJECT_TAG_LIMIT": self.object_tag_limit, "ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes, diff --git a/app/s3_api.py b/app/s3_api.py index 2c151ee..c1a6961 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -201,6 +201,11 @@ _SIGNING_KEY_CACHE_LOCK = threading.Lock() _SIGNING_KEY_CACHE_TTL = 60.0 _SIGNING_KEY_CACHE_MAX_SIZE = 256 +_SIGV4_HEADER_RE = re.compile( + r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)" +) +_SIGV4_REQUIRED_HEADERS = frozenset({'host', 'x-amz-date'}) + def clear_signing_key_cache() -> None: if _HAS_RUST: @@ -259,10 +264,7 @@ def _get_canonical_uri(req: Any) -> str: def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: - match = re.match( - r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)", - auth_header, - ) + match = _SIGV4_HEADER_RE.match(auth_header) if not match: return None @@ -286,14 +288,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if time_diff > tolerance: raise IamError("Request timestamp too old or too far in the future") - required_headers = {'host', 'x-amz-date'} signed_headers_set = set(signed_headers_str.split(';')) - if not required_headers.issubset(signed_headers_set): - if 'date' in signed_headers_set: - required_headers.remove('x-amz-date') - required_headers.add('date') - - if not required_headers.issubset(signed_headers_set): + if not _SIGV4_REQUIRED_HEADERS.issubset(signed_headers_set): + if not ({'host', 'date'}.issubset(signed_headers_set)): raise IamError("Required headers not signed") canonical_uri = _get_canonical_uri(req) @@ -1010,7 +1007,7 @@ def _render_encryption_document(config: dict[str, Any]) -> Element: return root -def _stream_file(path, chunk_size: int = 256 * 1024): +def _stream_file(path, chunk_size: int = 1024 * 1024): with path.open("rb") as handle: while True: chunk = handle.read(chunk_size) diff --git a/app/storage.py b/app/storage.py index fa66332..e148045 100644 --- a/app/storage.py +++ b/app/storage.py @@ -190,6 +190,7 @@ class ObjectStorage: object_cache_max_size: int = 100, bucket_config_cache_ttl: float = 30.0, object_key_max_length_bytes: int = 1024, + meta_read_cache_max: int = 2048, ) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) @@ -208,7 +209,7 @@ class ObjectStorage: self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} self._meta_index_locks: Dict[str, threading.Lock] = {} self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict() - self._meta_read_cache_max = 2048 + self._meta_read_cache_max = meta_read_cache_max self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") self._stats_mem: Dict[str, Dict[str, int]] = {} self._stats_serial: Dict[str, int] = {} @@ -218,6 +219,7 @@ class ObjectStorage: self._stats_flush_timer: Optional[threading.Timer] = None self._etag_index_dirty: set[str] = set() self._etag_index_flush_timer: Optional[threading.Timer] = None + self._etag_index_mem: Dict[str, tuple[Dict[str, str], float]] = {} def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: with self._registry_lock: @@ -427,7 +429,7 @@ class ObjectStorage: cache_path = self._system_bucket_root(bucket_id) / "stats.json" try: cache_path.parent.mkdir(parents=True, exist_ok=True) - self._atomic_write_json(cache_path, data) + self._atomic_write_json(cache_path, data, sync=False) except OSError: pass @@ -602,14 +604,7 @@ class ObjectStorage: is_truncated=False, next_continuation_token=None, ) - etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" - meta_cache: Dict[str, str] = {} - if etag_index_path.exists(): - try: - with open(etag_index_path, 'r', encoding='utf-8') as f: - meta_cache = json.load(f) - except (OSError, json.JSONDecodeError): - pass + meta_cache: Dict[str, str] = self._get_etag_index(bucket_id) entries_files: list[tuple[str, int, float, Optional[str]]] = [] entries_dirs: list[str] = [] @@ -2088,6 +2083,7 @@ class ObjectStorage: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(raw["etag_cache"], f) + self._etag_index_mem[bucket_id] = (dict(raw["etag_cache"]), etag_index_path.stat().st_mtime) except OSError: pass for key, size, mtime, etag in raw["objects"]: @@ -2211,6 +2207,7 @@ class ObjectStorage: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(meta_cache, f) + self._etag_index_mem[bucket_id] = (dict(meta_cache), etag_index_path.stat().st_mtime) except OSError: pass @@ -2324,6 +2321,25 @@ class ObjectStorage: self._etag_index_dirty.add(bucket_id) self._schedule_etag_index_flush() + def _get_etag_index(self, bucket_id: str) -> Dict[str, str]: + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + current_mtime = etag_index_path.stat().st_mtime + except OSError: + return {} + cached = self._etag_index_mem.get(bucket_id) + if cached: + cache_dict, cached_mtime = cached + if current_mtime == cached_mtime: + return cache_dict + try: + with open(etag_index_path, 'r', encoding='utf-8') as f: + data = json.load(f) + self._etag_index_mem[bucket_id] = (data, current_mtime) + return data + except (OSError, json.JSONDecodeError): + return {} + def _schedule_etag_index_flush(self) -> None: if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive(): self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes) @@ -2345,6 +2361,7 @@ class ObjectStorage: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: json.dump(index, f) + self._etag_index_mem[bucket_id] = (index, etag_index_path.stat().st_mtime) except OSError: pass @@ -2388,14 +2405,15 @@ class ObjectStorage: path.mkdir(parents=True, exist_ok=True) @staticmethod - def _atomic_write_json(path: Path, data: Any) -> None: + def _atomic_write_json(path: Path, data: Any, *, sync: bool = True) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_suffix(".tmp") try: with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f) - f.flush() - os.fsync(f.fileno()) + if sync: + f.flush() + os.fsync(f.fileno()) tmp_path.replace(path) except BaseException: try: