Migrate UI backend from direct storage calls to S3 API proxy via boto3

This commit is contained in:
2026-02-09 22:33:47 +08:00
parent 4ecd32a554
commit 1e3c4b545f
13 changed files with 1445 additions and 693 deletions

View File

@@ -223,6 +223,13 @@ def create_app(
app.extensions["access_logging"] = access_logging_service app.extensions["access_logging"] = access_logging_service
app.extensions["site_registry"] = site_registry app.extensions["site_registry"] = site_registry
from .s3_client import S3ProxyClient
api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000"
app.extensions["s3_proxy"] = S3ProxyClient(
api_base_url=api_base,
region=app.config.get("AWS_REGION", "us-east-1"),
)
operation_metrics_collector = None operation_metrics_collector = None
if app.config.get("OPERATION_METRICS_ENABLED", False): if app.config.get("OPERATION_METRICS_ENABLED", False):
operation_metrics_collector = OperationMetricsCollector( operation_metrics_collector = OperationMetricsCollector(

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import base64 import base64
import hashlib import hashlib
import hmac import hmac
import json
import logging import logging
import mimetypes import mimetypes
import re import re
@@ -2963,7 +2964,11 @@ def _bucket_policy_handler(bucket_name: str) -> Response:
store.delete_policy(bucket_name) store.delete_policy(bucket_name)
current_app.logger.info("Bucket policy removed", extra={"bucket": bucket_name}) current_app.logger.info("Bucket policy removed", extra={"bucket": bucket_name})
return Response(status=204) return Response(status=204)
payload = request.get_json(silent=True) raw_body = request.get_data(cache=False) or b""
try:
payload = json.loads(raw_body)
except (json.JSONDecodeError, ValueError):
return _error_response("MalformedPolicy", "Policy document must be JSON", 400)
if not payload: if not payload:
return _error_response("MalformedPolicy", "Policy document must be JSON", 400) return _error_response("MalformedPolicy", "Policy document must be JSON", 400)
try: try:

284
app/s3_client.py Normal file
View File

@@ -0,0 +1,284 @@
from __future__ import annotations
import json
import logging
import threading
import time
from typing import Any, Generator, Optional
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError
from flask import current_app, session
logger = logging.getLogger(__name__)
UI_PROXY_USER_AGENT = "MyFSIO-UIProxy/1.0"
_BOTO_ERROR_MAP = {
"NoSuchBucket": 404,
"NoSuchKey": 404,
"NoSuchUpload": 404,
"BucketAlreadyExists": 409,
"BucketAlreadyOwnedByYou": 409,
"BucketNotEmpty": 409,
"AccessDenied": 403,
"InvalidAccessKeyId": 403,
"SignatureDoesNotMatch": 403,
"InvalidBucketName": 400,
"InvalidArgument": 400,
"MalformedXML": 400,
"EntityTooLarge": 400,
"QuotaExceeded": 403,
}
_UPLOAD_REGISTRY_MAX_AGE = 86400
_UPLOAD_REGISTRY_CLEANUP_INTERVAL = 3600
class UploadRegistry:
def __init__(self) -> None:
self._entries: dict[str, tuple[str, str, float]] = {}
self._lock = threading.Lock()
self._last_cleanup = time.monotonic()
def register(self, upload_id: str, bucket_name: str, object_key: str) -> None:
with self._lock:
self._entries[upload_id] = (bucket_name, object_key, time.monotonic())
self._maybe_cleanup()
def get_key(self, upload_id: str, bucket_name: str) -> Optional[str]:
with self._lock:
entry = self._entries.get(upload_id)
if entry is None:
return None
stored_bucket, key, created_at = entry
if stored_bucket != bucket_name:
return None
if time.monotonic() - created_at > _UPLOAD_REGISTRY_MAX_AGE:
del self._entries[upload_id]
return None
return key
def remove(self, upload_id: str) -> None:
with self._lock:
self._entries.pop(upload_id, None)
def _maybe_cleanup(self) -> None:
now = time.monotonic()
if now - self._last_cleanup < _UPLOAD_REGISTRY_CLEANUP_INTERVAL:
return
self._last_cleanup = now
cutoff = now - _UPLOAD_REGISTRY_MAX_AGE
stale = [uid for uid, (_, _, ts) in self._entries.items() if ts < cutoff]
for uid in stale:
del self._entries[uid]
class S3ProxyClient:
def __init__(self, api_base_url: str, region: str = "us-east-1") -> None:
if not api_base_url:
raise ValueError("api_base_url is required for S3ProxyClient")
self._api_base_url = api_base_url.rstrip("/")
self._region = region
self.upload_registry = UploadRegistry()
@property
def api_base_url(self) -> str:
return self._api_base_url
def get_client(self, access_key: str, secret_key: str) -> Any:
if not access_key or not secret_key:
raise ValueError("Both access_key and secret_key are required")
config = Config(
user_agent_extra=UI_PROXY_USER_AGENT,
connect_timeout=5,
read_timeout=30,
retries={"max_attempts": 0},
signature_version="s3v4",
s3={"addressing_style": "path"},
request_checksum_calculation="when_required",
response_checksum_validation="when_required",
)
return boto3.client(
"s3",
endpoint_url=self._api_base_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=self._region,
config=config,
)
def _get_proxy() -> S3ProxyClient:
proxy = current_app.extensions.get("s3_proxy")
if proxy is None:
raise RuntimeError(
"S3 proxy not configured. Set API_BASE_URL or run both API and UI servers."
)
return proxy
def _get_session_creds() -> tuple[str, str]:
secret_store = current_app.extensions["secret_store"]
secret_store.purge_expired()
token = session.get("cred_token")
if not token:
raise PermissionError("Not authenticated")
creds = secret_store.peek(token)
if not creds:
raise PermissionError("Session expired")
access_key = creds.get("access_key", "")
secret_key = creds.get("secret_key", "")
if not access_key or not secret_key:
raise PermissionError("Invalid session credentials")
return access_key, secret_key
def get_session_s3_client() -> Any:
proxy = _get_proxy()
access_key, secret_key = _get_session_creds()
return proxy.get_client(access_key, secret_key)
def get_upload_registry() -> UploadRegistry:
return _get_proxy().upload_registry
def handle_client_error(exc: ClientError) -> tuple[dict[str, str], int]:
error_info = exc.response.get("Error", {})
code = error_info.get("Code", "InternalError")
message = error_info.get("Message") or "S3 operation failed"
http_status = _BOTO_ERROR_MAP.get(code)
if http_status is None:
http_status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 500)
return {"error": message}, http_status
def handle_connection_error(exc: Exception) -> tuple[dict[str, str], int]:
logger.error("S3 API connection failed: %s", exc)
return {"error": "S3 API server is unreachable. Ensure the API server is running."}, 502
def format_datetime_display(dt: Any, display_tz: str = "UTC") -> str:
from .ui import _format_datetime_display
return _format_datetime_display(dt, display_tz)
def format_datetime_iso(dt: Any, display_tz: str = "UTC") -> str:
from .ui import _format_datetime_iso
return _format_datetime_iso(dt, display_tz)
def build_url_templates(bucket_name: str) -> dict[str, str]:
from flask import url_for
preview_t = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
delete_t = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
presign_t = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
versions_t = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
restore_t = url_for(
"ui.restore_object_version",
bucket_name=bucket_name,
object_key="KEY_PLACEHOLDER",
version_id="VERSION_ID_PLACEHOLDER",
)
tags_t = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
copy_t = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
move_t = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
metadata_t = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
return {
"preview": preview_t,
"download": preview_t + "?download=1",
"presign": presign_t,
"delete": delete_t,
"versions": versions_t,
"restore": restore_t,
"tags": tags_t,
"copy": copy_t,
"move": move_t,
"metadata": metadata_t,
}
def translate_list_objects(
boto3_response: dict[str, Any],
url_templates: dict[str, str],
display_tz: str = "UTC",
versioning_enabled: bool = False,
) -> dict[str, Any]:
objects_data = []
for obj in boto3_response.get("Contents", []):
last_mod = obj["LastModified"]
objects_data.append({
"key": obj["Key"],
"size": obj["Size"],
"last_modified": last_mod.isoformat(),
"last_modified_display": format_datetime_display(last_mod, display_tz),
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
"etag": obj.get("ETag", "").strip('"'),
})
return {
"objects": objects_data,
"is_truncated": boto3_response.get("IsTruncated", False),
"next_continuation_token": boto3_response.get("NextContinuationToken"),
"total_count": boto3_response.get("KeyCount", len(objects_data)),
"versioning_enabled": versioning_enabled,
"url_templates": url_templates,
}
def get_versioning_via_s3(client: Any, bucket_name: str) -> bool:
try:
resp = client.get_bucket_versioning(Bucket=bucket_name)
return resp.get("Status") == "Enabled"
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "")
if code != "NoSuchBucket":
logger.warning("Failed to check versioning for %s: %s", bucket_name, code)
return False
def stream_objects_ndjson(
client: Any,
bucket_name: str,
prefix: Optional[str],
url_templates: dict[str, str],
display_tz: str = "UTC",
versioning_enabled: bool = False,
) -> Generator[str, None, None]:
meta_line = json.dumps({
"type": "meta",
"versioning_enabled": versioning_enabled,
"url_templates": url_templates,
}) + "\n"
yield meta_line
yield json.dumps({"type": "count", "total_count": 0}) + "\n"
kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": 1000}
if prefix:
kwargs["Prefix"] = prefix
try:
paginator = client.get_paginator("list_objects_v2")
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
last_mod = obj["LastModified"]
yield json.dumps({
"type": "object",
"key": obj["Key"],
"size": obj["Size"],
"last_modified": last_mod.isoformat(),
"last_modified_display": format_datetime_display(last_mod, display_tz),
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
"etag": obj.get("ETag", "").strip('"'),
}) + "\n"
except ClientError as exc:
error_msg = exc.response.get("Error", {}).get("Message", "S3 operation failed")
yield json.dumps({"type": "error", "error": error_msg}) + "\n"
return
except (EndpointConnectionError, ConnectionClosedError):
yield json.dumps({"type": "error", "error": "S3 API server is unreachable"}) + "\n"
return
yield json.dumps({"type": "done"}) + "\n"

1045
app/ui.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.2.7" APP_VERSION = "0.2.8"
def get_version() -> str: def get_version() -> str:

View File

@@ -7,7 +7,7 @@ This document expands on the README to describe the full workflow for running, c
MyFSIO ships two Flask entrypoints that share the same storage, IAM, and bucket-policy state: MyFSIO ships two Flask entrypoints that share the same storage, IAM, and bucket-policy state:
- **API server** Implements the S3-compatible REST API, policy evaluation, and Signature Version 4 presign service. - **API server** Implements the S3-compatible REST API, policy evaluation, and Signature Version 4 presign service.
- **UI server** Provides the browser console for buckets, IAM, and policies. It proxies to the API for presign operations. - **UI server** Provides the browser console for buckets, IAM, and policies. It proxies all storage operations through the S3 API via boto3 (SigV4-signed), mirroring the architecture used by MinIO and Garage.
Both servers read `AppConfig`, so editing JSON stores on disk instantly affects both surfaces. Both servers read `AppConfig`, so editing JSON stores on disk instantly affects both surfaces.
@@ -136,7 +136,7 @@ All configuration is done via environment variables. The table below lists every
| `MAX_UPLOAD_SIZE` | `1073741824` (1 GiB) | Bytes. Caps incoming uploads in both API + UI. | | `MAX_UPLOAD_SIZE` | `1073741824` (1 GiB) | Bytes. Caps incoming uploads in both API + UI. |
| `UI_PAGE_SIZE` | `100` | `MaxKeys` hint shown in listings. | | `UI_PAGE_SIZE` | `100` | `MaxKeys` hint shown in listings. |
| `SECRET_KEY` | Auto-generated | Flask session key. Auto-generates and persists if not set. **Set explicitly in production.** | | `SECRET_KEY` | Auto-generated | Flask session key. Auto-generates and persists if not set. **Set explicitly in production.** |
| `API_BASE_URL` | `None` | Public URL for presigned URLs. Required behind proxies. | | `API_BASE_URL` | `http://127.0.0.1:5000` | Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. |
| `AWS_REGION` | `us-east-1` | Region embedded in SigV4 credential scope. | | `AWS_REGION` | `us-east-1` | Region embedded in SigV4 credential scope. |
| `AWS_SERVICE` | `s3` | Service string for SigV4. | | `AWS_SERVICE` | `s3` | Service string for SigV4. |

View File

@@ -657,6 +657,7 @@
streamingComplete = true; streamingComplete = true;
flushPendingStreamObjects(); flushPendingStreamObjects();
hasMoreObjects = false; hasMoreObjects = false;
totalObjectCount = loadedObjectCount;
updateObjectCountBadge(); updateObjectCountBadge();
if (objectsLoadingRow && objectsLoadingRow.parentNode) { if (objectsLoadingRow && objectsLoadingRow.parentNode) {

View File

@@ -141,7 +141,7 @@
let visibleCount = 0; let visibleCount = 0;
bucketItems.forEach(item => { bucketItems.forEach(item => {
const name = item.querySelector('.card-title').textContent.toLowerCase(); const name = item.querySelector('.bucket-name').textContent.toLowerCase();
if (name.includes(term)) { if (name.includes(term)) {
item.classList.remove('d-none'); item.classList.remove('d-none');
visibleCount++; visibleCount++;

View File

@@ -97,8 +97,8 @@ python run.py --mode ui
<tbody> <tbody>
<tr> <tr>
<td><code>API_BASE_URL</code></td> <td><code>API_BASE_URL</code></td>
<td><code>None</code></td> <td><code>http://127.0.0.1:5000</code></td>
<td>The public URL of the API. <strong>Required</strong> if running behind a proxy. Ensures presigned URLs are generated correctly.</td> <td>Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy.</td>
</tr> </tr>
<tr> <tr>
<td><code>STORAGE_ROOT</code></td> <td><code>STORAGE_ROOT</code></td>

View File

@@ -1,8 +1,12 @@
import io import io
import json import json
import threading
from pathlib import Path from pathlib import Path
from werkzeug.serving import make_server
from app import create_app from app import create_app
from app.s3_client import S3ProxyClient
def _build_app(tmp_path: Path): def _build_app(tmp_path: Path):
@@ -26,13 +30,32 @@ def _build_app(tmp_path: Path):
"STORAGE_ROOT": storage_root, "STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config, "IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies, "BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://localhost", "API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing", "SECRET_KEY": "testing",
"WTF_CSRF_ENABLED": False,
} }
) )
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
return app return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
def _login(client): def _login(client):
return client.post( return client.post(
"/ui/login", "/ui/login",
@@ -43,54 +66,60 @@ def _login(client):
def test_bulk_delete_json_route(tmp_path: Path): def test_bulk_delete_json_route(tmp_path: Path):
app = _build_app(tmp_path) app = _build_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("demo") storage = app.extensions["object_storage"]
storage.put_object("demo", "first.txt", io.BytesIO(b"first")) storage.create_bucket("demo")
storage.put_object("demo", "second.txt", io.BytesIO(b"second")) storage.put_object("demo", "first.txt", io.BytesIO(b"first"))
storage.put_object("demo", "second.txt", io.BytesIO(b"second"))
client = app.test_client() client = app.test_client()
assert _login(client).status_code == 200 assert _login(client).status_code == 200
response = client.post( response = client.post(
"/ui/buckets/demo/objects/bulk-delete", "/ui/buckets/demo/objects/bulk-delete",
json={"keys": ["first.txt", "missing.txt"]}, json={"keys": ["first.txt", "missing.txt"]},
headers={"X-Requested-With": "XMLHttpRequest"}, headers={"X-Requested-With": "XMLHttpRequest"},
) )
assert response.status_code == 200 assert response.status_code == 200
payload = response.get_json() payload = response.get_json()
assert payload["status"] == "ok" assert payload["status"] == "ok"
assert set(payload["deleted"]) == {"first.txt", "missing.txt"} assert set(payload["deleted"]) == {"first.txt", "missing.txt"}
assert payload["errors"] == [] assert payload["errors"] == []
listing = storage.list_objects_all("demo") listing = storage.list_objects_all("demo")
assert {meta.key for meta in listing} == {"second.txt"} assert {meta.key for meta in listing} == {"second.txt"}
finally:
_shutdown_app(app)
def test_bulk_delete_validation(tmp_path: Path): def test_bulk_delete_validation(tmp_path: Path):
app = _build_app(tmp_path) app = _build_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("demo") storage = app.extensions["object_storage"]
storage.put_object("demo", "keep.txt", io.BytesIO(b"keep")) storage.create_bucket("demo")
storage.put_object("demo", "keep.txt", io.BytesIO(b"keep"))
client = app.test_client() client = app.test_client()
assert _login(client).status_code == 200 assert _login(client).status_code == 200
bad_response = client.post( bad_response = client.post(
"/ui/buckets/demo/objects/bulk-delete", "/ui/buckets/demo/objects/bulk-delete",
json={"keys": []}, json={"keys": []},
headers={"X-Requested-With": "XMLHttpRequest"}, headers={"X-Requested-With": "XMLHttpRequest"},
) )
assert bad_response.status_code == 400 assert bad_response.status_code == 400
assert bad_response.get_json()["status"] == "error" assert bad_response.get_json()["status"] == "error"
too_many = [f"obj-{index}.txt" for index in range(501)] too_many = [f"obj-{index}.txt" for index in range(501)]
limit_response = client.post( limit_response = client.post(
"/ui/buckets/demo/objects/bulk-delete", "/ui/buckets/demo/objects/bulk-delete",
json={"keys": too_many}, json={"keys": too_many},
headers={"X-Requested-With": "XMLHttpRequest"}, headers={"X-Requested-With": "XMLHttpRequest"},
) )
assert limit_response.status_code == 400 assert limit_response.status_code == 400
assert limit_response.get_json()["status"] == "error" assert limit_response.get_json()["status"] == "error"
still_there = storage.list_objects_all("demo") still_there = storage.list_objects_all("demo")
assert {meta.key for meta in still_there} == {"keep.txt"} assert {meta.key for meta in still_there} == {"keep.txt"}
finally:
_shutdown_app(app)

View File

@@ -1,10 +1,13 @@
"""Tests for UI-based encryption configuration.""" """Tests for UI-based encryption configuration."""
import json import json
import threading
from pathlib import Path from pathlib import Path
import pytest import pytest
from werkzeug.serving import make_server
from app import create_app from app import create_app
from app.s3_client import S3ProxyClient
def get_csrf_token(response): def get_csrf_token(response):
@@ -37,212 +40,224 @@ def _make_encryption_app(tmp_path: Path, *, kms_enabled: bool = True):
] ]
} }
iam_config.write_text(json.dumps(iam_payload)) iam_config.write_text(json.dumps(iam_payload))
config = { config = {
"TESTING": True, "TESTING": True,
"STORAGE_ROOT": storage_root, "STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config, "IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies, "BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver", "API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing", "SECRET_KEY": "testing",
"ENCRYPTION_ENABLED": True, "ENCRYPTION_ENABLED": True,
"WTF_CSRF_ENABLED": False,
} }
if kms_enabled: if kms_enabled:
config["KMS_ENABLED"] = True config["KMS_ENABLED"] = True
config["KMS_KEYS_PATH"] = str(tmp_path / "kms_keys.json") config["KMS_KEYS_PATH"] = str(tmp_path / "kms_keys.json")
config["ENCRYPTION_MASTER_KEY_PATH"] = str(tmp_path / "master.key") config["ENCRYPTION_MASTER_KEY_PATH"] = str(tmp_path / "master.key")
app = create_app(config) app = create_app(config)
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
storage = app.extensions["object_storage"] storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket") storage.create_bucket("test-bucket")
return app return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
class TestUIBucketEncryption: class TestUIBucketEncryption:
"""Test bucket encryption configuration via UI.""" """Test bucket encryption configuration via UI."""
def test_bucket_detail_shows_encryption_card(self, tmp_path): def test_bucket_detail_shows_encryption_card(self, tmp_path):
"""Encryption card should be visible on bucket detail page.""" """Encryption card should be visible on bucket detail page."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Default Encryption" in html
assert "Encryption Algorithm" in html or "Default encryption disabled" in html
finally:
_shutdown_app(app)
response = client.get("/ui/buckets/test-bucket?tab=properties")
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Default Encryption" in html
assert "Encryption Algorithm" in html or "Default encryption disabled" in html
def test_enable_aes256_encryption(self, tmp_path): def test_enable_aes256_encryption(self, tmp_path):
"""Should be able to enable AES-256 encryption.""" """Should be able to enable AES-256 encryption."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties") response = client.post(
csrf_token = get_csrf_token(response) "/ui/buckets/test-bucket/encryption",
data={
"action": "enable",
"algorithm": "AES256",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "AES-256" in html or "encryption enabled" in html.lower()
finally:
_shutdown_app(app)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "AES-256" in html or "encryption enabled" in html.lower()
def test_enable_kms_encryption(self, tmp_path): def test_enable_kms_encryption(self, tmp_path):
"""Should be able to enable KMS encryption.""" """Should be able to enable KMS encryption."""
app = _make_encryption_app(tmp_path, kms_enabled=True) app = _make_encryption_app(tmp_path, kms_enabled=True)
client = app.test_client() try:
with app.app_context():
kms = app.extensions.get("kms")
if kms:
key = kms.create_key("test-key")
key_id = key.key_id
else:
pytest.skip("KMS not available")
with app.app_context(): client = app.test_client()
kms = app.extensions.get("kms") client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
if kms:
key = kms.create_key("test-key")
key_id = key.key_id
else:
pytest.skip("KMS not available")
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"action": "enable",
"algorithm": "aws:kms",
"kms_key_id": key_id,
},
follow_redirects=True,
)
response = client.get("/ui/buckets/test-bucket?tab=properties") assert response.status_code == 200
csrf_token = get_csrf_token(response) html = response.data.decode("utf-8")
assert "KMS" in html or "encryption enabled" in html.lower()
finally:
_shutdown_app(app)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "aws:kms",
"kms_key_id": key_id,
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "KMS" in html or "encryption enabled" in html.lower()
def test_disable_encryption(self, tmp_path): def test_disable_encryption(self, tmp_path):
"""Should be able to disable encryption.""" """Should be able to disable encryption."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties") client.post(
csrf_token = get_csrf_token(response) "/ui/buckets/test-bucket/encryption",
data={
client.post( "action": "enable",
"/ui/buckets/test-bucket/encryption", "algorithm": "AES256",
data={ },
"csrf_token": csrf_token, )
"action": "enable",
"algorithm": "AES256", response = client.post(
}, "/ui/buckets/test-bucket/encryption",
) data={
"action": "disable",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "disabled" in html.lower() or "Default encryption disabled" in html
finally:
_shutdown_app(app)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "disable",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "disabled" in html.lower() or "Default encryption disabled" in html
def test_invalid_algorithm_rejected(self, tmp_path): def test_invalid_algorithm_rejected(self, tmp_path):
"""Invalid encryption algorithm should be rejected.""" """Invalid encryption algorithm should be rejected."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties") response = client.post(
csrf_token = get_csrf_token(response) "/ui/buckets/test-bucket/encryption",
data={
"action": "enable",
"algorithm": "INVALID",
},
follow_redirects=True,
)
response = client.post( assert response.status_code == 200
"/ui/buckets/test-bucket/encryption", html = response.data.decode("utf-8")
data={ assert "Invalid" in html or "danger" in html
"csrf_token": csrf_token, finally:
"action": "enable", _shutdown_app(app)
"algorithm": "INVALID",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Invalid" in html or "danger" in html
def test_encryption_persists_in_config(self, tmp_path): def test_encryption_persists_in_config(self, tmp_path):
"""Encryption config should persist in bucket config.""" """Encryption config should persist in bucket config."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties") client.post(
csrf_token = get_csrf_token(response) "/ui/buckets/test-bucket/encryption",
data={
"action": "enable",
"algorithm": "AES256",
},
)
client.post( with app.app_context():
"/ui/buckets/test-bucket/encryption", storage = app.extensions["object_storage"]
data={ config = storage.get_bucket_encryption("test-bucket")
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
)
with app.app_context(): assert "Rules" in config
storage = app.extensions["object_storage"] assert len(config["Rules"]) == 1
config = storage.get_bucket_encryption("test-bucket") assert config["Rules"][0]["SSEAlgorithm"] == "AES256"
finally:
assert "Rules" in config _shutdown_app(app)
assert len(config["Rules"]) == 1
assert config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256"
class TestUIEncryptionWithoutPermission: class TestUIEncryptionWithoutPermission:
"""Test encryption UI when user lacks permissions.""" """Test encryption UI when user lacks permissions."""
def test_readonly_user_cannot_change_encryption(self, tmp_path): def test_readonly_user_cannot_change_encryption(self, tmp_path):
"""Read-only user should not be able to change encryption settings.""" """Read-only user should not be able to change encryption settings."""
app = _make_encryption_app(tmp_path) app = _make_encryption_app(tmp_path)
client = app.test_client() try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties") response = client.post(
csrf_token = get_csrf_token(response) "/ui/buckets/test-bucket/encryption",
data={
"action": "enable",
"algorithm": "AES256",
},
follow_redirects=True,
)
response = client.post( assert response.status_code == 200
"/ui/buckets/test-bucket/encryption", html = response.data.decode("utf-8")
data={ assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower()
"csrf_token": csrf_token, finally:
"action": "enable", _shutdown_app(app)
"algorithm": "AES256",
},
follow_redirects=True,
)
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower()

View File

@@ -1,15 +1,18 @@
"""Tests for UI pagination of bucket objects.""" """Tests for UI pagination of bucket objects."""
import json import json
import threading
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
import pytest import pytest
from werkzeug.serving import make_server
from app import create_app from app import create_app
from app.s3_client import S3ProxyClient
def _make_app(tmp_path: Path): def _make_app(tmp_path: Path):
"""Create an app for testing.""" """Create an app for testing with a live API server."""
storage_root = tmp_path / "data" storage_root = tmp_path / "data"
iam_config = tmp_path / "iam.json" iam_config = tmp_path / "iam.json"
bucket_policies = tmp_path / "bucket_policies.json" bucket_policies = tmp_path / "bucket_policies.json"
@@ -33,157 +36,177 @@ def _make_app(tmp_path: Path):
"STORAGE_ROOT": storage_root, "STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config, "IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies, "BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://127.0.0.1:0",
} }
) )
server = make_server("127.0.0.1", 0, flask_app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
flask_app.config["API_BASE_URL"] = api_url
flask_app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
flask_app._test_server = server
flask_app._test_thread = thread
return flask_app return flask_app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
class TestPaginatedObjectListing: class TestPaginatedObjectListing:
"""Test paginated object listing API.""" """Test paginated object listing API."""
def test_objects_api_returns_paginated_results(self, tmp_path): def test_objects_api_returns_paginated_results(self, tmp_path):
"""Objects API should return paginated results.""" """Objects API should return paginated results."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create 10 test objects
for i in range(10): for i in range(10):
storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
with app.test_client() as client: with app.test_client() as client:
# Login first client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3")
# Request first page of 3 objects assert resp.status_code == 200
resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3")
assert resp.status_code == 200 data = resp.get_json()
assert len(data["objects"]) == 3
data = resp.get_json() assert data["is_truncated"] is True
assert len(data["objects"]) == 3 assert data["next_continuation_token"] is not None
assert data["is_truncated"] is True finally:
assert data["next_continuation_token"] is not None _shutdown_app(app)
assert data["total_count"] == 10
def test_objects_api_pagination_continuation(self, tmp_path): def test_objects_api_pagination_continuation(self, tmp_path):
"""Objects API should support continuation tokens.""" """Objects API should support continuation tokens."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create 5 test objects
for i in range(5): for i in range(5):
storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content")) storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
with app.test_client() as client: with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# Get first page resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2")
resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2") assert resp.status_code == 200
assert resp.status_code == 200 data = resp.get_json()
data = resp.get_json()
first_page_keys = [obj["key"] for obj in data["objects"]]
first_page_keys = [obj["key"] for obj in data["objects"]] assert len(first_page_keys) == 2
assert len(first_page_keys) == 2 assert data["is_truncated"] is True
assert data["is_truncated"] is True
token = data["next_continuation_token"]
# Get second page resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}")
token = data["next_continuation_token"] assert resp.status_code == 200
resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}") data = resp.get_json()
assert resp.status_code == 200
data = resp.get_json() second_page_keys = [obj["key"] for obj in data["objects"]]
assert len(second_page_keys) == 2
second_page_keys = [obj["key"] for obj in data["objects"]]
assert len(second_page_keys) == 2 assert set(first_page_keys).isdisjoint(set(second_page_keys))
finally:
# No overlap between pages _shutdown_app(app)
assert set(first_page_keys).isdisjoint(set(second_page_keys))
def test_objects_api_prefix_filter(self, tmp_path): def test_objects_api_prefix_filter(self, tmp_path):
"""Objects API should support prefix filtering.""" """Objects API should support prefix filtering."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create objects with different prefixes
storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log")) storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log"))
storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log")) storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log"))
storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data")) storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data"))
with app.test_client() as client: with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# Filter by prefix resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/")
resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/") assert resp.status_code == 200
assert resp.status_code == 200 data = resp.get_json()
data = resp.get_json()
keys = [obj["key"] for obj in data["objects"]]
keys = [obj["key"] for obj in data["objects"]] assert all(k.startswith("logs/") for k in keys)
assert all(k.startswith("logs/") for k in keys) assert len(keys) == 2
assert len(keys) == 2 finally:
_shutdown_app(app)
def test_objects_api_requires_authentication(self, tmp_path): def test_objects_api_requires_authentication(self, tmp_path):
"""Objects API should require login.""" """Objects API should require login."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
with app.test_client() as client:
# Don't login with app.test_client() as client:
resp = client.get("/ui/buckets/test-bucket/objects") resp = client.get("/ui/buckets/test-bucket/objects")
# Should redirect to login assert resp.status_code == 302
assert resp.status_code == 302 assert "/ui/login" in resp.headers.get("Location", "")
assert "/ui/login" in resp.headers.get("Location", "") finally:
_shutdown_app(app)
def test_objects_api_returns_object_metadata(self, tmp_path): def test_objects_api_returns_object_metadata(self, tmp_path):
"""Objects API should return complete object metadata.""" """Objects API should return complete object metadata."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.put_object("test-bucket", "test.txt", BytesIO(b"test content")) storage.create_bucket("test-bucket")
storage.put_object("test-bucket", "test.txt", BytesIO(b"test content"))
with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
resp = client.get("/ui/buckets/test-bucket/objects")
assert resp.status_code == 200
data = resp.get_json()
assert len(data["objects"]) == 1
obj = data["objects"][0]
# Check all expected fields with app.test_client() as client:
assert obj["key"] == "test.txt" client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
assert obj["size"] == 12 # len("test content")
assert "last_modified" in obj resp = client.get("/ui/buckets/test-bucket/objects")
assert "last_modified_display" in obj assert resp.status_code == 200
assert "etag" in obj data = resp.get_json()
assert len(data["objects"]) == 1
obj = data["objects"][0]
assert obj["key"] == "test.txt"
assert obj["size"] == 12
assert "last_modified" in obj
assert "last_modified_display" in obj
assert "etag" in obj
assert "url_templates" in data
templates = data["url_templates"]
assert "preview" in templates
assert "download" in templates
assert "delete" in templates
assert "KEY_PLACEHOLDER" in templates["preview"]
finally:
_shutdown_app(app)
# URLs are now returned as templates (not per-object) for performance
assert "url_templates" in data
templates = data["url_templates"]
assert "preview" in templates
assert "download" in templates
assert "delete" in templates
assert "KEY_PLACEHOLDER" in templates["preview"]
def test_bucket_detail_page_loads_without_objects(self, tmp_path): def test_bucket_detail_page_loads_without_objects(self, tmp_path):
"""Bucket detail page should load even with many objects.""" """Bucket detail page should load even with many objects."""
app = _make_app(tmp_path) app = _make_app(tmp_path)
storage = app.extensions["object_storage"] try:
storage.create_bucket("test-bucket") storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create many objects
for i in range(100): for i in range(100):
storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x")) storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x"))
with app.test_client() as client: with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# The page should load quickly (objects loaded via JS) resp = client.get("/ui/buckets/test-bucket")
resp = client.get("/ui/buckets/test-bucket") assert resp.status_code == 200
assert resp.status_code == 200
html = resp.data.decode("utf-8")
html = resp.data.decode("utf-8") assert "bucket-detail-main.js" in html
# Should have the JavaScript loading infrastructure (external JS file) finally:
assert "bucket-detail-main.js" in html _shutdown_app(app)

