"""Authenticated HTML UI for browsing buckets and objects.""" from __future__ import annotations import json import uuid import psutil import shutil from typing import Any from urllib.parse import urlparse import boto3 import requests from botocore.exceptions import ClientError from flask import ( Blueprint, Response, current_app, flash, jsonify, redirect, render_template, request, send_file, session, url_for, ) from flask_wtf.csrf import generate_csrf from .bucket_policies import BucketPolicyStore from .connections import ConnectionStore, RemoteConnection from .extensions import limiter from .iam import IamError from .replication import ReplicationManager, ReplicationRule from .secret_store import EphemeralSecretStore from .storage import ObjectStorage, StorageError ui_bp = Blueprint("ui", __name__, template_folder="../templates", url_prefix="/ui") def _storage() -> ObjectStorage: return current_app.extensions["object_storage"] def _replication_manager() -> ReplicationManager: return current_app.extensions["replication"] def _iam(): return current_app.extensions["iam"] def _bucket_policies() -> BucketPolicyStore: store: BucketPolicyStore = current_app.extensions["bucket_policies"] store.maybe_reload() return store def _connections() -> ConnectionStore: return current_app.extensions["connections"] def _replication() -> ReplicationManager: return current_app.extensions["replication"] def _secret_store() -> EphemeralSecretStore: store: EphemeralSecretStore = current_app.extensions["secret_store"] store.purge_expired() return store def _format_bytes(num: int) -> str: step = 1024 units = ["B", "KB", "MB", "GB", "TB", "PB"] value = float(num) for unit in units: if value < step or unit == units[-1]: if unit == "B": return f"{int(value)} B" return f"{value:.1f} {unit}" value /= step return f"{value:.1f} PB" def _friendly_error_message(exc: Exception) -> str: message = str(exc) or "An unexpected error occurred" if isinstance(exc, IamError): return f"Access issue: {message}" if isinstance(exc, StorageError): return f"Storage issue: {message}" return message def _policy_allows_public_read(policy: dict[str, Any]) -> bool: statements = policy.get("Statement", []) if isinstance(statements, dict): statements = [statements] list_allowed = False get_allowed = False for statement in statements: if not isinstance(statement, dict): continue if statement.get("Effect") != "Allow": continue if statement.get("Condition"): continue principal = statement.get("Principal") principal_all = principal == "*" or ( isinstance(principal, dict) and any(value == "*" or value == ["*"] for value in principal.values()) ) if not principal_all: continue actions = statement.get("Action", []) if isinstance(actions, str): actions = [actions] normalized = {action.lower() for action in actions} if not list_allowed: list_allowed = any(action in {"*", "s3:*", "s3:listbucket"} for action in normalized) if not get_allowed: get_allowed = any(action in {"*", "s3:*", "s3:getobject"} for action in normalized) if list_allowed and get_allowed: return True return False def _bucket_access_descriptor(policy: dict[str, Any] | None) -> tuple[str, str]: if not policy: return ("IAM only", "text-bg-secondary") if _policy_allows_public_read(policy): return ("Public read", "text-bg-warning") return ("Custom policy", "text-bg-info") def _current_principal(): creds = session.get("credentials") if not creds: return None try: return _iam().authenticate(creds["access_key"], creds["secret_key"]) except IamError: session.pop("credentials", None) return None def _authorize_ui(principal, bucket_name: str | None, action: str, *, object_key: str | None = None) -> None: iam_allowed = True iam_error: IamError | None = None try: _iam().authorize(principal, bucket_name, action) except IamError as exc: iam_allowed = False iam_error = exc decision = None enforce_bucket_policies = current_app.config.get("UI_ENFORCE_BUCKET_POLICIES", True) if bucket_name and enforce_bucket_policies: access_key = principal.access_key if principal else None decision = _bucket_policies().evaluate(access_key, bucket_name, object_key, action) if decision == "deny": raise IamError("Access denied by bucket policy") if not iam_allowed and decision != "allow": raise iam_error or IamError("Access denied") def _api_headers() -> dict[str, str]: creds = session.get("credentials") or {} return { "X-Access-Key": creds.get("access_key", ""), "X-Secret-Key": creds.get("secret_key", ""), } @ui_bp.app_context_processor def inject_nav_state() -> dict[str, Any]: principal = _current_principal() can_manage = False if principal: try: _iam().authorize(principal, None, "iam:list_users") can_manage = True except IamError: can_manage = False return { "principal": principal, "can_manage_iam": can_manage, "csrf_token": generate_csrf, } @ui_bp.before_request def ensure_authenticated(): exempt = {"ui.login"} if request.endpoint in exempt or request.endpoint is None: return None if _current_principal() is None: return redirect(url_for("ui.login")) return None @ui_bp.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() try: principal = _iam().authenticate(access_key, secret_key) except IamError as exc: flash(_friendly_error_message(exc), "danger") return render_template("login.html") session["credentials"] = {"access_key": access_key, "secret_key": secret_key} session.permanent = True flash(f"Welcome back, {principal.display_name}", "success") return redirect(url_for("ui.buckets_overview")) return render_template("login.html") @ui_bp.post("/logout") def logout(): session.pop("credentials", None) flash("Signed out", "info") return redirect(url_for("ui.login")) @ui_bp.get("/docs") def docs_page(): principal = _current_principal() api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" api_base = api_base.rstrip("/") parsed = urlparse(api_base) api_host = parsed.netloc or parsed.path or api_base return render_template( "docs.html", principal=principal, api_base=api_base, api_host=api_host, ) @ui_bp.get("/") def buckets_overview(): principal = _current_principal() buckets = _storage().list_buckets() allowed_names = set(_iam().buckets_for_principal(principal, [b.name for b in buckets])) visible_buckets = [] policy_store = _bucket_policies() for bucket in buckets: if bucket.name not in allowed_names: continue policy = policy_store.get_policy(bucket.name) stats = _storage().bucket_stats(bucket.name) access_label, access_badge = _bucket_access_descriptor(policy) visible_buckets.append({ "meta": bucket, "summary": { "objects": stats["objects"], "total_bytes": stats["bytes"], "human_size": _format_bytes(stats["bytes"]), }, "access_label": access_label, "access_badge": access_badge, "has_policy": bool(policy), "detail_url": url_for("ui.bucket_detail", bucket_name=bucket.name), }) return render_template("buckets.html", buckets=visible_buckets, principal=principal) @ui_bp.post("/buckets") def create_bucket(): principal = _current_principal() bucket_name = request.form.get("bucket_name", "").strip() if not bucket_name: flash("Bucket name is required", "danger") return redirect(url_for("ui.buckets_overview")) try: _authorize_ui(principal, bucket_name, "write") _storage().create_bucket(bucket_name) flash(f"Bucket '{bucket_name}' created", "success") except (StorageError, FileExistsError, IamError) as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) @ui_bp.get("/buckets/") def bucket_detail(bucket_name: str): principal = _current_principal() storage = _storage() try: _authorize_ui(principal, bucket_name, "list") objects = storage.list_objects(bucket_name) except (StorageError, IamError) as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) bucket_policy = _bucket_policies().get_policy(bucket_name) policy_text = json.dumps(bucket_policy, indent=2) if bucket_policy else "" default_policy = json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "AllowList", "Effect": "Allow", "Principal": "*", "Action": ["s3:ListBucket"], "Resource": [f"arn:aws:s3:::{bucket_name}"], }, { "Sid": "AllowRead", "Effect": "Allow", "Principal": "*", "Action": ["s3:GetObject"], "Resource": [f"arn:aws:s3:::{bucket_name}/*"], }, ], }, indent=2, ) can_edit_policy = False if principal: try: _iam().authorize(principal, bucket_name, "policy") can_edit_policy = True except IamError: can_edit_policy = False try: versioning_enabled = storage.is_versioning_enabled(bucket_name) except StorageError: versioning_enabled = False can_manage_versioning = False if principal: try: _iam().authorize(principal, bucket_name, "write") can_manage_versioning = True except IamError: can_manage_versioning = False # Replication info replication_rule = _replication().get_rule(bucket_name) connections = _connections().list() return render_template( "bucket_detail.html", bucket_name=bucket_name, objects=objects, principal=principal, bucket_policy_text=policy_text, bucket_policy=bucket_policy, can_edit_policy=can_edit_policy, can_manage_versioning=can_manage_versioning, default_policy=default_policy, versioning_enabled=versioning_enabled, replication_rule=replication_rule, connections=connections, ) @ui_bp.post("/buckets//upload") @limiter.limit("30 per minute") def upload_object(bucket_name: str): principal = _current_principal() file = request.files.get("object") object_key = request.form.get("object_key") metadata_raw = (request.form.get("metadata") or "").strip() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" def _response(success: bool, message: str, status: int = 200): if wants_json: payload = {"status": "ok" if success else "error", "message": message} return jsonify(payload), status flash(message, "success" if success else "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="objects")) if file and not object_key: object_key = file.filename if not object_key: return _response(False, "Object key is required", 400) if not file: return _response(False, "Choose a file to upload", 400) metadata = None if metadata_raw: try: parsed = json.loads(metadata_raw) if not isinstance(parsed, dict): raise ValueError metadata = {str(k): str(v) for k, v in parsed.items()} except ValueError: return _response(False, "Metadata must be a JSON object", 400) try: _authorize_ui(principal, bucket_name, "write") _storage().put_object(bucket_name, object_key, file.stream, metadata=metadata) # Trigger replication _replication().trigger_replication(bucket_name, object_key) message = f"Uploaded '{object_key}'" if metadata: message += " with metadata" return _response(True, message) except (StorageError, IamError) as exc: return _response(False, _friendly_error_message(exc), 400) @ui_bp.post("/buckets//multipart/initiate") def initiate_multipart_upload(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} object_key = str(payload.get("object_key", "")).strip() if not object_key: return jsonify({"error": "object_key is required"}), 400 metadata_payload = payload.get("metadata") metadata = None if metadata_payload is not None: if not isinstance(metadata_payload, dict): return jsonify({"error": "metadata must be an object"}), 400 metadata = {str(k): str(v) for k, v in metadata_payload.items()} try: upload_id = _storage().initiate_multipart_upload(bucket_name, object_key, metadata=metadata) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"upload_id": upload_id}) @ui_bp.put("/buckets//multipart//parts") def upload_multipart_part(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: part_number = int(request.args.get("partNumber", "0")) except ValueError: return jsonify({"error": "partNumber must be an integer"}), 400 if part_number < 1: return jsonify({"error": "partNumber must be >= 1"}), 400 try: etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, request.stream) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"etag": etag, "part_number": part_number}) @ui_bp.post("/buckets//multipart//complete") def complete_multipart_upload(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 payload = request.get_json(silent=True) or {} parts_payload = payload.get("parts") if not isinstance(parts_payload, list) or not parts_payload: return jsonify({"error": "parts array required"}), 400 normalized = [] for part in parts_payload: if not isinstance(part, dict): return jsonify({"error": "Each part must be an object"}), 400 raw_number = part.get("part_number") or part.get("PartNumber") try: number = int(raw_number) except (TypeError, ValueError): return jsonify({"error": "Each part must include part_number"}), 400 etag = str(part.get("etag") or part.get("ETag") or "").strip() normalized.append({"part_number": number, "etag": etag}) try: result = _storage().complete_multipart_upload(bucket_name, upload_id, normalized) _replication().trigger_replication(bucket_name, result["key"]) return jsonify(result) except StorageError as exc: return jsonify({"error": str(exc)}), 400 @ui_bp.delete("/buckets//multipart/") def abort_multipart_upload(bucket_name: str, upload_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: _storage().abort_multipart_upload(bucket_name, upload_id) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"status": "aborted"}) @ui_bp.post("/buckets//delete") @limiter.limit("20 per minute") def delete_bucket(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "delete") _storage().delete_bucket(bucket_name) _bucket_policies().delete_policy(bucket_name) _replication_manager().delete_rule(bucket_name) flash(f"Bucket '{bucket_name}' removed", "success") except (StorageError, IamError) as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.buckets_overview")) @ui_bp.post("/buckets//objects//delete") @limiter.limit("60 per minute") def delete_object(bucket_name: str, object_key: str): principal = _current_principal() purge_versions = request.form.get("purge_versions") == "1" try: _authorize_ui(principal, bucket_name, "delete", object_key=object_key) if purge_versions: _storage().purge_object(bucket_name, object_key) flash(f"Permanently deleted '{object_key}' and all versions", "success") else: _storage().delete_object(bucket_name, object_key) _replication_manager().trigger_replication(bucket_name, object_key, action="delete") flash(f"Deleted '{object_key}'", "success") except (IamError, StorageError) as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @ui_bp.post("/buckets//objects/bulk-delete") @limiter.limit("40 per minute") def bulk_delete_objects(bucket_name: str): principal = _current_principal() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" or request.is_json payload = request.get_json(silent=True) or {} keys_payload = payload.get("keys") purge_versions = bool(payload.get("purge_versions")) def _respond(success: bool, message: str, *, deleted=None, errors=None, status_code: int = 200): if wants_json: body = { "status": "ok" if success else "partial", "message": message, "deleted": deleted or [], "errors": errors or [], } if not success and not errors: body["status"] = "error" return jsonify(body), status_code flash(message, "success" if success and not errors else "warning") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) if not isinstance(keys_payload, list): return _respond(False, "keys must be provided as a JSON array", status_code=400) cleaned: list[str] = [] for entry in keys_payload: if isinstance(entry, str): candidate = entry.strip() if candidate: cleaned.append(candidate) if not cleaned: return _respond(False, "Select at least one object to delete", status_code=400) MAX_KEYS = current_app.config.get("BULK_DELETE_MAX_KEYS", 500) if len(cleaned) > MAX_KEYS: return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: return _respond(False, _friendly_error_message(exc), status_code=403) deleted: list[str] = [] errors: list[dict[str, str]] = [] for key in unique_keys: try: if purge_versions: storage.purge_object(bucket_name, key) else: storage.delete_object(bucket_name, key) _replication_manager().trigger_replication(bucket_name, key, action="delete") deleted.append(key) except StorageError as exc: errors.append({"key": key, "error": str(exc)}) if not deleted and errors: return _respond(False, "Unable to delete the selected objects", deleted=deleted, errors=errors, status_code=400) message = f"Deleted {len(deleted)} object{'s' if len(deleted) != 1 else ''}" if purge_versions and deleted: message += " (including archived versions)" if errors: message += f"; {len(errors)} failed" return _respond(not errors, message, deleted=deleted, errors=errors) @ui_bp.post("/buckets//objects/bulk-download") @limiter.limit("10 per minute") def bulk_download_objects(bucket_name: str): import io import zipfile principal = _current_principal() payload = request.get_json(silent=True) or {} keys_payload = payload.get("keys") if not isinstance(keys_payload, list): return jsonify({"error": "keys must be provided as a JSON array"}), 400 cleaned: list[str] = [] for entry in keys_payload: if isinstance(entry, str): candidate = entry.strip() if candidate: cleaned.append(candidate) if not cleaned: return jsonify({"error": "Select at least one object to download"}), 400 MAX_KEYS = current_app.config.get("BULK_DELETE_MAX_KEYS", 500) # Reuse same limit for now if len(cleaned) > MAX_KEYS: return jsonify({"error": f"A maximum of {MAX_KEYS} objects can be downloaded per request"}), 400 unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() # Check permissions for all keys first (or at least bucket read) # We'll check bucket read once, then object read for each if needed? # _authorize_ui checks bucket level if object_key is None, but we need to check each object if fine-grained policies exist. # For simplicity/performance, we check bucket list/read. try: _authorize_ui(principal, bucket_name, "read") except IamError as exc: return jsonify({"error": str(exc)}), 403 # Create ZIP buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for key in unique_keys: try: # Verify individual object permission if needed? # _authorize_ui(principal, bucket_name, "read", object_key=key) # This might be slow for many objects. Assuming bucket read is enough for now or we accept the overhead. # Let's skip individual check for bulk speed, assuming bucket read implies object read unless denied. # But strictly we should check. Let's check. _authorize_ui(principal, bucket_name, "read", object_key=key) path = storage.get_object_path(bucket_name, key) # Use the key as the filename in the zip zf.write(path, arcname=key) except (StorageError, IamError): # Skip files we can't read or don't exist continue buffer.seek(0) return send_file( buffer, as_attachment=True, download_name=f"{bucket_name}-download.zip", mimetype="application/zip" ) @ui_bp.post("/buckets//objects//purge") @limiter.limit("30 per minute") def purge_object_versions(bucket_name: str, object_key: str): principal = _current_principal() wants_json = request.headers.get("X-Requested-With") == "XMLHttpRequest" try: _authorize_ui(principal, bucket_name, "delete", object_key=object_key) _storage().purge_object(bucket_name, object_key) except IamError as exc: if wants_json: return jsonify({"error": str(exc)}), 403 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) except StorageError as exc: if wants_json: return jsonify({"error": str(exc)}), 400 flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) message = f"Removed archived versions for '{object_key}'" if wants_json: return jsonify({"status": "ok", "message": message}) flash(message, "success") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) @ui_bp.get("/buckets//objects//preview") def object_preview(bucket_name: str, object_key: str) -> Response: principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) path = _storage().get_object_path(bucket_name, object_key) except (StorageError, IamError) as exc: status = 403 if isinstance(exc, IamError) else 404 return Response(str(exc), status=status) download = request.args.get("download") == "1" return send_file(path, as_attachment=download, download_name=path.name) @ui_bp.post("/buckets//objects//presign") def object_presign(bucket_name: str, object_key: str): principal = _current_principal() payload = request.get_json(silent=True) or {} method = str(payload.get("method", "GET")).upper() action = "read" if method == "GET" else ("delete" if method == "DELETE" else "write") try: _authorize_ui(principal, bucket_name, action, object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 connection_url = "http://127.0.0.1:5000" url = f"{connection_url}/presign/{bucket_name}/{object_key}" headers = _api_headers() headers["X-Forwarded-Host"] = request.host headers["X-Forwarded-Proto"] = request.scheme headers["X-Forwarded-For"] = request.remote_addr or "127.0.0.1" try: response = requests.post(url, headers=headers, json=payload, timeout=5) except requests.RequestException as exc: return jsonify({"error": f"API unavailable: {exc}"}), 502 try: body = response.json() except ValueError: text = response.text or "" if text.strip().startswith("<"): import xml.etree.ElementTree as ET try: root = ET.fromstring(text) message = root.findtext(".//Message") or root.findtext(".//Code") or "Unknown S3 error" body = {"error": message} except ET.ParseError: body = {"error": text or "API returned an empty response"} else: body = {"error": text or "API returned an empty response"} return jsonify(body), response.status_code @ui_bp.get("/buckets//objects//versions") def object_versions(bucket_name: str, object_key: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "read", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 try: versions = _storage().list_object_versions(bucket_name, object_key) except StorageError as exc: return jsonify({"error": str(exc)}), 400 return jsonify({"versions": versions}) @ui_bp.get("/buckets//archived") def archived_objects(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "list") except IamError as exc: return jsonify({"error": str(exc)}), 403 try: entries = _storage().list_orphaned_objects(bucket_name) except StorageError as exc: return jsonify({"error": str(exc)}), 400 payload: list[dict[str, Any]] = [] for entry in entries: latest = entry.get("latest") or {} restore_url = None if latest.get("version_id"): restore_url = url_for( "ui.restore_object_version", bucket_name=bucket_name, object_key=entry["key"], version_id=latest["version_id"], ) purge_url = url_for("ui.purge_object_versions", bucket_name=bucket_name, object_key=entry["key"]) payload.append( { "key": entry["key"], "versions": entry.get("versions", 0), "total_size": entry.get("total_size", 0), "latest": entry.get("latest"), "restore_url": restore_url, "purge_url": purge_url, } ) return jsonify({"objects": payload}) @ui_bp.post("/buckets//objects//versions//restore") def restore_object_version(bucket_name: str, object_key: str, version_id: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write", object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 try: meta = _storage().restore_object_version(bucket_name, object_key, version_id) except StorageError as exc: return jsonify({"error": str(exc)}), 400 message = f"Restored '{meta.key}'" if meta else "Object restored" return jsonify({"status": "ok", "message": message}) @ui_bp.post("/buckets//policy") @limiter.limit("10 per minute") def update_bucket_policy(bucket_name: str): principal = _current_principal() action = request.form.get("mode", "upsert") try: _authorize_ui(principal, bucket_name, "policy") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name)) store = _bucket_policies() if action == "delete": store.delete_policy(bucket_name) flash("Bucket policy removed", "info") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) document = request.form.get("policy_document", "").strip() if not document: flash("Provide a JSON policy document", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) try: payload = json.loads(document) store.set_policy(bucket_name, payload) flash("Bucket policy saved", "success") except (json.JSONDecodeError, ValueError) as exc: flash(f"Policy error: {exc}", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="permissions")) @ui_bp.post("/buckets//versioning") def update_bucket_versioning(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) state = request.form.get("state", "enable") enable = state == "enable" try: _storage().set_bucket_versioning(bucket_name, enable) except StorageError as exc: flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) flash("Versioning enabled" if enable else "Versioning suspended", "success") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) @ui_bp.get("/iam") def iam_dashboard(): principal = _current_principal() iam_service = _iam() secret_token = request.args.get("secret_token") disclosed_secret: dict[str, str] | None = None if secret_token: payload = _secret_store().pop(secret_token) if isinstance(payload, dict): access_key = str(payload.get("access_key", "")) secret_key = payload.get("secret_key") if secret_key: disclosed_secret = { "access_key": access_key, "secret_key": str(secret_key), "operation": str(payload.get("operation", "create")), } locked = False locked_reason = None try: iam_service.authorize(principal, None, "iam:list_users") except IamError as exc: locked = True locked_reason = str(exc) users = iam_service.list_users() if not locked else [] config_summary = iam_service.config_summary() config_document = json.dumps(iam_service.export_config(mask_secrets=True), indent=2) return render_template( "iam.html", users=users, principal=principal, iam_locked=locked, locked_reason=locked_reason, config_summary=config_summary, config_document=config_document, disclosed_secret=disclosed_secret, ) @ui_bp.post("/iam/users") def create_iam_user(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:create_user") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) display_name = request.form.get("display_name", "").strip() or "Unnamed" if len(display_name) > 64: flash("Display name must be 64 characters or fewer", "danger") return redirect(url_for("ui.iam_dashboard")) policies_text = request.form.get("policies", "").strip() policies = None if policies_text: try: policies = json.loads(policies_text) except json.JSONDecodeError as exc: flash(f"Invalid JSON: {exc}", "danger") return redirect(url_for("ui.iam_dashboard")) try: created = _iam().create_user(display_name=display_name, policies=policies) except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) token = _secret_store().remember( { "access_key": created["access_key"], "secret_key": created["secret_key"], "operation": "create", } ) flash(f"Created user {created['access_key']}. Copy the secret below.", "success") return redirect(url_for("ui.iam_dashboard", secret_token=token)) @ui_bp.post("/iam/users//rotate") def rotate_iam_secret(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:rotate_key") except IamError as exc: if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({"error": str(exc)}), 403 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) try: new_secret = _iam().rotate_secret(access_key) if principal and principal.access_key == access_key: creds = session.get("credentials", {}) creds["secret_key"] = new_secret session["credentials"] = creds session.modified = True except IamError as exc: if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({"error": str(exc)}), 400 flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html: return jsonify({ "access_key": access_key, "secret_key": new_secret, "message": f"Secret rotated for {access_key}", }) token = _secret_store().remember( { "access_key": access_key, "secret_key": new_secret, "operation": "rotate", } ) flash(f"Rotated secret for {access_key}. Copy the secret below.", "info") return redirect(url_for("ui.iam_dashboard", secret_token=token)) @ui_bp.post("/iam/users//update") def update_iam_user(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:create_user") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) display_name = request.form.get("display_name", "").strip() if display_name: if len(display_name) > 64: flash("Display name must be 64 characters or fewer", "danger") else: try: _iam().update_user(access_key, display_name) flash(f"Updated user {access_key}", "success") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/iam/users//delete") def delete_iam_user(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:delete_user") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) if access_key == principal.access_key: # Self-deletion try: _iam().delete_user(access_key) session.pop("credentials", None) flash("Your account has been deleted.", "info") return redirect(url_for("ui.login")) except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) try: _iam().delete_user(access_key) flash(f"Deleted user {access_key}", "success") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/iam/users//policies") def update_iam_policies(access_key: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:update_policy") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) policies_raw = request.form.get("policies", "").strip() if not policies_raw: policies = [] else: try: policies = json.loads(policies_raw) if not isinstance(policies, list): raise ValueError("Policies must be a list") except (ValueError, json.JSONDecodeError): flash("Invalid JSON format for policies", "danger") return redirect(url_for("ui.iam_dashboard")) try: _iam().update_user_policies(access_key, policies) flash(f"Updated policies for {access_key}", "success") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.iam_dashboard")) @ui_bp.post("/connections") def create_connection(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) name = request.form.get("name", "").strip() endpoint = request.form.get("endpoint_url", "").strip() access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() region = request.form.get("region", "us-east-1").strip() if not all([name, endpoint, access_key, secret_key]): flash("All fields are required", "danger") return redirect(url_for("ui.connections_dashboard")) conn = RemoteConnection( id=str(uuid.uuid4()), name=name, endpoint_url=endpoint, access_key=access_key, secret_key=secret_key, region=region ) _connections().add(conn) flash(f"Connection '{name}' created", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/connections/test") def test_connection(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: return jsonify({"status": "error", "message": "Access denied"}), 403 data = request.get_json(silent=True) or request.form endpoint = data.get("endpoint_url", "").strip() access_key = data.get("access_key", "").strip() secret_key = data.get("secret_key", "").strip() region = data.get("region", "us-east-1").strip() if not all([endpoint, access_key, secret_key]): return jsonify({"status": "error", "message": "Missing credentials"}), 400 try: s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region, ) # Try to list buckets to verify credentials and endpoint s3.list_buckets() return jsonify({"status": "ok", "message": "Connection successful"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 400 @ui_bp.post("/connections//update") def update_connection(connection_id: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) conn = _connections().get(connection_id) if not conn: flash("Connection not found", "danger") return redirect(url_for("ui.connections_dashboard")) name = request.form.get("name", "").strip() endpoint = request.form.get("endpoint_url", "").strip() access_key = request.form.get("access_key", "").strip() secret_key = request.form.get("secret_key", "").strip() region = request.form.get("region", "us-east-1").strip() if not all([name, endpoint, access_key, secret_key]): flash("All fields are required", "danger") return redirect(url_for("ui.connections_dashboard")) conn.name = name conn.endpoint_url = endpoint conn.access_key = access_key conn.secret_key = secret_key conn.region = region _connections().save() flash(f"Connection '{name}' updated", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/connections//delete") def delete_connection(connection_id: str): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) _connections().delete(connection_id) flash("Connection deleted", "success") return redirect(url_for("ui.connections_dashboard")) @ui_bp.post("/buckets//replication") def update_bucket_replication(bucket_name: str): principal = _current_principal() try: _authorize_ui(principal, bucket_name, "write") except IamError as exc: flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) action = request.form.get("action") if action == "delete": _replication().delete_rule(bucket_name) flash("Replication disabled", "info") else: target_conn_id = request.form.get("target_connection_id") target_bucket = request.form.get("target_bucket", "").strip() if not target_conn_id or not target_bucket: flash("Target connection and bucket are required", "danger") else: rule = ReplicationRule( bucket_name=bucket_name, target_connection_id=target_conn_id, target_bucket=target_bucket, enabled=True ) _replication().set_rule(rule) flash("Replication configured", "success") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) @ui_bp.get("/connections") def connections_dashboard(): principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") except IamError: flash("Access denied", "danger") return redirect(url_for("ui.buckets_overview")) connections = _connections().list() return render_template("connections.html", connections=connections, principal=principal) @ui_bp.get("/metrics") def metrics_dashboard(): principal = _current_principal() cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() storage_root = current_app.config["STORAGE_ROOT"] disk = psutil.disk_usage(storage_root) storage = _storage() buckets = storage.list_buckets() total_buckets = len(buckets) total_objects = 0 total_bytes_used = 0 # Note: Uses cached stats from storage layer to improve performance for bucket in buckets: stats = storage.bucket_stats(bucket.name) total_objects += stats["objects"] total_bytes_used += stats["bytes"] return render_template( "metrics.html", principal=principal, cpu_percent=cpu_percent, memory={ "total": _format_bytes(memory.total), "available": _format_bytes(memory.available), "used": _format_bytes(memory.used), "percent": memory.percent, }, disk={ "total": _format_bytes(disk.total), "free": _format_bytes(disk.free), "used": _format_bytes(disk.used), "percent": disk.percent, }, app={ "buckets": total_buckets, "objects": total_objects, "storage_used": _format_bytes(total_bytes_used), "storage_raw": total_bytes_used, } ) @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" path = request.path or "" wants_html = request.accept_mimetypes.accept_html if wants_html and (not prefix or path.startswith(prefix)): return render_template("404.html"), 404 return error