Improve and standardized error handling

This commit is contained in:
2025-11-25 23:33:01 +08:00
parent b2f4d1b5db
commit 86138636db
7 changed files with 539 additions and 32 deletions

167
app/errors.py Normal file
View File

@@ -0,0 +1,167 @@
"""Standardized error handling for API and UI responses."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from xml.etree.ElementTree import Element, SubElement, tostring
from flask import Response, jsonify, request, flash, redirect, url_for, g
logger = logging.getLogger(__name__)
@dataclass
class AppError(Exception):
"""Base application error with multi-format response support."""
code: str
message: str
status_code: int = 500
details: Optional[Dict[str, Any]] = field(default=None)
def __post_init__(self):
super().__init__(self.message)
def to_xml_response(self) -> Response:
"""Convert to S3 API XML error response."""
error = Element("Error")
SubElement(error, "Code").text = self.code
SubElement(error, "Message").text = self.message
request_id = getattr(g, 'request_id', None) if g else None
SubElement(error, "RequestId").text = request_id or "unknown"
xml_bytes = tostring(error, encoding="utf-8")
return Response(xml_bytes, status=self.status_code, mimetype="application/xml")
def to_json_response(self) -> tuple[Response, int]:
"""Convert to JSON error response for UI AJAX calls."""
payload: Dict[str, Any] = {
"success": False,
"error": {
"code": self.code,
"message": self.message
}
}
if self.details:
payload["error"]["details"] = self.details
return jsonify(payload), self.status_code
def to_flash_message(self) -> str:
"""Convert to user-friendly flash message."""
return self.message
@dataclass
class BucketNotFoundError(AppError):
"""Bucket does not exist."""
code: str = "NoSuchBucket"
message: str = "The specified bucket does not exist"
status_code: int = 404
@dataclass
class BucketAlreadyExistsError(AppError):
"""Bucket already exists."""
code: str = "BucketAlreadyExists"
message: str = "The requested bucket name is not available"
status_code: int = 409
@dataclass
class BucketNotEmptyError(AppError):
"""Bucket is not empty."""
code: str = "BucketNotEmpty"
message: str = "The bucket you tried to delete is not empty"
status_code: int = 409
@dataclass
class ObjectNotFoundError(AppError):
"""Object does not exist."""
code: str = "NoSuchKey"
message: str = "The specified key does not exist"
status_code: int = 404
@dataclass
class InvalidObjectKeyError(AppError):
"""Invalid object key."""
code: str = "InvalidKey"
message: str = "The specified key is not valid"
status_code: int = 400
@dataclass
class AccessDeniedError(AppError):
"""Access denied."""
code: str = "AccessDenied"
message: str = "Access Denied"
status_code: int = 403
@dataclass
class InvalidCredentialsError(AppError):
"""Invalid credentials."""
code: str = "InvalidAccessKeyId"
message: str = "The access key ID you provided does not exist"
status_code: int = 403
@dataclass
class MalformedRequestError(AppError):
"""Malformed request."""
code: str = "MalformedXML"
message: str = "The XML you provided was not well-formed"
status_code: int = 400
@dataclass
class InvalidArgumentError(AppError):
"""Invalid argument."""
code: str = "InvalidArgument"
message: str = "Invalid argument"
status_code: int = 400
@dataclass
class EntityTooLargeError(AppError):
"""Entity too large."""
code: str = "EntityTooLarge"
message: str = "Your proposed upload exceeds the maximum allowed size"
status_code: int = 413
def handle_app_error(error: AppError) -> Response:
"""Handle application errors with appropriate response format."""
log_extra = {"error_code": error.code}
if error.details:
log_extra["details"] = error.details
logger.error(f"{error.code}: {error.message}", extra=log_extra)
if request.path.startswith('/ui'):
wants_json = (
request.is_json or
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
'application/json' in request.accept_mimetypes.values()
)
if wants_json:
return error.to_json_response()
flash(error.to_flash_message(), 'danger')
referrer = request.referrer
if referrer and request.host in referrer:
return redirect(referrer)
return redirect(url_for('ui.buckets_overview'))
else:
return error.to_xml_response()
def register_error_handlers(app):
"""Register error handlers with a Flask app."""
app.register_error_handler(AppError, handle_app_error)
for error_class in [
BucketNotFoundError, BucketAlreadyExistsError, BucketNotEmptyError,
ObjectNotFoundError, InvalidObjectKeyError,
AccessDeniedError, InvalidCredentialsError,
MalformedRequestError, InvalidArgumentError, EntityTooLargeError,
]:
app.register_error_handler(error_class, handle_app_error)

View File

@@ -258,9 +258,19 @@ class ReplicationManager:
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
return
try:
from .storage import ObjectStorage
ObjectStorage._sanitize_object_key(object_key)
except StorageError as e:
logger.error(f"Object key validation failed in replication: {e}")
return
file_size = 0
try:
# Using boto3 to upload
config = Config(user_agent_extra=REPLICATION_USER_AGENT)
s3 = boto3.client(
"s3",

View File

@@ -800,6 +800,8 @@ def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None:
"encryption": _bucket_encryption_handler,
"location": _bucket_location_handler,
"acl": _bucket_acl_handler,
"versions": _bucket_list_versions_handler,
"lifecycle": _bucket_lifecycle_handler,
}
requested = [key for key in handlers if key in request.args]
if not requested:
@@ -1135,6 +1137,268 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
return _xml_response(root)
def _bucket_list_versions_handler(bucket_name: str) -> Response:
"""Handle ListObjectVersions (GET /<bucket>?versions)."""
if request.method != "GET":
return _method_not_allowed(["GET"])
principal, error = _require_principal()
try:
_authorize_action(principal, bucket_name, "list")
except IamError as exc:
if error:
return error
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
try:
objects = storage.list_objects(bucket_name)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
prefix = request.args.get("prefix", "")
delimiter = request.args.get("delimiter", "")
max_keys = min(int(request.args.get("max-keys", 1000)), 1000)
key_marker = request.args.get("key-marker", "")
if prefix:
objects = [obj for obj in objects if obj.key.startswith(prefix)]
if key_marker:
objects = [obj for obj in objects if obj.key > key_marker]
# Build XML response
root = Element("ListVersionsResult", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
SubElement(root, "Name").text = bucket_name
SubElement(root, "Prefix").text = prefix
SubElement(root, "KeyMarker").text = key_marker
SubElement(root, "MaxKeys").text = str(max_keys)
if delimiter:
SubElement(root, "Delimiter").text = delimiter
version_count = 0
is_truncated = False
next_key_marker = ""
for obj in objects:
if version_count >= max_keys:
is_truncated = True
break
# Current version
version = SubElement(root, "Version")
SubElement(version, "Key").text = obj.key
SubElement(version, "VersionId").text = "null" # Current version ID
SubElement(version, "IsLatest").text = "true"
SubElement(version, "LastModified").text = obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z")
SubElement(version, "ETag").text = f'"{obj.etag}"'
SubElement(version, "Size").text = str(obj.size)
SubElement(version, "StorageClass").text = "STANDARD"
owner = SubElement(version, "Owner")
SubElement(owner, "ID").text = "local-owner"
SubElement(owner, "DisplayName").text = "Local Owner"
version_count += 1
next_key_marker = obj.key
# Get historical versions
try:
versions = storage.list_object_versions(bucket_name, obj.key)
for v in versions:
if version_count >= max_keys:
is_truncated = True
break
ver_elem = SubElement(root, "Version")
SubElement(ver_elem, "Key").text = obj.key
SubElement(ver_elem, "VersionId").text = v.get("version_id", "unknown")
SubElement(ver_elem, "IsLatest").text = "false"
SubElement(ver_elem, "LastModified").text = v.get("archived_at", "")
SubElement(ver_elem, "ETag").text = f'"{v.get("etag", "")}"'
SubElement(ver_elem, "Size").text = str(v.get("size", 0))
SubElement(ver_elem, "StorageClass").text = "STANDARD"
owner = SubElement(ver_elem, "Owner")
SubElement(owner, "ID").text = "local-owner"
SubElement(owner, "DisplayName").text = "Local Owner"
version_count += 1
except StorageError:
pass
SubElement(root, "IsTruncated").text = "true" if is_truncated else "false"
if is_truncated and next_key_marker:
SubElement(root, "NextKeyMarker").text = next_key_marker
return _xml_response(root)
def _bucket_lifecycle_handler(bucket_name: str) -> Response:
"""Handle bucket lifecycle configuration (GET/PUT/DELETE /<bucket>?lifecycle)."""
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
if request.method == "GET":
config = storage.get_bucket_lifecycle(bucket_name)
if not config:
return _error_response("NoSuchLifecycleConfiguration", "The lifecycle configuration does not exist", 404)
return _xml_response(_render_lifecycle_config(config))
if request.method == "DELETE":
storage.set_bucket_lifecycle(bucket_name, None)
current_app.logger.info("Bucket lifecycle deleted", extra={"bucket": bucket_name})
return Response(status=204)
# PUT
payload = request.get_data(cache=False) or b""
if not payload.strip():
return _error_response("MalformedXML", "Request body is required", 400)
try:
config = _parse_lifecycle_config(payload)
storage.set_bucket_lifecycle(bucket_name, config)
except ValueError as exc:
return _error_response("MalformedXML", str(exc), 400)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket lifecycle updated", extra={"bucket": bucket_name})
return Response(status=200)
def _render_lifecycle_config(config: list) -> Element:
"""Render lifecycle configuration to XML."""
root = Element("LifecycleConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
for rule in config:
rule_el = SubElement(root, "Rule")
SubElement(rule_el, "ID").text = rule.get("ID", "")
# Filter
filter_el = SubElement(rule_el, "Filter")
if rule.get("Prefix"):
SubElement(filter_el, "Prefix").text = rule.get("Prefix", "")
SubElement(rule_el, "Status").text = rule.get("Status", "Enabled")
# Expiration
if "Expiration" in rule:
exp = rule["Expiration"]
exp_el = SubElement(rule_el, "Expiration")
if "Days" in exp:
SubElement(exp_el, "Days").text = str(exp["Days"])
if "Date" in exp:
SubElement(exp_el, "Date").text = exp["Date"]
if exp.get("ExpiredObjectDeleteMarker"):
SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true"
# NoncurrentVersionExpiration
if "NoncurrentVersionExpiration" in rule:
nve = rule["NoncurrentVersionExpiration"]
nve_el = SubElement(rule_el, "NoncurrentVersionExpiration")
if "NoncurrentDays" in nve:
SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"])
# AbortIncompleteMultipartUpload
if "AbortIncompleteMultipartUpload" in rule:
aimu = rule["AbortIncompleteMultipartUpload"]
aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload")
if "DaysAfterInitiation" in aimu:
SubElement(aimu_el, "DaysAfterInitiation").text = str(aimu["DaysAfterInitiation"])
return root
def _parse_lifecycle_config(payload: bytes) -> list:
"""Parse lifecycle configuration from XML."""
try:
root = fromstring(payload)
except ParseError as exc:
raise ValueError(f"Unable to parse XML document: {exc}") from exc
if _strip_ns(root.tag) != "LifecycleConfiguration":
raise ValueError("Root element must be LifecycleConfiguration")
rules = []
for rule_el in root.findall("{*}Rule") or root.findall("Rule"):
rule: dict = {}
# ID
id_el = rule_el.find("{*}ID") or rule_el.find("ID")
if id_el is not None and id_el.text:
rule["ID"] = id_el.text.strip()
# Filter/Prefix
filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter")
if filter_el is not None:
prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix")
if prefix_el is not None and prefix_el.text:
rule["Prefix"] = prefix_el.text
# Legacy Prefix (outside Filter)
if "Prefix" not in rule:
prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix")
if prefix_el is not None:
rule["Prefix"] = prefix_el.text or ""
# Status
status_el = rule_el.find("{*}Status") or rule_el.find("Status")
rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled"
# Expiration
exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration")
if exp_el is not None:
expiration: dict = {}
days_el = exp_el.find("{*}Days") or exp_el.find("Days")
if days_el is not None and days_el.text:
expiration["Days"] = int(days_el.text.strip())
date_el = exp_el.find("{*}Date") or exp_el.find("Date")
if date_el is not None and date_el.text:
expiration["Date"] = date_el.text.strip()
eodm_el = exp_el.find("{*}ExpiredObjectDeleteMarker") or exp_el.find("ExpiredObjectDeleteMarker")
if eodm_el is not None and (eodm_el.text or "").strip().lower() in {"true", "1"}:
expiration["ExpiredObjectDeleteMarker"] = True
if expiration:
rule["Expiration"] = expiration
# NoncurrentVersionExpiration
nve_el = rule_el.find("{*}NoncurrentVersionExpiration") or rule_el.find("NoncurrentVersionExpiration")
if nve_el is not None:
nve: dict = {}
days_el = nve_el.find("{*}NoncurrentDays") or nve_el.find("NoncurrentDays")
if days_el is not None and days_el.text:
nve["NoncurrentDays"] = int(days_el.text.strip())
if nve:
rule["NoncurrentVersionExpiration"] = nve
# AbortIncompleteMultipartUpload
aimu_el = rule_el.find("{*}AbortIncompleteMultipartUpload") or rule_el.find("AbortIncompleteMultipartUpload")
if aimu_el is not None:
aimu: dict = {}
days_el = aimu_el.find("{*}DaysAfterInitiation") or aimu_el.find("DaysAfterInitiation")
if days_el is not None and days_el.text:
aimu["DaysAfterInitiation"] = int(days_el.text.strip())
if aimu:
rule["AbortIncompleteMultipartUpload"] = aimu
rules.append(rule)
return rules
def _bulk_delete_handler(bucket_name: str) -> Response:
principal, error = _require_principal()
if error:

View File

@@ -116,7 +116,6 @@ class ObjectStorage:
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
# ---------------------- Bucket helpers ----------------------
def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = []
for bucket in sorted(self.root.iterdir()):
@@ -160,17 +159,14 @@ class ObjectStorage:
if not bucket_path.exists():
raise StorageError("Bucket does not exist")
# Try to read from cache
cache_path = self._system_bucket_root(bucket_name) / "stats.json"
if cache_path.exists():
try:
# Check if cache is fresh
if time.time() - cache_path.stat().st_mtime < cache_ttl:
return json.loads(cache_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
# Calculate fresh stats
object_count = 0
total_bytes = 0
for path in bucket_path.rglob("*"):
@@ -184,7 +180,6 @@ class ObjectStorage:
stats = {"objects": object_count, "bytes": total_bytes}
# Write to cache
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_text(json.dumps(stats), encoding="utf-8")
@@ -215,7 +210,6 @@ class ObjectStorage:
self._remove_tree(self._system_bucket_root(bucket_path.name))
self._remove_tree(self._multipart_bucket_root(bucket_path.name))
# ---------------------- Object helpers ----------------------
def list_objects(self, bucket_name: str) -> List[ObjectMeta]:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
@@ -272,7 +266,6 @@ class ObjectStorage:
else:
self._delete_metadata(bucket_id, safe_key)
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
return ObjectMeta(
@@ -309,7 +302,6 @@ class ObjectStorage:
self._safe_unlink(path)
self._delete_metadata(bucket_id, rel)
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
for parent in path.parents:
@@ -345,7 +337,6 @@ class ObjectStorage:
if parent.exists() and not any(parent.iterdir()):
parent.rmdir()
# ---------------------- Versioning helpers ----------------------
def is_versioning_enabled(self, bucket_name: str) -> bool:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
@@ -358,7 +349,6 @@ class ObjectStorage:
config["versioning_enabled"] = bool(enabled)
self._write_bucket_config(bucket_path.name, config)
# ---------------------- Bucket configuration helpers ----------------------
def get_bucket_tags(self, bucket_name: str) -> List[Dict[str, str]]:
bucket_path = self._require_bucket_path(bucket_name)
config = self._read_bucket_config(bucket_path.name)
@@ -411,7 +401,18 @@ class ObjectStorage:
bucket_path = self._require_bucket_path(bucket_name)
self._set_bucket_config_entry(bucket_path.name, "encryption", config_payload or None)
# ---------------------- Object tagging helpers ----------------------
def get_bucket_lifecycle(self, bucket_name: str) -> Optional[List[Dict[str, Any]]]:
"""Get lifecycle configuration for bucket."""
bucket_path = self._require_bucket_path(bucket_name)
config = self._read_bucket_config(bucket_path.name)
lifecycle = config.get("lifecycle")
return lifecycle if isinstance(lifecycle, list) else None
def set_bucket_lifecycle(self, bucket_name: str, rules: Optional[List[Dict[str, Any]]]) -> None:
"""Set lifecycle configuration for bucket."""
bucket_path = self._require_bucket_path(bucket_name)
self._set_bucket_config_entry(bucket_path.name, "lifecycle", rules)
def get_object_tags(self, bucket_name: str, object_key: str) -> List[Dict[str, str]]:
"""Get tags for an object."""
bucket_path = self._bucket_path(bucket_name)
@@ -422,7 +423,6 @@ class ObjectStorage:
if not object_path.exists():
raise StorageError("Object does not exist")
# Tags are stored in the metadata file alongside user metadata
for meta_file in (self._metadata_file(bucket_path.name, safe_key), self._legacy_metadata_file(bucket_path.name, safe_key)):
if not meta_file.exists():
continue
@@ -448,7 +448,6 @@ class ObjectStorage:
meta_file = self._metadata_file(bucket_path.name, safe_key)
# Read existing metadata
existing_payload: Dict[str, Any] = {}
if meta_file.exists():
try:
@@ -456,19 +455,16 @@ class ObjectStorage:
except (OSError, json.JSONDecodeError):
pass
# Update tags
if tags:
existing_payload["tags"] = tags
else:
existing_payload.pop("tags", None)
# Write back if there's anything to store, otherwise delete
if existing_payload.get("metadata") or existing_payload.get("tags"):
meta_file.parent.mkdir(parents=True, exist_ok=True)
meta_file.write_text(json.dumps(existing_payload), encoding="utf-8")
elif meta_file.exists():
meta_file.unlink()
# Clean up empty parent directories
parent = meta_file.parent
meta_root = self._bucket_meta_root(bucket_path.name)
while parent != meta_root and parent.exists() and not any(parent.iterdir()):
@@ -603,7 +599,6 @@ class ObjectStorage:
record.pop("_latest_sort", None)
return sorted(aggregated.values(), key=lambda item: item["key"])
# ---------------------- Multipart helpers ----------------------
def initiate_multipart_upload(
self,
bucket_name: str,
@@ -695,7 +690,6 @@ class ObjectStorage:
destination = bucket_path / safe_key
destination.parent.mkdir(parents=True, exist_ok=True)
# Use a lock file to prevent concurrent writes to the same destination
lock_file_path = self._system_bucket_root(bucket_id) / "locks" / f"{safe_key.as_posix().replace('/', '_')}.lock"
lock_file_path.parent.mkdir(parents=True, exist_ok=True)
@@ -726,7 +720,6 @@ class ObjectStorage:
except BlockingIOError:
raise StorageError("Another upload to this key is in progress")
finally:
# Clean up lock file
try:
lock_file_path.unlink(missing_ok=True)
except OSError:
@@ -734,7 +727,6 @@ class ObjectStorage:
shutil.rmtree(upload_root, ignore_errors=True)
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
stat = destination.stat()
@@ -783,7 +775,6 @@ class ObjectStorage:
parts.sort(key=lambda x: x["PartNumber"])
return parts
# ---------------------- internal helpers ----------------------
def _bucket_path(self, bucket_name: str) -> Path:
safe_name = self._sanitize_bucket_name(bucket_name)
return self.root / safe_name