Reduce per-request CPU overhead: eliminate double stat(), cache content type and policy context, gate logging, configurable stat intervals
This commit is contained in:
@@ -486,10 +486,6 @@ def _configure_logging(app: Flask) -> None:
|
||||
g.request_id = f"{os.getpid():x}{next(_request_counter):012x}"
|
||||
g.request_started_at = time.perf_counter()
|
||||
g.request_bytes_in = request.content_length or 0
|
||||
app.logger.info(
|
||||
"Request started",
|
||||
extra={"path": request.path, "method": request.method, "remote_addr": request.remote_addr},
|
||||
)
|
||||
|
||||
@app.before_request
|
||||
def _maybe_serve_website():
|
||||
@@ -620,14 +616,15 @@ def _configure_logging(app: Flask) -> None:
|
||||
duration_ms = (time.perf_counter() - g.request_started_at) * 1000
|
||||
request_id = getattr(g, "request_id", f"{os.getpid():x}{next(_request_counter):012x}")
|
||||
response.headers.setdefault("X-Request-ID", request_id)
|
||||
app.logger.info(
|
||||
"Request completed",
|
||||
extra={
|
||||
"path": request.path,
|
||||
"method": request.method,
|
||||
"remote_addr": request.remote_addr,
|
||||
},
|
||||
)
|
||||
if app.logger.isEnabledFor(logging.INFO):
|
||||
app.logger.info(
|
||||
"Request completed",
|
||||
extra={
|
||||
"path": request.path,
|
||||
"method": request.method,
|
||||
"remote_addr": request.remote_addr,
|
||||
},
|
||||
)
|
||||
response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}"
|
||||
|
||||
operation_metrics = app.extensions.get("operation_metrics")
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
@@ -268,7 +269,7 @@ class BucketPolicyStore:
|
||||
self._last_mtime = self._current_mtime()
|
||||
# Performance: Avoid stat() on every request
|
||||
self._last_stat_check = 0.0
|
||||
self._stat_check_interval = 1.0 # Only check mtime every 1 second
|
||||
self._stat_check_interval = float(os.environ.get("BUCKET_POLICY_STAT_CHECK_INTERVAL_SECONDS", "2.0"))
|
||||
|
||||
def maybe_reload(self) -> None:
|
||||
# Performance: Skip stat check if we checked recently
|
||||
|
||||
@@ -125,7 +125,7 @@ class IamService:
|
||||
self._secret_key_cache: Dict[str, Tuple[str, float]] = {}
|
||||
self._cache_ttl = float(os.environ.get("IAM_CACHE_TTL_SECONDS", "5.0"))
|
||||
self._last_stat_check = 0.0
|
||||
self._stat_check_interval = 1.0
|
||||
self._stat_check_interval = float(os.environ.get("IAM_STAT_CHECK_INTERVAL_SECONDS", "2.0"))
|
||||
self._sessions: Dict[str, Dict[str, Any]] = {}
|
||||
self._session_lock = threading.Lock()
|
||||
self._load()
|
||||
|
||||
@@ -85,6 +85,9 @@ def _bucket_policies() -> BucketPolicyStore:
|
||||
|
||||
|
||||
def _build_policy_context() -> Dict[str, Any]:
|
||||
cached = getattr(g, "_policy_context", None)
|
||||
if cached is not None:
|
||||
return cached
|
||||
ctx: Dict[str, Any] = {}
|
||||
if request.headers.get("Referer"):
|
||||
ctx["aws:Referer"] = request.headers.get("Referer")
|
||||
@@ -98,6 +101,7 @@ def _build_policy_context() -> Dict[str, Any]:
|
||||
ctx["aws:SecureTransport"] = str(request.is_secure).lower()
|
||||
if request.headers.get("User-Agent"):
|
||||
ctx["aws:UserAgent"] = request.headers.get("User-Agent")
|
||||
g._policy_context = ctx
|
||||
return ctx
|
||||
|
||||
|
||||
@@ -1021,11 +1025,15 @@ def _apply_object_headers(
|
||||
file_stat,
|
||||
metadata: Dict[str, str] | None,
|
||||
etag: str,
|
||||
size_override: int | None = None,
|
||||
mtime_override: float | None = None,
|
||||
) -> None:
|
||||
if file_stat is not None:
|
||||
if response.status_code != 206:
|
||||
response.headers["Content-Length"] = str(file_stat.st_size)
|
||||
response.headers["Last-Modified"] = http_date(file_stat.st_mtime)
|
||||
effective_size = size_override if size_override is not None else (file_stat.st_size if file_stat is not None else None)
|
||||
effective_mtime = mtime_override if mtime_override is not None else (file_stat.st_mtime if file_stat is not None else None)
|
||||
if effective_size is not None and response.status_code != 206:
|
||||
response.headers["Content-Length"] = str(effective_size)
|
||||
if effective_mtime is not None:
|
||||
response.headers["Last-Modified"] = http_date(effective_mtime)
|
||||
response.headers["ETag"] = f'"{etag}"'
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
for key, value in (metadata or {}).items():
|
||||
@@ -2820,6 +2828,8 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if validation_error:
|
||||
return _error_response("InvalidArgument", validation_error, 400)
|
||||
|
||||
metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
|
||||
try:
|
||||
meta = storage.put_object(
|
||||
bucket_name,
|
||||
@@ -2834,10 +2844,11 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if "Bucket" in message:
|
||||
return _error_response("NoSuchBucket", message, 404)
|
||||
return _error_response("InvalidArgument", message, 400)
|
||||
current_app.logger.info(
|
||||
"Object uploaded",
|
||||
extra={"bucket": bucket_name, "key": object_key, "size": meta.size},
|
||||
)
|
||||
if current_app.logger.isEnabledFor(logging.INFO):
|
||||
current_app.logger.info(
|
||||
"Object uploaded",
|
||||
extra={"bucket": bucket_name, "key": object_key, "size": meta.size},
|
||||
)
|
||||
response = Response(status=200)
|
||||
if meta.etag:
|
||||
response.headers["ETag"] = f'"{meta.etag}"'
|
||||
@@ -2871,7 +2882,7 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
except StorageError as exc:
|
||||
return _error_response("NoSuchKey", str(exc), 404)
|
||||
metadata = storage.get_object_metadata(bucket_name, object_key)
|
||||
mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
mimetype = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
|
||||
is_encrypted = "x-amz-server-side-encryption" in metadata
|
||||
|
||||
@@ -2963,10 +2974,7 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
response.headers["Content-Type"] = mimetype
|
||||
logged_bytes = 0
|
||||
|
||||
try:
|
||||
file_stat = path.stat() if not is_encrypted else None
|
||||
except (PermissionError, OSError):
|
||||
file_stat = None
|
||||
file_stat = stat if not is_encrypted else None
|
||||
_apply_object_headers(response, file_stat=file_stat, metadata=metadata, etag=etag)
|
||||
|
||||
if request.method == "GET":
|
||||
@@ -2983,8 +2991,9 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if value:
|
||||
response.headers[header] = _sanitize_header_value(value)
|
||||
|
||||
action = "Object read" if request.method == "GET" else "Object head"
|
||||
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
|
||||
if current_app.logger.isEnabledFor(logging.INFO):
|
||||
action = "Object read" if request.method == "GET" else "Object head"
|
||||
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
|
||||
return response
|
||||
|
||||
if "uploadId" in request.args:
|
||||
@@ -3002,7 +3011,8 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
|
||||
storage.delete_object(bucket_name, object_key)
|
||||
lock_service.delete_object_lock_metadata(bucket_name, object_key)
|
||||
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
|
||||
if current_app.logger.isEnabledFor(logging.INFO):
|
||||
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
|
||||
|
||||
principal, _ = _require_principal()
|
||||
_notifications().emit_object_removed(
|
||||
@@ -3343,12 +3353,20 @@ def head_object(bucket_name: str, object_key: str) -> Response:
|
||||
_authorize_action(principal, bucket_name, "read", object_key=object_key)
|
||||
path = _storage().get_object_path(bucket_name, object_key)
|
||||
metadata = _storage().get_object_metadata(bucket_name, object_key)
|
||||
stat = path.stat()
|
||||
etag = metadata.get("__etag__") or _storage()._compute_etag(path)
|
||||
|
||||
response = Response(status=200)
|
||||
_apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag)
|
||||
response.headers["Content-Type"] = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
|
||||
cached_size = metadata.get("__size__")
|
||||
cached_mtime = metadata.get("__last_modified__")
|
||||
if cached_size is not None and cached_mtime is not None:
|
||||
size_val = int(cached_size)
|
||||
mtime_val = float(cached_mtime)
|
||||
response = Response(status=200)
|
||||
_apply_object_headers(response, file_stat=None, metadata=metadata, etag=etag, size_override=size_val, mtime_override=mtime_val)
|
||||
else:
|
||||
stat = path.stat()
|
||||
response = Response(status=200)
|
||||
_apply_object_headers(response, file_stat=stat, metadata=metadata, etag=etag)
|
||||
response.headers["Content-Type"] = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
return response
|
||||
except (StorageError, FileNotFoundError):
|
||||
return _error_response("NoSuchKey", "Object not found", 404)
|
||||
@@ -3578,10 +3596,12 @@ def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response:
|
||||
return error
|
||||
|
||||
metadata = _extract_request_metadata()
|
||||
content_type = request.headers.get("Content-Type")
|
||||
metadata["__content_type__"] = content_type or mimetypes.guess_type(object_key)[0] or "application/octet-stream"
|
||||
try:
|
||||
upload_id = _storage().initiate_multipart_upload(
|
||||
bucket_name,
|
||||
object_key,
|
||||
bucket_name,
|
||||
object_key,
|
||||
metadata=metadata or None
|
||||
)
|
||||
except StorageError as exc:
|
||||
|
||||
@@ -959,7 +959,7 @@ class ObjectStorage:
|
||||
|
||||
stat = destination.stat()
|
||||
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)}
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
|
||||
@@ -1815,7 +1815,7 @@ class ObjectStorage:
|
||||
etag = checksum_hex
|
||||
metadata = manifest.get("metadata")
|
||||
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
|
||||
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime)}
|
||||
combined_meta = {**internal_meta, **(metadata or {})}
|
||||
self._write_metadata(bucket_id, safe_key, combined_meta)
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.3.5"
|
||||
APP_VERSION = "0.3.6"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
Reference in New Issue
Block a user