Fix presigned URL security vulnerabilities: enforce key/user status in SigV4 paths, remove duplicate verification, remove X-Forwarded-Host trust

This commit is contained in:
2026-03-31 20:27:18 +08:00
parent 6a193dbb1c
commit 3838aed954
3 changed files with 111 additions and 131 deletions

View File

@@ -534,21 +534,6 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti
raise iam_error or IamError("Access denied")
def _enforce_bucket_policy(principal: Principal | None, bucket_name: str | None, object_key: str | None, action: str) -> None:
if not bucket_name:
return
policy_context = _build_policy_context()
decision = _bucket_policies().evaluate(
principal.access_key if principal else None,
bucket_name,
object_key,
action,
policy_context,
)
if decision == "deny":
raise IamError("Access denied by bucket policy")
def _object_principal(action: str, bucket_name: str, object_key: str):
principal, error = _require_principal()
try:
@@ -557,121 +542,7 @@ def _object_principal(action: str, bucket_name: str, object_key: str):
except IamError as exc:
if not error:
return None, _error_response("AccessDenied", str(exc), 403)
if not _has_presign_params():
return None, error
try:
principal = _validate_presigned_request(action, bucket_name, object_key)
_enforce_bucket_policy(principal, bucket_name, object_key, action)
return principal, None
except IamError as exc:
return None, _error_response("AccessDenied", str(exc), 403)
def _has_presign_params() -> bool:
return bool(request.args.get("X-Amz-Algorithm"))
def _validate_presigned_request(action: str, bucket_name: str, object_key: str) -> Principal:
algorithm = request.args.get("X-Amz-Algorithm")
credential = request.args.get("X-Amz-Credential")
amz_date = request.args.get("X-Amz-Date")
signed_headers = request.args.get("X-Amz-SignedHeaders")
expires = request.args.get("X-Amz-Expires")
signature = request.args.get("X-Amz-Signature")
if not all([algorithm, credential, amz_date, signed_headers, expires, signature]):
raise IamError("Malformed presigned URL")
if algorithm != "AWS4-HMAC-SHA256":
raise IamError("Unsupported signing algorithm")
parts = credential.split("/")
if len(parts) != 5:
raise IamError("Invalid credential scope")
access_key, date_stamp, region, service, terminal = parts
if terminal != "aws4_request":
raise IamError("Invalid credential scope")
config_region = current_app.config["AWS_REGION"]
config_service = current_app.config["AWS_SERVICE"]
if region != config_region or service != config_service:
raise IamError("Credential scope mismatch")
try:
expiry = int(expires)
except ValueError as exc:
raise IamError("Invalid expiration") from exc
min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1)
max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800)
if expiry < min_expiry or expiry > max_expiry:
raise IamError(f"Expiration must be between {min_expiry} second(s) and {max_expiry} seconds")
try:
request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc)
except ValueError as exc:
raise IamError("Invalid X-Amz-Date") from exc
now = datetime.now(timezone.utc)
tolerance = timedelta(seconds=current_app.config.get("SIGV4_TIMESTAMP_TOLERANCE_SECONDS", 900))
if request_time > now + tolerance:
raise IamError("Request date is too far in the future")
if now > request_time + timedelta(seconds=expiry):
raise IamError("Presigned URL expired")
signed_headers_list = [header.strip().lower() for header in signed_headers.split(";") if header]
signed_headers_list.sort()
canonical_headers = _canonical_headers_from_request(signed_headers_list)
canonical_query = _canonical_query_from_request()
payload_hash = request.args.get("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
canonical_request = "\n".join(
[
request.method,
_canonical_uri(bucket_name, object_key),
canonical_query,
canonical_headers,
";".join(signed_headers_list),
payload_hash,
]
)
hashed_request = hashlib.sha256(canonical_request.encode()).hexdigest()
scope = f"{date_stamp}/{region}/{service}/aws4_request"
string_to_sign = "\n".join([
"AWS4-HMAC-SHA256",
amz_date,
scope,
hashed_request,
])
secret = _iam().secret_for_key(access_key)
signing_key = _derive_signing_key(secret, date_stamp, region, service)
expected = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, signature):
raise IamError("Signature mismatch")
return _iam().principal_for_key(access_key)
def _canonical_query_from_request() -> str:
parts = []
for key in sorted(request.args.keys()):
if key == "X-Amz-Signature":
continue
values = request.args.getlist(key)
encoded_key = quote(str(key), safe="-_.~")
for value in sorted(values):
encoded_value = quote(str(value), safe="-_.~")
parts.append(f"{encoded_key}={encoded_value}")
return "&".join(parts)
def _canonical_headers_from_request(headers: list[str]) -> str:
lines = []
for header in headers:
if header == "host":
api_base = current_app.config.get("API_BASE_URL")
if api_base:
value = urlparse(api_base).netloc
else:
value = request.host
else:
value = request.headers.get(header, "")
canonical_value = " ".join(value.strip().split()) if value else ""
lines.append(f"{header}:{canonical_value}")
return "\n".join(lines) + "\n"
def _canonical_uri(bucket_name: str, object_key: str | None) -> str:
@@ -737,8 +608,8 @@ def _generate_presigned_url(
host = parsed.netloc
scheme = parsed.scheme
else:
host = request.headers.get("X-Forwarded-Host", request.host)
scheme = request.headers.get("X-Forwarded-Proto", request.scheme or "http")
host = request.host
scheme = request.scheme or "http"
canonical_headers = f"host:{host}\n"
canonical_request = "\n".join(