diff --git a/app/__init__.py b/app/__init__.py index 636dc7c..b010902 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -223,6 +223,13 @@ def create_app( app.extensions["access_logging"] = access_logging_service app.extensions["site_registry"] = site_registry + from .s3_client import S3ProxyClient + api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" + app.extensions["s3_proxy"] = S3ProxyClient( + api_base_url=api_base, + region=app.config.get("AWS_REGION", "us-east-1"), + ) + operation_metrics_collector = None if app.config.get("OPERATION_METRICS_ENABLED", False): operation_metrics_collector = OperationMetricsCollector( diff --git a/app/compression.py b/app/compression.py index 28e6899..bf32504 100644 --- a/app/compression.py +++ b/app/compression.py @@ -36,11 +36,11 @@ class GzipMiddleware: content_type = None content_length = None should_compress = False - is_streaming = False + passthrough = False exc_info_holder = [None] def custom_start_response(status: str, headers: List[Tuple[str, str]], exc_info=None): - nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress, is_streaming + nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress, passthrough response_started = True status_code = int(status.split(' ', 1)[0]) response_headers = list(headers) @@ -51,23 +51,29 @@ class GzipMiddleware: if name_lower == 'content-type': content_type = value.split(';')[0].strip().lower() elif name_lower == 'content-length': - content_length = int(value) + try: + content_length = int(value) + except (ValueError, TypeError): + pass elif name_lower == 'content-encoding': - should_compress = False + passthrough = True return start_response(status, headers, exc_info) elif name_lower == 'x-stream-response': - is_streaming = True + passthrough = True return start_response(status, headers, exc_info) if content_type and content_type in COMPRESSIBLE_MIMES: if content_length is None or content_length >= self.min_size: should_compress = True + else: + passthrough = True + return start_response(status, headers, exc_info) return None app_iter = self.app(environ, custom_start_response) - if is_streaming: + if passthrough: return app_iter response_body = b''.join(app_iter) diff --git a/app/s3_api.py b/app/s3_api.py index a237195..60898f6 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -3,6 +3,7 @@ from __future__ import annotations import base64 import hashlib import hmac +import json import logging import mimetypes import re @@ -2780,7 +2781,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() file_size = stat.st_size - etag = storage._compute_etag(path) + etag = metadata.get("__etag__") or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -2828,7 +2829,7 @@ def object_handler(bucket_name: str, object_key: str): try: stat = path.stat() response = Response(status=200) - etag = storage._compute_etag(path) + etag = metadata.get("__etag__") or storage._compute_etag(path) except PermissionError: return _error_response("AccessDenied", "Permission denied accessing object", 403) except OSError as exc: @@ -2963,7 +2964,11 @@ def _bucket_policy_handler(bucket_name: str) -> Response: store.delete_policy(bucket_name) current_app.logger.info("Bucket policy removed", extra={"bucket": bucket_name}) return Response(status=204) - payload = request.get_json(silent=True) + raw_body = request.get_data(cache=False) or b"" + try: + payload = json.loads(raw_body) + except (json.JSONDecodeError, ValueError): + return _error_response("MalformedPolicy", "Policy document must be JSON", 400) if not payload: return _error_response("MalformedPolicy", "Policy document must be JSON", 400) try: diff --git a/app/s3_client.py b/app/s3_client.py new file mode 100644 index 0000000..d5d6978 --- /dev/null +++ b/app/s3_client.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import json +import logging +import threading +import time +from typing import Any, Generator, Optional + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError +from flask import current_app, session + +logger = logging.getLogger(__name__) + +UI_PROXY_USER_AGENT = "MyFSIO-UIProxy/1.0" + +_BOTO_ERROR_MAP = { + "NoSuchBucket": 404, + "NoSuchKey": 404, + "NoSuchUpload": 404, + "BucketAlreadyExists": 409, + "BucketAlreadyOwnedByYou": 409, + "BucketNotEmpty": 409, + "AccessDenied": 403, + "InvalidAccessKeyId": 403, + "SignatureDoesNotMatch": 403, + "InvalidBucketName": 400, + "InvalidArgument": 400, + "MalformedXML": 400, + "EntityTooLarge": 400, + "QuotaExceeded": 403, +} + +_UPLOAD_REGISTRY_MAX_AGE = 86400 +_UPLOAD_REGISTRY_CLEANUP_INTERVAL = 3600 + + +class UploadRegistry: + def __init__(self) -> None: + self._entries: dict[str, tuple[str, str, float]] = {} + self._lock = threading.Lock() + self._last_cleanup = time.monotonic() + + def register(self, upload_id: str, bucket_name: str, object_key: str) -> None: + with self._lock: + self._entries[upload_id] = (bucket_name, object_key, time.monotonic()) + self._maybe_cleanup() + + def get_key(self, upload_id: str, bucket_name: str) -> Optional[str]: + with self._lock: + entry = self._entries.get(upload_id) + if entry is None: + return None + stored_bucket, key, created_at = entry + if stored_bucket != bucket_name: + return None + if time.monotonic() - created_at > _UPLOAD_REGISTRY_MAX_AGE: + del self._entries[upload_id] + return None + return key + + def remove(self, upload_id: str) -> None: + with self._lock: + self._entries.pop(upload_id, None) + + def _maybe_cleanup(self) -> None: + now = time.monotonic() + if now - self._last_cleanup < _UPLOAD_REGISTRY_CLEANUP_INTERVAL: + return + self._last_cleanup = now + cutoff = now - _UPLOAD_REGISTRY_MAX_AGE + stale = [uid for uid, (_, _, ts) in self._entries.items() if ts < cutoff] + for uid in stale: + del self._entries[uid] + + +class S3ProxyClient: + def __init__(self, api_base_url: str, region: str = "us-east-1") -> None: + if not api_base_url: + raise ValueError("api_base_url is required for S3ProxyClient") + self._api_base_url = api_base_url.rstrip("/") + self._region = region + self.upload_registry = UploadRegistry() + + @property + def api_base_url(self) -> str: + return self._api_base_url + + def get_client(self, access_key: str, secret_key: str) -> Any: + if not access_key or not secret_key: + raise ValueError("Both access_key and secret_key are required") + config = Config( + user_agent_extra=UI_PROXY_USER_AGENT, + connect_timeout=5, + read_timeout=30, + retries={"max_attempts": 0}, + signature_version="s3v4", + s3={"addressing_style": "path"}, + request_checksum_calculation="when_required", + response_checksum_validation="when_required", + ) + return boto3.client( + "s3", + endpoint_url=self._api_base_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=self._region, + config=config, + ) + + +def _get_proxy() -> S3ProxyClient: + proxy = current_app.extensions.get("s3_proxy") + if proxy is None: + raise RuntimeError( + "S3 proxy not configured. Set API_BASE_URL or run both API and UI servers." + ) + return proxy + + +def _get_session_creds() -> tuple[str, str]: + secret_store = current_app.extensions["secret_store"] + secret_store.purge_expired() + token = session.get("cred_token") + if not token: + raise PermissionError("Not authenticated") + creds = secret_store.peek(token) + if not creds: + raise PermissionError("Session expired") + access_key = creds.get("access_key", "") + secret_key = creds.get("secret_key", "") + if not access_key or not secret_key: + raise PermissionError("Invalid session credentials") + return access_key, secret_key + + +def get_session_s3_client() -> Any: + proxy = _get_proxy() + access_key, secret_key = _get_session_creds() + return proxy.get_client(access_key, secret_key) + + +def get_upload_registry() -> UploadRegistry: + return _get_proxy().upload_registry + + +def handle_client_error(exc: ClientError) -> tuple[dict[str, str], int]: + error_info = exc.response.get("Error", {}) + code = error_info.get("Code", "InternalError") + message = error_info.get("Message") or "S3 operation failed" + http_status = _BOTO_ERROR_MAP.get(code) + if http_status is None: + http_status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 500) + return {"error": message}, http_status + + +def handle_connection_error(exc: Exception) -> tuple[dict[str, str], int]: + logger.error("S3 API connection failed: %s", exc) + return {"error": "S3 API server is unreachable. Ensure the API server is running."}, 502 + + +def format_datetime_display(dt: Any, display_tz: str = "UTC") -> str: + from .ui import _format_datetime_display + return _format_datetime_display(dt, display_tz) + + +def format_datetime_iso(dt: Any, display_tz: str = "UTC") -> str: + from .ui import _format_datetime_iso + return _format_datetime_iso(dt, display_tz) + + +def build_url_templates(bucket_name: str) -> dict[str, str]: + from flask import url_for + preview_t = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + delete_t = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + presign_t = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versions_t = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + restore_t = url_for( + "ui.restore_object_version", + bucket_name=bucket_name, + object_key="KEY_PLACEHOLDER", + version_id="VERSION_ID_PLACEHOLDER", + ) + tags_t = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + copy_t = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + move_t = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + metadata_t = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + return { + "preview": preview_t, + "download": preview_t + "?download=1", + "presign": presign_t, + "delete": delete_t, + "versions": versions_t, + "restore": restore_t, + "tags": tags_t, + "copy": copy_t, + "move": move_t, + "metadata": metadata_t, + } + + +def translate_list_objects( + boto3_response: dict[str, Any], + url_templates: dict[str, str], + display_tz: str = "UTC", + versioning_enabled: bool = False, +) -> dict[str, Any]: + objects_data = [] + for obj in boto3_response.get("Contents", []): + last_mod = obj["LastModified"] + objects_data.append({ + "key": obj["Key"], + "size": obj["Size"], + "last_modified": last_mod.isoformat(), + "last_modified_display": format_datetime_display(last_mod, display_tz), + "last_modified_iso": format_datetime_iso(last_mod, display_tz), + "etag": obj.get("ETag", "").strip('"'), + }) + return { + "objects": objects_data, + "is_truncated": boto3_response.get("IsTruncated", False), + "next_continuation_token": boto3_response.get("NextContinuationToken"), + "total_count": boto3_response.get("KeyCount", len(objects_data)), + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + } + + +def get_versioning_via_s3(client: Any, bucket_name: str) -> bool: + try: + resp = client.get_bucket_versioning(Bucket=bucket_name) + return resp.get("Status") == "Enabled" + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code != "NoSuchBucket": + logger.warning("Failed to check versioning for %s: %s", bucket_name, code) + return False + + +def stream_objects_ndjson( + client: Any, + bucket_name: str, + prefix: Optional[str], + url_templates: dict[str, str], + display_tz: str = "UTC", + versioning_enabled: bool = False, +) -> Generator[str, None, None]: + meta_line = json.dumps({ + "type": "meta", + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + }) + "\n" + yield meta_line + + yield json.dumps({"type": "count", "total_count": 0}) + "\n" + + kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": 1000} + if prefix: + kwargs["Prefix"] = prefix + + try: + paginator = client.get_paginator("list_objects_v2") + for page in paginator.paginate(**kwargs): + for obj in page.get("Contents", []): + last_mod = obj["LastModified"] + yield json.dumps({ + "type": "object", + "key": obj["Key"], + "size": obj["Size"], + "last_modified": last_mod.isoformat(), + "last_modified_display": format_datetime_display(last_mod, display_tz), + "last_modified_iso": format_datetime_iso(last_mod, display_tz), + "etag": obj.get("ETag", "").strip('"'), + }) + "\n" + except ClientError as exc: + error_msg = exc.response.get("Error", {}).get("Message", "S3 operation failed") + yield json.dumps({"type": "error", "error": error_msg}) + "\n" + return + except (EndpointConnectionError, ConnectionClosedError): + yield json.dumps({"type": "error", "error": "S3 API server is unreachable"}) + "\n" + return + + yield json.dumps({"type": "done"}) + "\n" diff --git a/app/storage.py b/app/storage.py index e688bea..e3228ce 100644 --- a/app/storage.py +++ b/app/storage.py @@ -188,6 +188,7 @@ class ObjectStorage: self._object_cache_max_size = object_cache_max_size self._object_key_max_length_bytes = object_key_max_length_bytes self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} + self._meta_index_locks: Dict[str, threading.Lock] = {} self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: @@ -816,6 +817,10 @@ class ObjectStorage: if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") + entry = self._read_index_entry(bucket_path.name, safe_key) + if entry is not None: + tags = entry.get("tags") + return tags if isinstance(tags, list) else [] for meta_file in (self._metadata_file(bucket_path.name, safe_key), self._legacy_metadata_file(bucket_path.name, safe_key)): if not meta_file.exists(): continue @@ -839,30 +844,31 @@ class ObjectStorage: if not object_path.exists(): raise ObjectNotFoundError("Object does not exist") - meta_file = self._metadata_file(bucket_path.name, safe_key) - - existing_payload: Dict[str, Any] = {} - if meta_file.exists(): - try: - existing_payload = json.loads(meta_file.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - pass - + bucket_id = bucket_path.name + existing_entry = self._read_index_entry(bucket_id, safe_key) or {} + if not existing_entry: + meta_file = self._metadata_file(bucket_id, safe_key) + if meta_file.exists(): + try: + existing_entry = json.loads(meta_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + if tags: - existing_payload["tags"] = tags + existing_entry["tags"] = tags else: - existing_payload.pop("tags", None) - - if existing_payload.get("metadata") or existing_payload.get("tags"): - meta_file.parent.mkdir(parents=True, exist_ok=True) - meta_file.write_text(json.dumps(existing_payload), encoding="utf-8") - elif meta_file.exists(): - meta_file.unlink() - parent = meta_file.parent - meta_root = self._bucket_meta_root(bucket_path.name) - while parent != meta_root and parent.exists() and not any(parent.iterdir()): - parent.rmdir() - parent = parent.parent + existing_entry.pop("tags", None) + + if existing_entry.get("metadata") or existing_entry.get("tags"): + self._write_index_entry(bucket_id, safe_key, existing_entry) + else: + self._delete_index_entry(bucket_id, safe_key) + old_meta = self._metadata_file(bucket_id, safe_key) + try: + if old_meta.exists(): + old_meta.unlink() + except OSError: + pass def delete_object_tags(self, bucket_name: str, object_key: str) -> None: """Delete all tags from an object.""" @@ -1529,7 +1535,7 @@ class ObjectStorage: if entry.is_dir(follow_symlinks=False): if check_newer(entry.path): return True - elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): + elif entry.is_file(follow_symlinks=False) and (entry.name.endswith('.meta.json') or entry.name == '_index.json'): if entry.stat().st_mtime > index_mtime: return True except OSError: @@ -1543,22 +1549,50 @@ class ObjectStorage: meta_str = str(meta_root) meta_len = len(meta_str) + 1 meta_files: list[tuple[str, str]] = [] - + index_files: list[str] = [] + def collect_meta_files(dir_path: str) -> None: try: with os.scandir(dir_path) as it: for entry in it: if entry.is_dir(follow_symlinks=False): collect_meta_files(entry.path) - elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): - rel = entry.path[meta_len:] - key = rel[:-10].replace(os.sep, '/') - meta_files.append((key, entry.path)) + elif entry.is_file(follow_symlinks=False): + if entry.name == '_index.json': + index_files.append(entry.path) + elif entry.name.endswith('.meta.json'): + rel = entry.path[meta_len:] + key = rel[:-10].replace(os.sep, '/') + meta_files.append((key, entry.path)) except OSError: pass - + collect_meta_files(meta_str) - + + meta_cache = {} + + for idx_path in index_files: + try: + with open(idx_path, 'r', encoding='utf-8') as f: + idx_data = json.load(f) + rel_dir = idx_path[meta_len:] + rel_dir = rel_dir.replace(os.sep, '/') + if rel_dir.endswith('/_index.json'): + dir_prefix = rel_dir[:-len('/_index.json')] + else: + dir_prefix = '' + for entry_name, entry_data in idx_data.items(): + if dir_prefix: + key = f"{dir_prefix}/{entry_name}" + else: + key = entry_name + meta = entry_data.get("metadata", {}) + etag = meta.get("__etag__") + if etag: + meta_cache[key] = etag + except (OSError, json.JSONDecodeError): + pass + def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]: key, path = item try: @@ -1575,15 +1609,16 @@ class ObjectStorage: return key, None except (OSError, UnicodeDecodeError): return key, None - - if meta_files: - meta_cache = {} - max_workers = min((os.cpu_count() or 4) * 2, len(meta_files), 16) + + legacy_meta_files = [(k, p) for k, p in meta_files if k not in meta_cache] + if legacy_meta_files: + max_workers = min((os.cpu_count() or 4) * 2, len(legacy_meta_files), 16) with ThreadPoolExecutor(max_workers=max_workers) as executor: - for key, etag in executor.map(read_meta_file, meta_files): + for key, etag in executor.map(read_meta_file, legacy_meta_files): if etag: meta_cache[key] = etag - + + if meta_cache: try: etag_index_path.parent.mkdir(parents=True, exist_ok=True) with open(etag_index_path, 'w', encoding='utf-8') as f: @@ -1833,6 +1868,64 @@ class ObjectStorage: meta_rel = Path(key.as_posix() + ".meta.json") return meta_root / meta_rel + def _index_file_for_key(self, bucket_name: str, key: Path) -> tuple[Path, str]: + meta_root = self._bucket_meta_root(bucket_name) + parent = key.parent + entry_name = key.name + if parent == Path("."): + return meta_root / "_index.json", entry_name + return meta_root / parent / "_index.json", entry_name + + def _get_meta_index_lock(self, index_path: str) -> threading.Lock: + with self._cache_lock: + if index_path not in self._meta_index_locks: + self._meta_index_locks[index_path] = threading.Lock() + return self._meta_index_locks[index_path] + + def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]: + index_path, entry_name = self._index_file_for_key(bucket_name, key) + if not index_path.exists(): + return None + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + return index_data.get(entry_name) + except (OSError, json.JSONDecodeError): + return None + + def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None: + index_path, entry_name = self._index_file_for_key(bucket_name, key) + lock = self._get_meta_index_lock(str(index_path)) + with lock: + index_path.parent.mkdir(parents=True, exist_ok=True) + index_data: Dict[str, Any] = {} + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + index_data[entry_name] = entry + index_path.write_text(json.dumps(index_data), encoding="utf-8") + + def _delete_index_entry(self, bucket_name: str, key: Path) -> None: + index_path, entry_name = self._index_file_for_key(bucket_name, key) + if not index_path.exists(): + return + lock = self._get_meta_index_lock(str(index_path)) + with lock: + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return + if entry_name in index_data: + del index_data[entry_name] + if index_data: + index_path.write_text(json.dumps(index_data), encoding="utf-8") + else: + try: + index_path.unlink() + except OSError: + pass + def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]: if not metadata: return None @@ -1844,9 +1937,13 @@ class ObjectStorage: if not clean: self._delete_metadata(bucket_name, key) return - meta_file = self._metadata_file(bucket_name, key) - meta_file.parent.mkdir(parents=True, exist_ok=True) - meta_file.write_text(json.dumps({"metadata": clean}), encoding="utf-8") + self._write_index_entry(bucket_name, key, {"metadata": clean}) + old_meta = self._metadata_file(bucket_name, key) + try: + if old_meta.exists(): + old_meta.unlink() + except OSError: + pass def _archive_current_version(self, bucket_name: str, key: Path, *, reason: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -1873,6 +1970,10 @@ class ObjectStorage: manifest_path.write_text(json.dumps(record), encoding="utf-8") def _read_metadata(self, bucket_name: str, key: Path) -> Dict[str, str]: + entry = self._read_index_entry(bucket_name, key) + if entry is not None: + data = entry.get("metadata") + return data if isinstance(data, dict) else {} for meta_file in (self._metadata_file(bucket_name, key), self._legacy_metadata_file(bucket_name, key)): if not meta_file.exists(): continue @@ -1903,6 +2004,7 @@ class ObjectStorage: raise StorageError(message) from last_error def _delete_metadata(self, bucket_name: str, key: Path) -> None: + self._delete_index_entry(bucket_name, key) locations = ( (self._metadata_file(bucket_name, key), self._bucket_meta_root(bucket_name)), (self._legacy_metadata_file(bucket_name, key), self._legacy_meta_root(bucket_name)), diff --git a/app/ui.py b/app/ui.py index 06141e7..1c23730 100644 --- a/app/ui.py +++ b/app/ui.py @@ -13,7 +13,7 @@ from zoneinfo import ZoneInfo import boto3 import requests -from botocore.exceptions import ClientError +from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError from flask import ( Blueprint, Response, @@ -36,7 +36,18 @@ from .extensions import limiter, csrf from .iam import IamError from .kms import KMSManager from .replication import ReplicationManager, ReplicationRule -from .s3_api import _generate_presigned_url +from .s3_client import ( + get_session_s3_client, + get_upload_registry, + handle_client_error, + handle_connection_error, + build_url_templates, + translate_list_objects, + get_versioning_via_s3, + stream_objects_ndjson, + format_datetime_display as _s3_format_display, + format_datetime_iso as _s3_format_iso, +) from .secret_store import EphemeralSecretStore from .site_registry import SiteRegistry, SiteInfo, PeerSite from .storage import ObjectStorage, StorageError @@ -337,19 +348,39 @@ def docs_page(): @ui_bp.get("/") def buckets_overview(): principal = _current_principal() - buckets = _storage().list_buckets() - allowed_names = set(_iam().buckets_for_principal(principal, [b.name for b in buckets])) + try: + client = get_session_s3_client() + resp = client.list_buckets() + bucket_names = [b["Name"] for b in resp.get("Buckets", [])] + bucket_creation = {b["Name"]: b.get("CreationDate") for b in resp.get("Buckets", [])} + except PermissionError: + return redirect(url_for("ui.login")) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + flash(exc.response.get("Error", {}).get("Message", "S3 operation failed"), "danger") + else: + flash("S3 API server is unreachable. Ensure the API server is running.", "danger") + return render_template("buckets.html", buckets=[], principal=principal) + + allowed_names = set(_iam().buckets_for_principal(principal, bucket_names)) visible_buckets = [] policy_store = _bucket_policies() - for bucket in buckets: - if bucket.name not in allowed_names: + for name in bucket_names: + if name not in allowed_names: continue - policy = policy_store.get_policy(bucket.name) + policy = policy_store.get_policy(name) cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) - stats = _storage().bucket_stats(bucket.name, cache_ttl=cache_ttl) + stats = _storage().bucket_stats(name, cache_ttl=cache_ttl) access_label, access_badge = _bucket_access_descriptor(policy) + + class _BucketMeta: + def __init__(self, n, cd): + self.name = n + self.creation_date = cd + meta = _BucketMeta(name, bucket_creation.get(name)) + visible_buckets.append({ - "meta": bucket, + "meta": meta, "summary": { "objects": stats["total_objects"], "total_bytes": stats["total_bytes"], @@ -358,7 +389,7 @@ def buckets_overview(): "access_label": access_label, "access_badge": access_badge, "has_policy": bool(policy), - "detail_url": url_for("ui.bucket_detail", bucket_name=bucket.name), + "detail_url": url_for("ui.bucket_detail", bucket_name=name), }) return render_template("buckets.html", buckets=visible_buckets, principal=principal) @@ -377,14 +408,28 @@ def create_bucket(): return redirect(url_for("ui.buckets_overview")) try: _authorize_ui(principal, bucket_name, "write") - _storage().create_bucket(bucket_name) + client = get_session_s3_client() + client.create_bucket(Bucket=bucket_name) if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' created", "bucket_name": bucket_name}) flash(f"Bucket '{bucket_name}' created", "success") - except (StorageError, FileExistsError, IamError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + else: + msg = "S3 API server is unreachable" + if _wants_json(): + return jsonify({"error": msg}), 502 + flash(msg, "danger") return redirect(url_for("ui.buckets_overview")) @@ -506,89 +551,47 @@ def bucket_detail(bucket_name: str): @ui_bp.get("/buckets//objects") def list_bucket_objects(bucket_name: str): - """API endpoint for paginated object listing.""" principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: - max_keys = min(int(request.args.get("max_keys", 1000)), 100000) + max_keys = max(1, min(int(request.args.get("max_keys", 1000)), 100000)) except ValueError: return jsonify({"error": "max_keys must be an integer"}), 400 continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None try: - result = storage.list_objects( - bucket_name, - max_keys=max_keys, - continuation_token=continuation_token, - prefix=prefix, - ) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + client = get_session_s3_client() + kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": max_keys} + if continuation_token: + kwargs["ContinuationToken"] = continuation_token + if prefix: + kwargs["Prefix"] = prefix + boto_resp = client.list_objects_v2(**kwargs) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) - try: - versioning_enabled = storage.is_versioning_enabled(bucket_name) - except StorageError: - versioning_enabled = False - - preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") - tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - - objects_data = [] - for obj in result.objects: - objects_data.append({ - "key": obj.key, - "size": obj.size, - "last_modified": obj.last_modified.isoformat(), - "last_modified_display": _format_datetime_display(obj.last_modified), - "last_modified_iso": _format_datetime_iso(obj.last_modified), - "etag": obj.etag, - }) - - response = jsonify({ - "objects": objects_data, - "is_truncated": result.is_truncated, - "next_continuation_token": result.next_continuation_token, - "total_count": result.total_count, - "versioning_enabled": versioning_enabled, - "url_templates": { - "preview": preview_template, - "download": preview_template + "?download=1", - "presign": presign_template, - "delete": delete_template, - "versions": versions_template, - "restore": restore_template, - "tags": tags_template, - "copy": copy_template, - "move": move_template, - "metadata": metadata_template, - }, - }) + versioning_enabled = get_versioning_via_s3(client, bucket_name) + url_templates = build_url_templates(bucket_name) + display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") + data = translate_list_objects(boto_resp, url_templates, display_tz, versioning_enabled) + response = jsonify(data) response.headers["Cache-Control"] = "no-store" return response @ui_bp.get("/buckets//objects/stream") def stream_bucket_objects(bucket_name: str): - """Streaming NDJSON endpoint for progressive object listing. - - Streams objects as newline-delimited JSON for fast progressive rendering. - First line is metadata, subsequent lines are objects. - """ principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: @@ -597,79 +600,18 @@ def stream_bucket_objects(bucket_name: str): prefix = request.args.get("prefix") or None try: - versioning_enabled = storage.is_versioning_enabled(bucket_name) - except StorageError: - versioning_enabled = False + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 - preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") - tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versioning_enabled = get_versioning_via_s3(client, bucket_name) + url_templates = build_url_templates(bucket_name) display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") - def generate(): - meta_line = json.dumps({ - "type": "meta", - "versioning_enabled": versioning_enabled, - "url_templates": { - "preview": preview_template, - "download": preview_template + "?download=1", - "presign": presign_template, - "delete": delete_template, - "versions": versions_template, - "restore": restore_template, - "tags": tags_template, - "copy": copy_template, - "move": move_template, - "metadata": metadata_template, - }, - }) + "\n" - yield meta_line - - continuation_token = None - total_count = None - batch_size = 5000 - - while True: - try: - result = storage.list_objects( - bucket_name, - max_keys=batch_size, - continuation_token=continuation_token, - prefix=prefix, - ) - except StorageError as exc: - yield json.dumps({"type": "error", "error": str(exc)}) + "\n" - return - - if total_count is None: - total_count = result.total_count - yield json.dumps({"type": "count", "total_count": total_count}) + "\n" - - for obj in result.objects: - yield json.dumps({ - "type": "object", - "key": obj.key, - "size": obj.size, - "last_modified": obj.last_modified.isoformat(), - "last_modified_display": _format_datetime_display(obj.last_modified, display_tz), - "last_modified_iso": _format_datetime_iso(obj.last_modified, display_tz), - "etag": obj.etag, - }) + "\n" - - if not result.is_truncated: - break - continuation_token = result.next_continuation_token - - yield json.dumps({"type": "done"}) + "\n" - return Response( - generate(), + stream_objects_ndjson( + client, bucket_name, prefix, url_templates, display_tz, versioning_enabled, + ), mimetype='application/x-ndjson', headers={ 'Cache-Control': 'no-cache', @@ -714,15 +656,32 @@ def upload_object(bucket_name: str): try: _authorize_ui(principal, bucket_name, "write") - _storage().put_object(bucket_name, object_key, file.stream, metadata=metadata) + client = get_session_s3_client() + put_kwargs: dict[str, Any] = { + "Bucket": bucket_name, + "Key": object_key, + "Body": file.stream, + } + if file.content_type: + put_kwargs["ContentType"] = file.content_type + if metadata: + put_kwargs["Metadata"] = metadata + client.put_object(**put_kwargs) _replication().trigger_replication(bucket_name, object_key) - + message = f"Uploaded '{object_key}'" if metadata: message += " with metadata" return _response(True, message) - except (StorageError, IamError) as exc: + except PermissionError as exc: + return _response(False, str(exc), 401) + except IamError as exc: return _response(False, _friendly_error_message(exc), 400) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _response(False, err["error"], status) + return _response(False, "S3 API server is unreachable", 502) @ui_bp.post("/buckets//multipart/initiate") @@ -736,6 +695,11 @@ def initiate_multipart_upload(bucket_name: str): object_key = str(payload.get("object_key", "")).strip() if not object_key: return jsonify({"error": "object_key is required"}), 400 + if "\x00" in object_key: + return jsonify({"error": "Object key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(object_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Object key exceeds maximum length of {max_key_len} bytes"}), 400 metadata_payload = payload.get("metadata") metadata = None if metadata_payload is not None: @@ -743,10 +707,21 @@ def initiate_multipart_upload(bucket_name: str): return jsonify({"error": "metadata must be an object"}), 400 metadata = {str(k): str(v) for k, v in metadata_payload.items()} try: - upload_id = _storage().initiate_multipart_upload(bucket_name, object_key, metadata=metadata) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"upload_id": upload_id}) + client = get_session_s3_client() + create_kwargs: dict[str, Any] = {"Bucket": bucket_name, "Key": object_key} + if metadata: + create_kwargs["Metadata"] = metadata + resp = client.create_multipart_upload(**create_kwargs) + upload_id = resp["UploadId"] + get_upload_registry().register(upload_id, bucket_name, object_key) + return jsonify({"upload_id": upload_id}) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.put("/buckets//multipart//parts") @@ -762,17 +737,30 @@ def upload_multipart_part(bucket_name: str, upload_id: str): part_number = int(request.args.get("partNumber", "0")) except ValueError: return jsonify({"error": "partNumber must be an integer"}), 400 - if part_number < 1: - return jsonify({"error": "partNumber must be >= 1"}), 400 + if part_number < 1 or part_number > 10000: + return jsonify({"error": "partNumber must be between 1 and 10000"}), 400 + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: data = request.get_data() if not data: return jsonify({"error": "Empty request body"}), 400 - stream = io.BytesIO(data) - etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"etag": etag, "part_number": part_number}) + client = get_session_s3_client() + resp = client.upload_part( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + PartNumber=part_number, + Body=data, + ) + etag = resp.get("ETag", "").strip('"') + return jsonify({"etag": etag, "part_number": part_number}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//multipart//complete") @@ -796,19 +784,37 @@ def complete_multipart_upload(bucket_name: str, upload_id: str): except (TypeError, ValueError): return jsonify({"error": "Each part must include part_number"}), 400 etag = str(part.get("etag") or part.get("ETag") or "").strip() - normalized.append({"part_number": number, "etag": etag}) + normalized.append({"PartNumber": number, "ETag": etag}) + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: - result = _storage().complete_multipart_upload(bucket_name, upload_id, normalized) - _replication().trigger_replication(bucket_name, result.key) - + client = get_session_s3_client() + resp = client.complete_multipart_upload( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + MultipartUpload={"Parts": normalized}, + ) + get_upload_registry().remove(upload_id) + result_key = resp.get("Key", object_key) + _replication().trigger_replication(bucket_name, result_key) return jsonify({ - "key": result.key, - "size": result.size, - "etag": result.etag, - "last_modified": result.last_modified.isoformat() if result.last_modified else None, + "key": result_key, + "size": 0, + "etag": resp.get("ETag", "").strip('"'), + "last_modified": None, }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchUpload",): + get_upload_registry().remove(upload_id) + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.delete("/buckets//multipart/") @@ -818,10 +824,21 @@ def abort_multipart_upload(bucket_name: str, upload_id: str): _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: - _storage().abort_multipart_upload(bucket_name, upload_id) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + client = get_session_s3_client() + client.abort_multipart_upload(Bucket=bucket_name, Key=object_key, UploadId=upload_id) + get_upload_registry().remove(upload_id) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchUpload",): + get_upload_registry().remove(upload_id) + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "aborted"}) @@ -831,16 +848,36 @@ def delete_bucket(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "delete") - _storage().delete_bucket(bucket_name) - _bucket_policies().delete_policy(bucket_name) - _replication_manager().delete_rule(bucket_name) + client = get_session_s3_client() + client.delete_bucket(Bucket=bucket_name) + try: + _bucket_policies().delete_policy(bucket_name) + except Exception: + pass + try: + _replication_manager().delete_rule(bucket_name) + except Exception: + pass if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' removed"}) flash(f"Bucket '{bucket_name}' removed", "success") - except (StorageError, IamError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + else: + msg = "S3 API server is unreachable" + if _wants_json(): + return jsonify({"error": msg}), 502 + flash(msg, "danger") return redirect(url_for("ui.buckets_overview")) @@ -855,16 +892,31 @@ def delete_object(bucket_name: str, object_key: str): _storage().purge_object(bucket_name, object_key) message = f"Permanently deleted '{object_key}' and all versions" else: - _storage().delete_object(bucket_name, object_key) + client = get_session_s3_client() + client.delete_object(Bucket=bucket_name, Key=object_key) _replication_manager().trigger_replication(bucket_name, object_key, action="delete") message = f"Deleted '{object_key}'" if _wants_json(): return jsonify({"success": True, "message": message}) flash(message, "success") - except (IamError, StorageError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except StorageError as exc: + if _wants_json(): + return jsonify({"error": _friendly_error_message(exc)}), 400 + flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @@ -908,24 +960,50 @@ def bulk_delete_objects(bucket_name: str): return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) - storage = _storage() try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: return _respond(False, _friendly_error_message(exc), status_code=403) - deleted: list[str] = [] - errors: list[dict[str, str]] = [] + authorized_keys = [] + denied_keys = [] for key in unique_keys: try: - if purge_versions: + _authorize_ui(principal, bucket_name, "delete", object_key=key) + authorized_keys.append(key) + except IamError: + denied_keys.append(key) + if not authorized_keys: + return _respond(False, "Access denied for all selected objects", status_code=403) + unique_keys = authorized_keys + + if purge_versions: + storage = _storage() + deleted: list[str] = [] + errors: list[dict[str, str]] = [] + for key in unique_keys: + try: storage.purge_object(bucket_name, key) - else: - storage.delete_object(bucket_name, key) + deleted.append(key) + except StorageError as exc: + errors.append({"key": key, "error": str(exc)}) + else: + try: + client = get_session_s3_client() + objects_to_delete = [{"Key": k} for k in unique_keys] + resp = client.delete_objects( + Bucket=bucket_name, + Delete={"Objects": objects_to_delete, "Quiet": False}, + ) + deleted = [d["Key"] for d in resp.get("Deleted", [])] + errors = [{"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])] + for key in deleted: _replication_manager().trigger_replication(bucket_name, key, action="delete") - deleted.append(key) - except StorageError as exc: - errors.append({"key": key, "error": str(exc)}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _respond(False, err["error"], status_code=status) + return _respond(False, "S3 API server is unreachable", status_code=502) if not deleted and errors: return _respond(False, "Unable to delete the selected objects", deleted=deleted, errors=errors, status_code=400) @@ -1038,35 +1116,83 @@ def purge_object_versions(bucket_name: str, object_key: str): @ui_bp.get("/buckets//objects//preview") def object_preview(bucket_name: str, object_key: str) -> Response: + import mimetypes as _mimetypes principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) - path = storage.get_object_path(bucket_name, object_key) - metadata = storage.get_object_metadata(bucket_name, object_key) - except (StorageError, IamError) as exc: - status = 403 if isinstance(exc, IamError) else 404 - return Response(str(exc), status=status) - + except IamError as exc: + return Response(str(exc), status=403) + download = request.args.get("download") == "1" - - is_encrypted = "x-amz-server-side-encryption" in metadata - if is_encrypted and hasattr(storage, 'get_object_data'): + raw_filename = object_key.rsplit("/", 1)[-1] or object_key + safe_filename = raw_filename.replace('"', "'").replace("\\", "_") + safe_filename = "".join(c for c in safe_filename if c.isprintable() and c not in "\r\n") + if not safe_filename: + safe_filename = "download" + try: + safe_filename.encode("latin-1") + ascii_safe = True + except UnicodeEncodeError: + ascii_safe = False + + range_header = request.headers.get("Range") + + try: + client = get_session_s3_client() + get_kwargs: dict[str, Any] = {"Bucket": bucket_name, "Key": object_key} + if range_header: + get_kwargs["Range"] = range_header + resp = client.get_object(**get_kwargs) + except PermissionError as exc: + return Response(str(exc), status=401) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + status = 404 if code == "NoSuchKey" else 400 + return Response(exc.response.get("Error", {}).get("Message", "S3 operation failed"), status=status) + except (EndpointConnectionError, ConnectionClosedError): + return Response("S3 API server is unreachable", status=502) + + content_type = resp.get("ContentType") or _mimetypes.guess_type(object_key)[0] or "application/octet-stream" + content_length = resp.get("ContentLength", 0) + body_stream = resp["Body"] + is_partial = resp.get("ResponseMetadata", {}).get("HTTPStatusCode") == 206 + content_range = resp.get("ContentRange") + + _DANGEROUS_TYPES = { + "text/html", "text/xml", "application/xhtml+xml", + "application/xml", "image/svg+xml", + } + base_ct = content_type.split(";")[0].strip().lower() + if not download and base_ct in _DANGEROUS_TYPES: + content_type = "text/plain; charset=utf-8" + + def generate(): try: - data, _ = storage.get_object_data(bucket_name, object_key) - import io - import mimetypes - mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" - return send_file( - io.BytesIO(data), - mimetype=mimetype, - as_attachment=download, - download_name=path.name - ) - except StorageError as exc: - return Response(f"Decryption failed: {exc}", status=500) - - return send_file(path, as_attachment=download, download_name=path.name) + for chunk in body_stream.iter_chunks(chunk_size=65536): + yield chunk + finally: + body_stream.close() + + status_code = 206 if is_partial else 200 + headers = { + "Content-Type": content_type, + "X-Content-Type-Options": "nosniff", + "Accept-Ranges": "bytes", + } + if content_length: + headers["Content-Length"] = str(content_length) + if content_range: + headers["Content-Range"] = content_range + disposition = "attachment" if download else "inline" + if ascii_safe: + headers["Content-Disposition"] = f'{disposition}; filename="{safe_filename}"' + else: + from urllib.parse import quote + encoded = quote(safe_filename, safe="") + ascii_fallback = safe_filename.encode("ascii", "replace").decode("ascii").replace("?", "_") + headers["Content-Disposition"] = f'{disposition}; filename="{ascii_fallback}"; filename*=UTF-8\'\'{encoded}' + + return Response(generate(), status=status_code, headers=headers) @ui_bp.post("/buckets//objects//presign") @@ -1089,25 +1215,24 @@ def object_presign(bucket_name: str, object_key: str): min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) expires = max(min_expiry, min(expires, max_expiry)) - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 - if action != "write": - try: - storage.get_object_path(bucket_name, object_key) - except StorageError: - return jsonify({"error": "Object not found"}), 404 - secret = _iam().secret_for_key(principal.access_key) - api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" - url = _generate_presigned_url( - principal=principal, - secret_key=secret, - method=method, - bucket_name=bucket_name, - object_key=object_key, - expires_in=expires, - api_base_url=api_base, - ) + + method_to_client_method = {"GET": "get_object", "PUT": "put_object", "DELETE": "delete_object"} + client_method = method_to_client_method[method] + + try: + client = get_session_s3_client() + url = client.generate_presigned_url( + ClientMethod=client_method, + Params={"Bucket": bucket_name, "Key": object_key}, + ExpiresIn=expires, + ) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) current_app.logger.info( "Presigned URL generated", extra={"bucket": bucket_name, "key": object_key, "method": method}, @@ -1118,15 +1243,31 @@ def object_presign(bucket_name: str, object_key: str): @ui_bp.get("/buckets//objects//metadata") def object_metadata(bucket_name: str, object_key: str): principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) - metadata = storage.get_object_metadata(bucket_name, object_key) - return jsonify({"metadata": metadata}) except IamError as exc: return jsonify({"error": str(exc)}), 403 - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 + try: + client = get_session_s3_client() + resp = client.head_object(Bucket=bucket_name, Key=object_key) + metadata = resp.get("Metadata", {}) + if resp.get("ContentType"): + metadata["Content-Type"] = resp["ContentType"] + if resp.get("ContentLength") is not None: + metadata["Content-Length"] = str(resp["ContentLength"]) + if resp.get("ServerSideEncryption"): + metadata["x-amz-server-side-encryption"] = resp["ServerSideEncryption"] + return jsonify({"metadata": metadata}) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchKey", "404", "NotFound"): + return jsonify({"error": "Object not found"}), 404 + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) @ui_bp.get("/buckets//objects//versions") @@ -1137,10 +1278,25 @@ def object_versions(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 try: - versions = _storage().list_object_versions(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"versions": versions}) + client = get_session_s3_client() + resp = client.list_object_versions(Bucket=bucket_name, Prefix=object_key, MaxKeys=1000) + versions = [] + for v in resp.get("Versions", []): + if v.get("Key") != object_key: + continue + versions.append({ + "version_id": v.get("VersionId", ""), + "last_modified": v["LastModified"].isoformat() if v.get("LastModified") else None, + "size": v.get("Size", 0), + "etag": v.get("ETag", "").strip('"'), + "is_latest": v.get("IsLatest", False), + }) + return jsonify({"versions": versions}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.get("/buckets//archived") @@ -1206,13 +1362,32 @@ def update_bucket_policy(bucket_name: str): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) - store = _bucket_policies() + + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + if _wants_json(): + return jsonify({"error": str(exc)}), 403 + flash(str(exc), "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) + if action == "delete": - store.delete_policy(bucket_name) + try: + client.delete_bucket_policy(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) if _wants_json(): return jsonify({"success": True, "message": "Bucket policy removed"}) flash("Bucket policy removed", "info") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) + document = request.form.get("policy_document", "").strip() if not document: if _wants_json(): @@ -1220,15 +1395,25 @@ def update_bucket_policy(bucket_name: str): flash("Provide a JSON policy document", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) try: - payload = json.loads(document) - store.set_policy(bucket_name, payload) - if _wants_json(): - return jsonify({"success": True, "message": "Bucket policy saved"}) - flash("Bucket policy saved", "success") - except (json.JSONDecodeError, ValueError) as exc: + json.loads(document) + except json.JSONDecodeError as exc: if _wants_json(): return jsonify({"error": f"Policy error: {exc}"}), 400 flash(f"Policy error: {exc}", "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) + try: + client.put_bucket_policy(Bucket=bucket_name, Policy=document) + if _wants_json(): + return jsonify({"success": True, "message": "Bucket policy saved"}) + flash("Bucket policy saved", "success") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) @@ -1243,13 +1428,26 @@ def update_bucket_versioning(bucket_name: str): flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) state = request.form.get("state", "enable") + if state not in ("enable", "suspend"): + if _wants_json(): + return jsonify({"error": "state must be 'enable' or 'suspend'"}), 400 + flash("Invalid versioning state", "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) enable = state == "enable" try: - _storage().set_bucket_versioning(bucket_name, enable) - except StorageError as exc: + client = get_session_s3_client() + client.put_bucket_versioning( + Bucket=bucket_name, + VersioningConfiguration={"Status": "Enabled" if enable else "Suspended"}, + ) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) message = "Versioning enabled" if enable else "Versioning suspended" if _wants_json(): @@ -1344,7 +1542,6 @@ def update_bucket_quota(bucket_name: str): @ui_bp.post("/buckets//encryption") def update_bucket_encryption(bucket_name: str): - """Update bucket default encryption configuration.""" principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") @@ -1358,14 +1555,19 @@ def update_bucket_encryption(bucket_name: str): if action == "disable": try: - _storage().set_bucket_encryption(bucket_name, None) + client = get_session_s3_client() + client.delete_bucket_encryption(Bucket=bucket_name) if _wants_json(): return jsonify({"success": True, "message": "Default encryption disabled", "enabled": False}) flash("Default encryption disabled", "info") - except StorageError as exc: + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) algorithm = request.form.get("algorithm", "AES256") @@ -1377,21 +1579,18 @@ def update_bucket_encryption(bucket_name: str): flash("Invalid encryption algorithm", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - encryption_config: dict[str, Any] = { - "Rules": [ - { - "ApplyServerSideEncryptionByDefault": { - "SSEAlgorithm": algorithm, - } - } - ] - } - + sse_rule: dict[str, Any] = {"SSEAlgorithm": algorithm} if algorithm == "aws:kms" and kms_key_id: - encryption_config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] = kms_key_id + sse_rule["KMSMasterKeyID"] = kms_key_id try: - _storage().set_bucket_encryption(bucket_name, encryption_config) + client = get_session_s3_client() + client.put_bucket_encryption( + Bucket=bucket_name, + ServerSideEncryptionConfiguration={ + "Rules": [{"ApplyServerSideEncryptionByDefault": sse_rule}] + }, + ) if algorithm == "aws:kms": message = "Default KMS encryption enabled" else: @@ -1399,10 +1598,14 @@ def update_bucket_encryption(bucket_name: str): if _wants_json(): return jsonify({"success": True, "message": message, "enabled": True, "algorithm": algorithm}) flash(message, "success") - except StorageError as exc: + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @@ -2314,16 +2517,34 @@ def bucket_lifecycle(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": - rules = storage.get_bucket_lifecycle(bucket_name) or [] + try: + resp = client.get_bucket_lifecycle_configuration(Bucket=bucket_name) + rules = resp.get("Rules", []) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchLifecycleConfiguration": + rules = [] + else: + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) return jsonify({"rules": rules}) if request.method == "DELETE": - storage.set_bucket_lifecycle(bucket_name, None) + try: + client.delete_bucket_lifecycle(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "Lifecycle configuration deleted"}) payload = request.get_json(silent=True) or {} @@ -2339,8 +2560,11 @@ def bucket_lifecycle(bucket_name: str): "ID": str(rule.get("ID", f"rule-{i+1}")), "Status": "Enabled" if rule.get("Status", "Enabled") == "Enabled" else "Disabled", } + filt = {} if rule.get("Prefix"): - validated["Prefix"] = str(rule["Prefix"]) + filt["Prefix"] = str(rule["Prefix"]) + if filt: + validated["Filter"] = filt if rule.get("Expiration"): exp = rule["Expiration"] if isinstance(exp, dict) and exp.get("Days"): @@ -2355,7 +2579,19 @@ def bucket_lifecycle(bucket_name: str): validated["AbortIncompleteMultipartUpload"] = {"DaysAfterInitiation": int(aimu["DaysAfterInitiation"])} validated_rules.append(validated) - storage.set_bucket_lifecycle(bucket_name, validated_rules if validated_rules else None) + try: + if validated_rules: + client.put_bucket_lifecycle_configuration( + Bucket=bucket_name, + LifecycleConfiguration={"Rules": validated_rules}, + ) + else: + client.delete_bucket_lifecycle(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules}) @@ -2398,16 +2634,34 @@ def bucket_cors(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": - rules = storage.get_bucket_cors(bucket_name) or [] + try: + resp = client.get_bucket_cors(Bucket=bucket_name) + rules = resp.get("CORSRules", []) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchCORSConfiguration": + rules = [] + else: + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) return jsonify({"rules": rules}) if request.method == "DELETE": - storage.set_bucket_cors(bucket_name, None) + try: + client.delete_bucket_cors(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "CORS configuration deleted"}) payload = request.get_json(silent=True) or {} @@ -2438,7 +2692,19 @@ def bucket_cors(bucket_name: str): pass validated_rules.append(validated) - storage.set_bucket_cors(bucket_name, validated_rules if validated_rules else None) + try: + if validated_rules: + client.put_bucket_cors( + Bucket=bucket_name, + CORSConfiguration={"CORSRules": validated_rules}, + ) + else: + client.delete_bucket_cors(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "CORS configuration saved", "rules": validated_rules}) @@ -2451,33 +2717,57 @@ def bucket_acl(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 - acl_service = _acl() owner_id = principal.access_key if principal else "anonymous" if request.method == "GET": try: - acl = acl_service.get_bucket_acl(bucket_name) - if not acl: - acl = create_canned_acl("private", owner_id) + resp = client.get_bucket_acl(Bucket=bucket_name) + owner = resp.get("Owner", {}).get("ID", owner_id) + grants = [] + for grant in resp.get("Grants", []): + grantee = grant.get("Grantee", {}) + grantee_display = grantee.get("DisplayName") or grantee.get("ID", "") + if not grantee_display: + uri = grantee.get("URI", "") + if "AllUsers" in uri: + grantee_display = "Everyone (public)" + elif "AuthenticatedUsers" in uri: + grantee_display = "Authenticated users" + else: + grantee_display = uri or "unknown" + grants.append({ + "grantee": grantee_display, + "permission": grant.get("Permission", ""), + }) return jsonify({ - "owner": acl.owner, - "grants": [g.to_dict() for g in acl.grants], + "owner": owner, + "grants": grants, "canned_acls": list(CANNED_ACLS.keys()), }) - except Exception as exc: - return jsonify({"error": str(exc)}), 500 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) payload = request.get_json(silent=True) or {} canned_acl = payload.get("canned_acl") if canned_acl: if canned_acl not in CANNED_ACLS: return jsonify({"error": f"Invalid canned ACL: {canned_acl}"}), 400 - acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id) - return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"}) + try: + client.put_bucket_acl(Bucket=bucket_name, ACL=canned_acl) + return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"error": "canned_acl is required"}), 400 @@ -2490,14 +2780,24 @@ def object_tags(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": try: - tags = storage.get_object_tags(bucket_name, object_key) + resp = client.get_object_tagging(Bucket=bucket_name, Key=object_key) + tags = resp.get("TagSet", []) return jsonify({"tags": tags}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchKey": + return jsonify({"error": "Object not found"}), 404 + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) try: _authorize_ui(principal, bucket_name, "write", object_key=object_key) @@ -2508,22 +2808,34 @@ def object_tags(bucket_name: str, object_key: str): tags = payload.get("tags", []) if not isinstance(tags, list): return jsonify({"error": "tags must be a list"}), 400 - if len(tags) > 10: - return jsonify({"error": "Maximum 10 tags allowed"}), 400 + tag_limit = current_app.config.get("OBJECT_TAG_LIMIT", 50) + if len(tags) > tag_limit: + return jsonify({"error": f"Maximum {tag_limit} tags allowed"}), 400 validated_tags = [] - for tag in tags: - if isinstance(tag, dict) and tag.get("Key"): - validated_tags.append({ - "Key": str(tag["Key"]), - "Value": str(tag.get("Value", "")) - }) + for i, tag in enumerate(tags): + if not isinstance(tag, dict) or not tag.get("Key"): + return jsonify({"error": f"Tag at index {i} must have a Key field"}), 400 + validated_tags.append({ + "Key": str(tag["Key"]), + "Value": str(tag.get("Value", "")) + }) try: - storage.set_object_tags(bucket_name, object_key, validated_tags if validated_tags else None) + if validated_tags: + client.put_object_tagging( + Bucket=bucket_name, + Key=object_key, + Tagging={"TagSet": validated_tags}, + ) + else: + client.delete_object_tagging(Bucket=bucket_name, Key=object_key) return jsonify({"status": "ok", "message": "Tags saved", "tags": validated_tags}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//folders") @@ -2544,15 +2856,28 @@ def create_folder(bucket_name: str): folder_name = folder_name.rstrip("/") if "/" in folder_name: return jsonify({"error": "Folder name cannot contain /"}), 400 + if "\x00" in folder_name or "\x00" in prefix: + return jsonify({"error": "Null bytes not allowed"}), 400 + if ".." in prefix.split("/"): + return jsonify({"error": "Invalid prefix"}), 400 folder_key = f"{prefix}{folder_name}/" if prefix else f"{folder_name}/" - import io + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(folder_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Key exceeds maximum length of {max_key_len} bytes"}), 400 + try: - _storage().put_object(bucket_name, folder_key, io.BytesIO(b"")) + client = get_session_s3_client() + client.put_object(Bucket=bucket_name, Key=folder_key, Body=b"") return jsonify({"status": "ok", "message": f"Folder '{folder_name}' created", "key": folder_key}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//objects//copy") @@ -2569,31 +2894,37 @@ def copy_object(bucket_name: str, object_key: str): if not dest_key: return jsonify({"error": "dest_key is required"}), 400 + if "\x00" in dest_key: + return jsonify({"error": "Destination key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(dest_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Destination key exceeds maximum length of {max_key_len} bytes"}), 400 try: _authorize_ui(principal, dest_bucket, "write", object_key=dest_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - try: - source_path = storage.get_object_path(bucket_name, object_key) - source_metadata = storage.get_object_metadata(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 - - try: - with source_path.open("rb") as stream: - storage.put_object(dest_bucket, dest_key, stream, metadata=source_metadata or None) + client = get_session_s3_client() + client.copy_object( + Bucket=dest_bucket, + Key=dest_key, + CopySource={"Bucket": bucket_name, "Key": object_key}, + ) return jsonify({ "status": "ok", "message": f"Copied to {dest_bucket}/{dest_key}", "dest_bucket": dest_bucket, "dest_key": dest_key, }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//objects//move") @@ -2611,6 +2942,11 @@ def move_object(bucket_name: str, object_key: str): if not dest_key: return jsonify({"error": "dest_key is required"}), 400 + if "\x00" in dest_key: + return jsonify({"error": "Destination key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(dest_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Destination key exceeds maximum length of {max_key_len} bytes"}), 400 if dest_bucket == bucket_name and dest_key == object_key: return jsonify({"error": "Cannot move object to the same location"}), 400 @@ -2620,39 +2956,56 @@ def move_object(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() + try: + client = get_session_s3_client() + client.copy_object( + Bucket=dest_bucket, + Key=dest_key, + CopySource={"Bucket": bucket_name, "Key": object_key}, + ) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) try: - source_path = storage.get_object_path(bucket_name, object_key) - source_metadata = storage.get_object_metadata(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 - - try: - import io - with source_path.open("rb") as f: - data = f.read() - storage.put_object(dest_bucket, dest_key, io.BytesIO(data), metadata=source_metadata or None) - storage.delete_object(bucket_name, object_key) + client.delete_object(Bucket=bucket_name, Key=object_key) + except (ClientError, EndpointConnectionError, ConnectionClosedError): return jsonify({ - "status": "ok", - "message": f"Moved to {dest_bucket}/{dest_key}", + "status": "partial", + "message": f"Copied to {dest_bucket}/{dest_key} but failed to delete source", "dest_bucket": dest_bucket, "dest_key": dest_key, - }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + }), 200 + + return jsonify({ + "status": "ok", + "message": f"Moved to {dest_bucket}/{dest_key}", + "dest_bucket": dest_bucket, + "dest_key": dest_key, + }) @ui_bp.get("/buckets//list-for-copy") def list_buckets_for_copy(bucket_name: str): principal = _current_principal() - buckets = _storage().list_buckets() + try: + client = get_session_s3_client() + resp = client.list_buckets() + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except ClientError as exc: + return jsonify(*handle_client_error(exc)) + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) allowed = [] - for bucket in buckets: + for b in resp.get("Buckets", []): try: - _authorize_ui(principal, bucket.name, "write") - allowed.append(bucket.name) + _authorize_ui(principal, b["Name"], "write") + allowed.append(b["Name"]) except IamError: pass return jsonify({"buckets": allowed}) diff --git a/app/version.py b/app/version.py index f5bfa82..91607b1 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.2.7" +APP_VERSION = "0.2.8" def get_version() -> str: diff --git a/docs.md b/docs.md index d4917d0..4439a92 100644 --- a/docs.md +++ b/docs.md @@ -7,7 +7,7 @@ This document expands on the README to describe the full workflow for running, c MyFSIO ships two Flask entrypoints that share the same storage, IAM, and bucket-policy state: - **API server** – Implements the S3-compatible REST API, policy evaluation, and Signature Version 4 presign service. -- **UI server** – Provides the browser console for buckets, IAM, and policies. It proxies to the API for presign operations. +- **UI server** – Provides the browser console for buckets, IAM, and policies. It proxies all storage operations through the S3 API via boto3 (SigV4-signed), mirroring the architecture used by MinIO and Garage. Both servers read `AppConfig`, so editing JSON stores on disk instantly affects both surfaces. @@ -136,7 +136,7 @@ All configuration is done via environment variables. The table below lists every | `MAX_UPLOAD_SIZE` | `1073741824` (1 GiB) | Bytes. Caps incoming uploads in both API + UI. | | `UI_PAGE_SIZE` | `100` | `MaxKeys` hint shown in listings. | | `SECRET_KEY` | Auto-generated | Flask session key. Auto-generates and persists if not set. **Set explicitly in production.** | -| `API_BASE_URL` | `None` | Public URL for presigned URLs. Required behind proxies. | +| `API_BASE_URL` | `http://127.0.0.1:5000` | Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. | | `AWS_REGION` | `us-east-1` | Region embedded in SigV4 credential scope. | | `AWS_SERVICE` | `s3` | Service string for SigV4. | diff --git a/static/css/main.css b/static/css/main.css index 89a2c5f..0ab8050 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -1288,6 +1288,20 @@ html.sidebar-will-collapse .sidebar-user { padding: 2rem 1rem; } +#preview-text { + padding: 1rem 1.125rem; + max-height: 360px; + overflow: auto; + white-space: pre-wrap; + word-break: break-word; + font-family: 'SFMono-Regular', 'Menlo', 'Consolas', 'Liberation Mono', monospace; + font-size: .8rem; + line-height: 1.6; + tab-size: 4; + color: var(--myfsio-text); + background: transparent; +} + .upload-progress-stack { display: flex; flex-direction: column; diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 0864acc..041298a 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -101,6 +101,7 @@ const previewImage = document.getElementById('preview-image'); const previewVideo = document.getElementById('preview-video'); const previewAudio = document.getElementById('preview-audio'); + const previewText = document.getElementById('preview-text'); const previewIframe = document.getElementById('preview-iframe'); const downloadButton = document.getElementById('downloadButton'); const presignButton = document.getElementById('presignButton'); @@ -657,6 +658,7 @@ streamingComplete = true; flushPendingStreamObjects(); hasMoreObjects = false; + totalObjectCount = loadedObjectCount; updateObjectCountBadge(); if (objectsLoadingRow && objectsLoadingRow.parentNode) { @@ -1894,6 +1896,10 @@ el.setAttribute('src', 'about:blank'); } }); + if (previewText) { + previewText.classList.add('d-none'); + previewText.textContent = ''; + } previewPlaceholder.classList.remove('d-none'); }; @@ -1957,11 +1963,28 @@ previewIframe.style.minHeight = '500px'; previewIframe.classList.remove('d-none'); previewPlaceholder.classList.add('d-none'); - } else if (previewUrl && lower.match(/\.(txt|log|json|md|csv|xml|html|htm|js|ts|py|java|c|cpp|h|css|scss|yaml|yml|toml|ini|cfg|conf|sh|bat)$/)) { - previewIframe.src = previewUrl; - previewIframe.style.minHeight = '200px'; - previewIframe.classList.remove('d-none'); + } else if (previewUrl && previewText && lower.match(/\.(txt|log|json|md|csv|xml|html|htm|js|ts|py|java|c|cpp|h|css|scss|yaml|yml|toml|ini|cfg|conf|sh|bat|rs|go|rb|php|sql|r|swift|kt|scala|pl|lua|zig|ex|exs|hs|erl|ps1|psm1|psd1|fish|zsh|env|properties|gradle|makefile|dockerfile|vagrantfile|gitignore|gitattributes|editorconfig|eslintrc|prettierrc)$/)) { + previewText.textContent = 'Loading\u2026'; + previewText.classList.remove('d-none'); previewPlaceholder.classList.add('d-none'); + const currentRow = row; + fetch(previewUrl) + .then((r) => { + if (!r.ok) throw new Error(r.statusText); + const len = parseInt(r.headers.get('Content-Length') || '0', 10); + if (len > 512 * 1024) { + return r.text().then((t) => t.slice(0, 512 * 1024) + '\n\n--- Truncated (file too large for preview) ---'); + } + return r.text(); + }) + .then((text) => { + if (activeRow !== currentRow) return; + previewText.textContent = text; + }) + .catch(() => { + if (activeRow !== currentRow) return; + previewText.textContent = 'Failed to load preview'; + }); } const metadataUrl = row.dataset.metadataUrl; diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index 2190a11..182e5e7 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -321,7 +321,8 @@ Object preview - +

+            
           
         
       
diff --git a/templates/buckets.html b/templates/buckets.html
index bf185c3..13ea928 100644
--- a/templates/buckets.html
+++ b/templates/buckets.html
@@ -141,7 +141,7 @@
         let visibleCount = 0;
 
         bucketItems.forEach(item => {
-          const name = item.querySelector('.card-title').textContent.toLowerCase();
+          const name = item.querySelector('.bucket-name').textContent.toLowerCase();
           if (name.includes(term)) {
             item.classList.remove('d-none');
             visibleCount++;
diff --git a/templates/docs.html b/templates/docs.html
index 66b9baf..8e52e53 100644
--- a/templates/docs.html
+++ b/templates/docs.html
@@ -97,8 +97,8 @@ python run.py --mode ui
             
               
                 API_BASE_URL
-                None
-                The public URL of the API. Required if running behind a proxy. Ensures presigned URLs are generated correctly.
+                http://127.0.0.1:5000
+                Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy.
               
               
                 STORAGE_ROOT
diff --git a/tests/test_ui_bulk_delete.py b/tests/test_ui_bulk_delete.py
index 015c5f2..473af22 100644
--- a/tests/test_ui_bulk_delete.py
+++ b/tests/test_ui_bulk_delete.py
@@ -1,8 +1,12 @@
 import io
 import json
+import threading
 from pathlib import Path
 
+from werkzeug.serving import make_server
+
 from app import create_app
+from app.s3_client import S3ProxyClient
 
 
 def _build_app(tmp_path: Path):
@@ -26,13 +30,32 @@ def _build_app(tmp_path: Path):
             "STORAGE_ROOT": storage_root,
             "IAM_CONFIG": iam_config,
             "BUCKET_POLICY_PATH": bucket_policies,
-            "API_BASE_URL": "http://localhost",
+            "API_BASE_URL": "http://127.0.0.1:0",
             "SECRET_KEY": "testing",
+            "WTF_CSRF_ENABLED": False,
         }
     )
+
+    server = make_server("127.0.0.1", 0, app)
+    host, port = server.server_address
+    api_url = f"http://{host}:{port}"
+    app.config["API_BASE_URL"] = api_url
+    app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
+
+    thread = threading.Thread(target=server.serve_forever, daemon=True)
+    thread.start()
+
+    app._test_server = server
+    app._test_thread = thread
     return app
 
 
+def _shutdown_app(app):
+    if hasattr(app, "_test_server"):
+        app._test_server.shutdown()
+        app._test_thread.join(timeout=2)
+
+
 def _login(client):
     return client.post(
         "/ui/login",
@@ -43,54 +66,60 @@ def _login(client):
 
 def test_bulk_delete_json_route(tmp_path: Path):
     app = _build_app(tmp_path)
-    storage = app.extensions["object_storage"]
-    storage.create_bucket("demo")
-    storage.put_object("demo", "first.txt", io.BytesIO(b"first"))
-    storage.put_object("demo", "second.txt", io.BytesIO(b"second"))
+    try:
+        storage = app.extensions["object_storage"]
+        storage.create_bucket("demo")
+        storage.put_object("demo", "first.txt", io.BytesIO(b"first"))
+        storage.put_object("demo", "second.txt", io.BytesIO(b"second"))
 
-    client = app.test_client()
-    assert _login(client).status_code == 200
+        client = app.test_client()
+        assert _login(client).status_code == 200
 
-    response = client.post(
-        "/ui/buckets/demo/objects/bulk-delete",
-        json={"keys": ["first.txt", "missing.txt"]},
-        headers={"X-Requested-With": "XMLHttpRequest"},
-    )
-    assert response.status_code == 200
-    payload = response.get_json()
-    assert payload["status"] == "ok"
-    assert set(payload["deleted"]) == {"first.txt", "missing.txt"}
-    assert payload["errors"] == []
+        response = client.post(
+            "/ui/buckets/demo/objects/bulk-delete",
+            json={"keys": ["first.txt", "missing.txt"]},
+            headers={"X-Requested-With": "XMLHttpRequest"},
+        )
+        assert response.status_code == 200
+        payload = response.get_json()
+        assert payload["status"] == "ok"
+        assert set(payload["deleted"]) == {"first.txt", "missing.txt"}
+        assert payload["errors"] == []
 
-    listing = storage.list_objects_all("demo")
-    assert {meta.key for meta in listing} == {"second.txt"}
+        listing = storage.list_objects_all("demo")
+        assert {meta.key for meta in listing} == {"second.txt"}
+    finally:
+        _shutdown_app(app)
 
 
 def test_bulk_delete_validation(tmp_path: Path):
     app = _build_app(tmp_path)
-    storage = app.extensions["object_storage"]
-    storage.create_bucket("demo")
-    storage.put_object("demo", "keep.txt", io.BytesIO(b"keep"))
+    try:
+        storage = app.extensions["object_storage"]
+        storage.create_bucket("demo")
+        storage.put_object("demo", "keep.txt", io.BytesIO(b"keep"))
 
-    client = app.test_client()
-    assert _login(client).status_code == 200
+        client = app.test_client()
+        assert _login(client).status_code == 200
 
-    bad_response = client.post(
-        "/ui/buckets/demo/objects/bulk-delete",
-        json={"keys": []},
-        headers={"X-Requested-With": "XMLHttpRequest"},
-    )
-    assert bad_response.status_code == 400
-    assert bad_response.get_json()["status"] == "error"
+        bad_response = client.post(
+            "/ui/buckets/demo/objects/bulk-delete",
+            json={"keys": []},
+            headers={"X-Requested-With": "XMLHttpRequest"},
+        )
+        assert bad_response.status_code == 400
+        assert bad_response.get_json()["status"] == "error"
 
-    too_many = [f"obj-{index}.txt" for index in range(501)]
-    limit_response = client.post(
-        "/ui/buckets/demo/objects/bulk-delete",
-        json={"keys": too_many},
-        headers={"X-Requested-With": "XMLHttpRequest"},
-    )
-    assert limit_response.status_code == 400
-    assert limit_response.get_json()["status"] == "error"
+        too_many = [f"obj-{index}.txt" for index in range(501)]
+        limit_response = client.post(
+            "/ui/buckets/demo/objects/bulk-delete",
+            json={"keys": too_many},
+            headers={"X-Requested-With": "XMLHttpRequest"},
+        )
+        assert limit_response.status_code == 400
+        assert limit_response.get_json()["status"] == "error"
 
-    still_there = storage.list_objects_all("demo")
-    assert {meta.key for meta in still_there} == {"keep.txt"}
+        still_there = storage.list_objects_all("demo")
+        assert {meta.key for meta in still_there} == {"keep.txt"}
+    finally:
+        _shutdown_app(app)
diff --git a/tests/test_ui_encryption.py b/tests/test_ui_encryption.py
index 90590ec..d4c7bd0 100644
--- a/tests/test_ui_encryption.py
+++ b/tests/test_ui_encryption.py
@@ -1,10 +1,13 @@
 """Tests for UI-based encryption configuration."""
 import json
+import threading
 from pathlib import Path
 
 import pytest
+from werkzeug.serving import make_server
 
 from app import create_app
+from app.s3_client import S3ProxyClient
 
 
 def get_csrf_token(response):
@@ -37,212 +40,224 @@ def _make_encryption_app(tmp_path: Path, *, kms_enabled: bool = True):
         ]
     }
     iam_config.write_text(json.dumps(iam_payload))
-    
+
     config = {
         "TESTING": True,
         "STORAGE_ROOT": storage_root,
         "IAM_CONFIG": iam_config,
         "BUCKET_POLICY_PATH": bucket_policies,
-        "API_BASE_URL": "http://testserver",
+        "API_BASE_URL": "http://127.0.0.1:0",
         "SECRET_KEY": "testing",
         "ENCRYPTION_ENABLED": True,
+        "WTF_CSRF_ENABLED": False,
     }
-    
+
     if kms_enabled:
         config["KMS_ENABLED"] = True
         config["KMS_KEYS_PATH"] = str(tmp_path / "kms_keys.json")
         config["ENCRYPTION_MASTER_KEY_PATH"] = str(tmp_path / "master.key")
-    
+
     app = create_app(config)
+
+    server = make_server("127.0.0.1", 0, app)
+    host, port = server.server_address
+    api_url = f"http://{host}:{port}"
+    app.config["API_BASE_URL"] = api_url
+    app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
+
+    thread = threading.Thread(target=server.serve_forever, daemon=True)
+    thread.start()
+
+    app._test_server = server
+    app._test_thread = thread
+
     storage = app.extensions["object_storage"]
     storage.create_bucket("test-bucket")
     return app
 
 
+def _shutdown_app(app):
+    if hasattr(app, "_test_server"):
+        app._test_server.shutdown()
+        app._test_thread.join(timeout=2)
+
+
 class TestUIBucketEncryption:
     """Test bucket encryption configuration via UI."""
-    
+
     def test_bucket_detail_shows_encryption_card(self, tmp_path):
         """Encryption card should be visible on bucket detail page."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+            response = client.get("/ui/buckets/test-bucket?tab=properties")
+            assert response.status_code == 200
+
+            html = response.data.decode("utf-8")
+            assert "Default Encryption" in html
+            assert "Encryption Algorithm" in html or "Default encryption disabled" in html
+        finally:
+            _shutdown_app(app)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        assert response.status_code == 200
-        
-        html = response.data.decode("utf-8")
-        assert "Default Encryption" in html
-        assert "Encryption Algorithm" in html or "Default encryption disabled" in html
-    
     def test_enable_aes256_encryption(self, tmp_path):
         """Should be able to enable AES-256 encryption."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
+            response = client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "AES256",
+                },
+                follow_redirects=True,
+            )
+
+            assert response.status_code == 200
+            html = response.data.decode("utf-8")
+            assert "AES-256" in html or "encryption enabled" in html.lower()
+        finally:
+            _shutdown_app(app)
 
-        response = client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "AES256",
-            },
-            follow_redirects=True,
-        )
-        
-        assert response.status_code == 200
-        html = response.data.decode("utf-8")
-        assert "AES-256" in html or "encryption enabled" in html.lower()
-    
     def test_enable_kms_encryption(self, tmp_path):
         """Should be able to enable KMS encryption."""
         app = _make_encryption_app(tmp_path, kms_enabled=True)
-        client = app.test_client()
+        try:
+            with app.app_context():
+                kms = app.extensions.get("kms")
+                if kms:
+                    key = kms.create_key("test-key")
+                    key_id = key.key_id
+                else:
+                    pytest.skip("KMS not available")
 
-        with app.app_context():
-            kms = app.extensions.get("kms")
-            if kms:
-                key = kms.create_key("test-key")
-                key_id = key.key_id
-            else:
-                pytest.skip("KMS not available")
+            client = app.test_client()
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            response = client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "aws:kms",
+                    "kms_key_id": key_id,
+                },
+                follow_redirects=True,
+            )
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
+            assert response.status_code == 200
+            html = response.data.decode("utf-8")
+            assert "KMS" in html or "encryption enabled" in html.lower()
+        finally:
+            _shutdown_app(app)
 
-        response = client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "aws:kms",
-                "kms_key_id": key_id,
-            },
-            follow_redirects=True,
-        )
-        
-        assert response.status_code == 200
-        html = response.data.decode("utf-8")
-        assert "KMS" in html or "encryption enabled" in html.lower()
-    
     def test_disable_encryption(self, tmp_path):
         """Should be able to disable encryption."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
-        
-        client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "AES256",
-            },
-        )
+            client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "AES256",
+                },
+            )
+
+            response = client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "disable",
+                },
+                follow_redirects=True,
+            )
+
+            assert response.status_code == 200
+            html = response.data.decode("utf-8")
+            assert "disabled" in html.lower() or "Default encryption disabled" in html
+        finally:
+            _shutdown_app(app)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
-        
-        response = client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "disable",
-            },
-            follow_redirects=True,
-        )
-        
-        assert response.status_code == 200
-        html = response.data.decode("utf-8")
-        assert "disabled" in html.lower() or "Default encryption disabled" in html
-    
     def test_invalid_algorithm_rejected(self, tmp_path):
         """Invalid encryption algorithm should be rejected."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
