Add site registry UI and update documentation for geo-distribution

This commit is contained in:
2026-01-26 19:49:23 +08:00
parent b32f1f94f7
commit 62c36f7a6c
8 changed files with 1382 additions and 9 deletions

View File

@@ -31,6 +31,7 @@ from .notifications import NotificationService
from .object_lock import ObjectLockService
from .replication import ReplicationManager
from .secret_store import EphemeralSecretStore
from .site_registry import SiteRegistry, SiteInfo
from .storage import ObjectStorage
from .version import get_version
@@ -151,7 +152,17 @@ def create_app(
streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024),
max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50),
)
site_registry_path = config_dir / "site_registry.json"
site_registry = SiteRegistry(site_registry_path)
if app.config.get("SITE_ID") and not site_registry.get_local_site():
site_registry.set_local_site(SiteInfo(
site_id=app.config["SITE_ID"],
endpoint=app.config.get("SITE_ENDPOINT") or "",
region=app.config.get("SITE_REGION", "us-east-1"),
priority=app.config.get("SITE_PRIORITY", 100),
))
encryption_config = {
"encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False),
"encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"),
@@ -207,6 +218,7 @@ def create_app(
app.extensions["object_lock"] = object_lock_service
app.extensions["notifications"] = notification_service
app.extensions["access_logging"] = access_logging_service
app.extensions["site_registry"] = site_registry
operation_metrics_collector = None
if app.config.get("OPERATION_METRICS_ENABLED", False):
@@ -313,11 +325,14 @@ def create_app(
if include_api:
from .s3_api import s3_api_bp
from .kms_api import kms_api_bp
from .admin_api import admin_api_bp
app.register_blueprint(s3_api_bp)
app.register_blueprint(kms_api_bp)
app.register_blueprint(admin_api_bp)
csrf.exempt(s3_api_bp)
csrf.exempt(kms_api_bp)
csrf.exempt(admin_api_bp)
if include_ui:
from .ui import ui_bp

320
app/admin_api.py Normal file
View File

@@ -0,0 +1,320 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict, Optional, Tuple
from flask import Blueprint, Response, current_app, jsonify, request
from .connections import ConnectionStore
from .extensions import limiter
from .iam import IamError, Principal
from .replication import ReplicationManager
from .site_registry import PeerSite, SiteInfo, SiteRegistry
logger = logging.getLogger(__name__)
admin_api_bp = Blueprint("admin_api", __name__, url_prefix="/admin")
def _require_principal() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]:
from .s3_api import _require_principal as s3_require_principal
return s3_require_principal()
def _require_admin() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]:
principal, error = _require_principal()
if error:
return None, error
try:
_iam().authorize(principal, None, "iam:*")
return principal, None
except IamError:
return None, _json_error("AccessDenied", "Admin access required", 403)
def _site_registry() -> SiteRegistry:
return current_app.extensions["site_registry"]
def _connections() -> ConnectionStore:
return current_app.extensions["connections"]
def _replication() -> ReplicationManager:
return current_app.extensions["replication"]
def _iam():
return current_app.extensions["iam"]
def _json_error(code: str, message: str, status: int) -> Tuple[Dict[str, Any], int]:
return {"error": {"code": code, "message": message}}, status
def _get_admin_rate_limit() -> str:
return current_app.config.get("RATE_LIMIT_ADMIN", "60 per minute")
@admin_api_bp.route("/site", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def get_local_site():
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
local_site = registry.get_local_site()
if local_site:
return jsonify(local_site.to_dict())
config_site_id = current_app.config.get("SITE_ID")
config_endpoint = current_app.config.get("SITE_ENDPOINT")
if config_site_id:
return jsonify({
"site_id": config_site_id,
"endpoint": config_endpoint or "",
"region": current_app.config.get("SITE_REGION", "us-east-1"),
"priority": current_app.config.get("SITE_PRIORITY", 100),
"display_name": config_site_id,
"source": "environment",
})
return _json_error("NotFound", "Local site not configured", 404)
@admin_api_bp.route("/site", methods=["PUT"])
@limiter.limit(lambda: _get_admin_rate_limit())
def update_local_site():
principal, error = _require_admin()
if error:
return error
payload = request.get_json(silent=True) or {}
site_id = payload.get("site_id")
endpoint = payload.get("endpoint")
if not site_id:
return _json_error("ValidationError", "site_id is required", 400)
registry = _site_registry()
existing = registry.get_local_site()
site = SiteInfo(
site_id=site_id,
endpoint=endpoint or "",
region=payload.get("region", "us-east-1"),
priority=payload.get("priority", 100),
display_name=payload.get("display_name", site_id),
created_at=existing.created_at if existing else None,
)
registry.set_local_site(site)
logger.info("Local site updated", extra={"site_id": site_id, "principal": principal.access_key})
return jsonify(site.to_dict())
@admin_api_bp.route("/sites", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def list_all_sites():
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
local = registry.get_local_site()
peers = registry.list_peers()
result = {
"local": local.to_dict() if local else None,
"peers": [peer.to_dict() for peer in peers],
"total_peers": len(peers),
}
return jsonify(result)
@admin_api_bp.route("/sites", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def register_peer_site():
principal, error = _require_admin()
if error:
return error
payload = request.get_json(silent=True) or {}
site_id = payload.get("site_id")
endpoint = payload.get("endpoint")
if not site_id:
return _json_error("ValidationError", "site_id is required", 400)
if not endpoint:
return _json_error("ValidationError", "endpoint is required", 400)
registry = _site_registry()
if registry.get_peer(site_id):
return _json_error("AlreadyExists", f"Peer site '{site_id}' already exists", 409)
connection_id = payload.get("connection_id")
if connection_id:
if not _connections().get(connection_id):
return _json_error("ValidationError", f"Connection '{connection_id}' not found", 400)
peer = PeerSite(
site_id=site_id,
endpoint=endpoint,
region=payload.get("region", "us-east-1"),
priority=payload.get("priority", 100),
display_name=payload.get("display_name", site_id),
connection_id=connection_id,
)
registry.add_peer(peer)
logger.info("Peer site registered", extra={"site_id": site_id, "principal": principal.access_key})
return jsonify(peer.to_dict()), 201
@admin_api_bp.route("/sites/<site_id>", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def get_peer_site(site_id: str):
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer:
return _json_error("NotFound", f"Peer site '{site_id}' not found", 404)
return jsonify(peer.to_dict())
@admin_api_bp.route("/sites/<site_id>", methods=["PUT"])
@limiter.limit(lambda: _get_admin_rate_limit())
def update_peer_site(site_id: str):
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
existing = registry.get_peer(site_id)
if not existing:
return _json_error("NotFound", f"Peer site '{site_id}' not found", 404)
payload = request.get_json(silent=True) or {}
peer = PeerSite(
site_id=site_id,
endpoint=payload.get("endpoint", existing.endpoint),
region=payload.get("region", existing.region),
priority=payload.get("priority", existing.priority),
display_name=payload.get("display_name", existing.display_name),
connection_id=payload.get("connection_id", existing.connection_id),
created_at=existing.created_at,
is_healthy=existing.is_healthy,
last_health_check=existing.last_health_check,
)
registry.update_peer(peer)
logger.info("Peer site updated", extra={"site_id": site_id, "principal": principal.access_key})
return jsonify(peer.to_dict())
@admin_api_bp.route("/sites/<site_id>", methods=["DELETE"])
@limiter.limit(lambda: _get_admin_rate_limit())
def delete_peer_site(site_id: str):
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
if not registry.delete_peer(site_id):
return _json_error("NotFound", f"Peer site '{site_id}' not found", 404)
logger.info("Peer site deleted", extra={"site_id": site_id, "principal": principal.access_key})
return Response(status=204)
@admin_api_bp.route("/sites/<site_id>/health", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def check_peer_health(site_id: str):
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer:
return _json_error("NotFound", f"Peer site '{site_id}' not found", 404)
is_healthy = False
error_message = None
if peer.connection_id:
connection = _connections().get(peer.connection_id)
if connection:
is_healthy = _replication().check_endpoint_health(connection)
else:
error_message = f"Connection '{peer.connection_id}' not found"
else:
error_message = "No connection configured for this peer"
registry.update_health(site_id, is_healthy)
result = {
"site_id": site_id,
"is_healthy": is_healthy,
"checked_at": time.time(),
}
if error_message:
result["error"] = error_message
return jsonify(result)
@admin_api_bp.route("/topology", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def get_topology():
principal, error = _require_admin()
if error:
return error
registry = _site_registry()
local = registry.get_local_site()
peers = registry.list_peers()
sites = []
if local:
sites.append({
**local.to_dict(),
"is_local": True,
"is_healthy": True,
})
for peer in peers:
sites.append({
**peer.to_dict(),
"is_local": False,
})
sites.sort(key=lambda s: s.get("priority", 100))
return jsonify({
"sites": sites,
"total": len(sites),
"healthy_count": sum(1 for s in sites if s.get("is_healthy")),
})

View File

@@ -141,6 +141,11 @@ class AppConfig:
kms_generate_data_key_min_bytes: int
kms_generate_data_key_max_bytes: int
lifecycle_max_history_per_bucket: int
site_id: Optional[str]
site_endpoint: Optional[str]
site_region: str
site_priority: int
ratelimit_admin: str
@classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -298,6 +303,14 @@ class AppConfig:
kms_generate_data_key_max_bytes = int(_get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024))
lifecycle_max_history_per_bucket = int(_get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50))
site_id_raw = _get("SITE_ID", None)
site_id = str(site_id_raw).strip() if site_id_raw else None
site_endpoint_raw = _get("SITE_ENDPOINT", None)
site_endpoint = str(site_endpoint_raw).strip() if site_endpoint_raw else None
site_region = str(_get("SITE_REGION", "us-east-1"))
site_priority = int(_get("SITE_PRIORITY", 100))
ratelimit_admin = _validate_rate_limit(str(_get("RATE_LIMIT_ADMIN", "60 per minute")))
return cls(storage_root=storage_root,
max_upload_size=max_upload_size,
ui_page_size=ui_page_size,
@@ -375,7 +388,12 @@ class AppConfig:
encryption_chunk_size_bytes=encryption_chunk_size_bytes,
kms_generate_data_key_min_bytes=kms_generate_data_key_min_bytes,
kms_generate_data_key_max_bytes=kms_generate_data_key_max_bytes,
lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket)
lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket,
site_id=site_id,
site_endpoint=site_endpoint,
site_region=site_region,
site_priority=site_priority,
ratelimit_admin=ratelimit_admin)
def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues.
@@ -575,4 +593,9 @@ class AppConfig:
"KMS_GENERATE_DATA_KEY_MIN_BYTES": self.kms_generate_data_key_min_bytes,
"KMS_GENERATE_DATA_KEY_MAX_BYTES": self.kms_generate_data_key_max_bytes,
"LIFECYCLE_MAX_HISTORY_PER_BUCKET": self.lifecycle_max_history_per_bucket,
"SITE_ID": self.site_id,
"SITE_ENDPOINT": self.site_endpoint,
"SITE_REGION": self.site_region,
"SITE_PRIORITY": self.site_priority,
"RATE_LIMIT_ADMIN": self.ratelimit_admin,
}

177
app/site_registry.py Normal file
View File

@@ -0,0 +1,177 @@
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class SiteInfo:
site_id: str
endpoint: str
region: str = "us-east-1"
priority: int = 100
display_name: str = ""
created_at: Optional[float] = None
updated_at: Optional[float] = None
def __post_init__(self) -> None:
if not self.display_name:
self.display_name = self.site_id
if self.created_at is None:
self.created_at = time.time()
def to_dict(self) -> Dict[str, Any]:
return {
"site_id": self.site_id,
"endpoint": self.endpoint,
"region": self.region,
"priority": self.priority,
"display_name": self.display_name,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> SiteInfo:
return cls(
site_id=data["site_id"],
endpoint=data.get("endpoint", ""),
region=data.get("region", "us-east-1"),
priority=data.get("priority", 100),
display_name=data.get("display_name", ""),
created_at=data.get("created_at"),
updated_at=data.get("updated_at"),
)
@dataclass
class PeerSite:
site_id: str
endpoint: str
region: str = "us-east-1"
priority: int = 100
display_name: str = ""
created_at: Optional[float] = None
updated_at: Optional[float] = None
connection_id: Optional[str] = None
is_healthy: Optional[bool] = None
last_health_check: Optional[float] = None
def __post_init__(self) -> None:
if not self.display_name:
self.display_name = self.site_id
if self.created_at is None:
self.created_at = time.time()
def to_dict(self) -> Dict[str, Any]:
return {
"site_id": self.site_id,
"endpoint": self.endpoint,
"region": self.region,
"priority": self.priority,
"display_name": self.display_name,
"created_at": self.created_at,
"updated_at": self.updated_at,
"connection_id": self.connection_id,
"is_healthy": self.is_healthy,
"last_health_check": self.last_health_check,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PeerSite:
return cls(
site_id=data["site_id"],
endpoint=data.get("endpoint", ""),
region=data.get("region", "us-east-1"),
priority=data.get("priority", 100),
display_name=data.get("display_name", ""),
created_at=data.get("created_at"),
updated_at=data.get("updated_at"),
connection_id=data.get("connection_id"),
is_healthy=data.get("is_healthy"),
last_health_check=data.get("last_health_check"),
)
class SiteRegistry:
def __init__(self, config_path: Path) -> None:
self.config_path = config_path
self._local_site: Optional[SiteInfo] = None
self._peers: Dict[str, PeerSite] = {}
self.reload()
def reload(self) -> None:
if not self.config_path.exists():
self._local_site = None
self._peers = {}
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
data = json.load(f)
if data.get("local"):
self._local_site = SiteInfo.from_dict(data["local"])
else:
self._local_site = None
self._peers = {}
for peer_data in data.get("peers", []):
peer = PeerSite.from_dict(peer_data)
self._peers[peer.site_id] = peer
except (OSError, json.JSONDecodeError, KeyError):
self._local_site = None
self._peers = {}
def save(self) -> None:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"local": self._local_site.to_dict() if self._local_site else None,
"peers": [peer.to_dict() for peer in self._peers.values()],
}
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def get_local_site(self) -> Optional[SiteInfo]:
return self._local_site
def set_local_site(self, site: SiteInfo) -> None:
site.updated_at = time.time()
self._local_site = site
self.save()
def list_peers(self) -> List[PeerSite]:
return list(self._peers.values())
def get_peer(self, site_id: str) -> Optional[PeerSite]:
return self._peers.get(site_id)
def add_peer(self, peer: PeerSite) -> None:
peer.created_at = peer.created_at or time.time()
self._peers[peer.site_id] = peer
self.save()
def update_peer(self, peer: PeerSite) -> None:
if peer.site_id not in self._peers:
raise ValueError(f"Peer {peer.site_id} not found")
peer.updated_at = time.time()
self._peers[peer.site_id] = peer
self.save()
def delete_peer(self, site_id: str) -> bool:
if site_id in self._peers:
del self._peers[site_id]
self.save()
return True
return False
def update_health(self, site_id: str, is_healthy: bool) -> None:
peer = self._peers.get(site_id)
if peer:
peer.is_healthy = is_healthy
peer.last_health_check = time.time()
self.save()

228
app/ui.py
View File

@@ -38,6 +38,7 @@ from .kms import KMSManager
from .replication import ReplicationManager, ReplicationRule
from .s3_api import _generate_presigned_url
from .secret_store import EphemeralSecretStore
from .site_registry import SiteRegistry, SiteInfo, PeerSite
from .storage import ObjectStorage, StorageError
ui_bp = Blueprint("ui", __name__, template_folder="../templates", url_prefix="/ui")
@@ -145,6 +146,10 @@ def _operation_metrics():
return current_app.extensions.get("operation_metrics")
def _site_registry() -> SiteRegistry:
return current_app.extensions["site_registry"]
def _format_bytes(num: int) -> str:
step = 1024
units = ["B", "KB", "MB", "GB", "TB", "PB"]
@@ -2663,6 +2668,229 @@ def list_buckets_for_copy(bucket_name: str):
return jsonify({"buckets": allowed})
@ui_bp.get("/sites")
def sites_dashboard():
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied: Site management requires admin permissions", "danger")
return redirect(url_for("ui.buckets_overview"))
registry = _site_registry()
local_site = registry.get_local_site()
peers = registry.list_peers()
connections = _connections().list()
return render_template(
"sites.html",
principal=principal,
local_site=local_site,
peers=peers,
connections=connections,
config_site_id=current_app.config.get("SITE_ID"),
config_site_endpoint=current_app.config.get("SITE_ENDPOINT"),
config_site_region=current_app.config.get("SITE_REGION", "us-east-1"),
)
@ui_bp.post("/sites/local")
def update_local_site():
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
site_id = request.form.get("site_id", "").strip()
endpoint = request.form.get("endpoint", "").strip()
region = request.form.get("region", "us-east-1").strip()
priority = request.form.get("priority", "100")
display_name = request.form.get("display_name", "").strip()
if not site_id:
flash("Site ID is required", "danger")
return redirect(url_for("ui.sites_dashboard"))
try:
priority_int = int(priority)
except ValueError:
priority_int = 100
registry = _site_registry()
existing = registry.get_local_site()
site = SiteInfo(
site_id=site_id,
endpoint=endpoint,
region=region,
priority=priority_int,
display_name=display_name or site_id,
created_at=existing.created_at if existing else None,
)
registry.set_local_site(site)
flash("Local site configuration updated", "success")
return redirect(url_for("ui.sites_dashboard"))
@ui_bp.post("/sites/peers")
def add_peer_site():
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
site_id = request.form.get("site_id", "").strip()
endpoint = request.form.get("endpoint", "").strip()
region = request.form.get("region", "us-east-1").strip()
priority = request.form.get("priority", "100")
display_name = request.form.get("display_name", "").strip()
connection_id = request.form.get("connection_id", "").strip() or None
if not site_id:
flash("Site ID is required", "danger")
return redirect(url_for("ui.sites_dashboard"))
if not endpoint:
flash("Endpoint is required", "danger")
return redirect(url_for("ui.sites_dashboard"))
try:
priority_int = int(priority)
except ValueError:
priority_int = 100
registry = _site_registry()
if registry.get_peer(site_id):
flash(f"Peer site '{site_id}' already exists", "danger")
return redirect(url_for("ui.sites_dashboard"))
if connection_id and not _connections().get(connection_id):
flash(f"Connection '{connection_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
peer = PeerSite(
site_id=site_id,
endpoint=endpoint,
region=region,
priority=priority_int,
display_name=display_name or site_id,
connection_id=connection_id,
)
registry.add_peer(peer)
flash(f"Peer site '{site_id}' added", "success")
return redirect(url_for("ui.sites_dashboard"))
@ui_bp.post("/sites/peers/<site_id>/update")
def update_peer_site(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
registry = _site_registry()
existing = registry.get_peer(site_id)
if not existing:
flash(f"Peer site '{site_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
endpoint = request.form.get("endpoint", existing.endpoint).strip()
region = request.form.get("region", existing.region).strip()
priority = request.form.get("priority", str(existing.priority))
display_name = request.form.get("display_name", existing.display_name).strip()
connection_id = request.form.get("connection_id", "").strip() or existing.connection_id
try:
priority_int = int(priority)
except ValueError:
priority_int = existing.priority
if connection_id and not _connections().get(connection_id):
flash(f"Connection '{connection_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
peer = PeerSite(
site_id=site_id,
endpoint=endpoint,
region=region,
priority=priority_int,
display_name=display_name or site_id,
connection_id=connection_id,
created_at=existing.created_at,
is_healthy=existing.is_healthy,
last_health_check=existing.last_health_check,
)
registry.update_peer(peer)
flash(f"Peer site '{site_id}' updated", "success")
return redirect(url_for("ui.sites_dashboard"))
@ui_bp.post("/sites/peers/<site_id>/delete")
def delete_peer_site(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
registry = _site_registry()
if registry.delete_peer(site_id):
flash(f"Peer site '{site_id}' deleted", "success")
else:
flash(f"Peer site '{site_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
@ui_bp.get("/sites/peers/<site_id>/health")
def check_peer_site_health(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
return jsonify({"error": "Access denied"}), 403
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer:
return jsonify({"error": f"Peer site '{site_id}' not found"}), 404
is_healthy = False
error_message = None
if peer.connection_id:
connection = _connections().get(peer.connection_id)
if connection:
is_healthy = _replication().check_endpoint_health(connection)
else:
error_message = f"Connection '{peer.connection_id}' not found"
else:
error_message = "No connection configured for this peer"
registry.update_health(site_id, is_healthy)
result = {
"site_id": site_id,
"is_healthy": is_healthy,
}
if error_message:
result["error"] = error_message
return jsonify(result)
@ui_bp.app_errorhandler(404)
def ui_not_found(error): # type: ignore[override]
prefix = ui_bp.url_prefix or ""