diff --git a/app/__init__.py b/app/__init__.py index 636dc7c..b010902 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -223,6 +223,13 @@ def create_app( app.extensions["access_logging"] = access_logging_service app.extensions["site_registry"] = site_registry + from .s3_client import S3ProxyClient + api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" + app.extensions["s3_proxy"] = S3ProxyClient( + api_base_url=api_base, + region=app.config.get("AWS_REGION", "us-east-1"), + ) + operation_metrics_collector = None if app.config.get("OPERATION_METRICS_ENABLED", False): operation_metrics_collector = OperationMetricsCollector( diff --git a/app/s3_api.py b/app/s3_api.py index a237195..0cdabf3 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -3,6 +3,7 @@ from __future__ import annotations import base64 import hashlib import hmac +import json import logging import mimetypes import re @@ -2963,7 +2964,11 @@ def _bucket_policy_handler(bucket_name: str) -> Response: store.delete_policy(bucket_name) current_app.logger.info("Bucket policy removed", extra={"bucket": bucket_name}) return Response(status=204) - payload = request.get_json(silent=True) + raw_body = request.get_data(cache=False) or b"" + try: + payload = json.loads(raw_body) + except (json.JSONDecodeError, ValueError): + return _error_response("MalformedPolicy", "Policy document must be JSON", 400) if not payload: return _error_response("MalformedPolicy", "Policy document must be JSON", 400) try: diff --git a/app/s3_client.py b/app/s3_client.py new file mode 100644 index 0000000..d5d6978 --- /dev/null +++ b/app/s3_client.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import json +import logging +import threading +import time +from typing import Any, Generator, Optional + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError +from flask import current_app, session + +logger = logging.getLogger(__name__) + +UI_PROXY_USER_AGENT = "MyFSIO-UIProxy/1.0" + +_BOTO_ERROR_MAP = { + "NoSuchBucket": 404, + "NoSuchKey": 404, + "NoSuchUpload": 404, + "BucketAlreadyExists": 409, + "BucketAlreadyOwnedByYou": 409, + "BucketNotEmpty": 409, + "AccessDenied": 403, + "InvalidAccessKeyId": 403, + "SignatureDoesNotMatch": 403, + "InvalidBucketName": 400, + "InvalidArgument": 400, + "MalformedXML": 400, + "EntityTooLarge": 400, + "QuotaExceeded": 403, +} + +_UPLOAD_REGISTRY_MAX_AGE = 86400 +_UPLOAD_REGISTRY_CLEANUP_INTERVAL = 3600 + + +class UploadRegistry: + def __init__(self) -> None: + self._entries: dict[str, tuple[str, str, float]] = {} + self._lock = threading.Lock() + self._last_cleanup = time.monotonic() + + def register(self, upload_id: str, bucket_name: str, object_key: str) -> None: + with self._lock: + self._entries[upload_id] = (bucket_name, object_key, time.monotonic()) + self._maybe_cleanup() + + def get_key(self, upload_id: str, bucket_name: str) -> Optional[str]: + with self._lock: + entry = self._entries.get(upload_id) + if entry is None: + return None + stored_bucket, key, created_at = entry + if stored_bucket != bucket_name: + return None + if time.monotonic() - created_at > _UPLOAD_REGISTRY_MAX_AGE: + del self._entries[upload_id] + return None + return key + + def remove(self, upload_id: str) -> None: + with self._lock: + self._entries.pop(upload_id, None) + + def _maybe_cleanup(self) -> None: + now = time.monotonic() + if now - self._last_cleanup < _UPLOAD_REGISTRY_CLEANUP_INTERVAL: + return + self._last_cleanup = now + cutoff = now - _UPLOAD_REGISTRY_MAX_AGE + stale = [uid for uid, (_, _, ts) in self._entries.items() if ts < cutoff] + for uid in stale: + del self._entries[uid] + + +class S3ProxyClient: + def __init__(self, api_base_url: str, region: str = "us-east-1") -> None: + if not api_base_url: + raise ValueError("api_base_url is required for S3ProxyClient") + self._api_base_url = api_base_url.rstrip("/") + self._region = region + self.upload_registry = UploadRegistry() + + @property + def api_base_url(self) -> str: + return self._api_base_url + + def get_client(self, access_key: str, secret_key: str) -> Any: + if not access_key or not secret_key: + raise ValueError("Both access_key and secret_key are required") + config = Config( + user_agent_extra=UI_PROXY_USER_AGENT, + connect_timeout=5, + read_timeout=30, + retries={"max_attempts": 0}, + signature_version="s3v4", + s3={"addressing_style": "path"}, + request_checksum_calculation="when_required", + response_checksum_validation="when_required", + ) + return boto3.client( + "s3", + endpoint_url=self._api_base_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=self._region, + config=config, + ) + + +def _get_proxy() -> S3ProxyClient: + proxy = current_app.extensions.get("s3_proxy") + if proxy is None: + raise RuntimeError( + "S3 proxy not configured. Set API_BASE_URL or run both API and UI servers." + ) + return proxy + + +def _get_session_creds() -> tuple[str, str]: + secret_store = current_app.extensions["secret_store"] + secret_store.purge_expired() + token = session.get("cred_token") + if not token: + raise PermissionError("Not authenticated") + creds = secret_store.peek(token) + if not creds: + raise PermissionError("Session expired") + access_key = creds.get("access_key", "") + secret_key = creds.get("secret_key", "") + if not access_key or not secret_key: + raise PermissionError("Invalid session credentials") + return access_key, secret_key + + +def get_session_s3_client() -> Any: + proxy = _get_proxy() + access_key, secret_key = _get_session_creds() + return proxy.get_client(access_key, secret_key) + + +def get_upload_registry() -> UploadRegistry: + return _get_proxy().upload_registry + + +def handle_client_error(exc: ClientError) -> tuple[dict[str, str], int]: + error_info = exc.response.get("Error", {}) + code = error_info.get("Code", "InternalError") + message = error_info.get("Message") or "S3 operation failed" + http_status = _BOTO_ERROR_MAP.get(code) + if http_status is None: + http_status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 500) + return {"error": message}, http_status + + +def handle_connection_error(exc: Exception) -> tuple[dict[str, str], int]: + logger.error("S3 API connection failed: %s", exc) + return {"error": "S3 API server is unreachable. Ensure the API server is running."}, 502 + + +def format_datetime_display(dt: Any, display_tz: str = "UTC") -> str: + from .ui import _format_datetime_display + return _format_datetime_display(dt, display_tz) + + +def format_datetime_iso(dt: Any, display_tz: str = "UTC") -> str: + from .ui import _format_datetime_iso + return _format_datetime_iso(dt, display_tz) + + +def build_url_templates(bucket_name: str) -> dict[str, str]: + from flask import url_for + preview_t = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + delete_t = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + presign_t = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versions_t = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + restore_t = url_for( + "ui.restore_object_version", + bucket_name=bucket_name, + object_key="KEY_PLACEHOLDER", + version_id="VERSION_ID_PLACEHOLDER", + ) + tags_t = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + copy_t = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + move_t = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + metadata_t = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + return { + "preview": preview_t, + "download": preview_t + "?download=1", + "presign": presign_t, + "delete": delete_t, + "versions": versions_t, + "restore": restore_t, + "tags": tags_t, + "copy": copy_t, + "move": move_t, + "metadata": metadata_t, + } + + +def translate_list_objects( + boto3_response: dict[str, Any], + url_templates: dict[str, str], + display_tz: str = "UTC", + versioning_enabled: bool = False, +) -> dict[str, Any]: + objects_data = [] + for obj in boto3_response.get("Contents", []): + last_mod = obj["LastModified"] + objects_data.append({ + "key": obj["Key"], + "size": obj["Size"], + "last_modified": last_mod.isoformat(), + "last_modified_display": format_datetime_display(last_mod, display_tz), + "last_modified_iso": format_datetime_iso(last_mod, display_tz), + "etag": obj.get("ETag", "").strip('"'), + }) + return { + "objects": objects_data, + "is_truncated": boto3_response.get("IsTruncated", False), + "next_continuation_token": boto3_response.get("NextContinuationToken"), + "total_count": boto3_response.get("KeyCount", len(objects_data)), + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + } + + +def get_versioning_via_s3(client: Any, bucket_name: str) -> bool: + try: + resp = client.get_bucket_versioning(Bucket=bucket_name) + return resp.get("Status") == "Enabled" + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code != "NoSuchBucket": + logger.warning("Failed to check versioning for %s: %s", bucket_name, code) + return False + + +def stream_objects_ndjson( + client: Any, + bucket_name: str, + prefix: Optional[str], + url_templates: dict[str, str], + display_tz: str = "UTC", + versioning_enabled: bool = False, +) -> Generator[str, None, None]: + meta_line = json.dumps({ + "type": "meta", + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + }) + "\n" + yield meta_line + + yield json.dumps({"type": "count", "total_count": 0}) + "\n" + + kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": 1000} + if prefix: + kwargs["Prefix"] = prefix + + try: + paginator = client.get_paginator("list_objects_v2") + for page in paginator.paginate(**kwargs): + for obj in page.get("Contents", []): + last_mod = obj["LastModified"] + yield json.dumps({ + "type": "object", + "key": obj["Key"], + "size": obj["Size"], + "last_modified": last_mod.isoformat(), + "last_modified_display": format_datetime_display(last_mod, display_tz), + "last_modified_iso": format_datetime_iso(last_mod, display_tz), + "etag": obj.get("ETag", "").strip('"'), + }) + "\n" + except ClientError as exc: + error_msg = exc.response.get("Error", {}).get("Message", "S3 operation failed") + yield json.dumps({"type": "error", "error": error_msg}) + "\n" + return + except (EndpointConnectionError, ConnectionClosedError): + yield json.dumps({"type": "error", "error": "S3 API server is unreachable"}) + "\n" + return + + yield json.dumps({"type": "done"}) + "\n" diff --git a/app/ui.py b/app/ui.py index 06141e7..c174594 100644 --- a/app/ui.py +++ b/app/ui.py @@ -13,7 +13,7 @@ from zoneinfo import ZoneInfo import boto3 import requests -from botocore.exceptions import ClientError +from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError from flask import ( Blueprint, Response, @@ -36,7 +36,18 @@ from .extensions import limiter, csrf from .iam import IamError from .kms import KMSManager from .replication import ReplicationManager, ReplicationRule -from .s3_api import _generate_presigned_url +from .s3_client import ( + get_session_s3_client, + get_upload_registry, + handle_client_error, + handle_connection_error, + build_url_templates, + translate_list_objects, + get_versioning_via_s3, + stream_objects_ndjson, + format_datetime_display as _s3_format_display, + format_datetime_iso as _s3_format_iso, +) from .secret_store import EphemeralSecretStore from .site_registry import SiteRegistry, SiteInfo, PeerSite from .storage import ObjectStorage, StorageError @@ -337,19 +348,39 @@ def docs_page(): @ui_bp.get("/") def buckets_overview(): principal = _current_principal() - buckets = _storage().list_buckets() - allowed_names = set(_iam().buckets_for_principal(principal, [b.name for b in buckets])) + try: + client = get_session_s3_client() + resp = client.list_buckets() + bucket_names = [b["Name"] for b in resp.get("Buckets", [])] + bucket_creation = {b["Name"]: b.get("CreationDate") for b in resp.get("Buckets", [])} + except PermissionError: + return redirect(url_for("ui.login")) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + flash(exc.response.get("Error", {}).get("Message", "S3 operation failed"), "danger") + else: + flash("S3 API server is unreachable. Ensure the API server is running.", "danger") + return render_template("buckets.html", buckets=[], principal=principal) + + allowed_names = set(_iam().buckets_for_principal(principal, bucket_names)) visible_buckets = [] policy_store = _bucket_policies() - for bucket in buckets: - if bucket.name not in allowed_names: + for name in bucket_names: + if name not in allowed_names: continue - policy = policy_store.get_policy(bucket.name) + policy = policy_store.get_policy(name) cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) - stats = _storage().bucket_stats(bucket.name, cache_ttl=cache_ttl) + stats = _storage().bucket_stats(name, cache_ttl=cache_ttl) access_label, access_badge = _bucket_access_descriptor(policy) + + class _BucketMeta: + def __init__(self, n, cd): + self.name = n + self.creation_date = cd + meta = _BucketMeta(name, bucket_creation.get(name)) + visible_buckets.append({ - "meta": bucket, + "meta": meta, "summary": { "objects": stats["total_objects"], "total_bytes": stats["total_bytes"], @@ -358,7 +389,7 @@ def buckets_overview(): "access_label": access_label, "access_badge": access_badge, "has_policy": bool(policy), - "detail_url": url_for("ui.bucket_detail", bucket_name=bucket.name), + "detail_url": url_for("ui.bucket_detail", bucket_name=name), }) return render_template("buckets.html", buckets=visible_buckets, principal=principal) @@ -377,14 +408,28 @@ def create_bucket(): return redirect(url_for("ui.buckets_overview")) try: _authorize_ui(principal, bucket_name, "write") - _storage().create_bucket(bucket_name) + client = get_session_s3_client() + client.create_bucket(Bucket=bucket_name) if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' created", "bucket_name": bucket_name}) flash(f"Bucket '{bucket_name}' created", "success") - except (StorageError, FileExistsError, IamError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + else: + msg = "S3 API server is unreachable" + if _wants_json(): + return jsonify({"error": msg}), 502 + flash(msg, "danger") return redirect(url_for("ui.buckets_overview")) @@ -506,89 +551,47 @@ def bucket_detail(bucket_name: str): @ui_bp.get("/buckets//objects") def list_bucket_objects(bucket_name: str): - """API endpoint for paginated object listing.""" principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: - max_keys = min(int(request.args.get("max_keys", 1000)), 100000) + max_keys = max(1, min(int(request.args.get("max_keys", 1000)), 100000)) except ValueError: return jsonify({"error": "max_keys must be an integer"}), 400 continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None try: - result = storage.list_objects( - bucket_name, - max_keys=max_keys, - continuation_token=continuation_token, - prefix=prefix, - ) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + client = get_session_s3_client() + kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": max_keys} + if continuation_token: + kwargs["ContinuationToken"] = continuation_token + if prefix: + kwargs["Prefix"] = prefix + boto_resp = client.list_objects_v2(**kwargs) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) - try: - versioning_enabled = storage.is_versioning_enabled(bucket_name) - except StorageError: - versioning_enabled = False - - preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") - tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - - objects_data = [] - for obj in result.objects: - objects_data.append({ - "key": obj.key, - "size": obj.size, - "last_modified": obj.last_modified.isoformat(), - "last_modified_display": _format_datetime_display(obj.last_modified), - "last_modified_iso": _format_datetime_iso(obj.last_modified), - "etag": obj.etag, - }) - - response = jsonify({ - "objects": objects_data, - "is_truncated": result.is_truncated, - "next_continuation_token": result.next_continuation_token, - "total_count": result.total_count, - "versioning_enabled": versioning_enabled, - "url_templates": { - "preview": preview_template, - "download": preview_template + "?download=1", - "presign": presign_template, - "delete": delete_template, - "versions": versions_template, - "restore": restore_template, - "tags": tags_template, - "copy": copy_template, - "move": move_template, - "metadata": metadata_template, - }, - }) + versioning_enabled = get_versioning_via_s3(client, bucket_name) + url_templates = build_url_templates(bucket_name) + display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") + data = translate_list_objects(boto_resp, url_templates, display_tz, versioning_enabled) + response = jsonify(data) response.headers["Cache-Control"] = "no-store" return response @ui_bp.get("/buckets//objects/stream") def stream_bucket_objects(bucket_name: str): - """Streaming NDJSON endpoint for progressive object listing. - - Streams objects as newline-delimited JSON for fast progressive rendering. - First line is metadata, subsequent lines are objects. - """ principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: @@ -597,79 +600,18 @@ def stream_bucket_objects(bucket_name: str): prefix = request.args.get("prefix") or None try: - versioning_enabled = storage.is_versioning_enabled(bucket_name) - except StorageError: - versioning_enabled = False + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 - preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") - tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") - metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versioning_enabled = get_versioning_via_s3(client, bucket_name) + url_templates = build_url_templates(bucket_name) display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") - def generate(): - meta_line = json.dumps({ - "type": "meta", - "versioning_enabled": versioning_enabled, - "url_templates": { - "preview": preview_template, - "download": preview_template + "?download=1", - "presign": presign_template, - "delete": delete_template, - "versions": versions_template, - "restore": restore_template, - "tags": tags_template, - "copy": copy_template, - "move": move_template, - "metadata": metadata_template, - }, - }) + "\n" - yield meta_line - - continuation_token = None - total_count = None - batch_size = 5000 - - while True: - try: - result = storage.list_objects( - bucket_name, - max_keys=batch_size, - continuation_token=continuation_token, - prefix=prefix, - ) - except StorageError as exc: - yield json.dumps({"type": "error", "error": str(exc)}) + "\n" - return - - if total_count is None: - total_count = result.total_count - yield json.dumps({"type": "count", "total_count": total_count}) + "\n" - - for obj in result.objects: - yield json.dumps({ - "type": "object", - "key": obj.key, - "size": obj.size, - "last_modified": obj.last_modified.isoformat(), - "last_modified_display": _format_datetime_display(obj.last_modified, display_tz), - "last_modified_iso": _format_datetime_iso(obj.last_modified, display_tz), - "etag": obj.etag, - }) + "\n" - - if not result.is_truncated: - break - continuation_token = result.next_continuation_token - - yield json.dumps({"type": "done"}) + "\n" - return Response( - generate(), + stream_objects_ndjson( + client, bucket_name, prefix, url_templates, display_tz, versioning_enabled, + ), mimetype='application/x-ndjson', headers={ 'Cache-Control': 'no-cache', @@ -714,15 +656,32 @@ def upload_object(bucket_name: str): try: _authorize_ui(principal, bucket_name, "write") - _storage().put_object(bucket_name, object_key, file.stream, metadata=metadata) + client = get_session_s3_client() + put_kwargs: dict[str, Any] = { + "Bucket": bucket_name, + "Key": object_key, + "Body": file.stream, + } + if file.content_type: + put_kwargs["ContentType"] = file.content_type + if metadata: + put_kwargs["Metadata"] = metadata + client.put_object(**put_kwargs) _replication().trigger_replication(bucket_name, object_key) - + message = f"Uploaded '{object_key}'" if metadata: message += " with metadata" return _response(True, message) - except (StorageError, IamError) as exc: + except PermissionError as exc: + return _response(False, str(exc), 401) + except IamError as exc: return _response(False, _friendly_error_message(exc), 400) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _response(False, err["error"], status) + return _response(False, "S3 API server is unreachable", 502) @ui_bp.post("/buckets//multipart/initiate") @@ -736,6 +695,11 @@ def initiate_multipart_upload(bucket_name: str): object_key = str(payload.get("object_key", "")).strip() if not object_key: return jsonify({"error": "object_key is required"}), 400 + if "\x00" in object_key: + return jsonify({"error": "Object key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(object_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Object key exceeds maximum length of {max_key_len} bytes"}), 400 metadata_payload = payload.get("metadata") metadata = None if metadata_payload is not None: @@ -743,10 +707,21 @@ def initiate_multipart_upload(bucket_name: str): return jsonify({"error": "metadata must be an object"}), 400 metadata = {str(k): str(v) for k, v in metadata_payload.items()} try: - upload_id = _storage().initiate_multipart_upload(bucket_name, object_key, metadata=metadata) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"upload_id": upload_id}) + client = get_session_s3_client() + create_kwargs: dict[str, Any] = {"Bucket": bucket_name, "Key": object_key} + if metadata: + create_kwargs["Metadata"] = metadata + resp = client.create_multipart_upload(**create_kwargs) + upload_id = resp["UploadId"] + get_upload_registry().register(upload_id, bucket_name, object_key) + return jsonify({"upload_id": upload_id}) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.put("/buckets//multipart//parts") @@ -762,17 +737,30 @@ def upload_multipart_part(bucket_name: str, upload_id: str): part_number = int(request.args.get("partNumber", "0")) except ValueError: return jsonify({"error": "partNumber must be an integer"}), 400 - if part_number < 1: - return jsonify({"error": "partNumber must be >= 1"}), 400 + if part_number < 1 or part_number > 10000: + return jsonify({"error": "partNumber must be between 1 and 10000"}), 400 + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: data = request.get_data() if not data: return jsonify({"error": "Empty request body"}), 400 - stream = io.BytesIO(data) - etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"etag": etag, "part_number": part_number}) + client = get_session_s3_client() + resp = client.upload_part( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + PartNumber=part_number, + Body=data, + ) + etag = resp.get("ETag", "").strip('"') + return jsonify({"etag": etag, "part_number": part_number}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//multipart//complete") @@ -796,19 +784,37 @@ def complete_multipart_upload(bucket_name: str, upload_id: str): except (TypeError, ValueError): return jsonify({"error": "Each part must include part_number"}), 400 etag = str(part.get("etag") or part.get("ETag") or "").strip() - normalized.append({"part_number": number, "etag": etag}) + normalized.append({"PartNumber": number, "ETag": etag}) + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: - result = _storage().complete_multipart_upload(bucket_name, upload_id, normalized) - _replication().trigger_replication(bucket_name, result.key) - + client = get_session_s3_client() + resp = client.complete_multipart_upload( + Bucket=bucket_name, + Key=object_key, + UploadId=upload_id, + MultipartUpload={"Parts": normalized}, + ) + get_upload_registry().remove(upload_id) + result_key = resp.get("Key", object_key) + _replication().trigger_replication(bucket_name, result_key) return jsonify({ - "key": result.key, - "size": result.size, - "etag": result.etag, - "last_modified": result.last_modified.isoformat() if result.last_modified else None, + "key": result_key, + "size": 0, + "etag": resp.get("ETag", "").strip('"'), + "last_modified": None, }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchUpload",): + get_upload_registry().remove(upload_id) + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.delete("/buckets//multipart/") @@ -818,10 +824,21 @@ def abort_multipart_upload(bucket_name: str, upload_id: str): _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 + object_key = get_upload_registry().get_key(upload_id, bucket_name) + if not object_key: + return jsonify({"error": "Unknown upload ID or upload expired"}), 404 try: - _storage().abort_multipart_upload(bucket_name, upload_id) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + client = get_session_s3_client() + client.abort_multipart_upload(Bucket=bucket_name, Key=object_key, UploadId=upload_id) + get_upload_registry().remove(upload_id) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchUpload",): + get_upload_registry().remove(upload_id) + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "aborted"}) @@ -831,16 +848,36 @@ def delete_bucket(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "delete") - _storage().delete_bucket(bucket_name) - _bucket_policies().delete_policy(bucket_name) - _replication_manager().delete_rule(bucket_name) + client = get_session_s3_client() + client.delete_bucket(Bucket=bucket_name) + try: + _bucket_policies().delete_policy(bucket_name) + except Exception: + pass + try: + _replication_manager().delete_rule(bucket_name) + except Exception: + pass if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' removed"}) flash(f"Bucket '{bucket_name}' removed", "success") - except (StorageError, IamError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + else: + msg = "S3 API server is unreachable" + if _wants_json(): + return jsonify({"error": msg}), 502 + flash(msg, "danger") return redirect(url_for("ui.buckets_overview")) @@ -855,16 +892,31 @@ def delete_object(bucket_name: str, object_key: str): _storage().purge_object(bucket_name, object_key) message = f"Permanently deleted '{object_key}' and all versions" else: - _storage().delete_object(bucket_name, object_key) + client = get_session_s3_client() + client.delete_object(Bucket=bucket_name, Key=object_key) _replication_manager().trigger_replication(bucket_name, object_key, action="delete") message = f"Deleted '{object_key}'" if _wants_json(): return jsonify({"success": True, "message": message}) flash(message, "success") - except (IamError, StorageError) as exc: + except PermissionError: + return redirect(url_for("ui.login")) + except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") + except StorageError as exc: + if _wants_json(): + return jsonify({"error": _friendly_error_message(exc)}), 400 + flash(_friendly_error_message(exc), "danger") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @@ -908,24 +960,50 @@ def bulk_delete_objects(bucket_name: str): return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) - storage = _storage() try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: return _respond(False, _friendly_error_message(exc), status_code=403) - deleted: list[str] = [] - errors: list[dict[str, str]] = [] + authorized_keys = [] + denied_keys = [] for key in unique_keys: try: - if purge_versions: + _authorize_ui(principal, bucket_name, "delete", object_key=key) + authorized_keys.append(key) + except IamError: + denied_keys.append(key) + if not authorized_keys: + return _respond(False, "Access denied for all selected objects", status_code=403) + unique_keys = authorized_keys + + if purge_versions: + storage = _storage() + deleted: list[str] = [] + errors: list[dict[str, str]] = [] + for key in unique_keys: + try: storage.purge_object(bucket_name, key) - else: - storage.delete_object(bucket_name, key) + deleted.append(key) + except StorageError as exc: + errors.append({"key": key, "error": str(exc)}) + else: + try: + client = get_session_s3_client() + objects_to_delete = [{"Key": k} for k in unique_keys] + resp = client.delete_objects( + Bucket=bucket_name, + Delete={"Objects": objects_to_delete, "Quiet": False}, + ) + deleted = [d["Key"] for d in resp.get("Deleted", [])] + errors = [{"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])] + for key in deleted: _replication_manager().trigger_replication(bucket_name, key, action="delete") - deleted.append(key) - except StorageError as exc: - errors.append({"key": key, "error": str(exc)}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _respond(False, err["error"], status_code=status) + return _respond(False, "S3 API server is unreachable", status_code=502) if not deleted and errors: return _respond(False, "Unable to delete the selected objects", deleted=deleted, errors=errors, status_code=400) @@ -1038,35 +1116,81 @@ def purge_object_versions(bucket_name: str, object_key: str): @ui_bp.get("/buckets//objects//preview") def object_preview(bucket_name: str, object_key: str) -> Response: + import mimetypes as _mimetypes principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) - path = storage.get_object_path(bucket_name, object_key) - metadata = storage.get_object_metadata(bucket_name, object_key) - except (StorageError, IamError) as exc: - status = 403 if isinstance(exc, IamError) else 404 - return Response(str(exc), status=status) - + except IamError as exc: + return Response(str(exc), status=403) + download = request.args.get("download") == "1" - - is_encrypted = "x-amz-server-side-encryption" in metadata - if is_encrypted and hasattr(storage, 'get_object_data'): + raw_filename = object_key.rsplit("/", 1)[-1] or object_key + safe_filename = raw_filename.replace('"', "'").replace("\\", "_") + safe_filename = "".join(c for c in safe_filename if c.isprintable() and c not in "\r\n") + if not safe_filename: + safe_filename = "download" + try: + safe_filename.encode("latin-1") + ascii_safe = True + except UnicodeEncodeError: + ascii_safe = False + + range_header = request.headers.get("Range") + + try: + client = get_session_s3_client() + get_kwargs: dict[str, Any] = {"Bucket": bucket_name, "Key": object_key} + if range_header: + get_kwargs["Range"] = range_header + resp = client.get_object(**get_kwargs) + except PermissionError as exc: + return Response(str(exc), status=401) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + status = 404 if code == "NoSuchKey" else 400 + return Response(exc.response.get("Error", {}).get("Message", "S3 operation failed"), status=status) + except (EndpointConnectionError, ConnectionClosedError): + return Response("S3 API server is unreachable", status=502) + + content_type = resp.get("ContentType") or _mimetypes.guess_type(object_key)[0] or "application/octet-stream" + content_length = resp.get("ContentLength", 0) + body_stream = resp["Body"] + is_partial = resp.get("ResponseMetadata", {}).get("HTTPStatusCode") == 206 + content_range = resp.get("ContentRange") + + _DANGEROUS_TYPES = { + "text/html", "text/xml", "application/xhtml+xml", + "application/xml", "image/svg+xml", + } + force_download = content_type.split(";")[0].strip().lower() in _DANGEROUS_TYPES + + def generate(): try: - data, _ = storage.get_object_data(bucket_name, object_key) - import io - import mimetypes - mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" - return send_file( - io.BytesIO(data), - mimetype=mimetype, - as_attachment=download, - download_name=path.name - ) - except StorageError as exc: - return Response(f"Decryption failed: {exc}", status=500) - - return send_file(path, as_attachment=download, download_name=path.name) + for chunk in body_stream.iter_chunks(chunk_size=65536): + yield chunk + finally: + body_stream.close() + + status_code = 206 if is_partial else 200 + headers = { + "Content-Type": content_type, + "X-Content-Type-Options": "nosniff", + "Accept-Ranges": "bytes", + } + if content_length: + headers["Content-Length"] = str(content_length) + if content_range: + headers["Content-Range"] = content_range + disposition = "attachment" if download or force_download else "inline" + if ascii_safe: + headers["Content-Disposition"] = f'{disposition}; filename="{safe_filename}"' + else: + from urllib.parse import quote + encoded = quote(safe_filename, safe="") + ascii_fallback = safe_filename.encode("ascii", "replace").decode("ascii").replace("?", "_") + headers["Content-Disposition"] = f'{disposition}; filename="{ascii_fallback}"; filename*=UTF-8\'\'{encoded}' + + return Response(generate(), status=status_code, headers=headers) @ui_bp.post("/buckets//objects//presign") @@ -1089,25 +1213,24 @@ def object_presign(bucket_name: str, object_key: str): min_expiry = current_app.config.get("PRESIGNED_URL_MIN_EXPIRY_SECONDS", 1) max_expiry = current_app.config.get("PRESIGNED_URL_MAX_EXPIRY_SECONDS", 604800) expires = max(min_expiry, min(expires, max_expiry)) - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 - if action != "write": - try: - storage.get_object_path(bucket_name, object_key) - except StorageError: - return jsonify({"error": "Object not found"}), 404 - secret = _iam().secret_for_key(principal.access_key) - api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" - url = _generate_presigned_url( - principal=principal, - secret_key=secret, - method=method, - bucket_name=bucket_name, - object_key=object_key, - expires_in=expires, - api_base_url=api_base, - ) + + method_to_client_method = {"GET": "get_object", "PUT": "put_object", "DELETE": "delete_object"} + client_method = method_to_client_method[method] + + try: + client = get_session_s3_client() + url = client.generate_presigned_url( + ClientMethod=client_method, + Params={"Bucket": bucket_name, "Key": object_key}, + ExpiresIn=expires, + ) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) current_app.logger.info( "Presigned URL generated", extra={"bucket": bucket_name, "key": object_key, "method": method}, @@ -1118,15 +1241,31 @@ def object_presign(bucket_name: str, object_key: str): @ui_bp.get("/buckets//objects//metadata") def object_metadata(bucket_name: str, object_key: str): principal = _current_principal() - storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) - metadata = storage.get_object_metadata(bucket_name, object_key) - return jsonify({"metadata": metadata}) except IamError as exc: return jsonify({"error": str(exc)}), 403 - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 + try: + client = get_session_s3_client() + resp = client.head_object(Bucket=bucket_name, Key=object_key) + metadata = resp.get("Metadata", {}) + if resp.get("ContentType"): + metadata["Content-Type"] = resp["ContentType"] + if resp.get("ContentLength") is not None: + metadata["Content-Length"] = str(resp["ContentLength"]) + if resp.get("ServerSideEncryption"): + metadata["x-amz-server-side-encryption"] = resp["ServerSideEncryption"] + return jsonify({"metadata": metadata}) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchKey", "404", "NotFound"): + return jsonify({"error": "Object not found"}), 404 + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) @ui_bp.get("/buckets//objects//versions") @@ -1137,10 +1276,25 @@ def object_versions(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 try: - versions = _storage().list_object_versions(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 - return jsonify({"versions": versions}) + client = get_session_s3_client() + resp = client.list_object_versions(Bucket=bucket_name, Prefix=object_key, MaxKeys=1000) + versions = [] + for v in resp.get("Versions", []): + if v.get("Key") != object_key: + continue + versions.append({ + "version_id": v.get("VersionId", ""), + "last_modified": v["LastModified"].isoformat() if v.get("LastModified") else None, + "size": v.get("Size", 0), + "etag": v.get("ETag", "").strip('"'), + "is_latest": v.get("IsLatest", False), + }) + return jsonify({"versions": versions}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.get("/buckets//archived") @@ -1206,13 +1360,32 @@ def update_bucket_policy(bucket_name: str): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) - store = _bucket_policies() + + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + if _wants_json(): + return jsonify({"error": str(exc)}), 403 + flash(str(exc), "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) + if action == "delete": - store.delete_policy(bucket_name) + try: + client.delete_bucket_policy(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) if _wants_json(): return jsonify({"success": True, "message": "Bucket policy removed"}) flash("Bucket policy removed", "info") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) + document = request.form.get("policy_document", "").strip() if not document: if _wants_json(): @@ -1220,15 +1393,25 @@ def update_bucket_policy(bucket_name: str): flash("Provide a JSON policy document", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) try: - payload = json.loads(document) - store.set_policy(bucket_name, payload) - if _wants_json(): - return jsonify({"success": True, "message": "Bucket policy saved"}) - flash("Bucket policy saved", "success") - except (json.JSONDecodeError, ValueError) as exc: + json.loads(document) + except json.JSONDecodeError as exc: if _wants_json(): return jsonify({"error": f"Policy error: {exc}"}), 400 flash(f"Policy error: {exc}", "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) + try: + client.put_bucket_policy(Bucket=bucket_name, Policy=document) + if _wants_json(): + return jsonify({"success": True, "message": "Bucket policy saved"}) + flash("Bucket policy saved", "success") + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) + if _wants_json(): + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) @@ -1243,13 +1426,26 @@ def update_bucket_versioning(bucket_name: str): flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) state = request.form.get("state", "enable") + if state not in ("enable", "suspend"): + if _wants_json(): + return jsonify({"error": "state must be 'enable' or 'suspend'"}), 400 + flash("Invalid versioning state", "danger") + return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) enable = state == "enable" try: - _storage().set_bucket_versioning(bucket_name, enable) - except StorageError as exc: + client = get_session_s3_client() + client.put_bucket_versioning( + Bucket=bucket_name, + VersioningConfiguration={"Status": "Enabled" if enable else "Suspended"}, + ) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) message = "Versioning enabled" if enable else "Versioning suspended" if _wants_json(): @@ -1344,7 +1540,6 @@ def update_bucket_quota(bucket_name: str): @ui_bp.post("/buckets//encryption") def update_bucket_encryption(bucket_name: str): - """Update bucket default encryption configuration.""" principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") @@ -1358,14 +1553,19 @@ def update_bucket_encryption(bucket_name: str): if action == "disable": try: - _storage().set_bucket_encryption(bucket_name, None) + client = get_session_s3_client() + client.delete_bucket_encryption(Bucket=bucket_name) if _wants_json(): return jsonify({"success": True, "message": "Default encryption disabled", "enabled": False}) flash("Default encryption disabled", "info") - except StorageError as exc: + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) algorithm = request.form.get("algorithm", "AES256") @@ -1377,21 +1577,18 @@ def update_bucket_encryption(bucket_name: str): flash("Invalid encryption algorithm", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - encryption_config: dict[str, Any] = { - "Rules": [ - { - "ApplyServerSideEncryptionByDefault": { - "SSEAlgorithm": algorithm, - } - } - ] - } - + sse_rule: dict[str, Any] = {"SSEAlgorithm": algorithm} if algorithm == "aws:kms" and kms_key_id: - encryption_config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] = kms_key_id + sse_rule["KMSMasterKeyID"] = kms_key_id try: - _storage().set_bucket_encryption(bucket_name, encryption_config) + client = get_session_s3_client() + client.put_bucket_encryption( + Bucket=bucket_name, + ServerSideEncryptionConfiguration={ + "Rules": [{"ApplyServerSideEncryptionByDefault": sse_rule}] + }, + ) if algorithm == "aws:kms": message = "Default KMS encryption enabled" else: @@ -1399,10 +1596,14 @@ def update_bucket_encryption(bucket_name: str): if _wants_json(): return jsonify({"success": True, "message": message, "enabled": True, "algorithm": algorithm}) flash(message, "success") - except StorageError as exc: + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + else: + err, status = handle_connection_error(exc) if _wants_json(): - return jsonify({"error": _friendly_error_message(exc)}), 400 - flash(_friendly_error_message(exc), "danger") + return jsonify(err), status + flash(err["error"], "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @@ -2314,16 +2515,34 @@ def bucket_lifecycle(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": - rules = storage.get_bucket_lifecycle(bucket_name) or [] + try: + resp = client.get_bucket_lifecycle_configuration(Bucket=bucket_name) + rules = resp.get("Rules", []) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchLifecycleConfiguration": + rules = [] + else: + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) return jsonify({"rules": rules}) if request.method == "DELETE": - storage.set_bucket_lifecycle(bucket_name, None) + try: + client.delete_bucket_lifecycle(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "Lifecycle configuration deleted"}) payload = request.get_json(silent=True) or {} @@ -2339,8 +2558,11 @@ def bucket_lifecycle(bucket_name: str): "ID": str(rule.get("ID", f"rule-{i+1}")), "Status": "Enabled" if rule.get("Status", "Enabled") == "Enabled" else "Disabled", } + filt = {} if rule.get("Prefix"): - validated["Prefix"] = str(rule["Prefix"]) + filt["Prefix"] = str(rule["Prefix"]) + if filt: + validated["Filter"] = filt if rule.get("Expiration"): exp = rule["Expiration"] if isinstance(exp, dict) and exp.get("Days"): @@ -2355,7 +2577,19 @@ def bucket_lifecycle(bucket_name: str): validated["AbortIncompleteMultipartUpload"] = {"DaysAfterInitiation": int(aimu["DaysAfterInitiation"])} validated_rules.append(validated) - storage.set_bucket_lifecycle(bucket_name, validated_rules if validated_rules else None) + try: + if validated_rules: + client.put_bucket_lifecycle_configuration( + Bucket=bucket_name, + LifecycleConfiguration={"Rules": validated_rules}, + ) + else: + client.delete_bucket_lifecycle(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules}) @@ -2398,16 +2632,34 @@ def bucket_cors(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": - rules = storage.get_bucket_cors(bucket_name) or [] + try: + resp = client.get_bucket_cors(Bucket=bucket_name) + rules = resp.get("CORSRules", []) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchCORSConfiguration": + rules = [] + else: + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) return jsonify({"rules": rules}) if request.method == "DELETE": - storage.set_bucket_cors(bucket_name, None) + try: + client.delete_bucket_cors(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "CORS configuration deleted"}) payload = request.get_json(silent=True) or {} @@ -2438,7 +2690,19 @@ def bucket_cors(bucket_name: str): pass validated_rules.append(validated) - storage.set_bucket_cors(bucket_name, validated_rules if validated_rules else None) + try: + if validated_rules: + client.put_bucket_cors( + Bucket=bucket_name, + CORSConfiguration={"CORSRules": validated_rules}, + ) + else: + client.delete_bucket_cors(Bucket=bucket_name) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"status": "ok", "message": "CORS configuration saved", "rules": validated_rules}) @@ -2451,33 +2715,57 @@ def bucket_acl(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - if not storage.bucket_exists(bucket_name): - return jsonify({"error": "Bucket does not exist"}), 404 + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 - acl_service = _acl() owner_id = principal.access_key if principal else "anonymous" if request.method == "GET": try: - acl = acl_service.get_bucket_acl(bucket_name) - if not acl: - acl = create_canned_acl("private", owner_id) + resp = client.get_bucket_acl(Bucket=bucket_name) + owner = resp.get("Owner", {}).get("ID", owner_id) + grants = [] + for grant in resp.get("Grants", []): + grantee = grant.get("Grantee", {}) + grantee_display = grantee.get("DisplayName") or grantee.get("ID", "") + if not grantee_display: + uri = grantee.get("URI", "") + if "AllUsers" in uri: + grantee_display = "Everyone (public)" + elif "AuthenticatedUsers" in uri: + grantee_display = "Authenticated users" + else: + grantee_display = uri or "unknown" + grants.append({ + "grantee": grantee_display, + "permission": grant.get("Permission", ""), + }) return jsonify({ - "owner": acl.owner, - "grants": [g.to_dict() for g in acl.grants], + "owner": owner, + "grants": grants, "canned_acls": list(CANNED_ACLS.keys()), }) - except Exception as exc: - return jsonify({"error": str(exc)}), 500 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) payload = request.get_json(silent=True) or {} canned_acl = payload.get("canned_acl") if canned_acl: if canned_acl not in CANNED_ACLS: return jsonify({"error": f"Invalid canned ACL: {canned_acl}"}), 400 - acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id) - return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"}) + try: + client.put_bucket_acl(Bucket=bucket_name, ACL=canned_acl) + return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"}) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) return jsonify({"error": "canned_acl is required"}), 400 @@ -2490,14 +2778,24 @@ def object_tags(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() + try: + client = get_session_s3_client() + except (PermissionError, RuntimeError) as exc: + return jsonify({"error": str(exc)}), 403 if request.method == "GET": try: - tags = storage.get_object_tags(bucket_name, object_key) + resp = client.get_object_tagging(Bucket=bucket_name, Key=object_key) + tags = resp.get("TagSet", []) return jsonify({"tags": tags}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code == "NoSuchKey": + return jsonify({"error": "Object not found"}), 404 + err, status = handle_client_error(exc) + return jsonify(err), status + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) try: _authorize_ui(principal, bucket_name, "write", object_key=object_key) @@ -2508,22 +2806,34 @@ def object_tags(bucket_name: str, object_key: str): tags = payload.get("tags", []) if not isinstance(tags, list): return jsonify({"error": "tags must be a list"}), 400 - if len(tags) > 10: - return jsonify({"error": "Maximum 10 tags allowed"}), 400 + tag_limit = current_app.config.get("OBJECT_TAG_LIMIT", 50) + if len(tags) > tag_limit: + return jsonify({"error": f"Maximum {tag_limit} tags allowed"}), 400 validated_tags = [] - for tag in tags: - if isinstance(tag, dict) and tag.get("Key"): - validated_tags.append({ - "Key": str(tag["Key"]), - "Value": str(tag.get("Value", "")) - }) + for i, tag in enumerate(tags): + if not isinstance(tag, dict) or not tag.get("Key"): + return jsonify({"error": f"Tag at index {i} must have a Key field"}), 400 + validated_tags.append({ + "Key": str(tag["Key"]), + "Value": str(tag.get("Value", "")) + }) try: - storage.set_object_tags(bucket_name, object_key, validated_tags if validated_tags else None) + if validated_tags: + client.put_object_tagging( + Bucket=bucket_name, + Key=object_key, + Tagging={"TagSet": validated_tags}, + ) + else: + client.delete_object_tagging(Bucket=bucket_name, Key=object_key) return jsonify({"status": "ok", "message": "Tags saved", "tags": validated_tags}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//folders") @@ -2544,15 +2854,28 @@ def create_folder(bucket_name: str): folder_name = folder_name.rstrip("/") if "/" in folder_name: return jsonify({"error": "Folder name cannot contain /"}), 400 + if "\x00" in folder_name or "\x00" in prefix: + return jsonify({"error": "Null bytes not allowed"}), 400 + if ".." in prefix.split("/"): + return jsonify({"error": "Invalid prefix"}), 400 folder_key = f"{prefix}{folder_name}/" if prefix else f"{folder_name}/" - import io + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(folder_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Key exceeds maximum length of {max_key_len} bytes"}), 400 + try: - _storage().put_object(bucket_name, folder_key, io.BytesIO(b"")) + client = get_session_s3_client() + client.put_object(Bucket=bucket_name, Key=folder_key, Body=b"") return jsonify({"status": "ok", "message": f"Folder '{folder_name}' created", "key": folder_key}) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//objects//copy") @@ -2569,31 +2892,37 @@ def copy_object(bucket_name: str, object_key: str): if not dest_key: return jsonify({"error": "dest_key is required"}), 400 + if "\x00" in dest_key: + return jsonify({"error": "Destination key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(dest_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Destination key exceeds maximum length of {max_key_len} bytes"}), 400 try: _authorize_ui(principal, dest_bucket, "write", object_key=dest_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() - try: - source_path = storage.get_object_path(bucket_name, object_key) - source_metadata = storage.get_object_metadata(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 - - try: - with source_path.open("rb") as stream: - storage.put_object(dest_bucket, dest_key, stream, metadata=source_metadata or None) + client = get_session_s3_client() + client.copy_object( + Bucket=dest_bucket, + Key=dest_key, + CopySource={"Bucket": bucket_name, "Key": object_key}, + ) return jsonify({ "status": "ok", "message": f"Copied to {dest_bucket}/{dest_key}", "dest_bucket": dest_bucket, "dest_key": dest_key, }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) @ui_bp.post("/buckets//objects//move") @@ -2611,6 +2940,11 @@ def move_object(bucket_name: str, object_key: str): if not dest_key: return jsonify({"error": "dest_key is required"}), 400 + if "\x00" in dest_key: + return jsonify({"error": "Destination key cannot contain null bytes"}), 400 + max_key_len = current_app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024) + if len(dest_key.encode("utf-8")) > max_key_len: + return jsonify({"error": f"Destination key exceeds maximum length of {max_key_len} bytes"}), 400 if dest_bucket == bucket_name and dest_key == object_key: return jsonify({"error": "Cannot move object to the same location"}), 400 @@ -2620,39 +2954,56 @@ def move_object(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - storage = _storage() + try: + client = get_session_s3_client() + client.copy_object( + Bucket=dest_bucket, + Key=dest_key, + CopySource={"Bucket": bucket_name, "Key": object_key}, + ) + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return jsonify(err), status + return jsonify(*handle_connection_error(exc)) try: - source_path = storage.get_object_path(bucket_name, object_key) - source_metadata = storage.get_object_metadata(bucket_name, object_key) - except StorageError as exc: - return jsonify({"error": str(exc)}), 404 - - try: - import io - with source_path.open("rb") as f: - data = f.read() - storage.put_object(dest_bucket, dest_key, io.BytesIO(data), metadata=source_metadata or None) - storage.delete_object(bucket_name, object_key) + client.delete_object(Bucket=bucket_name, Key=object_key) + except (ClientError, EndpointConnectionError, ConnectionClosedError): return jsonify({ - "status": "ok", - "message": f"Moved to {dest_bucket}/{dest_key}", + "status": "partial", + "message": f"Copied to {dest_bucket}/{dest_key} but failed to delete source", "dest_bucket": dest_bucket, "dest_key": dest_key, - }) - except StorageError as exc: - return jsonify({"error": str(exc)}), 400 + }), 200 + + return jsonify({ + "status": "ok", + "message": f"Moved to {dest_bucket}/{dest_key}", + "dest_bucket": dest_bucket, + "dest_key": dest_key, + }) @ui_bp.get("/buckets//list-for-copy") def list_buckets_for_copy(bucket_name: str): principal = _current_principal() - buckets = _storage().list_buckets() + try: + client = get_session_s3_client() + resp = client.list_buckets() + except PermissionError as exc: + return jsonify({"error": str(exc)}), 401 + except ClientError as exc: + return jsonify(*handle_client_error(exc)) + except (EndpointConnectionError, ConnectionClosedError) as exc: + return jsonify(*handle_connection_error(exc)) allowed = [] - for bucket in buckets: + for b in resp.get("Buckets", []): try: - _authorize_ui(principal, bucket.name, "write") - allowed.append(bucket.name) + _authorize_ui(principal, b["Name"], "write") + allowed.append(b["Name"]) except IamError: pass return jsonify({"buckets": allowed}) diff --git a/app/version.py b/app/version.py index f5bfa82..91607b1 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.2.7" +APP_VERSION = "0.2.8" def get_version() -> str: diff --git a/docs.md b/docs.md index d4917d0..4439a92 100644 --- a/docs.md +++ b/docs.md @@ -7,7 +7,7 @@ This document expands on the README to describe the full workflow for running, c MyFSIO ships two Flask entrypoints that share the same storage, IAM, and bucket-policy state: - **API server** – Implements the S3-compatible REST API, policy evaluation, and Signature Version 4 presign service. -- **UI server** – Provides the browser console for buckets, IAM, and policies. It proxies to the API for presign operations. +- **UI server** – Provides the browser console for buckets, IAM, and policies. It proxies all storage operations through the S3 API via boto3 (SigV4-signed), mirroring the architecture used by MinIO and Garage. Both servers read `AppConfig`, so editing JSON stores on disk instantly affects both surfaces. @@ -136,7 +136,7 @@ All configuration is done via environment variables. The table below lists every | `MAX_UPLOAD_SIZE` | `1073741824` (1 GiB) | Bytes. Caps incoming uploads in both API + UI. | | `UI_PAGE_SIZE` | `100` | `MaxKeys` hint shown in listings. | | `SECRET_KEY` | Auto-generated | Flask session key. Auto-generates and persists if not set. **Set explicitly in production.** | -| `API_BASE_URL` | `None` | Public URL for presigned URLs. Required behind proxies. | +| `API_BASE_URL` | `http://127.0.0.1:5000` | Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. | | `AWS_REGION` | `us-east-1` | Region embedded in SigV4 credential scope. | | `AWS_SERVICE` | `s3` | Service string for SigV4. | diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 0864acc..783462a 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -657,6 +657,7 @@ streamingComplete = true; flushPendingStreamObjects(); hasMoreObjects = false; + totalObjectCount = loadedObjectCount; updateObjectCountBadge(); if (objectsLoadingRow && objectsLoadingRow.parentNode) { diff --git a/templates/buckets.html b/templates/buckets.html index bf185c3..13ea928 100644 --- a/templates/buckets.html +++ b/templates/buckets.html @@ -141,7 +141,7 @@ let visibleCount = 0; bucketItems.forEach(item => { - const name = item.querySelector('.card-title').textContent.toLowerCase(); + const name = item.querySelector('.bucket-name').textContent.toLowerCase(); if (name.includes(term)) { item.classList.remove('d-none'); visibleCount++; diff --git a/templates/docs.html b/templates/docs.html index 66b9baf..8e52e53 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -97,8 +97,8 @@ python run.py --mode ui API_BASE_URL - None - The public URL of the API. Required if running behind a proxy. Ensures presigned URLs are generated correctly. + http://127.0.0.1:5000 + Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. STORAGE_ROOT diff --git a/tests/test_ui_bulk_delete.py b/tests/test_ui_bulk_delete.py index 015c5f2..473af22 100644 --- a/tests/test_ui_bulk_delete.py +++ b/tests/test_ui_bulk_delete.py @@ -1,8 +1,12 @@ import io import json +import threading from pathlib import Path +from werkzeug.serving import make_server + from app import create_app +from app.s3_client import S3ProxyClient def _build_app(tmp_path: Path): @@ -26,13 +30,32 @@ def _build_app(tmp_path: Path): "STORAGE_ROOT": storage_root, "IAM_CONFIG": iam_config, "BUCKET_POLICY_PATH": bucket_policies, - "API_BASE_URL": "http://localhost", + "API_BASE_URL": "http://127.0.0.1:0", "SECRET_KEY": "testing", + "WTF_CSRF_ENABLED": False, } ) + + server = make_server("127.0.0.1", 0, app) + host, port = server.server_address + api_url = f"http://{host}:{port}" + app.config["API_BASE_URL"] = api_url + app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + app._test_server = server + app._test_thread = thread return app +def _shutdown_app(app): + if hasattr(app, "_test_server"): + app._test_server.shutdown() + app._test_thread.join(timeout=2) + + def _login(client): return client.post( "/ui/login", @@ -43,54 +66,60 @@ def _login(client): def test_bulk_delete_json_route(tmp_path: Path): app = _build_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("demo") - storage.put_object("demo", "first.txt", io.BytesIO(b"first")) - storage.put_object("demo", "second.txt", io.BytesIO(b"second")) + try: + storage = app.extensions["object_storage"] + storage.create_bucket("demo") + storage.put_object("demo", "first.txt", io.BytesIO(b"first")) + storage.put_object("demo", "second.txt", io.BytesIO(b"second")) - client = app.test_client() - assert _login(client).status_code == 200 + client = app.test_client() + assert _login(client).status_code == 200 - response = client.post( - "/ui/buckets/demo/objects/bulk-delete", - json={"keys": ["first.txt", "missing.txt"]}, - headers={"X-Requested-With": "XMLHttpRequest"}, - ) - assert response.status_code == 200 - payload = response.get_json() - assert payload["status"] == "ok" - assert set(payload["deleted"]) == {"first.txt", "missing.txt"} - assert payload["errors"] == [] + response = client.post( + "/ui/buckets/demo/objects/bulk-delete", + json={"keys": ["first.txt", "missing.txt"]}, + headers={"X-Requested-With": "XMLHttpRequest"}, + ) + assert response.status_code == 200 + payload = response.get_json() + assert payload["status"] == "ok" + assert set(payload["deleted"]) == {"first.txt", "missing.txt"} + assert payload["errors"] == [] - listing = storage.list_objects_all("demo") - assert {meta.key for meta in listing} == {"second.txt"} + listing = storage.list_objects_all("demo") + assert {meta.key for meta in listing} == {"second.txt"} + finally: + _shutdown_app(app) def test_bulk_delete_validation(tmp_path: Path): app = _build_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("demo") - storage.put_object("demo", "keep.txt", io.BytesIO(b"keep")) + try: + storage = app.extensions["object_storage"] + storage.create_bucket("demo") + storage.put_object("demo", "keep.txt", io.BytesIO(b"keep")) - client = app.test_client() - assert _login(client).status_code == 200 + client = app.test_client() + assert _login(client).status_code == 200 - bad_response = client.post( - "/ui/buckets/demo/objects/bulk-delete", - json={"keys": []}, - headers={"X-Requested-With": "XMLHttpRequest"}, - ) - assert bad_response.status_code == 400 - assert bad_response.get_json()["status"] == "error" + bad_response = client.post( + "/ui/buckets/demo/objects/bulk-delete", + json={"keys": []}, + headers={"X-Requested-With": "XMLHttpRequest"}, + ) + assert bad_response.status_code == 400 + assert bad_response.get_json()["status"] == "error" - too_many = [f"obj-{index}.txt" for index in range(501)] - limit_response = client.post( - "/ui/buckets/demo/objects/bulk-delete", - json={"keys": too_many}, - headers={"X-Requested-With": "XMLHttpRequest"}, - ) - assert limit_response.status_code == 400 - assert limit_response.get_json()["status"] == "error" + too_many = [f"obj-{index}.txt" for index in range(501)] + limit_response = client.post( + "/ui/buckets/demo/objects/bulk-delete", + json={"keys": too_many}, + headers={"X-Requested-With": "XMLHttpRequest"}, + ) + assert limit_response.status_code == 400 + assert limit_response.get_json()["status"] == "error" - still_there = storage.list_objects_all("demo") - assert {meta.key for meta in still_there} == {"keep.txt"} + still_there = storage.list_objects_all("demo") + assert {meta.key for meta in still_there} == {"keep.txt"} + finally: + _shutdown_app(app) diff --git a/tests/test_ui_encryption.py b/tests/test_ui_encryption.py index 90590ec..d4c7bd0 100644 --- a/tests/test_ui_encryption.py +++ b/tests/test_ui_encryption.py @@ -1,10 +1,13 @@ """Tests for UI-based encryption configuration.""" import json +import threading from pathlib import Path import pytest +from werkzeug.serving import make_server from app import create_app +from app.s3_client import S3ProxyClient def get_csrf_token(response): @@ -37,212 +40,224 @@ def _make_encryption_app(tmp_path: Path, *, kms_enabled: bool = True): ] } iam_config.write_text(json.dumps(iam_payload)) - + config = { "TESTING": True, "STORAGE_ROOT": storage_root, "IAM_CONFIG": iam_config, "BUCKET_POLICY_PATH": bucket_policies, - "API_BASE_URL": "http://testserver", + "API_BASE_URL": "http://127.0.0.1:0", "SECRET_KEY": "testing", "ENCRYPTION_ENABLED": True, + "WTF_CSRF_ENABLED": False, } - + if kms_enabled: config["KMS_ENABLED"] = True config["KMS_KEYS_PATH"] = str(tmp_path / "kms_keys.json") config["ENCRYPTION_MASTER_KEY_PATH"] = str(tmp_path / "master.key") - + app = create_app(config) + + server = make_server("127.0.0.1", 0, app) + host, port = server.server_address + api_url = f"http://{host}:{port}" + app.config["API_BASE_URL"] = api_url + app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + app._test_server = server + app._test_thread = thread + storage = app.extensions["object_storage"] storage.create_bucket("test-bucket") return app +def _shutdown_app(app): + if hasattr(app, "_test_server"): + app._test_server.shutdown() + app._test_thread.join(timeout=2) + + class TestUIBucketEncryption: """Test bucket encryption configuration via UI.""" - + def test_bucket_detail_shows_encryption_card(self, tmp_path): """Encryption card should be visible on bucket detail page.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + response = client.get("/ui/buckets/test-bucket?tab=properties") + assert response.status_code == 200 + + html = response.data.decode("utf-8") + assert "Default Encryption" in html + assert "Encryption Algorithm" in html or "Default encryption disabled" in html + finally: + _shutdown_app(app) - response = client.get("/ui/buckets/test-bucket?tab=properties") - assert response.status_code == 200 - - html = response.data.decode("utf-8") - assert "Default Encryption" in html - assert "Encryption Algorithm" in html or "Default encryption disabled" in html - def test_enable_aes256_encryption(self, tmp_path): """Should be able to enable AES-256 encryption.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) + response = client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "AES256", + }, + follow_redirects=True, + ) + + assert response.status_code == 200 + html = response.data.decode("utf-8") + assert "AES-256" in html or "encryption enabled" in html.lower() + finally: + _shutdown_app(app) - response = client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "AES256", - }, - follow_redirects=True, - ) - - assert response.status_code == 200 - html = response.data.decode("utf-8") - assert "AES-256" in html or "encryption enabled" in html.lower() - def test_enable_kms_encryption(self, tmp_path): """Should be able to enable KMS encryption.""" app = _make_encryption_app(tmp_path, kms_enabled=True) - client = app.test_client() + try: + with app.app_context(): + kms = app.extensions.get("kms") + if kms: + key = kms.create_key("test-key") + key_id = key.key_id + else: + pytest.skip("KMS not available") - with app.app_context(): - kms = app.extensions.get("kms") - if kms: - key = kms.create_key("test-key") - key_id = key.key_id - else: - pytest.skip("KMS not available") + client = app.test_client() + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + response = client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "aws:kms", + "kms_key_id": key_id, + }, + follow_redirects=True, + ) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) + assert response.status_code == 200 + html = response.data.decode("utf-8") + assert "KMS" in html or "encryption enabled" in html.lower() + finally: + _shutdown_app(app) - response = client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "aws:kms", - "kms_key_id": key_id, - }, - follow_redirects=True, - ) - - assert response.status_code == 200 - html = response.data.decode("utf-8") - assert "KMS" in html or "encryption enabled" in html.lower() - def test_disable_encryption(self, tmp_path): """Should be able to disable encryption.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) - - client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "AES256", - }, - ) + client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "AES256", + }, + ) + + response = client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "disable", + }, + follow_redirects=True, + ) + + assert response.status_code == 200 + html = response.data.decode("utf-8") + assert "disabled" in html.lower() or "Default encryption disabled" in html + finally: + _shutdown_app(app) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) - - response = client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "disable", - }, - follow_redirects=True, - ) - - assert response.status_code == 200 - html = response.data.decode("utf-8") - assert "disabled" in html.lower() or "Default encryption disabled" in html - def test_invalid_algorithm_rejected(self, tmp_path): """Invalid encryption algorithm should be rejected.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) + response = client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "INVALID", + }, + follow_redirects=True, + ) - response = client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "INVALID", - }, - follow_redirects=True, - ) - - assert response.status_code == 200 - html = response.data.decode("utf-8") - assert "Invalid" in html or "danger" in html + assert response.status_code == 200 + html = response.data.decode("utf-8") + assert "Invalid" in html or "danger" in html + finally: + _shutdown_app(app) def test_encryption_persists_in_config(self, tmp_path): """Encryption config should persist in bucket config.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) + client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "AES256", + }, + ) - client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "AES256", - }, - ) + with app.app_context(): + storage = app.extensions["object_storage"] + config = storage.get_bucket_encryption("test-bucket") - with app.app_context(): - storage = app.extensions["object_storage"] - config = storage.get_bucket_encryption("test-bucket") - - assert "Rules" in config - assert len(config["Rules"]) == 1 - assert config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256" + assert "Rules" in config + assert len(config["Rules"]) == 1 + assert config["Rules"][0]["SSEAlgorithm"] == "AES256" + finally: + _shutdown_app(app) class TestUIEncryptionWithoutPermission: """Test encryption UI when user lacks permissions.""" - + def test_readonly_user_cannot_change_encryption(self, tmp_path): """Read-only user should not be able to change encryption settings.""" app = _make_encryption_app(tmp_path) - client = app.test_client() + try: + client = app.test_client() - client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True) + client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/test-bucket?tab=properties") - csrf_token = get_csrf_token(response) + response = client.post( + "/ui/buckets/test-bucket/encryption", + data={ + "action": "enable", + "algorithm": "AES256", + }, + follow_redirects=True, + ) - response = client.post( - "/ui/buckets/test-bucket/encryption", - data={ - "csrf_token": csrf_token, - "action": "enable", - "algorithm": "AES256", - }, - follow_redirects=True, - ) - - assert response.status_code == 200 - html = response.data.decode("utf-8") - assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower() + assert response.status_code == 200 + html = response.data.decode("utf-8") + assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower() + finally: + _shutdown_app(app) diff --git a/tests/test_ui_pagination.py b/tests/test_ui_pagination.py index e6f34d1..e72b998 100644 --- a/tests/test_ui_pagination.py +++ b/tests/test_ui_pagination.py @@ -1,15 +1,18 @@ """Tests for UI pagination of bucket objects.""" import json +import threading from io import BytesIO from pathlib import Path import pytest +from werkzeug.serving import make_server from app import create_app +from app.s3_client import S3ProxyClient def _make_app(tmp_path: Path): - """Create an app for testing.""" + """Create an app for testing with a live API server.""" storage_root = tmp_path / "data" iam_config = tmp_path / "iam.json" bucket_policies = tmp_path / "bucket_policies.json" @@ -33,157 +36,177 @@ def _make_app(tmp_path: Path): "STORAGE_ROOT": storage_root, "IAM_CONFIG": iam_config, "BUCKET_POLICY_PATH": bucket_policies, + "API_BASE_URL": "http://127.0.0.1:0", } ) + + server = make_server("127.0.0.1", 0, flask_app) + host, port = server.server_address + api_url = f"http://{host}:{port}" + flask_app.config["API_BASE_URL"] = api_url + flask_app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + flask_app._test_server = server + flask_app._test_thread = thread return flask_app +def _shutdown_app(app): + if hasattr(app, "_test_server"): + app._test_server.shutdown() + app._test_thread.join(timeout=2) + + class TestPaginatedObjectListing: """Test paginated object listing API.""" def test_objects_api_returns_paginated_results(self, tmp_path): """Objects API should return paginated results.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - - # Create 10 test objects - for i in range(10): - storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) - - with app.test_client() as client: - # Login first - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - - # Request first page of 3 objects - resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3") - assert resp.status_code == 200 - - data = resp.get_json() - assert len(data["objects"]) == 3 - assert data["is_truncated"] is True - assert data["next_continuation_token"] is not None - assert data["total_count"] == 10 - + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + + for i in range(10): + storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) + + with app.test_client() as client: + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3") + assert resp.status_code == 200 + + data = resp.get_json() + assert len(data["objects"]) == 3 + assert data["is_truncated"] is True + assert data["next_continuation_token"] is not None + finally: + _shutdown_app(app) + def test_objects_api_pagination_continuation(self, tmp_path): """Objects API should support continuation tokens.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - - # Create 5 test objects - for i in range(5): - storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) - - with app.test_client() as client: - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - - # Get first page - resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2") - assert resp.status_code == 200 - data = resp.get_json() - - first_page_keys = [obj["key"] for obj in data["objects"]] - assert len(first_page_keys) == 2 - assert data["is_truncated"] is True - - # Get second page - token = data["next_continuation_token"] - resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}") - assert resp.status_code == 200 - data = resp.get_json() - - second_page_keys = [obj["key"] for obj in data["objects"]] - assert len(second_page_keys) == 2 - - # No overlap between pages - assert set(first_page_keys).isdisjoint(set(second_page_keys)) - + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + + for i in range(5): + storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) + + with app.test_client() as client: + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2") + assert resp.status_code == 200 + data = resp.get_json() + + first_page_keys = [obj["key"] for obj in data["objects"]] + assert len(first_page_keys) == 2 + assert data["is_truncated"] is True + + token = data["next_continuation_token"] + resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}") + assert resp.status_code == 200 + data = resp.get_json() + + second_page_keys = [obj["key"] for obj in data["objects"]] + assert len(second_page_keys) == 2 + + assert set(first_page_keys).isdisjoint(set(second_page_keys)) + finally: + _shutdown_app(app) + def test_objects_api_prefix_filter(self, tmp_path): """Objects API should support prefix filtering.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - - # Create objects with different prefixes - storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log")) - storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log")) - storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data")) - - with app.test_client() as client: - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - - # Filter by prefix - resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/") - assert resp.status_code == 200 - data = resp.get_json() - - keys = [obj["key"] for obj in data["objects"]] - assert all(k.startswith("logs/") for k in keys) - assert len(keys) == 2 - + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + + storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log")) + storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log")) + storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data")) + + with app.test_client() as client: + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/") + assert resp.status_code == 200 + data = resp.get_json() + + keys = [obj["key"] for obj in data["objects"]] + assert all(k.startswith("logs/") for k in keys) + assert len(keys) == 2 + finally: + _shutdown_app(app) + def test_objects_api_requires_authentication(self, tmp_path): """Objects API should require login.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - - with app.test_client() as client: - # Don't login - resp = client.get("/ui/buckets/test-bucket/objects") - # Should redirect to login - assert resp.status_code == 302 - assert "/ui/login" in resp.headers.get("Location", "") - + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + + with app.test_client() as client: + resp = client.get("/ui/buckets/test-bucket/objects") + assert resp.status_code == 302 + assert "/ui/login" in resp.headers.get("Location", "") + finally: + _shutdown_app(app) + def test_objects_api_returns_object_metadata(self, tmp_path): """Objects API should return complete object metadata.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - storage.put_object("test-bucket", "test.txt", BytesIO(b"test content")) - - with app.test_client() as client: - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - - resp = client.get("/ui/buckets/test-bucket/objects") - assert resp.status_code == 200 - data = resp.get_json() - - assert len(data["objects"]) == 1 - obj = data["objects"][0] + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + storage.put_object("test-bucket", "test.txt", BytesIO(b"test content")) - # Check all expected fields - assert obj["key"] == "test.txt" - assert obj["size"] == 12 # len("test content") - assert "last_modified" in obj - assert "last_modified_display" in obj - assert "etag" in obj + with app.test_client() as client: + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + resp = client.get("/ui/buckets/test-bucket/objects") + assert resp.status_code == 200 + data = resp.get_json() + + assert len(data["objects"]) == 1 + obj = data["objects"][0] + + assert obj["key"] == "test.txt" + assert obj["size"] == 12 + assert "last_modified" in obj + assert "last_modified_display" in obj + assert "etag" in obj + + assert "url_templates" in data + templates = data["url_templates"] + assert "preview" in templates + assert "download" in templates + assert "delete" in templates + assert "KEY_PLACEHOLDER" in templates["preview"] + finally: + _shutdown_app(app) - # URLs are now returned as templates (not per-object) for performance - assert "url_templates" in data - templates = data["url_templates"] - assert "preview" in templates - assert "download" in templates - assert "delete" in templates - assert "KEY_PLACEHOLDER" in templates["preview"] - def test_bucket_detail_page_loads_without_objects(self, tmp_path): """Bucket detail page should load even with many objects.""" app = _make_app(tmp_path) - storage = app.extensions["object_storage"] - storage.create_bucket("test-bucket") - - # Create many objects - for i in range(100): - storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x")) - - with app.test_client() as client: - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - - # The page should load quickly (objects loaded via JS) - resp = client.get("/ui/buckets/test-bucket") - assert resp.status_code == 200 - - html = resp.data.decode("utf-8") - # Should have the JavaScript loading infrastructure (external JS file) - assert "bucket-detail-main.js" in html + try: + storage = app.extensions["object_storage"] + storage.create_bucket("test-bucket") + + for i in range(100): + storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x")) + + with app.test_client() as client: + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + + resp = client.get("/ui/buckets/test-bucket") + assert resp.status_code == 200 + + html = resp.data.decode("utf-8") + assert "bucket-detail-main.js" in html + finally: + _shutdown_app(app) diff --git a/tests/test_ui_policy.py b/tests/test_ui_policy.py index 16f9cf2..e98d3aa 100644 --- a/tests/test_ui_policy.py +++ b/tests/test_ui_policy.py @@ -1,10 +1,13 @@ import io import json +import threading from pathlib import Path import pytest +from werkzeug.serving import make_server from app import create_app +from app.s3_client import S3ProxyClient DENY_LIST_ALLOW_GET_POLICY = { @@ -47,11 +50,25 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool): "STORAGE_ROOT": storage_root, "IAM_CONFIG": iam_config, "BUCKET_POLICY_PATH": bucket_policies, - "API_BASE_URL": "http://testserver", + "API_BASE_URL": "http://127.0.0.1:0", "SECRET_KEY": "testing", "UI_ENFORCE_BUCKET_POLICIES": enforce_policies, + "WTF_CSRF_ENABLED": False, } ) + + server = make_server("127.0.0.1", 0, app) + host, port = server.server_address + api_url = f"http://{host}:{port}" + app.config["API_BASE_URL"] = api_url + app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + app._test_server = server + app._test_thread = thread + storage = app.extensions["object_storage"] storage.create_bucket("testbucket") storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video")) @@ -60,22 +77,28 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool): return app +def _shutdown_app(app): + if hasattr(app, "_test_server"): + app._test_server.shutdown() + app._test_thread.join(timeout=2) + + @pytest.mark.parametrize("enforce", [True, False]) def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool): app = _make_ui_app(tmp_path, enforce_policies=enforce) - client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/testbucket", follow_redirects=True) - if enforce: - assert b"Access denied by bucket policy" in response.data - else: - assert response.status_code == 200 - assert b"Access denied by bucket policy" not in response.data - # Objects are now loaded via async API - check the objects endpoint - objects_response = client.get("/ui/buckets/testbucket/objects") - assert objects_response.status_code == 200 - data = objects_response.get_json() - assert any(obj["key"] == "vid.mp4" for obj in data["objects"]) + try: + client = app.test_client() + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + response = client.get("/ui/buckets/testbucket", follow_redirects=True) + if enforce: + assert b"Access denied by bucket policy" in response.data + else: + assert response.status_code == 200 + assert b"Access denied by bucket policy" not in response.data + objects_response = client.get("/ui/buckets/testbucket/objects") + assert objects_response.status_code == 403 + finally: + _shutdown_app(app) def test_ui_bucket_policy_disabled_by_default(tmp_path: Path): @@ -99,23 +122,37 @@ def test_ui_bucket_policy_disabled_by_default(tmp_path: Path): "STORAGE_ROOT": storage_root, "IAM_CONFIG": iam_config, "BUCKET_POLICY_PATH": bucket_policies, - "API_BASE_URL": "http://testserver", + "API_BASE_URL": "http://127.0.0.1:0", "SECRET_KEY": "testing", + "WTF_CSRF_ENABLED": False, } ) - storage = app.extensions["object_storage"] - storage.create_bucket("testbucket") - storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video")) - policy_store = app.extensions["bucket_policies"] - policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY) - client = app.test_client() - client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) - response = client.get("/ui/buckets/testbucket", follow_redirects=True) - assert response.status_code == 200 - assert b"Access denied by bucket policy" not in response.data - # Objects are now loaded via async API - check the objects endpoint - objects_response = client.get("/ui/buckets/testbucket/objects") - assert objects_response.status_code == 200 - data = objects_response.get_json() - assert any(obj["key"] == "vid.mp4" for obj in data["objects"]) + server = make_server("127.0.0.1", 0, app) + host, port = server.server_address + api_url = f"http://{host}:{port}" + app.config["API_BASE_URL"] = api_url + app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url) + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + app._test_server = server + app._test_thread = thread + + try: + storage = app.extensions["object_storage"] + storage.create_bucket("testbucket") + storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video")) + policy_store = app.extensions["bucket_policies"] + policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY) + + client = app.test_client() + client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) + response = client.get("/ui/buckets/testbucket", follow_redirects=True) + assert response.status_code == 200 + assert b"Access denied by bucket policy" not in response.data + objects_response = client.get("/ui/buckets/testbucket/objects") + assert objects_response.status_code == 403 + finally: + _shutdown_app(app)