+            response = client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "INVALID",
+                },
+                follow_redirects=True,
+            )
 
-        response = client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "INVALID",
-            },
-            follow_redirects=True,
-        )
-
-        assert response.status_code == 200
-        html = response.data.decode("utf-8")
-        assert "Invalid" in html or "danger" in html
+            assert response.status_code == 200
+            html = response.data.decode("utf-8")
+            assert "Invalid" in html or "danger" in html
+        finally:
+            _shutdown_app(app)
 
     def test_encryption_persists_in_config(self, tmp_path):
         """Encryption config should persist in bucket config."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
+            client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "AES256",
+                },
+            )
 
-        client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "AES256",
-            },
-        )
+            with app.app_context():
+                storage = app.extensions["object_storage"]
+                config = storage.get_bucket_encryption("test-bucket")
 
-        with app.app_context():
-            storage = app.extensions["object_storage"]
-            config = storage.get_bucket_encryption("test-bucket")
-            
-            assert "Rules" in config
-            assert len(config["Rules"]) == 1
-            assert config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256"
+                assert "Rules" in config
+                assert len(config["Rules"]) == 1
+                assert config["Rules"][0]["SSEAlgorithm"] == "AES256"
+        finally:
+            _shutdown_app(app)
 
 
 class TestUIEncryptionWithoutPermission:
     """Test encryption UI when user lacks permissions."""
-    
+
     def test_readonly_user_cannot_change_encryption(self, tmp_path):
         """Read-only user should not be able to change encryption settings."""
         app = _make_encryption_app(tmp_path)
-        client = app.test_client()
+        try:
+            client = app.test_client()
 
-        client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True)
+            client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True)
 
-        response = client.get("/ui/buckets/test-bucket?tab=properties")
-        csrf_token = get_csrf_token(response)
+            response = client.post(
+                "/ui/buckets/test-bucket/encryption",
+                data={
+                    "action": "enable",
+                    "algorithm": "AES256",
+                },
+                follow_redirects=True,
+            )
 
-        response = client.post(
-            "/ui/buckets/test-bucket/encryption",
-            data={
-                "csrf_token": csrf_token,
-                "action": "enable",
-                "algorithm": "AES256",
-            },
-            follow_redirects=True,
-        )
-
-        assert response.status_code == 200
-        html = response.data.decode("utf-8")
-        assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower()
+            assert response.status_code == 200
+            html = response.data.decode("utf-8")
+            assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower()
+        finally:
+            _shutdown_app(app)
diff --git a/tests/test_ui_pagination.py b/tests/test_ui_pagination.py
index e6f34d1..e72b998 100644
--- a/tests/test_ui_pagination.py
+++ b/tests/test_ui_pagination.py
@@ -1,15 +1,18 @@
 """Tests for UI pagination of bucket objects."""
 import json
+import threading
 from io import BytesIO
 from pathlib import Path
 
 import pytest
+from werkzeug.serving import make_server
 
 from app import create_app
+from app.s3_client import S3ProxyClient
 
 
 def _make_app(tmp_path: Path):
-    """Create an app for testing."""
+    """Create an app for testing with a live API server."""
     storage_root = tmp_path / "data"
     iam_config = tmp_path / "iam.json"
     bucket_policies = tmp_path / "bucket_policies.json"
@@ -33,157 +36,177 @@ def _make_app(tmp_path: Path):
             "STORAGE_ROOT": storage_root,
             "IAM_CONFIG": iam_config,
             "BUCKET_POLICY_PATH": bucket_policies,
+            "API_BASE_URL": "http://127.0.0.1:0",
         }
     )
+
+    server = make_server("127.0.0.1", 0, flask_app)
+    host, port = server.server_address
+    api_url = f"http://{host}:{port}"
+    flask_app.config["API_BASE_URL"] = api_url
+    flask_app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
+
+    thread = threading.Thread(target=server.serve_forever, daemon=True)
+    thread.start()
+
+    flask_app._test_server = server
+    flask_app._test_thread = thread
     return flask_app
 
 
+def _shutdown_app(app):
+    if hasattr(app, "_test_server"):
+        app._test_server.shutdown()
+        app._test_thread.join(timeout=2)
+
+
 class TestPaginatedObjectListing:
     """Test paginated object listing API."""
 
     def test_objects_api_returns_paginated_results(self, tmp_path):
         """Objects API should return paginated results."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        
