Optimize CPU usage via caching and reducing ThreadPoolExecutor workers to prevent CPU saturation

This commit is contained in:
2026-02-02 13:30:06 +08:00
parent 45d21cce21
commit a779b002d7
5 changed files with 66 additions and 7 deletions

View File

@@ -6,6 +6,7 @@ import re
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from fnmatch import fnmatch, translate from fnmatch import fnmatch, translate
from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Pattern, Sequence, Tuple from typing import Any, Dict, Iterable, List, Optional, Pattern, Sequence, Tuple
@@ -13,9 +14,14 @@ from typing import Any, Dict, Iterable, List, Optional, Pattern, Sequence, Tuple
RESOURCE_PREFIX = "arn:aws:s3:::" RESOURCE_PREFIX = "arn:aws:s3:::"
@lru_cache(maxsize=256)
def _compile_pattern(pattern: str) -> Pattern[str]:
return re.compile(translate(pattern), re.IGNORECASE)
def _match_string_like(value: str, pattern: str) -> bool: def _match_string_like(value: str, pattern: str) -> bool:
regex = translate(pattern) compiled = _compile_pattern(pattern)
return bool(re.match(regex, value, re.IGNORECASE)) return bool(compiled.match(value))
def _ip_in_cidr(ip_str: str, cidr: str) -> bool: def _ip_in_cidr(ip_str: str, cidr: str) -> bool:

View File

@@ -4,6 +4,7 @@ import hashlib
import hmac import hmac
import json import json
import math import math
import os
import secrets import secrets
import threading import threading
import time import time
@@ -121,7 +122,8 @@ class IamService:
self._failed_attempts: Dict[str, Deque[datetime]] = {} self._failed_attempts: Dict[str, Deque[datetime]] = {}
self._last_load_time = 0.0 self._last_load_time = 0.0
self._principal_cache: Dict[str, Tuple[Principal, float]] = {} self._principal_cache: Dict[str, Tuple[Principal, float]] = {}
self._cache_ttl = 10.0 self._secret_key_cache: Dict[str, Tuple[str, float]] = {}
self._cache_ttl = float(os.environ.get("IAM_CACHE_TTL_SECONDS", "5.0"))
self._last_stat_check = 0.0 self._last_stat_check = 0.0
self._stat_check_interval = 1.0 self._stat_check_interval = 1.0
self._sessions: Dict[str, Dict[str, Any]] = {} self._sessions: Dict[str, Dict[str, Any]] = {}
@@ -139,6 +141,7 @@ class IamService:
if self.config_path.stat().st_mtime > self._last_load_time: if self.config_path.stat().st_mtime > self._last_load_time:
self._load() self._load()
self._principal_cache.clear() self._principal_cache.clear()
self._secret_key_cache.clear()
except OSError: except OSError:
pass pass
@@ -367,6 +370,9 @@ class IamService:
user["secret_key"] = new_secret user["secret_key"] = new_secret
self._save() self._save()
self._principal_cache.pop(access_key, None) self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
from .s3_api import clear_signing_key_cache
clear_signing_key_cache()
self._load() self._load()
return new_secret return new_secret
@@ -385,6 +391,10 @@ class IamService:
raise IamError("User not found") raise IamError("User not found")
self._raw_config["users"] = remaining self._raw_config["users"] = remaining
self._save() self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
from .s3_api import clear_signing_key_cache
clear_signing_key_cache()
self._load() self._load()
def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None: def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None:
@@ -546,10 +556,19 @@ class IamService:
raise IamError("User not found") raise IamError("User not found")
def get_secret_key(self, access_key: str) -> str | None: def get_secret_key(self, access_key: str) -> str | None:
now = time.time()
cached = self._secret_key_cache.get(access_key)
if cached:
secret_key, cached_time = cached
if now - cached_time < self._cache_ttl:
return secret_key
self._maybe_reload() self._maybe_reload()
record = self._users.get(access_key) record = self._users.get(access_key)
if record: if record:
return record["secret_key"] secret_key = record["secret_key"]
self._secret_key_cache[access_key] = (secret_key, now)
return secret_key
return None return None
def get_principal(self, access_key: str) -> Principal | None: def get_principal(self, access_key: str) -> Principal | None:

View File

@@ -6,9 +6,12 @@ import hmac
import logging import logging
import mimetypes import mimetypes
import re import re
import threading
import time
import uuid import uuid
from collections import OrderedDict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Tuple
from urllib.parse import quote, urlencode, urlparse, unquote from urllib.parse import quote, urlencode, urlparse, unquote
from xml.etree.ElementTree import Element, SubElement, tostring, ParseError from xml.etree.ElementTree import Element, SubElement, tostring, ParseError
from defusedxml.ElementTree import fromstring from defusedxml.ElementTree import fromstring
@@ -181,11 +184,41 @@ def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
_SIGNING_KEY_CACHE: OrderedDict[Tuple[str, str, str, str], Tuple[bytes, float]] = OrderedDict()
_SIGNING_KEY_CACHE_LOCK = threading.Lock()
_SIGNING_KEY_CACHE_TTL = 60.0
_SIGNING_KEY_CACHE_MAX_SIZE = 256
def clear_signing_key_cache() -> None:
with _SIGNING_KEY_CACHE_LOCK:
_SIGNING_KEY_CACHE.clear()
def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name: str) -> bytes: def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name: str) -> bytes:
cache_key = (key, date_stamp, region_name, service_name)
now = time.time()
with _SIGNING_KEY_CACHE_LOCK:
cached = _SIGNING_KEY_CACHE.get(cache_key)
if cached:
signing_key, cached_time = cached
if now - cached_time < _SIGNING_KEY_CACHE_TTL:
_SIGNING_KEY_CACHE.move_to_end(cache_key)
return signing_key
else:
del _SIGNING_KEY_CACHE[cache_key]
k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp) k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp)
k_region = _sign(k_date, region_name) k_region = _sign(k_date, region_name)
k_service = _sign(k_region, service_name) k_service = _sign(k_region, service_name)
k_signing = _sign(k_service, "aws4_request") k_signing = _sign(k_service, "aws4_request")
with _SIGNING_KEY_CACHE_LOCK:
if len(_SIGNING_KEY_CACHE) >= _SIGNING_KEY_CACHE_MAX_SIZE:
_SIGNING_KEY_CACHE.popitem(last=False)
_SIGNING_KEY_CACHE[cache_key] = (k_signing, now)
return k_signing return k_signing

View File

@@ -1469,7 +1469,8 @@ class ObjectStorage:
if meta_files: if meta_files:
meta_cache = {} meta_cache = {}
with ThreadPoolExecutor(max_workers=min(64, len(meta_files))) as executor: max_workers = min((os.cpu_count() or 4) * 2, len(meta_files), 16)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for key, etag in executor.map(read_meta_file, meta_files): for key, etag in executor.map(read_meta_file, meta_files):
if etag: if etag:
meta_cache[key] = etag meta_cache[key] = etag

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.2.4" APP_VERSION = "0.2.5"
def get_version() -> str: def get_version() -> str: