Add static website hosting

This commit is contained in:
2026-02-15 20:57:02 +08:00
parent 01e79e6993
commit 67f057ca1c
16 changed files with 1704 additions and 7 deletions

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import html as html_module
import logging
import mimetypes
import shutil
import sys
import time
@@ -10,7 +12,7 @@ from pathlib import Path
from datetime import timedelta
from typing import Any, Dict, List, Optional
from flask import Flask, g, has_request_context, redirect, render_template, request, url_for
from flask import Flask, Response, g, has_request_context, redirect, render_template, request, url_for
from flask_cors import CORS
from flask_wtf.csrf import CSRFError
from werkzeug.middleware.proxy_fix import ProxyFix
@@ -32,8 +34,9 @@ from .object_lock import ObjectLockService
from .replication import ReplicationManager
from .secret_store import EphemeralSecretStore
from .site_registry import SiteRegistry, SiteInfo
from .storage import ObjectStorage
from .storage import ObjectStorage, StorageError
from .version import get_version
from .website_domains import WebsiteDomainStore
def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path:
@@ -223,6 +226,12 @@ def create_app(
app.extensions["access_logging"] = access_logging_service
app.extensions["site_registry"] = site_registry
website_domains_store = None
if app.config.get("WEBSITE_HOSTING_ENABLED", False):
website_domains_path = config_dir / "website_domains.json"
website_domains_store = WebsiteDomainStore(website_domains_path)
app.extensions["website_domains"] = website_domains_store
from .s3_client import S3ProxyClient
api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000"
app.extensions["s3_proxy"] = S3ProxyClient(
@@ -472,6 +481,128 @@ def _configure_logging(app: Flask) -> None:
extra={"path": request.path, "method": request.method, "remote_addr": request.remote_addr},
)
@app.before_request
def _maybe_serve_website():
if not app.config.get("WEBSITE_HOSTING_ENABLED"):
return None
if request.method not in {"GET", "HEAD"}:
return None
host = request.host
if ":" in host:
host = host.rsplit(":", 1)[0]
host = host.lower()
store = app.extensions.get("website_domains")
if not store:
return None
bucket = store.get_bucket(host)
if not bucket:
return None
storage = app.extensions["object_storage"]
if not storage.bucket_exists(bucket):
return _website_error_response(404, "Not Found")
website_config = storage.get_bucket_website(bucket)
if not website_config:
return _website_error_response(404, "Not Found")
index_doc = website_config.get("index_document", "index.html")
error_doc = website_config.get("error_document")
req_path = request.path.lstrip("/")
if not req_path or req_path.endswith("/"):
object_key = req_path + index_doc
else:
object_key = req_path
try:
obj_path = storage.get_object_path(bucket, object_key)
except (StorageError, OSError):
if object_key == req_path:
try:
obj_path = storage.get_object_path(bucket, req_path + "/" + index_doc)
object_key = req_path + "/" + index_doc
except (StorageError, OSError):
return _serve_website_error(storage, bucket, error_doc, 404)
else:
return _serve_website_error(storage, bucket, error_doc, 404)
content_type = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
is_encrypted = False
try:
metadata = storage.get_object_metadata(bucket, object_key)
is_encrypted = "x-amz-server-side-encryption" in metadata
except (StorageError, OSError):
pass
if request.method == "HEAD":
response = Response(status=200)
if is_encrypted and hasattr(storage, "get_object_data"):
try:
data, _ = storage.get_object_data(bucket, object_key)
response.headers["Content-Length"] = len(data)
except (StorageError, OSError):
return _website_error_response(500, "Internal Server Error")
else:
try:
stat = obj_path.stat()
response.headers["Content-Length"] = stat.st_size
except OSError:
return _website_error_response(500, "Internal Server Error")
response.headers["Content-Type"] = content_type
return response
if is_encrypted and hasattr(storage, "get_object_data"):
try:
data, _ = storage.get_object_data(bucket, object_key)
response = Response(data, mimetype=content_type)
response.headers["Content-Length"] = len(data)
return response
except (StorageError, OSError):
return _website_error_response(500, "Internal Server Error")
def _stream(file_path):
with file_path.open("rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
yield chunk
try:
stat = obj_path.stat()
response = Response(_stream(obj_path), mimetype=content_type, direct_passthrough=True)
response.headers["Content-Length"] = stat.st_size
return response
except OSError:
return _website_error_response(500, "Internal Server Error")
def _serve_website_error(storage, bucket, error_doc_key, status_code):
if not error_doc_key:
return _website_error_response(status_code, "Not Found" if status_code == 404 else "Error")
try:
obj_path = storage.get_object_path(bucket, error_doc_key)
except (StorageError, OSError):
return _website_error_response(status_code, "Not Found")
content_type = mimetypes.guess_type(error_doc_key)[0] or "text/html"
is_encrypted = False
try:
metadata = storage.get_object_metadata(bucket, error_doc_key)
is_encrypted = "x-amz-server-side-encryption" in metadata
except (StorageError, OSError):
pass
if is_encrypted and hasattr(storage, "get_object_data"):
try:
data, _ = storage.get_object_data(bucket, error_doc_key)
response = Response(data, status=status_code, mimetype=content_type)
response.headers["Content-Length"] = len(data)
return response
except (StorageError, OSError):
return _website_error_response(status_code, "Not Found")
try:
data = obj_path.read_bytes()
response = Response(data, status=status_code, mimetype=content_type)
response.headers["Content-Length"] = len(data)
return response
except OSError:
return _website_error_response(status_code, "Not Found")
def _website_error_response(status_code, message):
safe_msg = html_module.escape(str(message))
safe_code = html_module.escape(str(status_code))
body = f"<html><head><title>{safe_code} {safe_msg}</title></head><body><h1>{safe_code} {safe_msg}</h1></body></html>"
return Response(body, status=status_code, mimetype="text/html")
@app.after_request
def _log_request_end(response):
duration_ms = 0.0

View File

@@ -17,6 +17,7 @@ from .extensions import limiter
from .iam import IamError, Principal
from .replication import ReplicationManager
from .site_registry import PeerSite, SiteInfo, SiteRegistry
from .website_domains import WebsiteDomainStore
def _is_safe_url(url: str, allow_internal: bool = False) -> bool:
@@ -673,3 +674,98 @@ def check_bidirectional_status(site_id: str):
result["is_fully_configured"] = len(error_issues) == 0 and len(local_bidir_rules) > 0
return jsonify(result)
def _website_domains() -> WebsiteDomainStore:
return current_app.extensions["website_domains"]
def _storage():
return current_app.extensions["object_storage"]
@admin_api_bp.route("/website-domains", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def list_website_domains():
principal, error = _require_admin()
if error:
return error
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _json_error("InvalidRequest", "Website hosting is not enabled", 400)
return jsonify(_website_domains().list_all())
@admin_api_bp.route("/website-domains", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def create_website_domain():
principal, error = _require_admin()
if error:
return error
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _json_error("InvalidRequest", "Website hosting is not enabled", 400)
payload = request.get_json(silent=True) or {}
domain = (payload.get("domain") or "").strip().lower()
bucket = (payload.get("bucket") or "").strip()
if not domain:
return _json_error("ValidationError", "domain is required", 400)
if not bucket:
return _json_error("ValidationError", "bucket is required", 400)
storage = _storage()
if not storage.bucket_exists(bucket):
return _json_error("NoSuchBucket", f"Bucket '{bucket}' does not exist", 404)
store = _website_domains()
existing = store.get_bucket(domain)
if existing:
return _json_error("Conflict", f"Domain '{domain}' is already mapped to bucket '{existing}'", 409)
store.set_mapping(domain, bucket)
logger.info("Website domain mapping created: %s -> %s", domain, bucket)
return jsonify({"domain": domain, "bucket": bucket}), 201
@admin_api_bp.route("/website-domains/<domain>", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def get_website_domain(domain: str):
principal, error = _require_admin()
if error:
return error
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _json_error("InvalidRequest", "Website hosting is not enabled", 400)
bucket = _website_domains().get_bucket(domain)
if not bucket:
return _json_error("NotFound", f"No mapping found for domain '{domain}'", 404)
return jsonify({"domain": domain.lower(), "bucket": bucket})
@admin_api_bp.route("/website-domains/<domain>", methods=["PUT"])
@limiter.limit(lambda: _get_admin_rate_limit())
def update_website_domain(domain: str):
principal, error = _require_admin()
if error:
return error
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _json_error("InvalidRequest", "Website hosting is not enabled", 400)
payload = request.get_json(silent=True) or {}
bucket = (payload.get("bucket") or "").strip()
if not bucket:
return _json_error("ValidationError", "bucket is required", 400)
storage = _storage()
if not storage.bucket_exists(bucket):
return _json_error("NoSuchBucket", f"Bucket '{bucket}' does not exist", 404)
store = _website_domains()
store.set_mapping(domain, bucket)
logger.info("Website domain mapping updated: %s -> %s", domain, bucket)
return jsonify({"domain": domain.lower(), "bucket": bucket})
@admin_api_bp.route("/website-domains/<domain>", methods=["DELETE"])
@limiter.limit(lambda: _get_admin_rate_limit())
def delete_website_domain(domain: str):
principal, error = _require_admin()
if error:
return error
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _json_error("InvalidRequest", "Website hosting is not enabled", 400)
if not _website_domains().delete_mapping(domain):
return _json_error("NotFound", f"No mapping found for domain '{domain}'", 404)
logger.info("Website domain mapping deleted: %s", domain)
return Response(status=204)

View File

@@ -149,6 +149,7 @@ class AppConfig:
num_trusted_proxies: int
allowed_redirect_hosts: list[str]
allow_internal_endpoints: bool
website_hosting_enabled: bool
@classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -317,6 +318,7 @@ class AppConfig:
allowed_redirect_hosts_raw = _get("ALLOWED_REDIRECT_HOSTS", "")
allowed_redirect_hosts = [h.strip() for h in str(allowed_redirect_hosts_raw).split(",") if h.strip()]
allow_internal_endpoints = str(_get("ALLOW_INTERNAL_ENDPOINTS", "0")).lower() in {"1", "true", "yes", "on"}
website_hosting_enabled = str(_get("WEBSITE_HOSTING_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
return cls(storage_root=storage_root,
max_upload_size=max_upload_size,
@@ -403,7 +405,8 @@ class AppConfig:
ratelimit_admin=ratelimit_admin,
num_trusted_proxies=num_trusted_proxies,
allowed_redirect_hosts=allowed_redirect_hosts,
allow_internal_endpoints=allow_internal_endpoints)
allow_internal_endpoints=allow_internal_endpoints,
website_hosting_enabled=website_hosting_enabled)
def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues.
@@ -509,6 +512,8 @@ class AppConfig:
print(f" ENCRYPTION: Enabled (Master key: {self.encryption_master_key_path})")
if self.kms_enabled:
print(f" KMS: Enabled (Keys: {self.kms_keys_path})")
if self.website_hosting_enabled:
print(f" WEBSITE_HOSTING: Enabled")
def _auto(flag: bool) -> str:
return " (auto)" if flag else ""
print(f" SERVER_THREADS: {self.server_threads}{_auto(self.server_threads_auto)}")
@@ -611,4 +616,5 @@ class AppConfig:
"NUM_TRUSTED_PROXIES": self.num_trusted_proxies,
"ALLOWED_REDIRECT_HOSTS": self.allowed_redirect_hosts,
"ALLOW_INTERNAL_ENDPOINTS": self.allow_internal_endpoints,
"WEBSITE_HOSTING_ENABLED": self.website_hosting_enabled,
}

View File

@@ -270,9 +270,15 @@ class EncryptedObjectStorage:
def get_bucket_quota(self, bucket_name: str):
return self.storage.get_bucket_quota(bucket_name)
def set_bucket_quota(self, bucket_name: str, *, max_bytes=None, max_objects=None):
return self.storage.set_bucket_quota(bucket_name, max_bytes=max_bytes, max_objects=max_objects)
def get_bucket_website(self, bucket_name: str):
return self.storage.get_bucket_website(bucket_name)
def set_bucket_website(self, bucket_name: str, website_config):
return self.storage.set_bucket_website(bucket_name, website_config)
def _compute_etag(self, path: Path) -> str:
return self.storage._compute_etag(path)

View File

@@ -1027,6 +1027,7 @@ def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None:
"uploads": _bucket_uploads_handler,
"policy": _bucket_policy_handler,
"replication": _bucket_replication_handler,
"website": _bucket_website_handler,
}
requested = [key for key in handlers if key in request.args]
if not requested:
@@ -3060,6 +3061,79 @@ def _parse_replication_config(bucket_name: str, payload: bytes):
)
def _bucket_website_handler(bucket_name: str) -> Response:
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
return _error_response("InvalidRequest", "Website hosting is not enabled", 400)
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if request.method == "GET":
try:
config = storage.get_bucket_website(bucket_name)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
if not config:
return _error_response("NoSuchWebsiteConfiguration", "The specified bucket does not have a website configuration", 404)
root = Element("WebsiteConfiguration")
root.set("xmlns", S3_NS)
index_doc = config.get("index_document")
if index_doc:
idx_el = SubElement(root, "IndexDocument")
SubElement(idx_el, "Suffix").text = index_doc
error_doc = config.get("error_document")
if error_doc:
err_el = SubElement(root, "ErrorDocument")
SubElement(err_el, "Key").text = error_doc
return _xml_response(root)
if request.method == "DELETE":
try:
storage.set_bucket_website(bucket_name, None)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket website config deleted", extra={"bucket": bucket_name})
return Response(status=204)
ct_error = _require_xml_content_type()
if ct_error:
return ct_error
payload = request.get_data(cache=False) or b""
if not payload.strip():
return _error_response("MalformedXML", "Request body is required", 400)
try:
root = _parse_xml_with_limit(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
if _strip_ns(root.tag) != "WebsiteConfiguration":
return _error_response("MalformedXML", "Root element must be WebsiteConfiguration", 400)
index_el = _find_element(root, "IndexDocument")
if index_el is None:
return _error_response("InvalidArgument", "IndexDocument is required", 400)
suffix_el = _find_element(index_el, "Suffix")
if suffix_el is None or not (suffix_el.text or "").strip():
return _error_response("InvalidArgument", "IndexDocument Suffix is required", 400)
index_suffix = suffix_el.text.strip()
if "/" in index_suffix:
return _error_response("InvalidArgument", "IndexDocument Suffix must not contain '/'", 400)
website_config: Dict[str, Any] = {"index_document": index_suffix}
error_el = _find_element(root, "ErrorDocument")
if error_el is not None:
key_el = _find_element(error_el, "Key")
if key_el is not None and (key_el.text or "").strip():
website_config["error_document"] = key_el.text.strip()
try:
storage.set_bucket_website(bucket_name, website_config)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket website config updated", extra={"bucket": bucket_name, "index": index_suffix})
return Response(status=200)
def _parse_destination_arn(arn: str) -> tuple:
if not arn.startswith("arn:aws:s3:::"):
raise ValueError(f"Invalid ARN format: {arn}")

View File

@@ -688,10 +688,19 @@ class ObjectStorage:
return lifecycle if isinstance(lifecycle, list) else None
def set_bucket_lifecycle(self, bucket_name: str, rules: Optional[List[Dict[str, Any]]]) -> None:
"""Set lifecycle configuration for bucket."""
bucket_path = self._require_bucket_path(bucket_name)
self._set_bucket_config_entry(bucket_path.name, "lifecycle", rules)
def get_bucket_website(self, bucket_name: str) -> Optional[Dict[str, Any]]:
bucket_path = self._require_bucket_path(bucket_name)
config = self._read_bucket_config(bucket_path.name)
website = config.get("website")
return website if isinstance(website, dict) else None
def set_bucket_website(self, bucket_name: str, website_config: Optional[Dict[str, Any]]) -> None:
bucket_path = self._require_bucket_path(bucket_name)
self._set_bucket_config_entry(bucket_path.name, "website", website_config)
def get_bucket_quota(self, bucket_name: str) -> Dict[str, Any]:
"""Get quota configuration for bucket.

219
app/ui.py
View File

@@ -286,7 +286,8 @@ def inject_nav_state() -> dict[str, Any]:
return {
"principal": principal,
"can_manage_iam": can_manage,
"can_view_metrics": can_manage,
"can_view_metrics": can_manage,
"website_hosting_nav": can_manage and current_app.config.get("WEBSITE_HOSTING_ENABLED", False),
"csrf_token": generate_csrf,
}
@@ -498,12 +499,20 @@ def bucket_detail(bucket_name: str):
encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False)
lifecycle_enabled = current_app.config.get("LIFECYCLE_ENABLED", False)
site_sync_enabled = current_app.config.get("SITE_SYNC_ENABLED", False)
website_hosting_enabled = current_app.config.get("WEBSITE_HOSTING_ENABLED", False)
can_manage_encryption = can_manage_versioning
bucket_quota = storage.get_bucket_quota(bucket_name)
bucket_stats = storage.bucket_stats(bucket_name)
can_manage_quota = is_replication_admin
website_config = None
if website_hosting_enabled:
try:
website_config = storage.get_bucket_website(bucket_name)
except StorageError:
website_config = None
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name)
@@ -546,6 +555,9 @@ def bucket_detail(bucket_name: str):
bucket_stats=bucket_stats,
can_manage_quota=can_manage_quota,
site_sync_enabled=site_sync_enabled,
website_hosting_enabled=website_hosting_enabled,
website_config=website_config,
can_manage_website=can_edit_policy,
)
@@ -1610,6 +1622,75 @@ def update_bucket_encryption(bucket_name: str):
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
@ui_bp.post("/buckets/<bucket_name>/website")
def update_bucket_website(bucket_name: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "policy")
except IamError as exc:
if _wants_json():
return jsonify({"error": _friendly_error_message(exc)}), 403
flash(_friendly_error_message(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
if _wants_json():
return jsonify({"error": "Website hosting is not enabled"}), 400
flash("Website hosting is not enabled", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
action = request.form.get("action", "enable")
if action == "disable":
try:
_storage().set_bucket_website(bucket_name, None)
if _wants_json():
return jsonify({"success": True, "message": "Static website hosting disabled", "enabled": False})
flash("Static website hosting disabled", "info")
except StorageError as exc:
if _wants_json():
return jsonify({"error": _friendly_error_message(exc)}), 400
flash(_friendly_error_message(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
index_document = request.form.get("index_document", "").strip()
error_document = request.form.get("error_document", "").strip()
if not index_document:
if _wants_json():
return jsonify({"error": "Index document is required"}), 400
flash("Index document is required", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
if "/" in index_document:
if _wants_json():
return jsonify({"error": "Index document must not contain '/'"}), 400
flash("Index document must not contain '/'", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
website_cfg: dict[str, Any] = {"index_document": index_document}
if error_document:
website_cfg["error_document"] = error_document
try:
_storage().set_bucket_website(bucket_name, website_cfg)
if _wants_json():
return jsonify({
"success": True,
"message": "Static website hosting enabled",
"enabled": True,
"index_document": index_document,
"error_document": error_document,
})
flash("Static website hosting enabled", "success")
except StorageError as exc:
if _wants_json():
return jsonify({"error": _friendly_error_message(exc)}), 400
flash(_friendly_error_message(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
@ui_bp.get("/iam")
def iam_dashboard():
principal = _current_principal()
@@ -2275,6 +2356,142 @@ def connections_dashboard():
return render_template("connections.html", connections=connections, principal=principal)
@ui_bp.get("/website-domains")
def website_domains_dashboard():
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.buckets_overview"))
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
flash("Website hosting is not enabled", "warning")
return redirect(url_for("ui.buckets_overview"))
store = current_app.extensions.get("website_domains")
mappings = store.list_all() if store else []
storage = _storage()
buckets = [b.name for b in storage.list_buckets()]
return render_template(
"website_domains.html",
mappings=mappings,
buckets=buckets,
principal=principal,
can_manage_iam=True,
)
@ui_bp.post("/website-domains/create")
def create_website_domain():
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
if _wants_json():
return jsonify({"error": "Access denied"}), 403
flash("Access denied", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
if not current_app.config.get("WEBSITE_HOSTING_ENABLED", False):
if _wants_json():
return jsonify({"error": "Website hosting is not enabled"}), 400
flash("Website hosting is not enabled", "warning")
return redirect(url_for("ui.buckets_overview"))
domain = (request.form.get("domain") or "").strip().lower()
bucket = (request.form.get("bucket") or "").strip()
if not domain:
if _wants_json():
return jsonify({"error": "Domain is required"}), 400
flash("Domain is required", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
if not bucket:
if _wants_json():
return jsonify({"error": "Bucket is required"}), 400
flash("Bucket is required", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
storage = _storage()
if not storage.bucket_exists(bucket):
if _wants_json():
return jsonify({"error": f"Bucket '{bucket}' does not exist"}), 404
flash(f"Bucket '{bucket}' does not exist", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
store = current_app.extensions.get("website_domains")
if store.get_bucket(domain):
if _wants_json():
return jsonify({"error": f"Domain '{domain}' is already mapped"}), 409
flash(f"Domain '{domain}' is already mapped", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
store.set_mapping(domain, bucket)
if _wants_json():
return jsonify({"success": True, "domain": domain, "bucket": bucket}), 201
flash(f"Domain '{domain}' mapped to bucket '{bucket}'", "success")
return redirect(url_for("ui.website_domains_dashboard"))
@ui_bp.post("/website-domains/<domain>/update")
def update_website_domain(domain: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
if _wants_json():
return jsonify({"error": "Access denied"}), 403
flash("Access denied", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
bucket = (request.form.get("bucket") or "").strip()
if not bucket:
if _wants_json():
return jsonify({"error": "Bucket is required"}), 400
flash("Bucket is required", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
storage = _storage()
if not storage.bucket_exists(bucket):
if _wants_json():
return jsonify({"error": f"Bucket '{bucket}' does not exist"}), 404
flash(f"Bucket '{bucket}' does not exist", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
store = current_app.extensions.get("website_domains")
store.set_mapping(domain, bucket)
if _wants_json():
return jsonify({"success": True, "domain": domain.lower(), "bucket": bucket})
flash(f"Domain '{domain}' updated to bucket '{bucket}'", "success")
return redirect(url_for("ui.website_domains_dashboard"))
@ui_bp.post("/website-domains/<domain>/delete")
def delete_website_domain(domain: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
if _wants_json():
return jsonify({"error": "Access denied"}), 403
flash("Access denied", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
store = current_app.extensions.get("website_domains")
if not store.delete_mapping(domain):
if _wants_json():
return jsonify({"error": f"No mapping for domain '{domain}'"}), 404
flash(f"No mapping for domain '{domain}'", "danger")
return redirect(url_for("ui.website_domains_dashboard"))
if _wants_json():
return jsonify({"success": True})
flash(f"Domain '{domain}' mapping deleted", "success")
return redirect(url_for("ui.website_domains_dashboard"))
@ui_bp.get("/metrics")
def metrics_dashboard():
principal = _current_principal()

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.2.8"
APP_VERSION = "0.2.9"
def get_version() -> str:

55
app/website_domains.py Normal file
View File

@@ -0,0 +1,55 @@
from __future__ import annotations
import json
import threading
from pathlib import Path
from typing import Dict, List, Optional
class WebsiteDomainStore:
def __init__(self, config_path: Path) -> None:
self.config_path = config_path
self._lock = threading.Lock()
self._domains: Dict[str, str] = {}
self.reload()
def reload(self) -> None:
if not self.config_path.exists():
self._domains = {}
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
self._domains = {k.lower(): v for k, v in data.items()}
else:
self._domains = {}
except (OSError, json.JSONDecodeError):
self._domains = {}
def _save(self) -> None:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(self._domains, f, indent=2)
def list_all(self) -> List[Dict[str, str]]:
with self._lock:
return [{"domain": d, "bucket": b} for d, b in self._domains.items()]
def get_bucket(self, domain: str) -> Optional[str]:
with self._lock:
return self._domains.get(domain.lower())
def set_mapping(self, domain: str, bucket: str) -> None:
with self._lock:
self._domains[domain.lower()] = bucket
self._save()
def delete_mapping(self, domain: str) -> bool:
with self._lock:
key = domain.lower()
if key not in self._domains:
return False
del self._domains[key]
self._save()
return True