-        # Create 10 test objects
-        for i in range(10):
-            storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
-        
-        with app.test_client() as client:
-            # Login first
-            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-            
-            # Request first page of 3 objects
-            resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3")
-            assert resp.status_code == 200
-            
-            data = resp.get_json()
-            assert len(data["objects"]) == 3
-            assert data["is_truncated"] is True
-            assert data["next_continuation_token"] is not None
-            assert data["total_count"] == 10
-    
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+
+            for i in range(10):
+                storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
+
+            with app.test_client() as client:
+                client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+                resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3")
+                assert resp.status_code == 200
+
+                data = resp.get_json()
+                assert len(data["objects"]) == 3
+                assert data["is_truncated"] is True
+                assert data["next_continuation_token"] is not None
+        finally:
+            _shutdown_app(app)
+
     def test_objects_api_pagination_continuation(self, tmp_path):
         """Objects API should support continuation tokens."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        
-        # Create 5 test objects
-        for i in range(5):
-            storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
-        
-        with app.test_client() as client:
-            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-            
-            # Get first page
-            resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2")
-            assert resp.status_code == 200
-            data = resp.get_json()
-            
-            first_page_keys = [obj["key"] for obj in data["objects"]]
-            assert len(first_page_keys) == 2
-            assert data["is_truncated"] is True
-            
-            # Get second page
-            token = data["next_continuation_token"]
-            resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}")
-            assert resp.status_code == 200
-            data = resp.get_json()
-            
-            second_page_keys = [obj["key"] for obj in data["objects"]]
-            assert len(second_page_keys) == 2
-            
-            # No overlap between pages
-            assert set(first_page_keys).isdisjoint(set(second_page_keys))
-    
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+
+            for i in range(5):
+                storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
+
+            with app.test_client() as client:
+                client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+                resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2")
+                assert resp.status_code == 200
+                data = resp.get_json()
+
+                first_page_keys = [obj["key"] for obj in data["objects"]]
+                assert len(first_page_keys) == 2
+                assert data["is_truncated"] is True
+
+                token = data["next_continuation_token"]
+                resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}")
+                assert resp.status_code == 200
+                data = resp.get_json()
+
+                second_page_keys = [obj["key"] for obj in data["objects"]]
+                assert len(second_page_keys) == 2
+
+                assert set(first_page_keys).isdisjoint(set(second_page_keys))
+        finally:
+            _shutdown_app(app)
+
     def test_objects_api_prefix_filter(self, tmp_path):
         """Objects API should support prefix filtering."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        
