Reduce per-request overhead: pre-compile SigV4 regex, in-memory etag index cache, 1MB GET chunks, configurable meta cache, skip fsync for rebuildable caches
This commit is contained in:
@@ -184,6 +184,7 @@ def create_app(
|
||||
object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100),
|
||||
bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0),
|
||||
object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024),
|
||||
meta_read_cache_max=app.config.get("META_READ_CACHE_MAX", 2048),
|
||||
)
|
||||
|
||||
if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"):
|
||||
|
||||
@@ -136,6 +136,7 @@ class AppConfig:
|
||||
site_sync_clock_skew_tolerance_seconds: float
|
||||
object_key_max_length_bytes: int
|
||||
object_cache_max_size: int
|
||||
meta_read_cache_max: int
|
||||
bucket_config_cache_ttl_seconds: float
|
||||
object_tag_limit: int
|
||||
encryption_chunk_size_bytes: int
|
||||
@@ -315,6 +316,7 @@ class AppConfig:
|
||||
site_sync_clock_skew_tolerance_seconds = float(_get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0))
|
||||
object_key_max_length_bytes = int(_get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024))
|
||||
object_cache_max_size = int(_get("OBJECT_CACHE_MAX_SIZE", 100))
|
||||
meta_read_cache_max = int(_get("META_READ_CACHE_MAX", 2048))
|
||||
bucket_config_cache_ttl_seconds = float(_get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0))
|
||||
object_tag_limit = int(_get("OBJECT_TAG_LIMIT", 50))
|
||||
encryption_chunk_size_bytes = int(_get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024))
|
||||
@@ -421,6 +423,7 @@ class AppConfig:
|
||||
site_sync_clock_skew_tolerance_seconds=site_sync_clock_skew_tolerance_seconds,
|
||||
object_key_max_length_bytes=object_key_max_length_bytes,
|
||||
object_cache_max_size=object_cache_max_size,
|
||||
meta_read_cache_max=meta_read_cache_max,
|
||||
bucket_config_cache_ttl_seconds=bucket_config_cache_ttl_seconds,
|
||||
object_tag_limit=object_tag_limit,
|
||||
encryption_chunk_size_bytes=encryption_chunk_size_bytes,
|
||||
@@ -648,6 +651,7 @@ class AppConfig:
|
||||
"SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS": self.site_sync_clock_skew_tolerance_seconds,
|
||||
"OBJECT_KEY_MAX_LENGTH_BYTES": self.object_key_max_length_bytes,
|
||||
"OBJECT_CACHE_MAX_SIZE": self.object_cache_max_size,
|
||||
"META_READ_CACHE_MAX": self.meta_read_cache_max,
|
||||
"BUCKET_CONFIG_CACHE_TTL_SECONDS": self.bucket_config_cache_ttl_seconds,
|
||||
"OBJECT_TAG_LIMIT": self.object_tag_limit,
|
||||
"ENCRYPTION_CHUNK_SIZE_BYTES": self.encryption_chunk_size_bytes,
|
||||
|
||||
@@ -201,6 +201,11 @@ _SIGNING_KEY_CACHE_LOCK = threading.Lock()
|
||||
_SIGNING_KEY_CACHE_TTL = 60.0
|
||||
_SIGNING_KEY_CACHE_MAX_SIZE = 256
|
||||
|
||||
_SIGV4_HEADER_RE = re.compile(
|
||||
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)"
|
||||
)
|
||||
_SIGV4_REQUIRED_HEADERS = frozenset({'host', 'x-amz-date'})
|
||||
|
||||
|
||||
def clear_signing_key_cache() -> None:
|
||||
if _HAS_RUST:
|
||||
@@ -259,10 +264,7 @@ def _get_canonical_uri(req: Any) -> str:
|
||||
|
||||
|
||||
def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
||||
match = re.match(
|
||||
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)",
|
||||
auth_header,
|
||||
)
|
||||
match = _SIGV4_HEADER_RE.match(auth_header)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
@@ -286,14 +288,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
||||
if time_diff > tolerance:
|
||||
raise IamError("Request timestamp too old or too far in the future")
|
||||
|
||||
required_headers = {'host', 'x-amz-date'}
|
||||
signed_headers_set = set(signed_headers_str.split(';'))
|
||||
if not required_headers.issubset(signed_headers_set):
|
||||
if 'date' in signed_headers_set:
|
||||
required_headers.remove('x-amz-date')
|
||||
required_headers.add('date')
|
||||
|
||||
if not required_headers.issubset(signed_headers_set):
|
||||
if not _SIGV4_REQUIRED_HEADERS.issubset(signed_headers_set):
|
||||
if not ({'host', 'date'}.issubset(signed_headers_set)):
|
||||
raise IamError("Required headers not signed")
|
||||
|
||||
canonical_uri = _get_canonical_uri(req)
|
||||
@@ -1010,7 +1007,7 @@ def _render_encryption_document(config: dict[str, Any]) -> Element:
|
||||
return root
|
||||
|
||||
|
||||
def _stream_file(path, chunk_size: int = 256 * 1024):
|
||||
def _stream_file(path, chunk_size: int = 1024 * 1024):
|
||||
with path.open("rb") as handle:
|
||||
while True:
|
||||
chunk = handle.read(chunk_size)
|
||||
|
||||
@@ -190,6 +190,7 @@ class ObjectStorage:
|
||||
object_cache_max_size: int = 100,
|
||||
bucket_config_cache_ttl: float = 30.0,
|
||||
object_key_max_length_bytes: int = 1024,
|
||||
meta_read_cache_max: int = 2048,
|
||||
) -> None:
|
||||
self.root = Path(root)
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
@@ -208,7 +209,7 @@ class ObjectStorage:
|
||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||
self._meta_index_locks: Dict[str, threading.Lock] = {}
|
||||
self._meta_read_cache: OrderedDict[tuple, Optional[Dict[str, Any]]] = OrderedDict()
|
||||
self._meta_read_cache_max = 2048
|
||||
self._meta_read_cache_max = meta_read_cache_max
|
||||
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
|
||||
self._stats_mem: Dict[str, Dict[str, int]] = {}
|
||||
self._stats_serial: Dict[str, int] = {}
|
||||
@@ -218,6 +219,7 @@ class ObjectStorage:
|
||||
self._stats_flush_timer: Optional[threading.Timer] = None
|
||||
self._etag_index_dirty: set[str] = set()
|
||||
self._etag_index_flush_timer: Optional[threading.Timer] = None
|
||||
self._etag_index_mem: Dict[str, tuple[Dict[str, str], float]] = {}
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
with self._registry_lock:
|
||||
@@ -427,7 +429,7 @@ class ObjectStorage:
|
||||
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._atomic_write_json(cache_path, data)
|
||||
self._atomic_write_json(cache_path, data, sync=False)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -602,14 +604,7 @@ class ObjectStorage:
|
||||
is_truncated=False, next_continuation_token=None,
|
||||
)
|
||||
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
meta_cache: Dict[str, str] = {}
|
||||
if etag_index_path.exists():
|
||||
try:
|
||||
with open(etag_index_path, 'r', encoding='utf-8') as f:
|
||||
meta_cache = json.load(f)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
meta_cache: Dict[str, str] = self._get_etag_index(bucket_id)
|
||||
|
||||
entries_files: list[tuple[str, int, float, Optional[str]]] = []
|
||||
entries_dirs: list[str] = []
|
||||
@@ -2088,6 +2083,7 @@ class ObjectStorage:
|
||||
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(raw["etag_cache"], f)
|
||||
self._etag_index_mem[bucket_id] = (dict(raw["etag_cache"]), etag_index_path.stat().st_mtime)
|
||||
except OSError:
|
||||
pass
|
||||
for key, size, mtime, etag in raw["objects"]:
|
||||
@@ -2211,6 +2207,7 @@ class ObjectStorage:
|
||||
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(meta_cache, f)
|
||||
self._etag_index_mem[bucket_id] = (dict(meta_cache), etag_index_path.stat().st_mtime)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -2324,6 +2321,25 @@ class ObjectStorage:
|
||||
self._etag_index_dirty.add(bucket_id)
|
||||
self._schedule_etag_index_flush()
|
||||
|
||||
def _get_etag_index(self, bucket_id: str) -> Dict[str, str]:
|
||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||
try:
|
||||
current_mtime = etag_index_path.stat().st_mtime
|
||||
except OSError:
|
||||
return {}
|
||||
cached = self._etag_index_mem.get(bucket_id)
|
||||
if cached:
|
||||
cache_dict, cached_mtime = cached
|
||||
if current_mtime == cached_mtime:
|
||||
return cache_dict
|
||||
try:
|
||||
with open(etag_index_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
self._etag_index_mem[bucket_id] = (data, current_mtime)
|
||||
return data
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return {}
|
||||
|
||||
def _schedule_etag_index_flush(self) -> None:
|
||||
if self._etag_index_flush_timer is None or not self._etag_index_flush_timer.is_alive():
|
||||
self._etag_index_flush_timer = threading.Timer(5.0, self._flush_etag_indexes)
|
||||
@@ -2345,6 +2361,7 @@ class ObjectStorage:
|
||||
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(index, f)
|
||||
self._etag_index_mem[bucket_id] = (index, etag_index_path.stat().st_mtime)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -2388,14 +2405,15 @@ class ObjectStorage:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def _atomic_write_json(path: Path, data: Any) -> None:
|
||||
def _atomic_write_json(path: Path, data: Any, *, sync: bool = True) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = path.with_suffix(".tmp")
|
||||
try:
|
||||
with tmp_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
if sync:
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
tmp_path.replace(path)
|
||||
except BaseException:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user