from __future__ import annotations import io import json import uuid import psutil import shutil from datetime import datetime, timezone as dt_timezone from pathlib import Path from typing import Any from urllib.parse import quote, urlparse from zoneinfo import ZoneInfo import boto3 import requests from botocore.exceptions import ClientError from flask import ( Blueprint, Response, current_app, flash, jsonify, redirect, render_template, request, send_file, session, url_for, ) from flask_wtf.csrf import generate_csrf from .acl import AclService, create_canned_acl, CANNED_ACLS from .bucket_policies import BucketPolicyStore from .connections import ConnectionStore, RemoteConnection from .extensions import limiter, csrf from .iam import IamError from .kms import KMSManager from .replication import ReplicationManager, ReplicationRule from .s3_api import _generate_presigned_url from .secret_store import EphemeralSecretStore from .storage import ObjectStorage, StorageError ui_bp = Blueprint("ui", __name__, template_folder="../templates", url_prefix="/ui") def _convert_to_display_tz(dt: datetime, display_tz: str | None = None) -> datetime: """Convert a datetime to the configured display timezone. Args: dt: The datetime to convert display_tz: Optional timezone string. If not provided, reads from current_app.config. """ if display_tz is None: display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") if display_tz and display_tz != "UTC": try: tz = ZoneInfo(display_tz) if dt.tzinfo is None: dt = dt.replace(tzinfo=dt_timezone.utc) dt = dt.astimezone(tz) except (KeyError, ValueError): pass return dt def _format_datetime_display(dt: datetime, display_tz: str | None = None) -> str: """Format a datetime for display using the configured timezone. Args: dt: The datetime to format display_tz: Optional timezone string. If not provided, reads from current_app.config. """ dt = _convert_to_display_tz(dt, display_tz) tz_abbr = dt.strftime("%Z") or "UTC" return f"{dt.strftime('%b %d, %Y %H:%M')} ({tz_abbr})" def _format_datetime_iso(dt: datetime, display_tz: str | None = None) -> str: """Format a datetime as ISO format using the configured timezone. Args: dt: The datetime to format display_tz: Optional timezone string. If not provided, reads from current_app.config. """ dt = _convert_to_display_tz(dt, display_tz) return dt.isoformat() def _storage() -> ObjectStorage: return current_app.extensions["object_storage"] def _replication_manager() -> ReplicationManager: return current_app.extensions["replication"] def _iam(): return current_app.extensions["iam"] def _kms() -> KMSManager | None: return current_app.extensions.get("kms") def _bucket_policies() -> BucketPolicyStore: store: BucketPolicyStore = current_app.extensions["bucket_policies"] store.maybe_reload() return store def _build_policy_context() -> dict[str, Any]: ctx: dict[str, Any] = {} if request.headers.get("Referer"): ctx["aws:Referer"] = request.headers.get("Referer") if request.access_route: ctx["aws:SourceIp"] = request.access_route[0] elif request.remote_addr: ctx["aws:SourceIp"] = request.remote_addr ctx["aws:SecureTransport"] = str(request.is_secure).lower() if request.headers.get("User-Agent"): ctx["aws:UserAgent"] = request.headers.get("User-Agent") return ctx def _connections() -> ConnectionStore: return current_app.extensions["connections"] def _replication() -> ReplicationManager: return current_app.extensions["replication"] def _secret_store() -> EphemeralSecretStore: store: EphemeralSecretStore = current_app.extensions["secret_store"] store.purge_expired() return store def _acl() -> AclService: return current_app.extensions["acl"] def _format_bytes(num: int) -> str: step = 1024 units = ["B", "KB", "MB", "GB", "TB", "PB"] value = float(num) for unit in units: if value < step or unit == units[-1]: if unit == "B": return f"{int(value)} B" return f"{value:.1f} {unit}" value /= step return f"{value:.1f} PB" _metrics_last_save_time: float = 0.0 def _get_metrics_history_path() -> Path: storage_root = Path(current_app.config["STORAGE_ROOT"]) return storage_root / ".myfsio.sys" / "config" / "metrics_history.json" def _load_metrics_history() -> dict: path = _get_metrics_history_path() if not path.exists(): return {"history": []} try: return json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {"history": []} def _save_metrics_snapshot(cpu_percent: float, memory_percent: float, disk_percent: float, storage_bytes: int) -> None: global _metrics_last_save_time if not current_app.config.get("METRICS_HISTORY_ENABLED", False): return import time from datetime import datetime, timezone interval_minutes = current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5) now_ts = time.time() if now_ts - _metrics_last_save_time < interval_minutes * 60: return path = _get_metrics_history_path() path.parent.mkdir(parents=True, exist_ok=True) data = _load_metrics_history() history = data.get("history", []) retention_hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24) now = datetime.now(timezone.utc) snapshot = { "timestamp": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "cpu_percent": round(cpu_percent, 2), "memory_percent": round(memory_percent, 2), "disk_percent": round(disk_percent, 2), "storage_bytes": storage_bytes, } history.append(snapshot) cutoff = now.timestamp() - (retention_hours * 3600) history = [ h for h in history if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff ] data["history"] = history try: path.write_text(json.dumps(data, indent=2), encoding="utf-8") _metrics_last_save_time = now_ts except OSError: pass def _friendly_error_message(exc: Exception) -> str: message = str(exc) or "An unexpected error occurred" if isinstance(exc, IamError): return f"Access issue: {message}" if isinstance(exc, StorageError): return f"Storage issue: {message}" return message def _wants_json() -> bool: return request.accept_mimetypes.best_match( ["application/json", "text/html"] ) == "application/json" def _policy_allows_public_read(policy: dict[str, Any]) -> bool: statements = policy.get("Statement", []) if isinstance(statements, dict): statements = [statements] list_allowed = False get_allowed = False for statement in statements: if not isinstance(statement, dict): continue if statement.get("Effect") != "Allow": continue if statement.get("Condition"): continue principal = statement.get("Principal") principal_all = principal == "*" or ( isinstance(principal, dict) and any(value == "*" or value == ["*"] for value in principal.values()) ) if not principal_all: continue actions = statement.get("Action", []) if isinstance(actions, str): actions = [actions] normalized = {action.lower() for action in actions} if not list_allowed: list_allowed = any(action in {"*", "s3:*", "s3:listbucket"} for action in normalized) if not get_allowed: get_allowed = any(action in {"*", "s3:*", "s3:getobject"} for action in normalized) if list_allowed and get_allowed: return True return False def _bucket_access_descriptor(policy: dict[str, Any] | None) -> tuple[str, str]: if not policy: return ("IAM only", "text-bg-secondary") if _policy_allows_public_read(policy): return ("Public read", "text-bg-warning") return ("Custom policy", "text-bg-info") def _current_principal(): creds = session.get("credentials") if not creds: return None try: return _iam().authenticate(creds["access_key"], creds["secret_key"]) except IamError: session.pop("credentials", None) return None def _authorize_ui(principal, bucket_name: str | None, action: str, *, object_key: str | None = None) -> None: iam_allowed = True iam_error: IamError | None = None try: _iam().authorize(principal, bucket_name, action) except IamError as exc: iam_allowed = False iam_error = exc decision = None enforce_bucket_policies = current_app.config.get("UI_ENFORCE_BUCKET_POLICIES", True) if bucket_name and enforce_bucket_policies: access_key = principal.access_key if principal else None policy_context = _build_policy_context() decision = _bucket_policies().evaluate(access_key, bucket_name, object_key, action, policy_context) if decision == "deny": raise IamError("Access denied by bucket policy") if not iam_allowed and decision != "allow": raise iam_error or IamError("Access denied") def _api_headers() -> dict[str, str]: creds = session.get("credentials") or {} return { "X-Access-Key": creds.get("access_key", ""), "X-Secret-Key": creds.get("secret_key", ""), } @ui_bp.app_context_processor def inject_nav_state() -> dict[str, Any]: principal = _current_principal() can_manage = False if principal: try: _iam().authorize(principal, None, "iam:list_users") can_manage = True except IamError: can_manage = False return { "principal": principal, "can_manage_iam": can_manage, "can_view_metrics": can_manage, "csrf_token": generate_csrf, } @ui_bp.before_request def ensure_authenticated(): exempt = {"ui.login"} if request.endpoint in exempt or request.endpoint is None: return None if _current_principal() is None: return redirect(url_for("ui.login")) return None @ui_bp.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() try: principal = _iam().authenticate(access_key, secret_key) except IamError as exc: flash(_friendly_error_message(exc), "danger") return render_template("login.html") session["credentials"] = {"access_key": access_key, "secret_key": secret_key} session.permanent = True flash(f"Welcome back, {principal.display_name}", "success") return redirect(url_for("ui.buckets_overview")) return render_template("login.html") @ui_bp.post("/logout") def logout(): session.pop("credentials", None) flash("Signed out", "info") return redirect(url_for("ui.login")) @ui_bp.get("/docs") def docs_page(): principal = _current_principal() api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" api_base = api_base.rstrip("/") parsed = urlparse(api_base) api_host = parsed.netloc or parsed.path or api_base return render_template( "docs.html", principal=principal, api_base=api_base, api_host=api_host, ) @ui_bp.get("/") def buckets_overview(): principal = _current_principal() buckets = _storage().list_buckets() allowed_names = set(_iam().buckets_for_principal(principal, [b.name for b in buckets])) visible_buckets = [] policy_store = _bucket_policies() for bucket in buckets: if bucket.name not in allowed_names: continue policy = policy_store.get_policy(bucket.name) cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) stats = _storage().bucket_stats(bucket.name, cache_ttl=cache_ttl) access_label, access_badge = _bucket_access_descriptor(policy) visible_buckets.append({ "meta": bucket, "summary": { "objects": stats["total_objects"], "total_bytes": stats["total_bytes"], "human_size": _format_bytes(stats["total_bytes"]), }, "access_label": access_label, "access_badge": access_badge, "has_policy": bool(policy), "detail_url": url_for("ui.bucket_detail", bucket_name=bucket.name), }) return render_template("buckets.html", buckets=visible_buckets, principal=principal) @ui_bp.get("/buckets") def buckets_redirect(): return redirect(url_for("ui.buckets_overview")) @ui_bp.post("/buckets") def create_bucket(): principal = _current_principal() bucket_name = request.form.get("bucket_name", "").strip() if not bucket_name: if _wants_json(): return jsonify({"error": "Bucket name is required"}), 400 flash("Bucket name is required", "danger") return redirect(url_for("ui.buckets_overview")) try: _authorize_ui(principal, bucket_name, "write") _storage().create_bucket(bucket_name) if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' created", "bucket_name": bucket_name}) flash(f"Bucket '{bucket_name}' created", "success") except (StorageError, FileExistsError, IamError) as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) @ui_bp.get("/buckets/") def bucket_detail(bucket_name: str): principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "list") if not storage.bucket_exists(bucket_name): raise StorageError("Bucket does not exist") except (StorageError, IamError) as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) bucket_policy = _bucket_policies().get_policy(bucket_name) policy_text = json.dumps(bucket_policy, indent=2) if bucket_policy else "" default_policy = json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "AllowList", "Effect": "Allow", "Principal": "*", "Action": ["s3:ListBucket"], "Resource": [f"arn:aws:s3:::{bucket_name}"], }, { "Sid": "AllowRead", "Effect": "Allow", "Principal": "*", "Action": ["s3:GetObject"], "Resource": [f"arn:aws:s3:::{bucket_name}/*"], }, ], }, indent=2, ) can_edit_policy = False if principal: try: _iam().authorize(principal, bucket_name, "policy") can_edit_policy = True except IamError: can_edit_policy = False can_manage_lifecycle = False if principal: try: _iam().authorize(principal, bucket_name, "lifecycle") can_manage_lifecycle = True except IamError: can_manage_lifecycle = False can_manage_cors = False if principal: try: _iam().authorize(principal, bucket_name, "cors") can_manage_cors = True except IamError: can_manage_cors = False try: versioning_enabled = storage.is_versioning_enabled(bucket_name) except StorageError: versioning_enabled = False can_manage_versioning = False if principal: try: _iam().authorize(principal, bucket_name, "write") can_manage_versioning = True except IamError: can_manage_versioning = False can_manage_replication = False if principal: try: _iam().authorize(principal, bucket_name, "replication") can_manage_replication = True except IamError: can_manage_replication = False is_replication_admin = False if principal: try: _iam().authorize(principal, None, "iam:list_users") is_replication_admin = True except IamError: is_replication_admin = False replication_rule = _replication().get_rule(bucket_name) connections = _connections().list() if (is_replication_admin or replication_rule) else [] encryption_config = storage.get_bucket_encryption(bucket_name) kms_manager = _kms() kms_keys = kms_manager.list_keys() if kms_manager else [] kms_enabled = current_app.config.get("KMS_ENABLED", False) encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False) lifecycle_enabled = current_app.config.get("LIFECYCLE_ENABLED", False) can_manage_encryption = can_manage_versioning bucket_quota = storage.get_bucket_quota(bucket_name) bucket_stats = storage.bucket_stats(bucket_name) can_manage_quota = False try: _iam().authorize(principal, None, "iam:list_users") can_manage_quota = True except IamError: pass objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name) lifecycle_url = url_for("ui.bucket_lifecycle", bucket_name=bucket_name) cors_url = url_for("ui.bucket_cors", bucket_name=bucket_name) acl_url = url_for("ui.bucket_acl", bucket_name=bucket_name) folders_url = url_for("ui.create_folder", bucket_name=bucket_name) buckets_for_copy_url = url_for("ui.list_buckets_for_copy", bucket_name=bucket_name) return render_template( "bucket_detail.html", bucket_name=bucket_name, objects_api_url=objects_api_url, objects_stream_url=objects_stream_url, lifecycle_url=lifecycle_url, cors_url=cors_url, acl_url=acl_url, folders_url=folders_url, buckets_for_copy_url=buckets_for_copy_url, principal=principal, bucket_policy_text=policy_text, bucket_policy=bucket_policy, can_edit_policy=can_edit_policy, can_manage_lifecycle=can_manage_lifecycle, can_manage_cors=can_manage_cors, can_manage_versioning=can_manage_versioning, can_manage_replication=can_manage_replication, can_manage_encryption=can_manage_encryption, is_replication_admin=is_replication_admin, default_policy=default_policy, versioning_enabled=versioning_enabled, replication_rule=replication_rule, connections=connections, encryption_config=encryption_config, kms_keys=kms_keys, kms_enabled=kms_enabled, encryption_enabled=encryption_enabled, lifecycle_enabled=lifecycle_enabled, bucket_quota=bucket_quota, bucket_stats=bucket_stats, can_manage_quota=can_manage_quota, ) @ui_bp.get("/buckets//objects") def list_bucket_objects(bucket_name: str): """API endpoint for paginated object listing.""" principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 max_keys = min(int(request.args.get("max_keys", 1000)), 100000) continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None try: result = storage.list_objects( bucket_name, max_keys=max_keys, continuation_token=continuation_token, prefix=prefix, ) except StorageError as exc: return jsonify({"error": str(exc)}), 400 try: versioning_enabled = storage.is_versioning_enabled(bucket_name) except StorageError: versioning_enabled = False preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") objects_data = [] for obj in result.objects: objects_data.append({ "key": obj.key, "size": obj.size, "last_modified": obj.last_modified.isoformat(), "last_modified_display": _format_datetime_display(obj.last_modified), "last_modified_iso": _format_datetime_iso(obj.last_modified), "etag": obj.etag, }) return jsonify({ "objects": objects_data, "is_truncated": result.is_truncated, "next_continuation_token": result.next_continuation_token, "total_count": result.total_count, "versioning_enabled": versioning_enabled, "url_templates": { "preview": preview_template, "download": preview_template + "?download=1", "presign": presign_template, "delete": delete_template, "versions": versions_template, "restore": restore_template, "tags": tags_template, "copy": copy_template, "move": move_template, "metadata": metadata_template, }, }) @ui_bp.get("/buckets//objects/stream") def stream_bucket_objects(bucket_name: str): """Streaming NDJSON endpoint for progressive object listing. Streams objects as newline-delimited JSON for fast progressive rendering. First line is metadata, subsequent lines are objects. """ principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 prefix = request.args.get("prefix") or None try: versioning_enabled = storage.is_versioning_enabled(bucket_name) except StorageError: versioning_enabled = False preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") metadata_template = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") def generate(): meta_line = json.dumps({ "type": "meta", "versioning_enabled": versioning_enabled, "url_templates": { "preview": preview_template, "download": preview_template + "?download=1", "presign": presign_template, "delete": delete_template, "versions": versions_template, "restore": restore_template, "tags": tags_template, "copy": copy_template, "move": move_template, "metadata": metadata_template, }, }) + "\n" yield meta_line continuation_token = None total_count = None batch_size = 5000 while True: try: result = storage.list_objects( bucket_name, max_keys=batch_size, continuation_token=continuation_token, prefix=prefix, ) except StorageError as exc: yield json.dumps({"type": "error", "error": str(exc)}) + "\n" return if total_count is None: total_count = result.total_count yield json.dumps({"type": "count", "total_count": total_count}) + "\n" for obj in result.objects: yield json.dumps({ "type": "object", "key": obj.key, "size": obj.size, "last_modified": obj.last_modified.isoformat(), "last_modified_display": _format_datetime_display(obj.last_modified, display_tz), "last_modified_iso": _format_datetime_iso(obj.last_modified, display_tz), "etag": obj.etag, }) + "\n" if not result.is_truncated: break continuation_token = result.next_continuation_token yield json.dumps({"type": "done"}) + "\n" return Response( generate(), mimetype='application/x-ndjson', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', } ) @ui_bp.post("/buckets//upload") @limiter.limit("30 per minute") def upload_object(bucket_name: str): principal = _current_principal() file = request.files.get("object") object_key = request.form.get("object_key") metadata_raw = (request.form.get("metadata") or "").strip() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" def _response(success: bool, message: str, status: int = 200): if wants_json: payload = {"status": "ok" if success else "error", "message": message} return jsonify(payload), status flash(message, "success" if success else "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="objects")) if file and not object_key: object_key = file.filename if not object_key: return _response(False, "Object key is required", 400) if not file: return _response(False, "Choose a file to upload", 400) metadata = None if metadata_raw: try: parsed = json.loads(metadata_raw) if not isinstance(parsed, dict): raise ValueError metadata = {str(k): str(v) for k, v in parsed.items()} except ValueError: return _response(False, "Metadata must be a JSON object", 400) try: _authorize_ui(principal, bucket_name, "write") _storage().put_object(bucket_name, object_key, file.stream, metadata=metadata) _replication().trigger_replication(bucket_name, object_key) message = f"Uploaded '{object_key}'" if metadata: message += " with metadata" return _response(True, message) except (StorageError, IamError) as exc: return _response(False, _friendly_error_message(exc), 400) @ui_bp.post("/buckets//multipart/initiate") def initiate_multipart_upload(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} object_key = str(payload.get("object_key", "")).strip() if not object_key: return jsonify({"error": "object_key is required"}), 400 metadata_payload = payload.get("metadata") metadata = None if metadata_payload is not None: if not isinstance(metadata_payload, dict): return jsonify({"error": "metadata must be an object"}), 400 metadata = {str(k): str(v) for k, v in metadata_payload.items()} try: upload_id = _storage().initiate_multipart_upload(bucket_name, object_key, metadata=metadata) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"upload_id": upload_id}) @ui_bp.put("/buckets//multipart//parts") @limiter.exempt @csrf.exempt def upload_multipart_part(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: part_number = int(request.args.get("partNumber", "0")) except ValueError: return jsonify({"error": "partNumber must be an integer"}), 400 if part_number < 1: return jsonify({"error": "partNumber must be >= 1"}), 400 try: data = request.get_data() if not data: return jsonify({"error": "Empty request body"}), 400 stream = io.BytesIO(data) etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"etag": etag, "part_number": part_number}) @ui_bp.post("/buckets//multipart//complete") def complete_multipart_upload(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} parts_payload = payload.get("parts") if not isinstance(parts_payload, list) or not parts_payload: return jsonify({"error": "parts array required"}), 400 normalized = [] for part in parts_payload: if not isinstance(part, dict): return jsonify({"error": "Each part must be an object"}), 400 raw_number = part.get("part_number") or part.get("PartNumber") try: number = int(raw_number) except (TypeError, ValueError): return jsonify({"error": "Each part must include part_number"}), 400 etag = str(part.get("etag") or part.get("ETag") or "").strip() normalized.append({"part_number": number, "etag": etag}) try: result = _storage().complete_multipart_upload(bucket_name, upload_id, normalized) _replication().trigger_replication(bucket_name, result.key) return jsonify({ "key": result.key, "size": result.size, "etag": result.etag, "last_modified": result.last_modified.isoformat() if result.last_modified else None, }) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.delete("/buckets//multipart/") def abort_multipart_upload(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: _storage().abort_multipart_upload(bucket_name, upload_id) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"status": "aborted"}) @ui_bp.post("/buckets//delete") @limiter.limit("20 per minute") def delete_bucket(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "delete") _storage().delete_bucket(bucket_name) _bucket_policies().delete_policy(bucket_name) _replication_manager().delete_rule(bucket_name) if _wants_json(): return jsonify({"success": True, "message": f"Bucket '{bucket_name}' removed"}) flash(f"Bucket '{bucket_name}' removed", "success") except (StorageError, IamError) as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) @ui_bp.post("/buckets//objects//delete") @limiter.limit("60 per minute") def delete_object(bucket_name: str, object_key: str): principal = _current_principal() purge_versions = request.form.get("purge_versions") == "1" try: _authorize_ui(principal, bucket_name, "delete", object_key=object_key) if purge_versions: _storage().purge_object(bucket_name, object_key) message = f"Permanently deleted '{object_key}' and all versions" else: _storage().delete_object(bucket_name, object_key) _replication_manager().trigger_replication(bucket_name, object_key, action="delete") message = f"Deleted '{object_key}'" if _wants_json(): return jsonify({"success": True, "message": message}) flash(message, "success") except (IamError, StorageError) as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @ui_bp.post("/buckets//objects/bulk-delete") @limiter.limit("40 per minute") def bulk_delete_objects(bucket_name: str): principal = _current_principal() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" or request.is_json payload = request.get_json(silent=True) or {} keys_payload = payload.get("keys") purge_versions = bool(payload.get("purge_versions")) def _respond(success: bool, message: str, *, deleted=None, errors=None, status_code: int = 200): if wants_json: body = { "status": "ok" if success else "partial", "message": message, "deleted": deleted or [], "errors": errors or [], } if not success and not errors: body["status"] = "error" return jsonify(body), status_code flash(message, "success" if success and not errors else "warning") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) if not isinstance(keys_payload, list): return _respond(False, "keys must be provided as a JSON array", status_code=400) cleaned: list[str] = [] for entry in keys_payload: if isinstance(entry, str): candidate = entry.strip() if candidate: cleaned.append(candidate) if not cleaned: return _respond(False, "Select at least one object to delete", status_code=400) MAX_KEYS = current_app.config.get("BULK_DELETE_MAX_KEYS", 500) if len(cleaned) > MAX_KEYS: return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: return _respond(False, _friendly_error_message(exc), status_code=403) deleted: list[str] = [] errors: list[dict[str, str]] = [] for key in unique_keys: try: if purge_versions: storage.purge_object(bucket_name, key) else: storage.delete_object(bucket_name, key) _replication_manager().trigger_replication(bucket_name, key, action="delete") deleted.append(key) except StorageError as exc: errors.append({"key": key, "error": str(exc)}) if not deleted and errors: return _respond(False, "Unable to delete the selected objects", deleted=deleted, errors=errors, status_code=400) message = f"Deleted {len(deleted)} object{'s' if len(deleted) != 1 else ''}" if purge_versions and deleted: message += " (including archived versions)" if errors: message += f"; {len(errors)} failed" return _respond(not errors, message, deleted=deleted, errors=errors) @ui_bp.post("/buckets//objects/bulk-download") @limiter.limit("10 per minute") def bulk_download_objects(bucket_name: str): import io import zipfile principal = _current_principal() payload = request.get_json(silent=True) or {} keys_payload = payload.get("keys") if not isinstance(keys_payload, list): return jsonify({"error": "keys must be provided as a JSON array"}), 400 cleaned: list[str] = [] for entry in keys_payload: if isinstance(entry, str): candidate = entry.strip() if candidate: cleaned.append(candidate) if not cleaned: return jsonify({"error": "Select at least one object to download"}), 400 MAX_KEYS = current_app.config.get("BULK_DELETE_MAX_KEYS", 500) if len(cleaned) > MAX_KEYS: return jsonify({"error": f"A maximum of {MAX_KEYS} objects can be downloaded per request"}), 400 unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() try: _authorize_ui(principal, bucket_name, "read") except IamError as exc: return jsonify({"error": str(exc)}), 403 buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for key in unique_keys: try: _authorize_ui(principal, bucket_name, "read", object_key=key) metadata = storage.get_object_metadata(bucket_name, key) is_encrypted = "x-amz-server-side-encryption" in metadata if is_encrypted and hasattr(storage, 'get_object_data'): data, _ = storage.get_object_data(bucket_name, key) zf.writestr(key, data) else: path = storage.get_object_path(bucket_name, key) zf.write(path, arcname=key) except (StorageError, IamError): continue buffer.seek(0) return send_file( buffer, as_attachment=True, download_name=f"{bucket_name}-download.zip", mimetype="application/zip" ) @ui_bp.post("/buckets//objects//purge") @limiter.limit("30 per minute") def purge_object_versions(bucket_name: str, object_key: str): principal = _current_principal() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" try: _authorize_ui(principal, bucket_name, "delete", object_key=object_key) _storage().purge_object(bucket_name, object_key) except IamError as exc: if wants_json: return jsonify({"error": str(exc)}), 403 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) except StorageError as exc: if wants_json: return jsonify({"error": str(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) message = f"Removed archived versions for '{object_key}'" if wants_json: return jsonify({"status": "ok", "message": message}) flash(message, "success") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @ui_bp.get("/buckets//objects//preview") def object_preview(bucket_name: str, object_key: str) -> Response: principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) path = storage.get_object_path(bucket_name, object_key) metadata = storage.get_object_metadata(bucket_name, object_key) except (StorageError, IamError) as exc: status = 403 if isinstance(exc, IamError) else 404 return Response(str(exc), status=status) download = request.args.get("download") == "1" is_encrypted = "x-amz-server-side-encryption" in metadata if is_encrypted and hasattr(storage, 'get_object_data'): try: data, _ = storage.get_object_data(bucket_name, object_key) import io import mimetypes mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" return send_file( io.BytesIO(data), mimetype=mimetype, as_attachment=download, download_name=path.name ) except StorageError as exc: return Response(f"Decryption failed: {exc}", status=500) return send_file(path, as_attachment=download, download_name=path.name) @ui_bp.post("/buckets//objects//presign") def object_presign(bucket_name: str, object_key: str): principal = _current_principal() payload = request.get_json(silent=True) or {} method = str(payload.get("method", "GET")).upper() allowed_methods = {"GET", "PUT", "DELETE"} if method not in allowed_methods: return jsonify({"error": "Method must be GET, PUT, or DELETE"}), 400 action = "read" if method == "GET" else ("delete" if method == "DELETE" else "write") try: _authorize_ui(principal, bucket_name, action, object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 try: expires = int(payload.get("expires_in", 900)) except (TypeError, ValueError): return jsonify({"error": "expires_in must be an integer"}), 400 expires = max(1, min(expires, 7 * 24 * 3600)) storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 if action != "write": try: storage.get_object_path(bucket_name, object_key) except StorageError: return jsonify({"error": "Object not found"}), 404 secret = _iam().secret_for_key(principal.access_key) api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" url = _generate_presigned_url( principal=principal, secret_key=secret, method=method, bucket_name=bucket_name, object_key=object_key, expires_in=expires, api_base_url=api_base, ) current_app.logger.info( "Presigned URL generated", extra={"bucket": bucket_name, "key": object_key, "method": method}, ) return jsonify({"url": url, "method": method, "expires_in": expires}) @ui_bp.get("/buckets//objects//metadata") def object_metadata(bucket_name: str, object_key: str): principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) metadata = storage.get_object_metadata(bucket_name, object_key) return jsonify({"metadata": metadata}) except IamError as exc: return jsonify({"error": str(exc)}), 403 except StorageError as exc: return jsonify({"error": str(exc)}), 404 @ui_bp.get("/buckets//objects//versions") def object_versions(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 try: versions = _storage().list_object_versions(bucket_name, object_key) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"versions": versions}) @ui_bp.get("/buckets//archived") def archived_objects(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: entries = _storage().list_orphaned_objects(bucket_name) except StorageError as exc: return jsonify({"error": str(exc)}), 400 payload: list[dict[str, Any]] = [] for entry in entries: latest = entry.get("latest") or {} restore_url = None if latest.get("version_id"): restore_url = url_for( "ui.restore_object_version", bucket_name=bucket_name, object_key=entry["key"], version_id=latest["version_id"], ) purge_url = url_for("ui.purge_object_versions", bucket_name=bucket_name, object_key=entry["key"]) payload.append( { "key": entry["key"], "versions": entry.get("versions", 0), "total_size": entry.get("total_size", 0), "latest": entry.get("latest"), "restore_url": restore_url, "purge_url": purge_url, } ) return jsonify({"objects": payload}) @ui_bp.post("/buckets//objects//versions//restore") def restore_object_version(bucket_name: str, object_key: str, version_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 try: meta = _storage().restore_object_version(bucket_name, object_key, version_id) except StorageError as exc: return jsonify({"error": str(exc)}), 400 message = f"Restored '{meta.key}'" if meta else "Object restored" return jsonify({"status": "ok", "message": message}) @ui_bp.post("/buckets//policy") @limiter.limit("10 per minute") def update_bucket_policy(bucket_name: str): principal = _current_principal() action = request.form.get("mode", "upsert") try: _authorize_ui(principal, bucket_name, "policy") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) store = _bucket_policies() if action == "delete": store.delete_policy(bucket_name) if _wants_json(): return jsonify({"success": True, "message": "Bucket policy removed"}) flash("Bucket policy removed", "info") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) document = request.form.get("policy_document", "").strip() if not document: if _wants_json(): return jsonify({"error": "Provide a JSON policy document"}), 400 flash("Provide a JSON policy document", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) try: payload = json.loads(document) store.set_policy(bucket_name, payload) if _wants_json(): return jsonify({"success": True, "message": "Bucket policy saved"}) flash("Bucket policy saved", "success") except (json.JSONDecodeError, ValueError) as exc: if _wants_json(): return jsonify({"error": f"Policy error: {exc}"}), 400 flash(f"Policy error: {exc}", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) @ui_bp.post("/buckets//versioning") def update_bucket_versioning(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 403 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) state = request.form.get("state", "enable") enable = state == "enable" try: _storage().set_bucket_versioning(bucket_name, enable) except StorageError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) message = "Versioning enabled" if enable else "Versioning suspended" if _wants_json(): return jsonify({"success": True, "message": message, "enabled": enable}) flash(message, "success") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @ui_bp.post("/buckets//quota") def update_bucket_quota(bucket_name: str): """Update bucket quota configuration (admin only).""" principal = _current_principal() is_admin = False try: _iam().authorize(principal, None, "iam:list_users") is_admin = True except IamError: pass if not is_admin: if _wants_json(): return jsonify({"error": "Only administrators can manage bucket quotas"}), 403 flash("Only administrators can manage bucket quotas", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) action = request.form.get("action", "set") if action == "remove": try: _storage().set_bucket_quota(bucket_name, max_bytes=None, max_objects=None) if _wants_json(): return jsonify({"success": True, "message": "Bucket quota removed"}) flash("Bucket quota removed", "info") except StorageError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) max_mb_str = request.form.get("max_mb", "").strip() max_objects_str = request.form.get("max_objects", "").strip() max_bytes = None max_objects = None if max_mb_str: try: max_mb = int(max_mb_str) if max_mb < 1: raise ValueError("Size must be at least 1 MB") max_bytes = max_mb * 1024 * 1024 except ValueError as exc: if _wants_json(): return jsonify({"error": f"Invalid size value: {exc}"}), 400 flash(f"Invalid size value: {exc}", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) if max_objects_str: try: max_objects = int(max_objects_str) if max_objects < 0: raise ValueError("Object count must be non-negative") except ValueError as exc: if _wants_json(): return jsonify({"error": f"Invalid object count: {exc}"}), 400 flash(f"Invalid object count: {exc}", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) try: _storage().set_bucket_quota(bucket_name, max_bytes=max_bytes, max_objects=max_objects) if max_bytes is None and max_objects is None: message = "Bucket quota removed" else: message = "Bucket quota updated" if _wants_json(): return jsonify({ "success": True, "message": message, "max_bytes": max_bytes, "max_objects": max_objects, "has_quota": max_bytes is not None or max_objects is not None }) flash(message, "success" if max_bytes or max_objects else "info") except StorageError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @ui_bp.post("/buckets//encryption") def update_bucket_encryption(bucket_name: str): """Update bucket default encryption configuration.""" principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 403 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) action = request.form.get("action", "enable") if action == "disable": try: _storage().set_bucket_encryption(bucket_name, None) if _wants_json(): return jsonify({"success": True, "message": "Default encryption disabled", "enabled": False}) flash("Default encryption disabled", "info") except StorageError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) algorithm = request.form.get("algorithm", "AES256") kms_key_id = request.form.get("kms_key_id", "").strip() or None if algorithm not in ("AES256", "aws:kms"): if _wants_json(): return jsonify({"error": "Invalid encryption algorithm"}), 400 flash("Invalid encryption algorithm", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) encryption_config: dict[str, Any] = { "Rules": [ { "ApplyServerSideEncryptionByDefault": { "SSEAlgorithm": algorithm, } } ] } if algorithm == "aws:kms" and kms_key_id: encryption_config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] = kms_key_id try: _storage().set_bucket_encryption(bucket_name, encryption_config) if algorithm == "aws:kms": message = "Default KMS encryption enabled" else: message = "Default AES-256 encryption enabled" if _wants_json(): return jsonify({"success": True, "message": message, "enabled": True, "algorithm": algorithm}) flash(message, "success") except StorageError as exc: if _wants_json(): return jsonify({"error": _friendly_error_message(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @ui_bp.get("/iam") def iam_dashboard(): principal = _current_principal() iam_service = _iam() secret_token = request.args.get("secret_token") disclosed_secret: dict[str, str] | None = None if secret_token: payload = _secret_store().pop(secret_token) if isinstance(payload, dict): access_key = str(payload.get("access_key", "")) secret_key = payload.get("secret_key") if secret_key: disclosed_secret = { "access_key": access_key, "secret_key": str(secret_key), "operation": str(payload.get("operation", "create")), } locked = False locked_reason = None try: iam_service.authorize(principal, None, "iam:list_users") except IamError as exc: locked = True locked_reason = str(exc) users = iam_service.list_users() if not locked else [] config_summary = iam_service.config_summary() config_document = json.dumps(iam_service.export_config(mask_secrets=True), indent=2) return render_template( "iam.html", users=users, principal=principal, iam_locked=locked, locked_reason=locked_reason, config_summary=config_summary, config_document=config_document, disclosed_secret=disclosed_secret, ) @ui_bp.post("/iam/users") def create_iam_user(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:create_user") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) display_name = request.form.get("display_name", "").strip() or "Unnamed" if len(display_name) > 64: if _wants_json(): return jsonify({"error": "Display name must be 64 characters or fewer"}), 400 flash("Display name must be 64 characters or fewer", "danger") return redirect(url_for("ui.iam_dashboard")) policies_text = request.form.get("policies", "").strip() policies = None if policies_text: try: policies = json.loads(policies_text) except json.JSONDecodeError as exc: if _wants_json(): return jsonify({"error": f"Invalid JSON: {exc}"}), 400 flash(f"Invalid JSON: {exc}", "danger") return redirect(url_for("ui.iam_dashboard")) try: created = _iam().create_user(display_name=display_name, policies=policies) except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) token = _secret_store().remember( { "access_key": created["access_key"], "secret_key": created["secret_key"], "operation": "create", } ) if _wants_json(): return jsonify({ "success": True, "message": f"Created user {created['access_key']}", "access_key": created["access_key"], "secret_key": created["secret_key"], "display_name": display_name, "policies": policies or [] }) flash(f"Created user {created['access_key']}. Copy the secret below.", "success") return redirect(url_for("ui.iam_dashboard", secret_token=token)) @ui_bp.post("/iam/users//rotate") def rotate_iam_secret(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:rotate_key") except IamError as exc: if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) try: new_secret = _iam().rotate_secret(access_key) if principal and principal.access_key == access_key: creds = session.get("credentials", {}) creds["secret_key"] = new_secret session["credentials"] = creds session.modified = True except IamError as exc: if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({ "access_key": access_key, "secret_key": new_secret, "message": f"Secret rotated for {access_key}", }) token = _secret_store().remember( { "access_key": access_key, "secret_key": new_secret, "operation": "rotate", } ) flash(f"Rotated secret for {access_key}. Copy the secret below.", "info") return redirect(url_for("ui.iam_dashboard", secret_token=token)) @ui_bp.post("/iam/users//update") def update_iam_user(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:create_user") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) display_name = request.form.get("display_name", "").strip() if display_name: if len(display_name) > 64: if _wants_json(): return jsonify({"error": "Display name must be 64 characters or fewer"}), 400 flash("Display name must be 64 characters or fewer", "danger") else: try: _iam().update_user(access_key, display_name) if _wants_json(): return jsonify({"success": True, "message": f"Updated user {access_key}", "display_name": display_name}) flash(f"Updated user {access_key}", "success") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/iam/users//delete") def delete_iam_user(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:delete_user") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) if access_key == principal.access_key: try: _iam().delete_user(access_key) session.pop("credentials", None) if _wants_json(): return jsonify({"success": True, "message": "Your account has been deleted", "redirect": url_for("ui.login")}) flash("Your account has been deleted.", "info") return redirect(url_for("ui.login")) except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) try: _iam().delete_user(access_key) if _wants_json(): return jsonify({"success": True, "message": f"Deleted user {access_key}"}) flash(f"Deleted user {access_key}", "success") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/iam/users//policies") def update_iam_policies(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:update_policy") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) policies_raw = request.form.get("policies", "").strip() if not policies_raw: policies = [] else: try: policies = json.loads(policies_raw) if not isinstance(policies, list): raise ValueError("Policies must be a list") except (ValueError, json.JSONDecodeError): if _wants_json(): return jsonify({"error": "Invalid JSON format for policies"}), 400 flash("Invalid JSON format for policies", "danger") return redirect(url_for("ui.iam_dashboard")) try: _iam().update_user_policies(access_key, policies) if _wants_json(): return jsonify({"success": True, "message": f"Updated policies for {access_key}", "policies": policies}) flash(f"Updated policies for {access_key}", "success") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/connections") def create_connection(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: if _wants_json(): return jsonify({"error": "Access denied"}), 403 flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) name = request.form.get("name", "").strip() endpoint = request.form.get("endpoint_url", "").strip() access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() region = request.form.get("region", "us-east-1").strip() if not all([name, endpoint, access_key, secret_key]): if _wants_json(): return jsonify({"error": "All fields are required"}), 400 flash("All fields are required", "danger") return redirect(url_for("ui.connections_dashboard")) conn = RemoteConnection( id=str(uuid.uuid4()), name=name, endpoint_url=endpoint, access_key=access_key, secret_key=secret_key, region=region ) _connections().add(conn) if _wants_json(): return jsonify({"success": True, "message": f"Connection '{name}' created", "connection_id": conn.id}) flash(f"Connection '{name}' created", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/connections/test") def test_connection(): from botocore.config import Config as BotoConfig from botocore.exceptions import ConnectTimeoutError, EndpointConnectionError, ReadTimeoutError principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"status": "error", "message": "Access denied"}), 403 data = request.get_json(silent=True) or request.form endpoint = data.get("endpoint_url", "").strip() access_key = data.get("access_key", "").strip() secret_key = data.get("secret_key", "").strip() region = data.get("region", "us-east-1").strip() if not all([endpoint, access_key, secret_key]): return jsonify({"status": "error", "message": "Missing credentials"}), 400 try: config = BotoConfig( connect_timeout=5, read_timeout=10, retries={'max_attempts': 1} ) s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region, config=config, ) s3.list_buckets() return jsonify({"status": "ok", "message": "Connection successful"}) except (ConnectTimeoutError, ReadTimeoutError): return jsonify({"status": "error", "message": f"Connection timed out - endpoint may be down or unreachable: {endpoint}"}), 400 except EndpointConnectionError: return jsonify({"status": "error", "message": f"Could not connect to endpoint: {endpoint}"}), 400 except ClientError as e: error_code = e.response.get('Error', {}).get('Code', 'Unknown') error_msg = e.response.get('Error', {}).get('Message', str(e)) return jsonify({"status": "error", "message": f"Connection failed ({error_code}): {error_msg}"}), 400 except Exception as e: return jsonify({"status": "error", "message": f"Connection failed: {str(e)}"}), 400 @ui_bp.post("/connections//update") def update_connection(connection_id: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: if _wants_json(): return jsonify({"error": "Access denied"}), 403 flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) conn = _connections().get(connection_id) if not conn: if _wants_json(): return jsonify({"error": "Connection not found"}), 404 flash("Connection not found", "danger") return redirect(url_for("ui.connections_dashboard")) name = request.form.get("name", "").strip() endpoint = request.form.get("endpoint_url", "").strip() access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() region = request.form.get("region", "us-east-1").strip() if not all([name, endpoint, access_key, secret_key]): if _wants_json(): return jsonify({"error": "All fields are required"}), 400 flash("All fields are required", "danger") return redirect(url_for("ui.connections_dashboard")) conn.name = name conn.endpoint_url = endpoint conn.access_key = access_key conn.secret_key = secret_key conn.region = region _connections().save() if _wants_json(): return jsonify({ "success": True, "message": f"Connection '{name}' updated", "connection": { "id": connection_id, "name": name, "endpoint_url": endpoint, "access_key": access_key, "region": region } }) flash(f"Connection '{name}' updated", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/connections//delete") def delete_connection(connection_id: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: if _wants_json(): return jsonify({"error": "Access denied"}), 403 flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) _connections().delete(connection_id) if _wants_json(): return jsonify({"success": True, "message": "Connection deleted"}) flash("Connection deleted", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/buckets//replication") def update_bucket_replication(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError as exc: if _wants_json(): return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) is_admin = False try: _iam().authorize(principal, None, "iam:list_users") is_admin = True except IamError: is_admin = False action = request.form.get("action") if action == "delete": if not is_admin: if _wants_json(): return jsonify({"error": "Only administrators can remove replication configuration"}), 403 flash("Only administrators can remove replication configuration", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) _replication().delete_rule(bucket_name) if _wants_json(): return jsonify({"success": True, "message": "Replication configuration removed", "action": "delete"}) flash("Replication configuration removed", "info") elif action == "pause": rule = _replication().get_rule(bucket_name) if rule: rule.enabled = False _replication().set_rule(rule) if _wants_json(): return jsonify({"success": True, "message": "Replication paused", "action": "pause", "enabled": False}) flash("Replication paused", "info") else: if _wants_json(): return jsonify({"error": "No replication configuration to pause"}), 404 flash("No replication configuration to pause", "warning") elif action == "resume": from .replication import REPLICATION_MODE_ALL rule = _replication().get_rule(bucket_name) if rule: rule.enabled = True _replication().set_rule(rule) if rule.mode == REPLICATION_MODE_ALL: _replication().replicate_existing_objects(bucket_name) message = "Replication resumed. Syncing pending objects in background." else: message = "Replication resumed" if _wants_json(): return jsonify({"success": True, "message": message, "action": "resume", "enabled": True}) flash(message, "success") else: if _wants_json(): return jsonify({"error": "No replication configuration to resume"}), 404 flash("No replication configuration to resume", "warning") elif action == "create": if not is_admin: if _wants_json(): return jsonify({"error": "Only administrators can configure replication settings"}), 403 flash("Only administrators can configure replication settings", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) from .replication import REPLICATION_MODE_NEW_ONLY, REPLICATION_MODE_ALL import time target_conn_id = request.form.get("target_connection_id") target_bucket = request.form.get("target_bucket", "").strip() replication_mode = request.form.get("replication_mode", REPLICATION_MODE_NEW_ONLY) if not target_conn_id or not target_bucket: if _wants_json(): return jsonify({"error": "Target connection and bucket are required"}), 400 flash("Target connection and bucket are required", "danger") else: rule = ReplicationRule( bucket_name=bucket_name, target_connection_id=target_conn_id, target_bucket=target_bucket, enabled=True, mode=replication_mode, created_at=time.time(), ) _replication().set_rule(rule) if replication_mode == REPLICATION_MODE_ALL: _replication().replicate_existing_objects(bucket_name) message = "Replication configured. Existing objects are being replicated in the background." else: message = "Replication configured. Only new uploads will be replicated." if _wants_json(): return jsonify({"success": True, "message": message, "action": "create", "enabled": True}) flash(message, "success") else: if _wants_json(): return jsonify({"error": "Invalid action"}), 400 flash("Invalid action", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) @ui_bp.get("/buckets//replication/status") def get_replication_status(bucket_name: str): """Async endpoint to fetch replication sync status without blocking page load.""" principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 rule = _replication().get_rule(bucket_name) if not rule: return jsonify({"error": "No replication rule"}), 404 connection = _connections().get(rule.target_connection_id) endpoint_healthy = False endpoint_error = None if connection: endpoint_healthy = _replication().check_endpoint_health(connection) if not endpoint_healthy: endpoint_error = f"Cannot reach endpoint: {connection.endpoint_url}" else: endpoint_error = "Target connection not found" stats = None if endpoint_healthy: stats = _replication().get_sync_status(bucket_name) if not stats: return jsonify({ "objects_synced": 0, "objects_pending": 0, "objects_orphaned": 0, "bytes_synced": 0, "last_sync_at": rule.stats.last_sync_at if rule.stats else None, "last_sync_key": rule.stats.last_sync_key if rule.stats else None, "endpoint_healthy": endpoint_healthy, "endpoint_error": endpoint_error, }) return jsonify({ "objects_synced": stats.objects_synced, "objects_pending": stats.objects_pending, "objects_orphaned": stats.objects_orphaned, "bytes_synced": stats.bytes_synced, "last_sync_at": stats.last_sync_at, "last_sync_key": stats.last_sync_key, "endpoint_healthy": endpoint_healthy, "endpoint_error": endpoint_error, }) @ui_bp.get("/buckets//replication/failures") def get_replication_failures(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 limit = request.args.get("limit", 50, type=int) offset = request.args.get("offset", 0, type=int) failures = _replication().get_failed_items(bucket_name, limit, offset) total = _replication().get_failure_count(bucket_name) return jsonify({ "failures": [f.to_dict() for f in failures], "total": total, "limit": limit, "offset": offset, }) @ui_bp.post("/buckets//replication/failures//retry") def retry_replication_failure(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 success = _replication().retry_failed_item(bucket_name, object_key) if success: return jsonify({"status": "submitted", "object_key": object_key}) return jsonify({"error": "Failed to submit retry"}), 400 @ui_bp.post("/buckets//replication/failures/retry-all") def retry_all_replication_failures(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 result = _replication().retry_all_failed(bucket_name) return jsonify({ "status": "submitted", "submitted": result["submitted"], "skipped": result["skipped"], }) @ui_bp.delete("/buckets//replication/failures/") def dismiss_replication_failure(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 success = _replication().dismiss_failure(bucket_name, object_key) if success: return jsonify({"status": "dismissed", "object_key": object_key}) return jsonify({"error": "Failure not found"}), 404 @ui_bp.delete("/buckets//replication/failures") def clear_replication_failures(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "replication") except IamError: return jsonify({"error": "Access denied"}), 403 _replication().clear_failures(bucket_name) return jsonify({"status": "cleared"}) @ui_bp.get("/connections//health") def check_connection_health(connection_id: str): """Check if a connection endpoint is reachable.""" principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"error": "Access denied"}), 403 conn = _connections().get(connection_id) if not conn: return jsonify({"healthy": False, "error": "Connection not found"}), 404 healthy = _replication().check_endpoint_health(conn) return jsonify({ "healthy": healthy, "error": None if healthy else f"Cannot reach endpoint: {conn.endpoint_url}" }) @ui_bp.get("/connections") def connections_dashboard(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) connections = _connections().list() return render_template("connections.html", connections=connections, principal=principal) @ui_bp.get("/metrics") def metrics_dashboard(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied: Metrics require admin permissions", "danger") return redirect(url_for("ui.buckets_overview")) from app.version import APP_VERSION import time cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() storage_root = current_app.config["STORAGE_ROOT"] disk = psutil.disk_usage(storage_root) storage = _storage() buckets = storage.list_buckets() total_buckets = len(buckets) total_objects = 0 total_bytes_used = 0 total_versions = 0 cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) for bucket in buckets: stats = storage.bucket_stats(bucket.name, cache_ttl=cache_ttl) total_objects += stats.get("total_objects", stats.get("objects", 0)) total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0)) total_versions += stats.get("version_count", 0) boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds / 86400) return render_template( "metrics.html", principal=principal, cpu_percent=round(cpu_percent, 2), memory={ "total": _format_bytes(memory.total), "available": _format_bytes(memory.available), "used": _format_bytes(memory.used), "percent": round(memory.percent, 2), }, disk={ "total": _format_bytes(disk.total), "free": _format_bytes(disk.free), "used": _format_bytes(disk.used), "percent": round(disk.percent, 2), }, app={ "buckets": total_buckets, "objects": total_objects, "versions": total_versions, "storage_used": _format_bytes(total_bytes_used), "storage_raw": total_bytes_used, "version": APP_VERSION, "uptime_days": uptime_days, }, metrics_history_enabled=current_app.config.get("METRICS_HISTORY_ENABLED", False), ) @ui_bp.route("/metrics/api") def metrics_api(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"error": "Access denied"}), 403 import time cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() storage_root = current_app.config["STORAGE_ROOT"] disk = psutil.disk_usage(storage_root) storage = _storage() buckets = storage.list_buckets() total_buckets = len(buckets) total_objects = 0 total_bytes_used = 0 total_versions = 0 cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) for bucket in buckets: stats = storage.bucket_stats(bucket.name, cache_ttl=cache_ttl) total_objects += stats.get("total_objects", stats.get("objects", 0)) total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0)) total_versions += stats.get("version_count", 0) boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds / 86400) _save_metrics_snapshot(cpu_percent, memory.percent, disk.percent, total_bytes_used) return jsonify({ "cpu_percent": round(cpu_percent, 2), "memory": { "total": _format_bytes(memory.total), "available": _format_bytes(memory.available), "used": _format_bytes(memory.used), "percent": round(memory.percent, 2), }, "disk": { "total": _format_bytes(disk.total), "free": _format_bytes(disk.free), "used": _format_bytes(disk.used), "percent": round(disk.percent, 2), }, "app": { "buckets": total_buckets, "objects": total_objects, "versions": total_versions, "storage_used": _format_bytes(total_bytes_used), "storage_raw": total_bytes_used, "uptime_days": uptime_days, } }) @ui_bp.route("/metrics/history") def metrics_history(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"error": "Access denied"}), 403 if not current_app.config.get("METRICS_HISTORY_ENABLED", False): return jsonify({"enabled": False, "history": []}) hours = request.args.get("hours", type=int) if hours is None: hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24) data = _load_metrics_history() history = data.get("history", []) if hours: from datetime import datetime, timezone cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600) history = [ h for h in history if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff ] return jsonify({ "enabled": True, "retention_hours": current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24), "interval_minutes": current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5), "history": history, }) @ui_bp.route("/metrics/settings", methods=["GET", "PUT"]) def metrics_settings(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"error": "Access denied"}), 403 if request.method == "GET": return jsonify({ "enabled": current_app.config.get("METRICS_HISTORY_ENABLED", False), "retention_hours": current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24), "interval_minutes": current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5), }) data = request.get_json() or {} if "enabled" in data: current_app.config["METRICS_HISTORY_ENABLED"] = bool(data["enabled"]) if "retention_hours" in data: current_app.config["METRICS_HISTORY_RETENTION_HOURS"] = max(1, int(data["retention_hours"])) if "interval_minutes" in data: current_app.config["METRICS_HISTORY_INTERVAL_MINUTES"] = max(1, int(data["interval_minutes"])) return jsonify({ "enabled": current_app.config.get("METRICS_HISTORY_ENABLED", False), "retention_hours": current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24), "interval_minutes": current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5), }) @ui_bp.route("/buckets//lifecycle", methods=["GET", "POST", "DELETE"]) def bucket_lifecycle(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "lifecycle") except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 if request.method == "GET": rules = storage.get_bucket_lifecycle(bucket_name) or [] return jsonify({"rules": rules}) if request.method == "DELETE": storage.set_bucket_lifecycle(bucket_name, None) return jsonify({"status": "ok", "message": "Lifecycle configuration deleted"}) payload = request.get_json(silent=True) or {} rules = payload.get("rules", []) if not isinstance(rules, list): return jsonify({"error": "rules must be a list"}), 400 validated_rules = [] for i, rule in enumerate(rules): if not isinstance(rule, dict): return jsonify({"error": f"Rule {i} must be an object"}), 400 validated = { "ID": str(rule.get("ID", f"rule-{i+1}")), "Status": "Enabled" if rule.get("Status", "Enabled") == "Enabled" else "Disabled", } if rule.get("Prefix"): validated["Prefix"] = str(rule["Prefix"]) if rule.get("Expiration"): exp = rule["Expiration"] if isinstance(exp, dict) and exp.get("Days"): validated["Expiration"] = {"Days": int(exp["Days"])} if rule.get("NoncurrentVersionExpiration"): nve = rule["NoncurrentVersionExpiration"] if isinstance(nve, dict) and nve.get("NoncurrentDays"): validated["NoncurrentVersionExpiration"] = {"NoncurrentDays": int(nve["NoncurrentDays"])} if rule.get("AbortIncompleteMultipartUpload"): aimu = rule["AbortIncompleteMultipartUpload"] if isinstance(aimu, dict) and aimu.get("DaysAfterInitiation"): validated["AbortIncompleteMultipartUpload"] = {"DaysAfterInitiation": int(aimu["DaysAfterInitiation"])} validated_rules.append(validated) storage.set_bucket_lifecycle(bucket_name, validated_rules if validated_rules else None) return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules}) @ui_bp.get("/buckets//lifecycle/history") def get_lifecycle_history(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "lifecycle") except IamError: return jsonify({"error": "Access denied"}), 403 limit = request.args.get("limit", 50, type=int) offset = request.args.get("offset", 0, type=int) lifecycle_manager = current_app.extensions.get("lifecycle") if not lifecycle_manager: return jsonify({ "executions": [], "total": 0, "limit": limit, "offset": offset, "enabled": False, }) records = lifecycle_manager.get_execution_history(bucket_name, limit, offset) return jsonify({ "executions": [r.to_dict() for r in records], "total": len(lifecycle_manager.get_execution_history(bucket_name, 1000, 0)), "limit": limit, "offset": offset, "enabled": True, }) @ui_bp.route("/buckets//cors", methods=["GET", "POST", "DELETE"]) def bucket_cors(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "cors") except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 if request.method == "GET": rules = storage.get_bucket_cors(bucket_name) or [] return jsonify({"rules": rules}) if request.method == "DELETE": storage.set_bucket_cors(bucket_name, None) return jsonify({"status": "ok", "message": "CORS configuration deleted"}) payload = request.get_json(silent=True) or {} rules = payload.get("rules", []) if not isinstance(rules, list): return jsonify({"error": "rules must be a list"}), 400 validated_rules = [] for i, rule in enumerate(rules): if not isinstance(rule, dict): return jsonify({"error": f"Rule {i} must be an object"}), 400 origins = rule.get("AllowedOrigins", []) methods = rule.get("AllowedMethods", []) if not origins or not methods: return jsonify({"error": f"Rule {i} must have AllowedOrigins and AllowedMethods"}), 400 validated = { "AllowedOrigins": [str(o) for o in origins if o], "AllowedMethods": [str(m).upper() for m in methods if m], } if rule.get("AllowedHeaders"): validated["AllowedHeaders"] = [str(h) for h in rule["AllowedHeaders"] if h] if rule.get("ExposeHeaders"): validated["ExposeHeaders"] = [str(h) for h in rule["ExposeHeaders"] if h] if rule.get("MaxAgeSeconds") is not None: try: validated["MaxAgeSeconds"] = int(rule["MaxAgeSeconds"]) except (ValueError, TypeError): pass validated_rules.append(validated) storage.set_bucket_cors(bucket_name, validated_rules if validated_rules else None) return jsonify({"status": "ok", "message": "CORS configuration saved", "rules": validated_rules}) @ui_bp.route("/buckets//acl", methods=["GET", "POST"]) def bucket_acl(bucket_name: str): principal = _current_principal() action = "read" if request.method == "GET" else "write" try: _authorize_ui(principal, bucket_name, action) except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() if not storage.bucket_exists(bucket_name): return jsonify({"error": "Bucket does not exist"}), 404 acl_service = _acl() owner_id = principal.access_key if principal else "anonymous" if request.method == "GET": try: acl = acl_service.get_bucket_acl(bucket_name) if not acl: acl = create_canned_acl("private", owner_id) return jsonify({ "owner": acl.owner, "grants": [g.to_dict() for g in acl.grants], "canned_acls": list(CANNED_ACLS.keys()), }) except Exception as exc: return jsonify({"error": str(exc)}), 500 payload = request.get_json(silent=True) or {} canned_acl = payload.get("canned_acl") if canned_acl: if canned_acl not in CANNED_ACLS: return jsonify({"error": f"Invalid canned ACL: {canned_acl}"}), 400 acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id) return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"}) return jsonify({"error": "canned_acl is required"}), 400 @ui_bp.route("/buckets//objects//tags", methods=["GET", "POST"]) def object_tags(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() if request.method == "GET": try: tags = storage.get_object_tags(bucket_name, object_key) return jsonify({"tags": tags}) except StorageError as exc: return jsonify({"error": str(exc)}), 404 try: _authorize_ui(principal, bucket_name, "write", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} tags = payload.get("tags", []) if not isinstance(tags, list): return jsonify({"error": "tags must be a list"}), 400 if len(tags) > 10: return jsonify({"error": "Maximum 10 tags allowed"}), 400 validated_tags = [] for tag in tags: if isinstance(tag, dict) and tag.get("Key"): validated_tags.append({ "Key": str(tag["Key"]), "Value": str(tag.get("Value", "")) }) try: storage.set_object_tags(bucket_name, object_key, validated_tags if validated_tags else None) return jsonify({"status": "ok", "message": "Tags saved", "tags": validated_tags}) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.post("/buckets//folders") def create_folder(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} folder_name = str(payload.get("folder_name", "")).strip() prefix = str(payload.get("prefix", "")).strip() if not folder_name: return jsonify({"error": "folder_name is required"}), 400 folder_name = folder_name.rstrip("/") if "/" in folder_name: return jsonify({"error": "Folder name cannot contain /"}), 400 folder_key = f"{prefix}{folder_name}/" if prefix else f"{folder_name}/" import io try: _storage().put_object(bucket_name, folder_key, io.BytesIO(b"")) return jsonify({"status": "ok", "message": f"Folder '{folder_name}' created", "key": folder_key}) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.post("/buckets//objects//copy") def copy_object(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip() dest_key = str(payload.get("dest_key", "")).strip() if not dest_key: return jsonify({"error": "dest_key is required"}), 400 try: _authorize_ui(principal, dest_bucket, "write", object_key=dest_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() try: source_path = storage.get_object_path(bucket_name, object_key) source_metadata = storage.get_object_metadata(bucket_name, object_key) except StorageError as exc: return jsonify({"error": str(exc)}), 404 try: with source_path.open("rb") as stream: storage.put_object(dest_bucket, dest_key, stream, metadata=source_metadata or None) return jsonify({ "status": "ok", "message": f"Copied to {dest_bucket}/{dest_key}", "dest_bucket": dest_bucket, "dest_key": dest_key, }) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.post("/buckets//objects//move") def move_object(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) _authorize_ui(principal, bucket_name, "delete", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip() dest_key = str(payload.get("dest_key", "")).strip() if not dest_key: return jsonify({"error": "dest_key is required"}), 400 if dest_bucket == bucket_name and dest_key == object_key: return jsonify({"error": "Cannot move object to the same location"}), 400 try: _authorize_ui(principal, dest_bucket, "write", object_key=dest_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 storage = _storage() try: source_path = storage.get_object_path(bucket_name, object_key) source_metadata = storage.get_object_metadata(bucket_name, object_key) except StorageError as exc: return jsonify({"error": str(exc)}), 404 try: import io with source_path.open("rb") as f: data = f.read() storage.put_object(dest_bucket, dest_key, io.BytesIO(data), metadata=source_metadata or None) storage.delete_object(bucket_name, object_key) return jsonify({ "status": "ok", "message": f"Moved to {dest_bucket}/{dest_key}", "dest_bucket": dest_bucket, "dest_key": dest_key, }) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.get("/buckets//list-for-copy") def list_buckets_for_copy(bucket_name: str): principal = _current_principal() buckets = _storage().list_buckets() allowed = [] for bucket in buckets: try: _authorize_ui(principal, bucket.name, "write") allowed.append(bucket.name) except IamError: pass return jsonify({"buckets": allowed}) @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" path = request.path or "" wants_html = request.accept_mimetypes.accept_html if wants_html and (not prefix or path.startswith(prefix)): return render_template("404.html"), 404 return error