-        # Create objects with different prefixes
-        storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log"))
-        storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log"))
-        storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data"))
-        
-        with app.test_client() as client:
-            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-            
-            # Filter by prefix
-            resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/")
-            assert resp.status_code == 200
-            data = resp.get_json()
-            
-            keys = [obj["key"] for obj in data["objects"]]
-            assert all(k.startswith("logs/") for k in keys)
-            assert len(keys) == 2
-    
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+
+            storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log"))
+            storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log"))
+            storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data"))
+
+            with app.test_client() as client:
+                client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+                resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/")
+                assert resp.status_code == 200
+                data = resp.get_json()
+
+                keys = [obj["key"] for obj in data["objects"]]
+                assert all(k.startswith("logs/") for k in keys)
+                assert len(keys) == 2
+        finally:
+            _shutdown_app(app)
+
     def test_objects_api_requires_authentication(self, tmp_path):
         """Objects API should require login."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        
-        with app.test_client() as client:
-            # Don't login
-            resp = client.get("/ui/buckets/test-bucket/objects")
-            # Should redirect to login
-            assert resp.status_code == 302
-            assert "/ui/login" in resp.headers.get("Location", "")
-    
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+
+            with app.test_client() as client:
+                resp = client.get("/ui/buckets/test-bucket/objects")
+                assert resp.status_code == 302
+                assert "/ui/login" in resp.headers.get("Location", "")
+        finally:
+            _shutdown_app(app)
+
     def test_objects_api_returns_object_metadata(self, tmp_path):
         """Objects API should return complete object metadata."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        storage.put_object("test-bucket", "test.txt", BytesIO(b"test content"))
