From 6b715851b94d3edf431f84c06ba88e143b93e897 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 26 Jan 2026 21:39:47 +0800 Subject: [PATCH] Add replication setup wizard and site-level sync dashboard for site registry --- app/replication.py | 3 + app/ui.py | 195 ++++++++++++++++++++++++++ templates/replication_wizard.html | 226 ++++++++++++++++++++++++++++++ templates/sites.html | 69 ++++++++- 4 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 templates/replication_wizard.html diff --git a/app/replication.py b/app/replication.py index d8022ba..c0023dd 100644 --- a/app/replication.py +++ b/app/replication.py @@ -318,6 +318,9 @@ class ReplicationManager: def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]: return self._rules.get(bucket_name) + def list_rules(self) -> List[ReplicationRule]: + return list(self._rules.values()) + def set_rule(self, rule: ReplicationRule) -> None: old_rule = self._rules.get(rule.bucket_name) was_all_mode = old_rule and old_rule.mode == REPLICATION_MODE_ALL if old_rule else False diff --git a/app/ui.py b/app/ui.py index 172295d..4f0bb58 100644 --- a/app/ui.py +++ b/app/ui.py @@ -2682,11 +2682,28 @@ def sites_dashboard(): peers = registry.list_peers() connections = _connections().list() + replication = _replication() + all_rules = replication.list_rules() + + peers_with_stats = [] + for peer in peers: + buckets_syncing = 0 + if peer.connection_id: + for rule in all_rules: + if rule.target_connection_id == peer.connection_id: + buckets_syncing += 1 + peers_with_stats.append({ + "peer": peer, + "buckets_syncing": buckets_syncing, + "has_connection": bool(peer.connection_id), + }) + return render_template( "sites.html", principal=principal, local_site=local_site, peers=peers, + peers_with_stats=peers_with_stats, connections=connections, config_site_id=current_app.config.get("SITE_ID"), config_site_endpoint=current_app.config.get("SITE_ENDPOINT"), @@ -2784,6 +2801,9 @@ def add_peer_site(): registry.add_peer(peer) flash(f"Peer site '{site_id}' added", "success") + + if connection_id: + return redirect(url_for("ui.replication_wizard", site_id=site_id)) return redirect(url_for("ui.sites_dashboard")) @@ -2891,6 +2911,181 @@ def check_peer_site_health(site_id: str): return jsonify(result) +@ui_bp.get("/sites/peers//replication-wizard") +def replication_wizard(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer: + flash(f"Peer site '{site_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + if not peer.connection_id: + flash("This peer has no connection configured. Add a connection first to set up replication.", "warning") + return redirect(url_for("ui.sites_dashboard")) + + connection = _connections().get(peer.connection_id) + if not connection: + flash(f"Connection '{peer.connection_id}' not found", "danger") + return redirect(url_for("ui.sites_dashboard")) + + buckets = _storage().list_buckets() + replication = _replication() + + bucket_info = [] + for bucket in buckets: + existing_rule = replication.get_rule(bucket.name) + has_rule_for_peer = ( + existing_rule and + existing_rule.target_connection_id == peer.connection_id + ) + bucket_info.append({ + "name": bucket.name, + "has_rule": has_rule_for_peer, + "existing_mode": existing_rule.mode if has_rule_for_peer else None, + "existing_target": existing_rule.target_bucket if has_rule_for_peer else None, + }) + + return render_template( + "replication_wizard.html", + principal=principal, + peer=peer, + connection=connection, + buckets=bucket_info, + csrf_token=generate_csrf, + ) + + +@ui_bp.post("/sites/peers//replication-rules") +def create_peer_replication_rules(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.sites_dashboard")) + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer or not peer.connection_id: + flash("Invalid peer site or no connection configured", "danger") + return redirect(url_for("ui.sites_dashboard")) + + from .replication import REPLICATION_MODE_NEW_ONLY, REPLICATION_MODE_ALL + import time as time_module + + selected_buckets = request.form.getlist("buckets") + mode = request.form.get("mode", REPLICATION_MODE_NEW_ONLY) + + if not selected_buckets: + flash("No buckets selected", "warning") + return redirect(url_for("ui.sites_dashboard")) + + created = 0 + failed = 0 + replication = _replication() + + for bucket_name in selected_buckets: + target_bucket = request.form.get(f"target_{bucket_name}", bucket_name).strip() + if not target_bucket: + target_bucket = bucket_name + + try: + rule = ReplicationRule( + bucket_name=bucket_name, + target_connection_id=peer.connection_id, + target_bucket=target_bucket, + enabled=True, + mode=mode, + created_at=time_module.time(), + ) + replication.set_rule(rule) + + if mode == REPLICATION_MODE_ALL: + replication.replicate_existing_objects(bucket_name) + + created += 1 + except Exception: + failed += 1 + + if created > 0: + flash(f"Created {created} replication rule(s) for {peer.display_name or peer.site_id}", "success") + if failed > 0: + flash(f"Failed to create {failed} rule(s)", "danger") + + return redirect(url_for("ui.sites_dashboard")) + + +@ui_bp.get("/sites/peers//sync-stats") +def get_peer_sync_stats(site_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + registry = _site_registry() + peer = registry.get_peer(site_id) + if not peer: + return jsonify({"error": "Peer not found"}), 404 + + if not peer.connection_id: + return jsonify({"error": "No connection configured"}), 400 + + replication = _replication() + all_rules = replication.list_rules() + + stats = { + "buckets_syncing": 0, + "objects_synced": 0, + "objects_pending": 0, + "objects_failed": 0, + "bytes_synced": 0, + "last_sync_at": None, + "buckets": [], + } + + for rule in all_rules: + if rule.target_connection_id != peer.connection_id: + continue + + stats["buckets_syncing"] += 1 + + bucket_stats = { + "bucket_name": rule.bucket_name, + "target_bucket": rule.target_bucket, + "mode": rule.mode, + "enabled": rule.enabled, + } + + if rule.stats: + stats["objects_synced"] += rule.stats.objects_synced + stats["objects_pending"] += rule.stats.objects_pending + stats["bytes_synced"] += rule.stats.bytes_synced + + if rule.stats.last_sync_at: + if not stats["last_sync_at"] or rule.stats.last_sync_at > stats["last_sync_at"]: + stats["last_sync_at"] = rule.stats.last_sync_at + + bucket_stats["last_sync_at"] = rule.stats.last_sync_at + bucket_stats["objects_synced"] = rule.stats.objects_synced + bucket_stats["objects_pending"] = rule.stats.objects_pending + + failure_count = replication.get_failure_count(rule.bucket_name) + stats["objects_failed"] += failure_count + bucket_stats["failures"] = failure_count + + stats["buckets"].append(bucket_stats) + + return jsonify(stats) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/templates/replication_wizard.html b/templates/replication_wizard.html new file mode 100644 index 0000000..24489e6 --- /dev/null +++ b/templates/replication_wizard.html @@ -0,0 +1,226 @@ +{% extends "base.html" %} + +{% block title %}Set Up Replication - S3 Compatible Storage{% endblock %} + +{% block content %} + + +
+
+
+
+
+ + + + Peer Site +
+
+
+
+
Site ID
+
{{ peer.site_id }}
+
Endpoint
+
{{ peer.endpoint }}
+
Region
+
{{ peer.region }}
+
Connection
+
{{ connection.name }}
+
+
+
+ +
+
+
+ + + + Replication Modes +
+
+
+

New Only: Only replicate new objects uploaded after the rule is created.

+

All Objects: Replicate all existing objects plus new uploads.

+

Bidirectional: Two-way sync between sites. Changes on either side are synchronized.

+
+
+
+ +
+
+
+
+ + + + Select Buckets to Replicate +
+

Choose which buckets should be replicated to this peer site

+
+
+ {% if buckets %} +
+ + +
+ + +
+ +
+ + + + + + + + + + + {% for bucket in buckets %} + + + + + + + {% endfor %} + +
+ + Local BucketTarget Bucket NameStatus
+ + +
+ + + + {{ bucket.name }} +
+
+ + + {% if bucket.has_rule %} + + Already configured ({{ bucket.existing_mode }}) + + {% else %} + + Not configured + + {% endif %} +
+
+ +
+ + + Skip for Now + +
+
+ {% else %} +
+
+ + + +
+
No buckets yet
+

Create some buckets first, then come back to set up replication.

+ + Go to Buckets + +
+ {% endif %} +
+
+
+
+ + +{% endblock %} diff --git a/templates/sites.html b/templates/sites.html index 608a777..ef7d14c 100644 --- a/templates/sites.html +++ b/templates/sites.html @@ -163,11 +163,13 @@ Endpoint Region Priority + Sync Status Actions - {% for peer in peers %} + {% for item in peers_with_stats %} + {% set peer = item.peer %} @@ -207,8 +209,37 @@ {{ peer.region }} {{ peer.priority }} + + {% if item.has_connection %} +
+ {{ item.buckets_syncing }} bucket{{ 's' if item.buckets_syncing != 1 else '' }} + {% if item.buckets_syncing > 0 %} + + {% endif %} +
+
+ +
+ {% else %} + No connection + {% endif %} +
+ + + + + +