Add replication setup wizard and site-level sync dashboard for site registry

This commit is contained in:
2026-01-26 21:39:47 +08:00
parent 62c36f7a6c
commit 6b715851b9
4 changed files with 492 additions and 1 deletions

View File

@@ -318,6 +318,9 @@ class ReplicationManager:
def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]:
return self._rules.get(bucket_name)
def list_rules(self) -> List[ReplicationRule]:
return list(self._rules.values())
def set_rule(self, rule: ReplicationRule) -> None:
old_rule = self._rules.get(rule.bucket_name)
was_all_mode = old_rule and old_rule.mode == REPLICATION_MODE_ALL if old_rule else False

195
app/ui.py
View File

@@ -2682,11 +2682,28 @@ def sites_dashboard():
peers = registry.list_peers()
connections = _connections().list()
replication = _replication()
all_rules = replication.list_rules()
peers_with_stats = []
for peer in peers:
buckets_syncing = 0
if peer.connection_id:
for rule in all_rules:
if rule.target_connection_id == peer.connection_id:
buckets_syncing += 1
peers_with_stats.append({
"peer": peer,
"buckets_syncing": buckets_syncing,
"has_connection": bool(peer.connection_id),
})
return render_template(
"sites.html",
principal=principal,
local_site=local_site,
peers=peers,
peers_with_stats=peers_with_stats,
connections=connections,
config_site_id=current_app.config.get("SITE_ID"),
config_site_endpoint=current_app.config.get("SITE_ENDPOINT"),
@@ -2784,6 +2801,9 @@ def add_peer_site():
registry.add_peer(peer)
flash(f"Peer site '{site_id}' added", "success")
if connection_id:
return redirect(url_for("ui.replication_wizard", site_id=site_id))
return redirect(url_for("ui.sites_dashboard"))
@@ -2891,6 +2911,181 @@ def check_peer_site_health(site_id: str):
return jsonify(result)
@ui_bp.get("/sites/peers/<site_id>/replication-wizard")
def replication_wizard(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer:
flash(f"Peer site '{site_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
if not peer.connection_id:
flash("This peer has no connection configured. Add a connection first to set up replication.", "warning")
return redirect(url_for("ui.sites_dashboard"))
connection = _connections().get(peer.connection_id)
if not connection:
flash(f"Connection '{peer.connection_id}' not found", "danger")
return redirect(url_for("ui.sites_dashboard"))
buckets = _storage().list_buckets()
replication = _replication()
bucket_info = []
for bucket in buckets:
existing_rule = replication.get_rule(bucket.name)
has_rule_for_peer = (
existing_rule and
existing_rule.target_connection_id == peer.connection_id
)
bucket_info.append({
"name": bucket.name,
"has_rule": has_rule_for_peer,
"existing_mode": existing_rule.mode if has_rule_for_peer else None,
"existing_target": existing_rule.target_bucket if has_rule_for_peer else None,
})
return render_template(
"replication_wizard.html",
principal=principal,
peer=peer,
connection=connection,
buckets=bucket_info,
csrf_token=generate_csrf,
)
@ui_bp.post("/sites/peers/<site_id>/replication-rules")
def create_peer_replication_rules(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
flash("Access denied", "danger")
return redirect(url_for("ui.sites_dashboard"))
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer or not peer.connection_id:
flash("Invalid peer site or no connection configured", "danger")
return redirect(url_for("ui.sites_dashboard"))
from .replication import REPLICATION_MODE_NEW_ONLY, REPLICATION_MODE_ALL
import time as time_module
selected_buckets = request.form.getlist("buckets")
mode = request.form.get("mode", REPLICATION_MODE_NEW_ONLY)
if not selected_buckets:
flash("No buckets selected", "warning")
return redirect(url_for("ui.sites_dashboard"))
created = 0
failed = 0
replication = _replication()
for bucket_name in selected_buckets:
target_bucket = request.form.get(f"target_{bucket_name}", bucket_name).strip()
if not target_bucket:
target_bucket = bucket_name
try:
rule = ReplicationRule(
bucket_name=bucket_name,
target_connection_id=peer.connection_id,
target_bucket=target_bucket,
enabled=True,
mode=mode,
created_at=time_module.time(),
)
replication.set_rule(rule)
if mode == REPLICATION_MODE_ALL:
replication.replicate_existing_objects(bucket_name)
created += 1
except Exception:
failed += 1
if created > 0:
flash(f"Created {created} replication rule(s) for {peer.display_name or peer.site_id}", "success")
if failed > 0:
flash(f"Failed to create {failed} rule(s)", "danger")
return redirect(url_for("ui.sites_dashboard"))
@ui_bp.get("/sites/peers/<site_id>/sync-stats")
def get_peer_sync_stats(site_id: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:*")
except IamError:
return jsonify({"error": "Access denied"}), 403
registry = _site_registry()
peer = registry.get_peer(site_id)
if not peer:
return jsonify({"error": "Peer not found"}), 404
if not peer.connection_id:
return jsonify({"error": "No connection configured"}), 400
replication = _replication()
all_rules = replication.list_rules()
stats = {
"buckets_syncing": 0,
"objects_synced": 0,
"objects_pending": 0,
"objects_failed": 0,
"bytes_synced": 0,
"last_sync_at": None,
"buckets": [],
}
for rule in all_rules:
if rule.target_connection_id != peer.connection_id:
continue
stats["buckets_syncing"] += 1
bucket_stats = {
"bucket_name": rule.bucket_name,
"target_bucket": rule.target_bucket,
"mode": rule.mode,
"enabled": rule.enabled,
}
if rule.stats:
stats["objects_synced"] += rule.stats.objects_synced
stats["objects_pending"] += rule.stats.objects_pending
stats["bytes_synced"] += rule.stats.bytes_synced
if rule.stats.last_sync_at:
if not stats["last_sync_at"] or rule.stats.last_sync_at > stats["last_sync_at"]:
stats["last_sync_at"] = rule.stats.last_sync_at
bucket_stats["last_sync_at"] = rule.stats.last_sync_at
bucket_stats["objects_synced"] = rule.stats.objects_synced
bucket_stats["objects_pending"] = rule.stats.objects_pending
failure_count = replication.get_failure_count(rule.bucket_name)
stats["objects_failed"] += failure_count
bucket_stats["failures"] = failure_count
stats["buckets"].append(bucket_stats)
return jsonify(stats)
@ui_bp.app_errorhandler(404)
def ui_not_found(error): # type: ignore[override]
prefix = ui_bp.url_prefix or ""