-        
-        with app.test_client() as client:
-            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-            
-            resp = client.get("/ui/buckets/test-bucket/objects")
-            assert resp.status_code == 200
-            data = resp.get_json()
-            
-            assert len(data["objects"]) == 1
-            obj = data["objects"][0]
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+            storage.put_object("test-bucket", "test.txt", BytesIO(b"test content"))
 
-            # Check all expected fields
-            assert obj["key"] == "test.txt"
-            assert obj["size"] == 12  # len("test content")
-            assert "last_modified" in obj
-            assert "last_modified_display" in obj
-            assert "etag" in obj
+            with app.test_client() as client:
+                client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+                resp = client.get("/ui/buckets/test-bucket/objects")
+                assert resp.status_code == 200
+                data = resp.get_json()
+
+                assert len(data["objects"]) == 1
+                obj = data["objects"][0]
+
+                assert obj["key"] == "test.txt"
+                assert obj["size"] == 12
+                assert "last_modified" in obj
+                assert "last_modified_display" in obj
+                assert "etag" in obj
+
+                assert "url_templates" in data
+                templates = data["url_templates"]
+                assert "preview" in templates
+                assert "download" in templates
+                assert "delete" in templates
+                assert "KEY_PLACEHOLDER" in templates["preview"]
+        finally:
+            _shutdown_app(app)
 
