From be63e27c15d6311bdf1cfb00b55108c210e717de Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 7 Mar 2026 14:08:23 +0800 Subject: [PATCH 1/2] Reduce per-request CPU overhead: eliminate double stat(), cache content type and policy context, gate logging, configurable stat intervals --- app/__init__.py | 21 ++++++-------- app/bucket_policies.py | 3 +- app/iam.py | 2 +- app/s3_api.py | 66 +++++++++++++++++++++++++++--------------- app/storage.py | 4 +-- app/version.py | 2 +- 6 files changed, 58 insertions(+), 40 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index aea5d78..1da677e 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -486,10 +486,6 @@ def _configure_logging(app: Flask) -> None: g.request_id = f"{os.getpid():x}{next(_request_counter):012x}" g.request_started_at = time.perf_counter() g.request_bytes_in = request.content_length or 0 - app.logger.info( - "Request started", - extra={"path": request.path, "method": request.method, "remote_addr": request.remote_addr}, - ) @app.before_request def _maybe_serve_website(): @@ -620,14 +616,15 @@ def _configure_logging(app: Flask) -> None: duration_ms = (time.perf_counter() - g.request_started_at) * 1000 request_id = getattr(g, "request_id", f"{os.getpid():x}{next(_request_counter):012x}") response.headers.setdefault("X-Request-ID", request_id) - app.logger.info( - "Request completed", - extra={ - "path": request.path, - "method": request.method, - "remote_addr": request.remote_addr, - }, - ) + if app.logger.isEnabledFor(logging.INFO): + app.logger.info( + "Request completed", + extra={ + "path": request.path, + "method": request.method, + "remote_addr": request.remote_addr, + }, + ) response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}" operation_metrics = app.extensions.get("operation_metrics") diff --git a/app/bucket_policies.py b/app/bucket_policies.py index 1ff9eb6..61a9337 100644 --- a/app/bucket_policies.py +++ b/app/bucket_policies.py @@ -2,6 +2,7 @@ from __future__ import annotations import ipaddress import json +import os import re import time from dataclasses import dataclass, field @@ -268,7 +269,7 @@ class BucketPolicyStore: self._last_mtime = self._current_mtime() # Performance: Avoid stat() on every request self._last_stat_check = 0.0 - self._stat_check_interval = 1.0 # Only check mtime every 1 second + self._stat_check_interval = float(os.environ.get("BUCKET_POLICY_STAT_CHECK_INTERVAL_SECONDS", "2.0")) def maybe_reload(self) -> None: # Performance: Skip stat check if we checked recently diff --git a/app/iam.py b/app/iam.py index 65b705a..074f6c1 100644 --- a/app/iam.py +++ b/app/iam.py @@ -125,7 +125,7 @@ class IamService: self._secret_key_cache: Dict[str, Tuple[str, float]] = {} self._cache_ttl = float(os.environ.get("IAM_CACHE_TTL_SECONDS", "5.0")) self._last_stat_check = 0.0 - self._stat_check_interval = 1.0 + self._stat_check_interval = float(os.environ.get("IAM_STAT_CHECK_INTERVAL_SECONDS", "2.0")) self._sessions: Dict[str, Dict[str, Any]] = {} self._session_lock = threading.Lock() self._load() diff --git a/app/s3_api.py b/app/s3_api.py index 91a8254..d5b9dcd 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -85,6 +85,9 @@ def _bucket_policies() -> BucketPolicyStore: def _build_policy_context() -> Dict[str, Any]: + cached = getattr(g, "_policy_context", None) + if cached is not None: + return cached ctx: Dict[str, Any] = {} if request.headers.get("Referer"): ctx["aws:Referer"] = request.headers.get("Referer") @@ -98,6 +101,7 @@ def _build_policy_context() -> Dict[str, Any]: ctx["aws:SecureTransport"] = str(request.is_secure).lower() if request.headers.get("User-Agent"): ctx["aws:UserAgent"] = request.headers.get("User-Agent") + g._policy_context = ctx return ctx @@ -1021,11 +1025,15 @@ def _apply_object_headers( file_stat, metadata: Dict[str, str] | None, etag: str, + size_override: int | None = None, + mtime_override: float | None = None, ) -> None: - if file_stat is not None: - if response.status_code != 206: - response.headers["Content-Length"] = str(file_stat.st_size) - response.headers["Last-Modified"] = http_date(file_stat.st_mtime) + effective_size = size_override if size_override is not None else (file_stat.st_size if file_stat is not None else None) + effective_mtime = mtime_override if mtime_override is not None else (file_stat.st_mtime if file_stat is not None else None) + if effective_size is not None and response.status_code != 206: + response.headers["Content-Length"] = str(effective_size) + if effective_mtime is not None: + response.headers["Last-Modified"] = http_date(effective_mtime) response.headers["ETag"] = f'"{etag}"' response.headers["Accept-Ranges"] = "bytes" for key, value in (metadata or {}).items(): @@ -2820,6 +2828,8 @@ def object_handler(bucket_name: str, object_key: str): if validation_error: return _error_response("InvalidArgument", validation_error, 400) + metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream" + try: meta = storage.put_object( bucket_name, @@ -2834,10 +2844,11 @@ def object_handler(bucket_name: str, object_key: str): if "Bucket" in message: return _error_response("NoSuchBucket", message, 404) return _error_response("InvalidArgument", message, 400) - current_app.logger.info( - "Object uploaded", - extra={"bucket": bucket_name, "key": object_key, "size": meta.size}, - ) + if current_app.logger.isEnabledFor(logging.INFO): + current_app.logger.info( + "Object uploaded", + extra={"bucket": bucket_name, "key": object_key, "size": meta.size}, + ) response = Response(status=200) if meta.etag: response.headers["ETag"] = f'"{meta.etag}"' @@ -2871,7 +2882,7 @@ def object_handler(bucket_name: str, object_key: str): except StorageError as exc: return _error_response("NoSuchKey", str(exc), 404) metadata = storage.get_object_metadata(bucket_name, object_key) - mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" + mimetype = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream" is_encrypted = "x-amz-server-side-encryption" in metadata @@ -2963,10 +2974,7 @@ def object_handler(bucket_name: str, object_key: str): response.headers["Content-Type"] = mimetype logged_bytes = 0 - try: - file_stat = path.stat() if not is_encrypted else None - except (PermissionError, OSError): - file_stat = None + file_stat = stat if not is_encrypted else None _apply_object_headers(response, file_stat=file_stat, metadata=metadata, etag=etag) if request.method == "GET": @@ -2983,8 +2991,9 @@ def object_handler(bucket_name: str, object_key: str): if value: response.headers[header] = _sanitize_header_value(value) - action = "Object read" if request.method == "GET" else "Object head" - current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes}) + if current_app.logger.isEnabledFor(logging.INFO): + action = "Object read" if request.method == "GET" else "Object head" + current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes}) return response if "uploadId" in request.args: @@ -3002,7 +3011,8 @@ def object_handler(bucket_name: str, object_key: str): storage.delete_object(bucket_name, object_key) lock_service.delete_object_lock_metadata(bucket_name, object_key) - current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) + if current_app.logger.isEnabledFor(logging.INFO): + current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) principal, _ = _require_principal() _notifications().emit_object_removed( @@ -3343,12 +3353,20 @@ def head_object(bucket_name: str, object_key: str) -> Response: _authorize_action(principal, bucket_name, "read", object_key=object_key) path = _storage().get_object_path(bucket_name, object_key) metadata = _storage().get_object_metadata(bucket_name, object_key) - stat = path.stat() etag = metadata.get("__etag__") or _storage()._compute_etag(path) - - response = Response(status=200) - _apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag) - response.headers["Content-Type"] = mimetypes.guess_type(object_key)[0] or "application/octet-stream" + + cached_size = metadata.get("__size__") + cached_mtime = metadata.get("__last_modified__") + if cached_size is not None and cached_mtime is not None: + size_val = int(cached_size) + mtime_val = float(cached_mtime) + response = Response(status=200) + _apply_object_headers(response, file_stat=None, metadata=metadata, etag=etag, size_override=size_val, mtime_override=mtime_val) + else: + stat = path.stat() + response = Response(status=200) + _apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag) + response.headers["Content-Type"] = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream" return response except (StorageError, FileNotFoundError): return _error_response("NoSuchKey", "Object not found", 404) @@ -3578,10 +3596,12 @@ def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response: return error metadata = _extract_request_metadata() + content_type = request.headers.get("Content-Type") + metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream" try: upload_id = _storage().initiate_multipart_upload( - bucket_name, - object_key, + bucket_name, + object_key, metadata=metadata or None ) except StorageError as exc: diff --git a/app/storage.py b/app/storage.py index 3e613f9..b7d96b9 100644 --- a/app/storage.py +++ b/app/storage.py @@ -959,7 +959,7 @@ class ObjectStorage: stat = destination.stat() - internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} + internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)} combined_meta = {**internal_meta, **(metadata or {})} self._write_metadata(bucket_id, safe_key, combined_meta) @@ -1815,7 +1815,7 @@ class ObjectStorage: etag = checksum_hex metadata = manifest.get("metadata") - internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} + internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)} combined_meta = {**internal_meta, **(metadata or {})} self._write_metadata(bucket_id, safe_key, combined_meta) diff --git a/app/version.py b/app/version.py index edc8de6..d900925 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.5" +APP_VERSION = "0.3.6" def get_version() -> str: From 72f5d9d70ce2587f326e23db4816a94c17a8fec5 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 7 Mar 2026 17:54:00 +0800 Subject: [PATCH 2/2] Restore data integrity guarantees: Content-MD5 validation, fsync durability, atomic metadata writes, concurrent write protection --- app/s3_api.py | 21 +++++++ app/storage.py | 113 ++++++++++++++++++++++------------- myfsio_core/src/streaming.rs | 5 ++ 3 files changed, 97 insertions(+), 42 deletions(-) diff --git a/app/s3_api.py b/app/s3_api.py index d5b9dcd..749e1e1 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -2844,6 +2844,18 @@ def object_handler(bucket_name: str, object_key: str): if "Bucket" in message: return _error_response("NoSuchBucket", message, 404) return _error_response("InvalidArgument", message, 400) + + content_md5 = request.headers.get("Content-MD5") + if content_md5 and meta.etag: + try: + expected_md5 = base64.b64decode(content_md5).hex() + except Exception: + storage.delete_object(bucket_name, object_key) + return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400) + if expected_md5 != meta.etag: + storage.delete_object(bucket_name, object_key) + return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400) + if current_app.logger.isEnabledFor(logging.INFO): current_app.logger.info( "Object uploaded", @@ -3650,6 +3662,15 @@ def _upload_part(bucket_name: str, object_key: str) -> Response: return _error_response("NoSuchUpload", str(exc), 404) return _error_response("InvalidArgument", str(exc), 400) + content_md5 = request.headers.get("Content-MD5") + if content_md5 and etag: + try: + expected_md5 = base64.b64decode(content_md5).hex() + except Exception: + return _error_response("InvalidDigest", "Content-MD5 header is not valid base64", 400) + if expected_md5 != etag: + return _error_response("BadDigest", "The Content-MD5 you specified did not match what we received", 400) + response = Response(status=200) response.headers["ETag"] = f'"{etag}"' return response diff --git a/app/storage.py b/app/storage.py index b7d96b9..88fb303 100644 --- a/app/storage.py +++ b/app/storage.py @@ -361,7 +361,7 @@ class ObjectStorage: try: cache_path.parent.mkdir(parents=True, exist_ok=True) - cache_path.write_text(json.dumps(stats), encoding="utf-8") + self._atomic_write_json(cache_path, stats) except OSError: pass @@ -423,7 +423,7 @@ class ObjectStorage: cache_path = self._system_bucket_root(bucket_id) / "stats.json" try: cache_path.parent.mkdir(parents=True, exist_ok=True) - cache_path.write_text(json.dumps(data), encoding="utf-8") + self._atomic_write_json(cache_path, data) except OSError: pass @@ -879,11 +879,6 @@ class ObjectStorage: is_overwrite = destination.exists() existing_size = destination.stat().st_size if is_overwrite else 0 - archived_version_size = 0 - if self._is_versioning_enabled(bucket_path) and is_overwrite: - archived_version_size = existing_size - self._archive_current_version(bucket_id, safe_key, reason="overwrite") - tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR tmp_dir.mkdir(parents=True, exist_ok=True) @@ -910,19 +905,21 @@ class ObjectStorage: quota_check["quota"], quota_check["usage"], ) - - shutil.move(str(tmp_path), str(destination)) - finally: + except BaseException: if tmp_path: try: tmp_path.unlink(missing_ok=True) except OSError: pass + raise else: tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp" try: + checksum = hashlib.md5() with tmp_path.open("wb") as target: - shutil.copyfileobj(stream, target) + shutil.copyfileobj(_HashingReader(stream, checksum), target) + target.flush() + os.fsync(target.fileno()) new_size = tmp_path.stat().st_size size_delta = new_size - existing_size @@ -941,27 +938,43 @@ class ObjectStorage: quota_check["usage"], ) - checksum = hashlib.md5() - with tmp_path.open("rb") as f: - while True: - chunk = f.read(1048576) - if not chunk: - break - checksum.update(chunk) etag = checksum.hexdigest() - - shutil.move(str(tmp_path), str(destination)) - finally: + except BaseException: try: tmp_path.unlink(missing_ok=True) except OSError: pass + raise - stat = destination.stat() - - internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)} - combined_meta = {**internal_meta, **(metadata or {})} - self._write_metadata(bucket_id, safe_key, combined_meta) + lock_file_path = self._system_bucket_root(bucket_id) / "locks" / f"{safe_key.as_posix().replace('/', '_')}.lock" + try: + with _atomic_lock_file(lock_file_path): + archived_version_size = 0 + if self._is_versioning_enabled(bucket_path) and is_overwrite: + archived_version_size = existing_size + self._archive_current_version(bucket_id, safe_key, reason="overwrite") + + shutil.move(str(tmp_path), str(destination)) + tmp_path = None + + stat = destination.stat() + + internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)} + combined_meta = {**internal_meta, **(metadata or {})} + self._write_metadata(bucket_id, safe_key, combined_meta) + except BlockingIOError: + try: + if tmp_path: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + raise StorageError("Another upload to this key is in progress") + finally: + if tmp_path: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass self._update_bucket_stats_cache( bucket_id, @@ -1553,18 +1566,16 @@ class ObjectStorage: temp_path = upload_root / f".{part_filename}.tmp" try: - with temp_path.open("wb") as target: - shutil.copyfileobj(stream, target) if _HAS_RUST: + with temp_path.open("wb") as target: + shutil.copyfileobj(stream, target) part_etag = _rc.md5_file(str(temp_path)) else: checksum = hashlib.md5() - with temp_path.open("rb") as f: - while True: - chunk = f.read(1048576) - if not chunk: - break - checksum.update(chunk) + with temp_path.open("wb") as target: + shutil.copyfileobj(_HashingReader(stream, checksum), target) + target.flush() + os.fsync(target.fileno()) part_etag = checksum.hexdigest() temp_path.replace(part_path) except OSError: @@ -1598,7 +1609,7 @@ class ObjectStorage: parts = manifest.setdefault("parts", {}) parts[str(part_number)] = record - manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + self._atomic_write_json(manifest_path, manifest) break except OSError as exc: if attempt < max_retries - 1: @@ -1691,7 +1702,7 @@ class ObjectStorage: parts = manifest.setdefault("parts", {}) parts[str(part_number)] = record - manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + self._atomic_write_json(manifest_path, manifest) break except OSError as exc: if attempt < max_retries - 1: @@ -1797,6 +1808,8 @@ class ObjectStorage: break checksum.update(data) target.write(data) + target.flush() + os.fsync(target.fileno()) checksum_hex = checksum.hexdigest() except BlockingIOError: raise StorageError("Another upload to this key is in progress") @@ -2303,6 +2316,23 @@ class ObjectStorage: ): path.mkdir(parents=True, exist_ok=True) + @staticmethod + def _atomic_write_json(path: Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_suffix(".tmp") + try: + with tmp_path.open("w", encoding="utf-8") as f: + json.dump(data, f) + f.flush() + os.fsync(f.fileno()) + tmp_path.replace(path) + except BaseException: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + raise + def _multipart_dir(self, bucket_name: str, upload_id: str) -> Path: return self._multipart_bucket_root(bucket_name) / upload_id @@ -2337,7 +2367,7 @@ class ObjectStorage: def _write_bucket_config(self, bucket_name: str, payload: dict[str, Any]) -> None: config_path = self._bucket_config_path(bucket_name) config_path.parent.mkdir(parents=True, exist_ok=True) - config_path.write_text(json.dumps(payload), encoding="utf-8") + self._atomic_write_json(config_path, payload) try: mtime = config_path.stat().st_mtime except OSError: @@ -2371,8 +2401,7 @@ class ObjectStorage: def _write_multipart_manifest(self, upload_root: Path, manifest: dict[str, Any]) -> None: manifest_path = upload_root / self.MULTIPART_MANIFEST - manifest_path.parent.mkdir(parents=True, exist_ok=True) - manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + self._atomic_write_json(manifest_path, manifest) def _metadata_file(self, bucket_name: str, key: Path) -> Path: meta_root = self._bucket_meta_root(bucket_name) @@ -2442,7 +2471,7 @@ class ObjectStorage: except (OSError, json.JSONDecodeError): pass index_data[entry_name] = entry - index_path.write_text(json.dumps(index_data), encoding="utf-8") + self._atomic_write_json(index_path, index_data) self._invalidate_meta_read_cache(bucket_name, key) def _delete_index_entry(self, bucket_name: str, key: Path) -> None: @@ -2463,7 +2492,7 @@ class ObjectStorage: if entry_name in index_data: del index_data[entry_name] if index_data: - index_path.write_text(json.dumps(index_data), encoding="utf-8") + self._atomic_write_json(index_path, index_data) else: try: index_path.unlink() @@ -2512,7 +2541,7 @@ class ObjectStorage: "reason": reason, } manifest_path = version_dir / f"{version_id}.json" - manifest_path.write_text(json.dumps(record), encoding="utf-8") + self._atomic_write_json(manifest_path, record) def _read_metadata(self, bucket_name: str, key: Path) -> Dict[str, str]: entry = self._read_index_entry(bucket_name, key) diff --git a/myfsio_core/src/streaming.rs b/myfsio_core/src/streaming.rs index 1ff13f6..0ecca8c 100644 --- a/myfsio_core/src/streaming.rs +++ b/myfsio_core/src/streaming.rs @@ -46,6 +46,8 @@ pub fn stream_to_file_with_md5( py.check_signals()?; } + file.sync_all() + .map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?; Ok(()) })(); @@ -102,6 +104,9 @@ pub fn assemble_parts_with_md5( } } + target.sync_all() + .map_err(|e| PyIOError::new_err(format!("Failed to fsync: {}", e)))?; + Ok(format!("{:x}", hasher.finalize())) }) }