diff --git a/app/__init__.py b/app/__init__.py index 6cafe1c..d301c83 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -31,6 +31,7 @@ from .notifications import NotificationService from .object_lock import ObjectLockService from .replication import ReplicationManager from .secret_store import EphemeralSecretStore +from .site_registry import SiteRegistry, SiteInfo from .storage import ObjectStorage from .version import get_version @@ -151,7 +152,17 @@ def create_app( streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024), max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50), ) - + + site_registry_path = config_dir / "site_registry.json" + site_registry = SiteRegistry(site_registry_path) + if app.config.get("SITE_ID") and not site_registry.get_local_site(): + site_registry.set_local_site(SiteInfo( + site_id=app.config["SITE_ID"], + endpoint=app.config.get("SITE_ENDPOINT") or "", + region=app.config.get("SITE_REGION", "us-east-1"), + priority=app.config.get("SITE_PRIORITY", 100), + )) + encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), @@ -207,6 +218,7 @@ def create_app( app.extensions["object_lock"] = object_lock_service app.extensions["notifications"] = notification_service app.extensions["access_logging"] = access_logging_service + app.extensions["site_registry"] = site_registry operation_metrics_collector = None if app.config.get("OPERATION_METRICS_ENABLED", False): @@ -313,11 +325,14 @@ def create_app( if include_api: from .s3_api import s3_api_bp from .kms_api import kms_api_bp + from .admin_api import admin_api_bp app.register_blueprint(s3_api_bp) app.register_blueprint(kms_api_bp) + app.register_blueprint(admin_api_bp) csrf.exempt(s3_api_bp) csrf.exempt(kms_api_bp) + csrf.exempt(admin_api_bp) if include_ui: from .ui import ui_bp diff --git a/app/admin_api.py b/app/admin_api.py new file mode 100644 index 0000000..48c18a3 --- /dev/null +++ b/app/admin_api.py @@ -0,0 +1,320 @@ +from __future__ import annotations + +import logging +import time +from typing import Any, Dict, Optional, Tuple + +from flask import Blueprint, Response, current_app, jsonify, request + +from .connections import ConnectionStore +from .extensions import limiter +from .iam import IamError, Principal +from .replication import ReplicationManager +from .site_registry import PeerSite, SiteInfo, SiteRegistry + +logger = logging.getLogger(__name__) + +admin_api_bp = Blueprint("admin_api", __name__, url_prefix="/admin") + + +def _require_principal() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: + from .s3_api import _require_principal as s3_require_principal + return s3_require_principal() + + +def _require_admin() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: + principal, error = _require_principal() + if error: + return None, error + + try: + _iam().authorize(principal, None, "iam:*") + return principal, None + except IamError: + return None, _json_error("AccessDenied", "Admin access required", 403) + + +def _site_registry() -> SiteRegistry: + return current_app.extensions["site_registry"] + + +def _connections() -> ConnectionStore: + return current_app.extensions["connections"] + + +def _replication() -> ReplicationManager: + return current_app.extensions["replication"] + + +def _iam(): + return current_app.extensions["iam"] + + +def _json_error(code: str, message: str, status: int) -> Tuple[Dict[str, Any], int]: + return {"error": {"code": code, "message": message}}, status + + +def _get_admin_rate_limit() -> str: + return current_app.config.get("RATE_LIMIT_ADMIN", "60 per minute") + + +@admin_api_bp.route("/site", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_local_site(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local_site = registry.get_local_site() + + if local_site: + return jsonify(local_site.to_dict()) + + config_site_id = current_app.config.get("SITE_ID") + config_endpoint = current_app.config.get("SITE_ENDPOINT") + + if config_site_id: + return jsonify({ + "site_id": config_site_id, + "endpoint": config_endpoint or "", + "region": current_app.config.get("SITE_REGION", "us-east-1"), + "priority": current_app.config.get("SITE_PRIORITY", 100), + "display_name": config_site_id, + "source": "environment", + }) + + return _json_error("NotFound", "Local site not configured", 404) + + +@admin_api_bp.route("/site", methods=["PUT"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def update_local_site(): + principal, error = _require_admin() + if error: + return error + + payload = request.get_json(silent=True) or {} + + site_id = payload.get("site_id") + endpoint = payload.get("endpoint") + + if not site_id: + return _json_error("ValidationError", "site_id is required", 400) + + registry = _site_registry() + existing = registry.get_local_site() + + site = SiteInfo( + site_id=site_id, + endpoint=endpoint or "", + region=payload.get("region", "us-east-1"), + priority=payload.get("priority", 100), + display_name=payload.get("display_name", site_id), + created_at=existing.created_at if existing else None, + ) + + registry.set_local_site(site) + + logger.info("Local site updated", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(site.to_dict()) + + +@admin_api_bp.route("/sites", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def list_all_sites(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local = registry.get_local_site() + peers = registry.list_peers() + + result = { + "local": local.to_dict() if local else None, + "peers": [peer.to_dict() for peer in peers], + "total_peers": len(peers), + } + + return jsonify(result) + + +@admin_api_bp.route("/sites", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def register_peer_site(): + principal, error = _require_admin() + if error: + return error + + payload = request.get_json(silent=True) or {} + + site_id = payload.get("site_id") + endpoint = payload.get("endpoint") + + if not site_id: + return _json_error("ValidationError", "site_id is required", 400) + if not endpoint: + return _json_error("ValidationError", "endpoint is required", 400) + + registry = _site_registry() + + if registry.get_peer(site_id): + return _json_error("AlreadyExists", f"Peer site '{site_id}' already exists", 409) + + connection_id = payload.get("connection_id") + if connection_id: + if not _connections().get(connection_id): + return _json_error("ValidationError", f"Connection '{connection_id}' not found", 400) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=payload.get("region", "us-east-1"), + priority=payload.get("priority", 100), + display_name=payload.get("display_name", site_id), + connection_id=connection_id, + ) + + registry.add_peer(peer) + + logger.info("Peer site registered", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(peer.to_dict()), 201 + + +@admin_api_bp.route("/sites/", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + return jsonify(peer.to_dict()) + + +@admin_api_bp.route("/sites/", methods=["PUT"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def update_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + existing = registry.get_peer(site_id) + + if not existing: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + payload = request.get_json(silent=True) or {} + + peer = PeerSite( + site_id=site_id, + endpoint=payload.get("endpoint", existing.endpoint), + region=payload.get("region", existing.region), + priority=payload.get("priority", existing.priority), + display_name=payload.get("display_name", existing.display_name), + connection_id=payload.get("connection_id", existing.connection_id), + created_at=existing.created_at, + is_healthy=existing.is_healthy, + last_health_check=existing.last_health_check, + ) + + registry.update_peer(peer) + + logger.info("Peer site updated", extra={"site_id": site_id, "principal": principal.access_key}) + return jsonify(peer.to_dict()) + + +@admin_api_bp.route("/sites/", methods=["DELETE"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def delete_peer_site(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + + if not registry.delete_peer(site_id): + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + logger.info("Peer site deleted", extra={"site_id": site_id, "principal": principal.access_key}) + return Response(status=204) + + +@admin_api_bp.route("/sites//health", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def check_peer_health(site_id: str): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) + + is_healthy = False + error_message = None + + if peer.connection_id: + connection = _connections().get(peer.connection_id) + if connection: + is_healthy = _replication().check_endpoint_health(connection) + else: + error_message = f"Connection '{peer.connection_id}' not found" + else: + error_message = "No connection configured for this peer" + + registry.update_health(site_id, is_healthy) + + result = { + "site_id": site_id, + "is_healthy": is_healthy, + "checked_at": time.time(), + } + if error_message: + result["error"] = error_message + + return jsonify(result) + + +@admin_api_bp.route("/topology", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def get_topology(): + principal, error = _require_admin() + if error: + return error + + registry = _site_registry() + local = registry.get_local_site() + peers = registry.list_peers() + + sites = [] + + if local: + sites.append({ + **local.to_dict(), + "is_local": True, + "is_healthy": True, + }) + + for peer in peers: + sites.append({ + **peer.to_dict(), + "is_local": False, + }) + + sites.sort(key=lambda s: s.get("priority", 100)) + + return jsonify({ + "sites": sites, + "total": len(sites), + "healthy_count": sum(1 for s in sites if s.get("is_healthy")), + }) diff --git a/app/config.py b/app/config.py index 7c40e40..8b779a6 100644 --- a/app/config.py +++ b/app/config.py @@ -141,6 +141,11 @@ class AppConfig: kms_generate_data_key_min_bytes: int kms_generate_data_key_max_bytes: int lifecycle_max_history_per_bucket: int + site_id: Optional[str] + site_endpoint: Optional[str] + site_region: str + site_priority: int + ratelimit_admin: str @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -298,6 +303,14 @@ class AppConfig: kms_generate_data_key_max_bytes = int(_get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024)) lifecycle_max_history_per_bucket = int(_get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50)) + site_id_raw = _get("SITE_ID", None) + site_id = str(site_id_raw).strip() if site_id_raw else None + site_endpoint_raw = _get("SITE_ENDPOINT", None) + site_endpoint = str(site_endpoint_raw).strip() if site_endpoint_raw else None + site_region = str(_get("SITE_REGION", "us-east-1")) + site_priority = int(_get("SITE_PRIORITY", 100)) + ratelimit_admin = _validate_rate_limit(str(_get("RATE_LIMIT_ADMIN", "60 per minute"))) + return cls(storage_root=storage_root, max_upload_size=max_upload_size, ui_page_size=ui_page_size, @@ -375,7 +388,12 @@ class AppConfig: encryption_chunk_size_bytes=encryption_chunk_size_bytes, kms_generate_data_key_min_bytes=kms_generate_data_key_min_bytes, kms_generate_data_key_max_bytes=kms_generate_data_key_max_bytes, - lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket) + lifecycle_max_history_per_bucket=lifecycle_max_history_per_bucket, + site_id=site_id, + site_endpoint=site_endpoint, + site_region=site_region, + site_priority=site_priority, + ratelimit_admin=ratelimit_admin) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -575,4 +593,9 @@ class AppConfig: "KMS_GENERATE_DATA_KEY_MIN_BYTES": self.kms_generate_data_key_min_bytes, "KMS_GENERATE_DATA_KEY_MAX_BYTES": self.kms_generate_data_key_max_bytes, "LIFECYCLE_MAX_HISTORY_PER_BUCKET": self.lifecycle_max_history_per_bucket, + "SITE_ID": self.site_id, + "SITE_ENDPOINT": self.site_endpoint, + "SITE_REGION": self.site_region, + "SITE_PRIORITY": self.site_priority, + "RATE_LIMIT_ADMIN": self.ratelimit_admin, } diff --git a/app/site_registry.py b/app/site_registry.py new file mode 100644 index 0000000..b257326 --- /dev/null +++ b/app/site_registry.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + + +@dataclass +class SiteInfo: + site_id: str + endpoint: str + region: str = "us-east-1" + priority: int = 100 + display_name: str = "" + created_at: Optional[float] = None + updated_at: Optional[float] = None + + def __post_init__(self) -> None: + if not self.display_name: + self.display_name = self.site_id + if self.created_at is None: + self.created_at = time.time() + + def to_dict(self) -> Dict[str, Any]: + return { + "site_id": self.site_id, + "endpoint": self.endpoint, + "region": self.region, + "priority": self.priority, + "display_name": self.display_name, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> SiteInfo: + return cls( + site_id=data["site_id"], + endpoint=data.get("endpoint", ""), + region=data.get("region", "us-east-1"), + priority=data.get("priority", 100), + display_name=data.get("display_name", ""), + created_at=data.get("created_at"), + updated_at=data.get("updated_at"), + ) + + +@dataclass +class PeerSite: + site_id: str + endpoint: str + region: str = "us-east-1" + priority: int = 100 + display_name: str = "" + created_at: Optional[float] = None + updated_at: Optional[float] = None + connection_id: Optional[str] = None + is_healthy: Optional[bool] = None + last_health_check: Optional[float] = None + + def __post_init__(self) -> None: + if not self.display_name: + self.display_name = self.site_id + if self.created_at is None: + self.created_at = time.time() + + def to_dict(self) -> Dict[str, Any]: + return { + "site_id": self.site_id, + "endpoint": self.endpoint, + "region": self.region, + "priority": self.priority, + "display_name": self.display_name, + "created_at": self.created_at, + "updated_at": self.updated_at, + "connection_id": self.connection_id, + "is_healthy": self.is_healthy, + "last_health_check": self.last_health_check, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> PeerSite: + return cls( + site_id=data["site_id"], + endpoint=data.get("endpoint", ""), + region=data.get("region", "us-east-1"), + priority=data.get("priority", 100), + display_name=data.get("display_name", ""), + created_at=data.get("created_at"), + updated_at=data.get("updated_at"), + connection_id=data.get("connection_id"), + is_healthy=data.get("is_healthy"), + last_health_check=data.get("last_health_check"), + ) + + +class SiteRegistry: + def __init__(self, config_path: Path) -> None: + self.config_path = config_path + self._local_site: Optional[SiteInfo] = None + self._peers: Dict[str, PeerSite] = {} + self.reload() + + def reload(self) -> None: + if not self.config_path.exists(): + self._local_site = None + self._peers = {} + return + + try: + with open(self.config_path, "r", encoding="utf-8") as f: + data = json.load(f) + + if data.get("local"): + self._local_site = SiteInfo.from_dict(data["local"]) + else: + self._local_site = None + + self._peers = {} + for peer_data in data.get("peers", []): + peer = PeerSite.from_dict(peer_data) + self._peers[peer.site_id] = peer + + except (OSError, json.JSONDecodeError, KeyError): + self._local_site = None + self._peers = {} + + def save(self) -> None: + self.config_path.parent.mkdir(parents=True, exist_ok=True) + data = { + "local": self._local_site.to_dict() if self._local_site else None, + "peers": [peer.to_dict() for peer in self._peers.values()], + } + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + def get_local_site(self) -> Optional[SiteInfo]: + return self._local_site + + def set_local_site(self, site: SiteInfo) -> None: + site.updated_at = time.time() + self._local_site = site + self.save() + + def list_peers(self) -> List[PeerSite]: + return list(self._peers.values()) + + def get_peer(self, site_id: str) -> Optional[PeerSite]: + return self._peers.get(site_id) + + def add_peer(self, peer: PeerSite) -> None: + peer.created_at = peer.created_at or time.time() + self._peers[peer.site_id] = peer + self.save() + + def update_peer(self, peer: PeerSite) -> None: + if peer.site_id not in self._peers: + raise ValueError(f"Peer {peer.site_id} not found") + peer.updated_at = time.time() + self._peers[peer.site_id] = peer + self.save() + + def delete_peer(self, site_id: str) -> bool: + if site_id in self._peers: + del self._peers[site_id] + self.save() + return True + return False + + def update_health(self, site_id: str, is_healthy: bool) -> None: + peer = self._peers.get(site_id) + if peer: + peer.is_healthy = is_healthy + peer.last_health_check = time.time() + self.save() diff --git a/app/ui.py b/app/ui.py index 6634a19..172295d 100644 --- a/app/ui.py +++ b/app/ui.py @@ -38,6 +38,7 @@ from .kms import KMSManager from .replication import ReplicationManager, ReplicationRule from .s3_api import _generate_presigned_url from .secret_store import EphemeralSecretStore +from .site_registry import SiteRegistry, SiteInfo, PeerSite from .storage import ObjectStorage, StorageError ui_bp = Blueprint("ui", __name__, template_folder="../templates", url_prefix="/ui") @@ -145,6 +146,10 @@ def _operation_metrics(): return current_app.extensions.get("operation_metrics") +def _site_registry() -> SiteRegistry: + return current_app.extensions["site_registry"] + + def _format_bytes(num: int) -> str: step = 1024 units = ["B", "KB", "MB", "GB", "TB", "PB"] @@ -2663,6 +2668,229 @@ def list_buckets_for_copy(bucket_name: str): return jsonify({"buckets": allowed}) +@ui_bp.get("/sites") +def sites_dashboard(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied: Site management requires admin permissions", "danger") + return redirect(url_for("ui.buckets_overview")) + + registry = _site_registry() + local_site = registry.get_local_site() + peers = registry.list_peers() + connections = _connections().list() + + return render_template( + "sites.html", + principal=principal, + local_site=local_site, + peers=peers, + connections=connections, + config_site_id=current_app.config.get("SITE_ID"), + config_site_endpoint=current_app.config.get("SITE_ENDPOINT"), + config_site_region=current_app.config.get("SITE_REGION", "us-east-1"), + ) + + +@ui_bp.post("/sites/local") +def update_local_site(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + site_id = request.form.get("site_id", "").strip() + endpoint = request.form.get("endpoint", "").strip() + region = request.form.get("region", "us-east-1").strip() + priority = request.form.get("priority", "100") + display_name = request.form.get("display_name", "").strip() + + if not site_id: + flash("Site ID is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + + try: + priority_int = int(priority) + except ValueError: + priority_int = 100 + + registry = _site_registry() + existing = registry.get_local_site() + + site = SiteInfo( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + created_at=existing.created_at if existing else None, + ) + registry.set_local_site(site) + + flash("Local site configuration updated", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers") +def add_peer_site(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + site_id = request.form.get("site_id", "").strip() + endpoint = request.form.get("endpoint", "").strip() + region = request.form.get("region", "us-east-1").strip() + priority = request.form.get("priority", "100") + display_name = request.form.get("display_name", "").strip() + connection_id = request.form.get("connection_id", "").strip() or None + + if not site_id: + flash("Site ID is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + if not endpoint: + flash("Endpoint is required", "danger") + return redirect(url_for("ui.sites_dashboard")) + + try: + priority_int = int(priority) + except ValueError: + priority_int = 100 + + registry = _site_registry() + + if registry.get_peer(site_id): + flash(f"Peer site '{site_id}' already exists", "danger") + return redirect(url_for("ui.sites_dashboard")) + + if connection_id and not _connections().get(connection_id): + flash(f"Connection '{connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + connection_id=connection_id, + ) + registry.add_peer(peer) + + flash(f"Peer site '{site_id}' added", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers//update") +def update_peer_site(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + existing = registry.get_peer(site_id) + + if not existing: + flash(f"Peer site '{site_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + endpoint = request.form.get("endpoint", existing.endpoint).strip() + region = request.form.get("region", existing.region).strip() + priority = request.form.get("priority", str(existing.priority)) + display_name = request.form.get("display_name", existing.display_name).strip() + connection_id = request.form.get("connection_id", "").strip() or existing.connection_id + + try: + priority_int = int(priority) + except ValueError: + priority_int = existing.priority + + if connection_id and not _connections().get(connection_id): + flash(f"Connection '{connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + peer = PeerSite( + site_id=site_id, + endpoint=endpoint, + region=region, + priority=priority_int, + display_name=display_name or site_id, + connection_id=connection_id, + created_at=existing.created_at, + is_healthy=existing.is_healthy, + last_health_check=existing.last_health_check, + ) + registry.update_peer(peer) + + flash(f"Peer site '{site_id}' updated", "success") + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.post("/sites/peers//delete") +def delete_peer_site(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + if registry.delete_peer(site_id): + flash(f"Peer site '{site_id}' deleted", "success") + else: + flash(f"Peer site '{site_id}' not found", "danger") + + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.get("/sites/peers//health") +def check_peer_site_health(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + registry = _site_registry() + peer = registry.get_peer(site_id) + + if not peer: + return jsonify({"error": f"Peer site '{site_id}' not found"}), 404 + + is_healthy = False + error_message = None + + if peer.connection_id: + connection = _connections().get(peer.connection_id) + if connection: + is_healthy = _replication().check_endpoint_health(connection) + else: + error_message = f"Connection '{peer.connection_id}' not found" + else: + error_message = "No connection configured for this peer" + + registry.update_health(site_id, is_healthy) + + result = { + "site_id": site_id, + "is_healthy": is_healthy, + } + if error_message: + result["error"] = error_message + + return jsonify(result) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/templates/base.html b/templates/base.html index 146acc7..7cd24be 100644 --- a/templates/base.html +++ b/templates/base.html @@ -94,6 +94,12 @@ Metrics + + + + + Sites + {% endif %} -
+
09 +

Site Registry

+
+

Track cluster membership and site identity for geo-distributed deployments. The site registry stores local site identity and peer site information.

+ +

Connections vs Sites

+

Understanding the difference between Connections and Sites is key to configuring geo-distribution:

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
AspectConnectionsSites
PurposeStore credentials to authenticate with remote S3 endpointsTrack cluster membership and site identity
ContainsEndpoint URL, access key, secret key, regionSite ID, endpoint, region, priority, display name
Used byReplication rules, site sync workersGeo-distribution awareness, cluster topology
Analogy"How do I log in to that server?""Who are the members of my cluster?"
+
+

Sites can optionally link to a Connection (via connection_id) to perform health checks against peer sites.

+ +

Configuration

+

Set environment variables to bootstrap local site identity on startup:

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
VariableDefaultDescription
SITE_IDNoneUnique identifier for this site (e.g., us-west-1)
SITE_ENDPOINTNonePublic URL for this site (e.g., https://s3.us-west-1.example.com)
SITE_REGIONus-east-1AWS-style region identifier
SITE_PRIORITY100Routing priority (lower = preferred)
+
+
# Example: Configure site identity
+export SITE_ID=us-west-1
+export SITE_ENDPOINT=https://s3.us-west-1.example.com
+export SITE_REGION=us-west-1
+export SITE_PRIORITY=100
+python run.py
+ +

Using the Sites UI

+

Navigate to Sites in the sidebar to manage site configuration:

+
+
+
+
Local Site Identity
+
+
    +
  • Configure this site's ID, endpoint, region, and priority
  • +
  • Display name for easier identification
  • +
  • Changes persist to site_registry.json
  • +
+
+
+
+
+
+
Peer Sites
+
+
    +
  • Register remote sites in your cluster
  • +
  • Link to a Connection for health checks
  • +
  • View health status (green/red/unknown)
  • +
  • Edit or delete peers as needed
  • +
+
+
+
+
+ +

Admin API Endpoints

+

The /admin API provides programmatic access to site registry:

+
# Get local site configuration
+curl {{ api_base }}/admin/site \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Update local site
+curl -X PUT {{ api_base }}/admin/site \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"site_id": "us-west-1", "endpoint": "https://s3.example.com", "region": "us-west-1"}'
+
+# List all peer sites
+curl {{ api_base }}/admin/sites \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Add a peer site
+curl -X POST {{ api_base }}/admin/sites \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"site_id": "us-east-1", "endpoint": "https://s3.us-east-1.example.com"}'
+
+# Check peer health
+curl {{ api_base }}/admin/sites/us-east-1/health \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Get cluster topology
+curl {{ api_base }}/admin/topology \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+ +

Storage Location

+

Site registry data is stored at:

+ data/.myfsio.sys/config/site_registry.json + +
+
+ + + + +
+ Planned: The site registry lays the groundwork for features like automatic failover, intelligent routing, and multi-site consistency. Currently it provides cluster awareness and health monitoring. +
+
+
+
+
+
+
+
+ 10

Object Versioning

Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.

@@ -1046,7 +1211,7 @@ curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
- 10 + 11

Bucket Quotas

Limit how much data a bucket can hold using storage quotas. Quotas are enforced on uploads and multipart completions.

@@ -1114,7 +1279,7 @@ curl -X PUT "{{ api_base }}/bucket/<bucket>?quota" \
- 11 + 12

Encryption

Protect data at rest with server-side encryption using AES-256-GCM. Objects are encrypted before being written to disk and decrypted transparently on read.

@@ -1208,7 +1373,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
- 12 + 13

Lifecycle Rules

Automatically delete expired objects, clean up old versions, and abort incomplete multipart uploads using time-based lifecycle rules.

@@ -1290,7 +1455,7 @@ curl "{{ api_base }}/<bucket>?lifecycle" \
- 13 + 14

Metrics History

Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.

@@ -1374,7 +1539,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
- 14 + 15

Operation Metrics

Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.

@@ -1481,7 +1646,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
- 15 + 16

Troubleshooting & tips

@@ -1543,6 +1708,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
  • REST endpoints
  • API Examples
  • Site Replication & Sync
  • +
  • Site Registry
  • Object Versioning
  • Bucket Quotas
  • Encryption
  • diff --git a/templates/sites.html b/templates/sites.html new file mode 100644 index 0000000..608a777 --- /dev/null +++ b/templates/sites.html @@ -0,0 +1,432 @@ +{% extends "base.html" %} + +{% block title %}Sites - S3 Compatible Storage{% endblock %} + +{% block content %} + + +
    +
    +
    +
    +
    + + + + Local Site Identity +
    +

    This site's configuration

    +
    +
    +
    + +
    + + +
    Unique identifier for this site
    +
    +
    + + +
    Public URL for this site
    +
    +
    + + +
    +
    +
    + + +
    Lower = preferred
    +
    +
    + + +
    +
    +
    + +
    +
    +
    +
    + +
    +
    +
    + + + + Add Peer Site +
    +

    Register a remote site

    +
    +
    +
    + +
    + + +
    +
    + + +
    +
    + + +
    +
    +
    + + +
    +
    + + +
    +
    +
    + + +
    Link to a remote connection for health checks
    +
    +
    + +
    +
    +
    +
    +
    + +
    +
    +
    +
    +
    + + + + Peer Sites +
    +

    Known remote sites in the cluster

    +
    +
    +
    + {% if peers %} +
    + + + + + + + + + + + + + {% for peer in peers %} + + + + + + + + + {% endfor %} + +
    HealthSite IDEndpointRegionPriorityActions
    + + {% if peer.is_healthy == true %} + + + + {% elif peer.is_healthy == false %} + + + + {% else %} + + + + + {% endif %} + + +
    +
    + + + +
    +
    + {{ peer.display_name or peer.site_id }} + {% if peer.display_name and peer.display_name != peer.site_id %} +
    {{ peer.site_id }} + {% endif %} +
    +
    +
    + {{ peer.endpoint }} + {{ peer.region }}{{ peer.priority }} +
    + + + +
    +
    +
    + {% else %} +
    +
    + + + +
    +
    No peer sites yet
    +

    Add peer sites to enable geo-distribution and site-to-site replication.

    +
    + {% endif %} +
    +
    +
    +
    + + + + + + +{% endblock %}