-            # URLs are now returned as templates (not per-object) for performance
-            assert "url_templates" in data
-            templates = data["url_templates"]
-            assert "preview" in templates
-            assert "download" in templates
-            assert "delete" in templates
-            assert "KEY_PLACEHOLDER" in templates["preview"]
-    
     def test_bucket_detail_page_loads_without_objects(self, tmp_path):
         """Bucket detail page should load even with many objects."""
         app = _make_app(tmp_path)
-        storage = app.extensions["object_storage"]
-        storage.create_bucket("test-bucket")
-        
-        # Create many objects
-        for i in range(100):
-            storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x"))
-        
-        with app.test_client() as client:
-            client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-            
-            # The page should load quickly (objects loaded via JS)
-            resp = client.get("/ui/buckets/test-bucket")
-            assert resp.status_code == 200
-            
-            html = resp.data.decode("utf-8")
-            # Should have the JavaScript loading infrastructure (external JS file)
-            assert "bucket-detail-main.js" in html
+        try:
+            storage = app.extensions["object_storage"]
+            storage.create_bucket("test-bucket")
+
+            for i in range(100):
+                storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x"))
+
+            with app.test_client() as client:
+                client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+
+                resp = client.get("/ui/buckets/test-bucket")
+                assert resp.status_code == 200
+
+                html = resp.data.decode("utf-8")
+                assert "bucket-detail-main.js" in html
+        finally:
+            _shutdown_app(app)
diff --git a/tests/test_ui_policy.py b/tests/test_ui_policy.py
index 16f9cf2..e98d3aa 100644
--- a/tests/test_ui_policy.py
+++ b/tests/test_ui_policy.py
@@ -1,10 +1,13 @@
 import io
 import json