View File

@@ -1,10 +1,13 @@
import io import io
import json import json
import threading
from pathlib import Path from pathlib import Path
import pytest import pytest
from werkzeug.serving import make_server
from app import create_app from app import create_app
from app.s3_client import S3ProxyClient
DENY_LIST_ALLOW_GET_POLICY = { DENY_LIST_ALLOW_GET_POLICY = {
@@ -47,11 +50,25 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
"STORAGE_ROOT": storage_root, "STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config, "IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies, "BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver", "API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing", "SECRET_KEY": "testing",
"UI_ENFORCE_BUCKET_POLICIES": enforce_policies, "UI_ENFORCE_BUCKET_POLICIES": enforce_policies,
"WTF_CSRF_ENABLED": False,
} }
) )
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
storage = app.extensions["object_storage"] storage = app.extensions["object_storage"]
storage.create_bucket("testbucket") storage.create_bucket("testbucket")
storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video")) storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
@@ -60,22 +77,28 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
return app return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
@pytest.mark.parametrize("enforce", [True, False]) @pytest.mark.parametrize("enforce", [True, False])
def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool): def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool):
app = _make_ui_app(tmp_path, enforce_policies=enforce) app = _make_ui_app(tmp_path, enforce_policies=enforce)
client = app.test_client() try:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) client = app.test_client()
response = client.get("/ui/buckets/testbucket", follow_redirects=True) client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
if enforce: response = client.get("/ui/buckets/testbucket", follow_redirects=True)
assert b"Access denied by bucket policy" in response.data if enforce:
else: assert b"Access denied by bucket policy" in response.data
assert response.status_code == 200 else:
assert b"Access denied by bucket policy" not in response.data assert response.status_code == 200
# Objects are now loaded via async API - check the objects endpoint assert b"Access denied by bucket policy" not in response.data
objects_response = client.get("/ui/buckets/testbucket/objects") objects_response = client.get("/ui/buckets/testbucket/objects")
assert objects_response.status_code == 200 assert objects_response.status_code == 403
data = objects_response.get_json() finally:
assert any(obj["key"] == "vid.mp4" for obj in data["objects"]) _shutdown_app(app)
def test_ui_bucket_policy_disabled_by_default(tmp_path: Path): def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
@@ -99,23 +122,37 @@ def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
"STORAGE_ROOT": storage_root, "STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config, "IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies, "BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver", "API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing", "SECRET_KEY": "testing",
"WTF_CSRF_ENABLED": False,
} }
) )
storage = app.extensions["object_storage"]
storage.create_bucket("testbucket")
storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
policy_store = app.extensions["bucket_policies"]
policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY)
client = app.test_client() server = make_server("127.0.0.1", 0, app)
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True) host, port = server.server_address
response = client.get("/ui/buckets/testbucket", follow_redirects=True) api_url = f"http://{host}:{port}"
assert response.status_code == 200 app.config["API_BASE_URL"] = api_url
assert b"Access denied by bucket policy" not in response.data app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
# Objects are now loaded via async API - check the objects endpoint
objects_response = client.get("/ui/buckets/testbucket/objects") thread = threading.Thread(target=server.serve_forever, daemon=True)
assert objects_response.status_code == 200 thread.start()
data = objects_response.get_json()
assert any(obj["key"] == "vid.mp4" for obj in data["objects"]) app._test_server = server
app._test_thread = thread
try:
storage = app.extensions["object_storage"]
storage.create_bucket("testbucket")
storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
policy_store = app.extensions["bucket_policies"]
policy_store.set_policy("testbucket", DENY_LIST_ALLOW_GET_POLICY)
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/testbucket", follow_redirects=True)
assert response.status_code == 200
assert b"Access denied by bucket policy" not in response.data
objects_response = client.get("/ui/buckets/testbucket/objects")
assert objects_response.status_code == 403
finally:
_shutdown_app(app)