from __future__ import annotations import ipaddress import logging import re import socket import time from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse import requests from flask import Blueprint, Response, current_app, jsonify, request from .connections import ConnectionStore from .extensions import limiter from .iam import IamError, Principal from .replication import ReplicationManager from .site_registry import PeerSite, SiteInfo, SiteRegistry def _is_safe_url(url: str) -> bool: """Check if a URL is safe to make requests to (not internal/private).""" try: parsed = urlparse(url) hostname = parsed.hostname if not hostname: return False blocked_hosts = { "localhost", "127.0.0.1", "0.0.0.0", "::1", "[::1]", "metadata.google.internal", "169.254.169.254", } if hostname.lower() in blocked_hosts: return False try: resolved_ip = socket.gethostbyname(hostname) ip = ipaddress.ip_address(resolved_ip) if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved: return False except (socket.gaierror, ValueError): return False return True except Exception: return False def _validate_endpoint(endpoint: str) -> Optional[str]: """Validate endpoint URL format. Returns error message or None.""" try: parsed = urlparse(endpoint) if not parsed.scheme or parsed.scheme not in ("http", "https"): return "Endpoint must be http or https URL" if not parsed.netloc: return "Endpoint must have a host" return None except Exception: return "Invalid endpoint URL" def _validate_priority(priority: Any) -> Optional[str]: """Validate priority value. Returns error message or None.""" try: p = int(priority) if p < 0 or p > 1000: return "Priority must be between 0 and 1000" return None except (TypeError, ValueError): return "Priority must be an integer" def _validate_region(region: str) -> Optional[str]: """Validate region format. Returns error message or None.""" if not re.match(r"^[a-z]{2,}-[a-z]+-\d+$", region): return "Region must match format like us-east-1" return None logger = logging.getLogger(__name__) admin_api_bp = Blueprint("admin_api", __name__, url_prefix="/admin") def _require_principal() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: from .s3_api import _require_principal as s3_require_principal return s3_require_principal() def _require_admin() -> Tuple[Optional[Principal], Optional[Tuple[Dict[str, Any], int]]]: principal, error = _require_principal() if error: return None, error try: _iam().authorize(principal, None, "iam:*") return principal, None except IamError: return None, _json_error("AccessDenied", "Admin access required", 403) def _site_registry() -> SiteRegistry: return current_app.extensions["site_registry"] def _connections() -> ConnectionStore: return current_app.extensions["connections"] def _replication() -> ReplicationManager: return current_app.extensions["replication"] def _iam(): return current_app.extensions["iam"] def _json_error(code: str, message: str, status: int) -> Tuple[Dict[str, Any], int]: return {"error": {"code": code, "message": message}}, status def _get_admin_rate_limit() -> str: return current_app.config.get("RATE_LIMIT_ADMIN", "60 per minute") @admin_api_bp.route("/site", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def get_local_site(): principal, error = _require_admin() if error: return error registry = _site_registry() local_site = registry.get_local_site() if local_site: return jsonify(local_site.to_dict()) config_site_id = current_app.config.get("SITE_ID") config_endpoint = current_app.config.get("SITE_ENDPOINT") if config_site_id: return jsonify({ "site_id": config_site_id, "endpoint": config_endpoint or "", "region": current_app.config.get("SITE_REGION", "us-east-1"), "priority": current_app.config.get("SITE_PRIORITY", 100), "display_name": config_site_id, "source": "environment", }) return _json_error("NotFound", "Local site not configured", 404) @admin_api_bp.route("/site", methods=["PUT"]) @limiter.limit(lambda: _get_admin_rate_limit()) def update_local_site(): principal, error = _require_admin() if error: return error payload = request.get_json(silent=True) or {} site_id = payload.get("site_id") endpoint = payload.get("endpoint") if not site_id: return _json_error("ValidationError", "site_id is required", 400) registry = _site_registry() existing = registry.get_local_site() site = SiteInfo( site_id=site_id, endpoint=endpoint or "", region=payload.get("region", "us-east-1"), priority=payload.get("priority", 100), display_name=payload.get("display_name", site_id), created_at=existing.created_at if existing else None, ) registry.set_local_site(site) logger.info("Local site updated", extra={"site_id": site_id, "principal": principal.access_key}) return jsonify(site.to_dict()) @admin_api_bp.route("/sites", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def list_all_sites(): principal, error = _require_admin() if error: return error registry = _site_registry() local = registry.get_local_site() peers = registry.list_peers() result = { "local": local.to_dict() if local else None, "peers": [peer.to_dict() for peer in peers], "total_peers": len(peers), } return jsonify(result) @admin_api_bp.route("/sites", methods=["POST"]) @limiter.limit(lambda: _get_admin_rate_limit()) def register_peer_site(): principal, error = _require_admin() if error: return error payload = request.get_json(silent=True) or {} site_id = payload.get("site_id") endpoint = payload.get("endpoint") if not site_id: return _json_error("ValidationError", "site_id is required", 400) if not endpoint: return _json_error("ValidationError", "endpoint is required", 400) endpoint_error = _validate_endpoint(endpoint) if endpoint_error: return _json_error("ValidationError", endpoint_error, 400) region = payload.get("region", "us-east-1") region_error = _validate_region(region) if region_error: return _json_error("ValidationError", region_error, 400) priority = payload.get("priority", 100) priority_error = _validate_priority(priority) if priority_error: return _json_error("ValidationError", priority_error, 400) registry = _site_registry() if registry.get_peer(site_id): return _json_error("AlreadyExists", f"Peer site '{site_id}' already exists", 409) connection_id = payload.get("connection_id") if connection_id: if not _connections().get(connection_id): return _json_error("ValidationError", f"Connection '{connection_id}' not found", 400) peer = PeerSite( site_id=site_id, endpoint=endpoint, region=region, priority=int(priority), display_name=payload.get("display_name", site_id), connection_id=connection_id, ) registry.add_peer(peer) logger.info("Peer site registered", extra={"site_id": site_id, "principal": principal.access_key}) return jsonify(peer.to_dict()), 201 @admin_api_bp.route("/sites/", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def get_peer_site(site_id: str): principal, error = _require_admin() if error: return error registry = _site_registry() peer = registry.get_peer(site_id) if not peer: return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) return jsonify(peer.to_dict()) @admin_api_bp.route("/sites/", methods=["PUT"]) @limiter.limit(lambda: _get_admin_rate_limit()) def update_peer_site(site_id: str): principal, error = _require_admin() if error: return error registry = _site_registry() existing = registry.get_peer(site_id) if not existing: return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) payload = request.get_json(silent=True) or {} peer = PeerSite( site_id=site_id, endpoint=payload.get("endpoint", existing.endpoint), region=payload.get("region", existing.region), priority=payload.get("priority", existing.priority), display_name=payload.get("display_name", existing.display_name), connection_id=payload.get("connection_id", existing.connection_id), created_at=existing.created_at, is_healthy=existing.is_healthy, last_health_check=existing.last_health_check, ) registry.update_peer(peer) logger.info("Peer site updated", extra={"site_id": site_id, "principal": principal.access_key}) return jsonify(peer.to_dict()) @admin_api_bp.route("/sites/", methods=["DELETE"]) @limiter.limit(lambda: _get_admin_rate_limit()) def delete_peer_site(site_id: str): principal, error = _require_admin() if error: return error registry = _site_registry() if not registry.delete_peer(site_id): return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) logger.info("Peer site deleted", extra={"site_id": site_id, "principal": principal.access_key}) return Response(status=204) @admin_api_bp.route("/sites//health", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def check_peer_health(site_id: str): principal, error = _require_admin() if error: return error registry = _site_registry() peer = registry.get_peer(site_id) if not peer: return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) is_healthy = False error_message = None if peer.connection_id: connection = _connections().get(peer.connection_id) if connection: is_healthy = _replication().check_endpoint_health(connection) else: error_message = f"Connection '{peer.connection_id}' not found" else: error_message = "No connection configured for this peer" registry.update_health(site_id, is_healthy) result = { "site_id": site_id, "is_healthy": is_healthy, "checked_at": time.time(), } if error_message: result["error"] = error_message return jsonify(result) @admin_api_bp.route("/topology", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def get_topology(): principal, error = _require_admin() if error: return error registry = _site_registry() local = registry.get_local_site() peers = registry.list_peers() sites = [] if local: sites.append({ **local.to_dict(), "is_local": True, "is_healthy": True, }) for peer in peers: sites.append({ **peer.to_dict(), "is_local": False, }) sites.sort(key=lambda s: s.get("priority", 100)) return jsonify({ "sites": sites, "total": len(sites), "healthy_count": sum(1 for s in sites if s.get("is_healthy")), }) @admin_api_bp.route("/sites//bidirectional-status", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def check_bidirectional_status(site_id: str): principal, error = _require_admin() if error: return error registry = _site_registry() peer = registry.get_peer(site_id) if not peer: return _json_error("NotFound", f"Peer site '{site_id}' not found", 404) local_site = registry.get_local_site() replication = _replication() local_rules = replication.list_rules() local_bidir_rules = [] for rule in local_rules: if rule.target_connection_id == peer.connection_id and rule.mode == "bidirectional": local_bidir_rules.append({ "bucket_name": rule.bucket_name, "target_bucket": rule.target_bucket, "enabled": rule.enabled, }) result = { "site_id": site_id, "local_site_id": local_site.site_id if local_site else None, "local_endpoint": local_site.endpoint if local_site else None, "local_bidirectional_rules": local_bidir_rules, "local_site_sync_enabled": current_app.config.get("SITE_SYNC_ENABLED", False), "remote_status": None, "issues": [], "is_fully_configured": False, } if not local_site or not local_site.site_id: result["issues"].append({ "code": "NO_LOCAL_SITE_ID", "message": "Local site identity not configured", "severity": "error", }) if not local_site or not local_site.endpoint: result["issues"].append({ "code": "NO_LOCAL_ENDPOINT", "message": "Local site endpoint not configured (remote site cannot reach back)", "severity": "error", }) if not peer.connection_id: result["issues"].append({ "code": "NO_CONNECTION", "message": "No connection configured for this peer", "severity": "error", }) return jsonify(result) connection = _connections().get(peer.connection_id) if not connection: result["issues"].append({ "code": "CONNECTION_NOT_FOUND", "message": f"Connection '{peer.connection_id}' not found", "severity": "error", }) return jsonify(result) if not local_bidir_rules: result["issues"].append({ "code": "NO_LOCAL_BIDIRECTIONAL_RULES", "message": "No bidirectional replication rules configured on this site", "severity": "warning", }) if not result["local_site_sync_enabled"]: result["issues"].append({ "code": "SITE_SYNC_DISABLED", "message": "Site sync worker is disabled (SITE_SYNC_ENABLED=false). Pull operations will not work.", "severity": "warning", }) if not replication.check_endpoint_health(connection): result["issues"].append({ "code": "REMOTE_UNREACHABLE", "message": "Remote endpoint is not reachable", "severity": "error", }) return jsonify(result) if not _is_safe_url(peer.endpoint): result["issues"].append({ "code": "ENDPOINT_NOT_ALLOWED", "message": "Peer endpoint points to internal or private address", "severity": "error", }) return jsonify(result) try: admin_url = peer.endpoint.rstrip("/") + "/admin/sites" resp = requests.get( admin_url, timeout=10, headers={ "Accept": "application/json", "X-Access-Key": connection.access_key, "X-Secret-Key": connection.secret_key, }, ) if resp.status_code == 200: try: remote_data = resp.json() if not isinstance(remote_data, dict): raise ValueError("Expected JSON object") remote_local = remote_data.get("local") if remote_local is not None and not isinstance(remote_local, dict): raise ValueError("Expected 'local' to be an object") remote_peers = remote_data.get("peers", []) if not isinstance(remote_peers, list): raise ValueError("Expected 'peers' to be a list") except (ValueError, json.JSONDecodeError) as e: logger.warning("Invalid JSON from remote admin API: %s", e) result["remote_status"] = {"reachable": True, "invalid_response": True} result["issues"].append({ "code": "REMOTE_INVALID_RESPONSE", "message": "Remote admin API returned invalid JSON", "severity": "warning", }) return jsonify(result) result["remote_status"] = { "reachable": True, "local_site": remote_local, "site_sync_enabled": None, "has_peer_for_us": False, "peer_connection_configured": False, "has_bidirectional_rules_for_us": False, } for rp in remote_peers: if not isinstance(rp, dict): continue if local_site and ( rp.get("site_id") == local_site.site_id or rp.get("endpoint") == local_site.endpoint ): result["remote_status"]["has_peer_for_us"] = True result["remote_status"]["peer_connection_configured"] = bool(rp.get("connection_id")) break if not result["remote_status"]["has_peer_for_us"]: result["issues"].append({ "code": "REMOTE_NO_PEER_FOR_US", "message": "Remote site does not have this site registered as a peer", "severity": "error", }) elif not result["remote_status"]["peer_connection_configured"]: result["issues"].append({ "code": "REMOTE_NO_CONNECTION_FOR_US", "message": "Remote site has us as peer but no connection configured (cannot push back)", "severity": "error", }) elif resp.status_code == 401 or resp.status_code == 403: result["remote_status"] = { "reachable": True, "admin_access_denied": True, } result["issues"].append({ "code": "REMOTE_ADMIN_ACCESS_DENIED", "message": "Cannot verify remote configuration (admin access denied)", "severity": "warning", }) else: result["remote_status"] = { "reachable": True, "admin_api_error": resp.status_code, } result["issues"].append({ "code": "REMOTE_ADMIN_API_ERROR", "message": f"Remote admin API returned status {resp.status_code}", "severity": "warning", }) except requests.RequestException as e: logger.warning("Remote admin API unreachable: %s", e) result["remote_status"] = { "reachable": False, "error": "Connection failed", } result["issues"].append({ "code": "REMOTE_ADMIN_UNREACHABLE", "message": "Could not reach remote admin API", "severity": "warning", }) except Exception as e: logger.warning("Error checking remote bidirectional status: %s", e, exc_info=True) result["issues"].append({ "code": "VERIFICATION_ERROR", "message": "Internal error during verification", "severity": "warning", }) error_issues = [i for i in result["issues"] if i["severity"] == "error"] result["is_fully_configured"] = len(error_issues) == 0 and len(local_bidir_rules) > 0 return jsonify(result)