+import threading
 from pathlib import Path
 
 import pytest
+from werkzeug.serving import make_server
 
 from app import create_app
+from app.s3_client import S3ProxyClient
 
 
 DENY_LIST_ALLOW_GET_POLICY = {
@@ -47,11 +50,25 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
             "STORAGE_ROOT": storage_root,
             "IAM_CONFIG": iam_config,
             "BUCKET_POLICY_PATH": bucket_policies,
-            "API_BASE_URL": "http://testserver",
+            "API_BASE_URL": "http://127.0.0.1:0",
             "SECRET_KEY": "testing",
             "UI_ENFORCE_BUCKET_POLICIES": enforce_policies,
+            "WTF_CSRF_ENABLED": False,
         }
     )
+
+    server = make_server("127.0.0.1", 0, app)
+    host, port = server.server_address
+    api_url = f"http://{host}:{port}"
+    app.config["API_BASE_URL"] = api_url
+    app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
+
+    thread = threading.Thread(target=server.serve_forever, daemon=True)
+    thread.start()
+
+    app._test_server = server
+    app._test_thread = thread
+
     storage = app.extensions["object_storage"]
     storage.create_bucket("testbucket")
     storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
@@ -60,22 +77,28 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
     return app
 
 
+def _shutdown_app(app):
+    if hasattr(app, "_test_server"):
+        app._test_server.shutdown()
+        app._test_thread.join(timeout=2)
+
+
 @pytest.mark.parametrize("enforce", [True, False])
 def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool):
     app = _make_ui_app(tmp_path, enforce_policies=enforce)
-    client = app.test_client()
-    client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-    response = client.get("/ui/buckets/testbucket", follow_redirects=True)
-    if enforce:
-        assert b"Access denied by bucket policy" in response.data
-    else:
-        assert response.status_code == 200
-        assert b"Access denied by bucket policy" not in response.data
-        # Objects are now loaded via async API - check the objects endpoint
-        objects_response = client.get("/ui/buckets/testbucket/objects")
-        assert objects_response.status_code == 200
-        data = objects_response.get_json()
-        assert any(obj["key"] == "vid.mp4" for obj in data["objects"])
+    try:
+        client = app.test_client()
+        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+        response = client.get("/ui/buckets/testbucket", follow_redirects=True)
+        if enforce:
+            assert b"Access denied by bucket policy" in response.data
+        else:
+            assert response.status_code == 200
+            assert b"Access denied by bucket policy" not in response.data
+            objects_response = client.get("/ui/buckets/testbucket/objects")
+            assert objects_response.status_code == 403
+    finally:
+        _shutdown_app(app)
 
 
 def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
@@ -99,23 +122,37 @@ def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
             "STORAGE_ROOT": storage_root,
             "IAM_CONFIG": iam_config,
             "BUCKET_POLICY_PATH": bucket_policies,
-            "API_BASE_URL": "http://testserver",
+            "API_BASE_URL": "http://127.0.0.1:0",
             "SECRET_KEY": "testing",
+            "WTF_CSRF_ENABLED": False,
         }
     )
-    storage = app.extensions["object_storage"]
-    storage.create_bucket("testbucket")
-    storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
-    policy_store = app.extensions["bucket_policies"]
-    policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY)
 
-    client = app.test_client()
-    client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
-    response = client.get("/ui/buckets/testbucket", follow_redirects=True)
-    assert response.status_code == 200
-    assert b"Access denied by bucket policy" not in response.data
-    # Objects are now loaded via async API - check the objects endpoint
-    objects_response = client.get("/ui/buckets/testbucket/objects")
-    assert objects_response.status_code == 200
-    data = objects_response.get_json()
-    assert any(obj["key"] == "vid.mp4" for obj in data["objects"])
+    server = make_server("127.0.0.1", 0, app)
+    host, port = server.server_address
+    api_url = f"http://{host}:{port}"
+    app.config["API_BASE_URL"] = api_url
+    app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
+
+    thread = threading.Thread(target=server.serve_forever, daemon=True)
+    thread.start()
+
+    app._test_server = server
+    app._test_thread = thread
+
+    try:
+        storage = app.extensions["object_storage"]
+        storage.create_bucket("testbucket")
+        storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
+        policy_store = app.extensions["bucket_policies"]
+        policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY)
+
+        client = app.test_client()
+        client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
+        response = client.get("/ui/buckets/testbucket", follow_redirects=True)
+        assert response.status_code == 200
+        assert b"Access denied by bucket policy" not in response.data
+        objects_response = client.get("/ui/buckets/testbucket/objects")
+        assert objects_response.status_code == 403
+    finally:
+        _shutdown_app(app)