From 912a7dc74ff2437555003d9c2d25b4e5ae85c87c Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 20 Jan 2026 00:00:31 +0800 Subject: [PATCH 1/6] Add background collection for system metrics --- app/__init__.py | 11 +++ app/system_metrics.py | 215 ++++++++++++++++++++++++++++++++++++++++++ app/ui.py | 79 +--------------- app/version.py | 2 +- 4 files changed, 230 insertions(+), 77 deletions(-) create mode 100644 app/system_metrics.py diff --git a/app/__init__.py b/app/__init__.py index ad555e1..02c6472 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -197,6 +197,17 @@ def create_app( ) app.extensions["operation_metrics"] = operation_metrics_collector + system_metrics_collector = None + if app.config.get("METRICS_HISTORY_ENABLED", False): + from .system_metrics import SystemMetricsCollector + system_metrics_collector = SystemMetricsCollector( + storage_root, + interval_minutes=app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5), + retention_hours=app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24), + ) + system_metrics_collector.set_storage(storage) + app.extensions["system_metrics"] = system_metrics_collector + @app.errorhandler(500) def internal_error(error): return render_template('500.html'), 500 diff --git a/app/system_metrics.py b/app/system_metrics.py new file mode 100644 index 0000000..235710b --- /dev/null +++ b/app/system_metrics.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +import json +import logging +import threading +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, TYPE_CHECKING + +import psutil + +if TYPE_CHECKING: + from .storage import ObjectStorage + +logger = logging.getLogger(__name__) + + +@dataclass +class SystemMetricsSnapshot: + timestamp: datetime + cpu_percent: float + memory_percent: float + disk_percent: float + storage_bytes: int + + def to_dict(self) -> Dict[str, Any]: + return { + "timestamp": self.timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"), + "cpu_percent": round(self.cpu_percent, 2), + "memory_percent": round(self.memory_percent, 2), + "disk_percent": round(self.disk_percent, 2), + "storage_bytes": self.storage_bytes, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SystemMetricsSnapshot": + timestamp_str = data["timestamp"] + if timestamp_str.endswith("Z"): + timestamp_str = timestamp_str[:-1] + "+00:00" + return cls( + timestamp=datetime.fromisoformat(timestamp_str), + cpu_percent=data.get("cpu_percent", 0.0), + memory_percent=data.get("memory_percent", 0.0), + disk_percent=data.get("disk_percent", 0.0), + storage_bytes=data.get("storage_bytes", 0), + ) + + +class SystemMetricsCollector: + def __init__( + self, + storage_root: Path, + interval_minutes: int = 5, + retention_hours: int = 24, + ): + self.storage_root = storage_root + self.interval_seconds = interval_minutes * 60 + self.retention_hours = retention_hours + self._lock = threading.Lock() + self._shutdown = threading.Event() + self._snapshots: List[SystemMetricsSnapshot] = [] + self._storage_ref: Optional["ObjectStorage"] = None + + self._load_history() + + self._snapshot_thread = threading.Thread( + target=self._snapshot_loop, + name="system-metrics-snapshot", + daemon=True, + ) + self._snapshot_thread.start() + + def set_storage(self, storage: "ObjectStorage") -> None: + with self._lock: + self._storage_ref = storage + + def _config_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "metrics_history.json" + + def _load_history(self) -> None: + config_path = self._config_path() + if not config_path.exists(): + return + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + history_data = data.get("history", []) + self._snapshots = [SystemMetricsSnapshot.from_dict(s) for s in history_data] + self._prune_old_snapshots() + except (json.JSONDecodeError, OSError, KeyError) as e: + logger.warning(f"Failed to load system metrics history: {e}") + + def _save_history(self) -> None: + config_path = self._config_path() + config_path.parent.mkdir(parents=True, exist_ok=True) + try: + data = {"history": [s.to_dict() for s in self._snapshots]} + config_path.write_text(json.dumps(data, indent=2), encoding="utf-8") + except OSError as e: + logger.warning(f"Failed to save system metrics history: {e}") + + def _prune_old_snapshots(self) -> None: + if not self._snapshots: + return + cutoff = datetime.now(timezone.utc).timestamp() - (self.retention_hours * 3600) + self._snapshots = [ + s for s in self._snapshots if s.timestamp.timestamp() > cutoff + ] + + def _snapshot_loop(self) -> None: + while not self._shutdown.is_set(): + self._shutdown.wait(timeout=self.interval_seconds) + if not self._shutdown.is_set(): + self._take_snapshot() + + def _take_snapshot(self) -> None: + try: + cpu_percent = psutil.cpu_percent(interval=0.1) + memory = psutil.virtual_memory() + disk = psutil.disk_usage(str(self.storage_root)) + + storage_bytes = 0 + with self._lock: + storage = self._storage_ref + if storage: + try: + buckets = storage.list_buckets() + for bucket in buckets: + stats = storage.bucket_stats(bucket.name, cache_ttl=60) + storage_bytes += stats.get("total_bytes", stats.get("bytes", 0)) + except Exception as e: + logger.warning(f"Failed to collect bucket stats: {e}") + + snapshot = SystemMetricsSnapshot( + timestamp=datetime.now(timezone.utc), + cpu_percent=cpu_percent, + memory_percent=memory.percent, + disk_percent=disk.percent, + storage_bytes=storage_bytes, + ) + + with self._lock: + self._snapshots.append(snapshot) + self._prune_old_snapshots() + self._save_history() + + logger.debug(f"System metrics snapshot taken: CPU={cpu_percent:.1f}%, Memory={memory.percent:.1f}%") + except Exception as e: + logger.warning(f"Failed to take system metrics snapshot: {e}") + + def get_current(self) -> Dict[str, Any]: + cpu_percent = psutil.cpu_percent(interval=0.1) + memory = psutil.virtual_memory() + disk = psutil.disk_usage(str(self.storage_root)) + boot_time = psutil.boot_time() + uptime_seconds = time.time() - boot_time + uptime_days = int(uptime_seconds / 86400) + + total_buckets = 0 + total_objects = 0 + total_bytes_used = 0 + total_versions = 0 + + with self._lock: + storage = self._storage_ref + if storage: + try: + buckets = storage.list_buckets() + total_buckets = len(buckets) + for bucket in buckets: + stats = storage.bucket_stats(bucket.name, cache_ttl=60) + total_objects += stats.get("total_objects", stats.get("objects", 0)) + total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0)) + total_versions += stats.get("version_count", 0) + except Exception as e: + logger.warning(f"Failed to collect current bucket stats: {e}") + + return { + "cpu_percent": round(cpu_percent, 2), + "memory": { + "total": memory.total, + "available": memory.available, + "used": memory.used, + "percent": round(memory.percent, 2), + }, + "disk": { + "total": disk.total, + "free": disk.free, + "used": disk.used, + "percent": round(disk.percent, 2), + }, + "app": { + "buckets": total_buckets, + "objects": total_objects, + "versions": total_versions, + "storage_bytes": total_bytes_used, + "uptime_days": uptime_days, + }, + } + + def get_history(self, hours: Optional[int] = None) -> List[Dict[str, Any]]: + with self._lock: + snapshots = list(self._snapshots) + + if hours: + cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600) + snapshots = [s for s in snapshots if s.timestamp.timestamp() > cutoff] + + return [s.to_dict() for s in snapshots] + + def shutdown(self) -> None: + self._shutdown.set() + self._take_snapshot() + self._snapshot_thread.join(timeout=5.0) diff --git a/app/ui.py b/app/ui.py index 3808912..1cc2b00 100644 --- a/app/ui.py +++ b/app/ui.py @@ -158,69 +158,6 @@ def _format_bytes(num: int) -> str: return f"{value:.1f} PB" -_metrics_last_save_time: float = 0.0 - - -def _get_metrics_history_path() -> Path: - storage_root = Path(current_app.config["STORAGE_ROOT"]) - return storage_root / ".myfsio.sys" / "config" / "metrics_history.json" - - -def _load_metrics_history() -> dict: - path = _get_metrics_history_path() - if not path.exists(): - return {"history": []} - try: - return json.loads(path.read_text(encoding="utf-8")) - except (json.JSONDecodeError, OSError): - return {"history": []} - - -def _save_metrics_snapshot(cpu_percent: float, memory_percent: float, disk_percent: float, storage_bytes: int) -> None: - global _metrics_last_save_time - - if not current_app.config.get("METRICS_HISTORY_ENABLED", False): - return - - import time - from datetime import datetime, timezone - - interval_minutes = current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5) - now_ts = time.time() - if now_ts - _metrics_last_save_time < interval_minutes * 60: - return - - path = _get_metrics_history_path() - path.parent.mkdir(parents=True, exist_ok=True) - - data = _load_metrics_history() - history = data.get("history", []) - retention_hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24) - - now = datetime.now(timezone.utc) - snapshot = { - "timestamp": now.strftime("%Y-%m-%dT%H:%M:%SZ"), - "cpu_percent": round(cpu_percent, 2), - "memory_percent": round(memory_percent, 2), - "disk_percent": round(disk_percent, 2), - "storage_bytes": storage_bytes, - } - history.append(snapshot) - - cutoff = now.timestamp() - (retention_hours * 3600) - history = [ - h for h in history - if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff - ] - - data["history"] = history - try: - path.write_text(json.dumps(data, indent=2), encoding="utf-8") - _metrics_last_save_time = now_ts - except OSError: - pass - - def _friendly_error_message(exc: Exception) -> str: message = str(exc) or "An unexpected error occurred" if isinstance(exc, IamError): @@ -2240,8 +2177,6 @@ def metrics_api(): uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds / 86400) - _save_metrics_snapshot(cpu_percent, memory.percent, disk.percent, total_bytes_used) - return jsonify({ "cpu_percent": round(cpu_percent, 2), "memory": { @@ -2276,23 +2211,15 @@ def metrics_history(): except IamError: return jsonify({"error": "Access denied"}), 403 - if not current_app.config.get("METRICS_HISTORY_ENABLED", False): + system_metrics = current_app.extensions.get("system_metrics") + if not system_metrics: return jsonify({"enabled": False, "history": []}) hours = request.args.get("hours", type=int) if hours is None: hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24) - data = _load_metrics_history() - history = data.get("history", []) - - if hours: - from datetime import datetime, timezone - cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600) - history = [ - h for h in history - if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff - ] + history = system_metrics.get_history(hours=hours) return jsonify({ "enabled": True, diff --git a/app/version.py b/app/version.py index ecf1394..998adc1 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.2.2" +APP_VERSION = "0.2.3" def get_version() -> str: From c0603c592bd0edd79468517d2911017fb80ed784 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 22 Jan 2026 10:58:44 +0800 Subject: [PATCH 2/6] Add configurable server threads and connections --- app/config.py | 53 ++++++++++++++++++++++++++++++++++++++++-- docs.md | 9 ++++++++ run.py | 56 +++++++++++++++++++++++++++++++++------------ templates/docs.html | 23 +++++++++++++++++++ 4 files changed, 124 insertions(+), 17 deletions(-) diff --git a/app/config.py b/app/config.py index 2778963..8e04441 100644 --- a/app/config.py +++ b/app/config.py @@ -90,6 +90,10 @@ class AppConfig: operation_metrics_enabled: bool operation_metrics_interval_minutes: int operation_metrics_retention_hours: int + server_threads: int + server_connection_limit: int + server_backlog: int + server_channel_timeout: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -193,6 +197,11 @@ class AppConfig: operation_metrics_interval_minutes = int(_get("OPERATION_METRICS_INTERVAL_MINUTES", 5)) operation_metrics_retention_hours = int(_get("OPERATION_METRICS_RETENTION_HOURS", 24)) + server_threads = int(_get("SERVER_THREADS", 4)) + server_connection_limit = int(_get("SERVER_CONNECTION_LIMIT", 100)) + server_backlog = int(_get("SERVER_BACKLOG", 1024)) + server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120)) + return cls(storage_root=storage_root, max_upload_size=max_upload_size, ui_page_size=ui_page_size, @@ -236,7 +245,11 @@ class AppConfig: metrics_history_interval_minutes=metrics_history_interval_minutes, operation_metrics_enabled=operation_metrics_enabled, operation_metrics_interval_minutes=operation_metrics_interval_minutes, - operation_metrics_retention_hours=operation_metrics_retention_hours) + operation_metrics_retention_hours=operation_metrics_retention_hours, + server_threads=server_threads, + server_connection_limit=server_connection_limit, + server_backlog=server_backlog, + server_channel_timeout=server_channel_timeout) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -296,7 +309,35 @@ class AppConfig: if "*" in self.cors_origins: issues.append("INFO: CORS_ORIGINS is set to '*'. Consider restricting to specific domains in production.") - + + if not (1 <= self.server_threads <= 64): + issues.append(f"CRITICAL: SERVER_THREADS={self.server_threads} is outside valid range (1-64). Server cannot start.") + if not (10 <= self.server_connection_limit <= 1000): + issues.append(f"CRITICAL: SERVER_CONNECTION_LIMIT={self.server_connection_limit} is outside valid range (10-1000). Server cannot start.") + if not (64 <= self.server_backlog <= 4096): + issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (64-4096). Server cannot start.") + if not (10 <= self.server_channel_timeout <= 300): + issues.append(f"CRITICAL: SERVER_CHANNEL_TIMEOUT={self.server_channel_timeout} is outside valid range (10-300). Server cannot start.") + + if sys.platform != "win32": + try: + import resource + soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) + threshold = int(soft_limit * 0.8) + if self.server_connection_limit > threshold: + issues.append(f"WARNING: SERVER_CONNECTION_LIMIT={self.server_connection_limit} exceeds 80% of system file descriptor limit (soft={soft_limit}). Consider running 'ulimit -n {self.server_connection_limit + 100}'.") + except (ImportError, OSError): + pass + + try: + import psutil + available_mb = psutil.virtual_memory().available / (1024 * 1024) + estimated_mb = self.server_threads * 50 + if estimated_mb > available_mb * 0.5: + issues.append(f"WARNING: SERVER_THREADS={self.server_threads} may require ~{estimated_mb}MB memory, exceeding 50% of available RAM ({int(available_mb)}MB).") + except ImportError: + pass + return issues def print_startup_summary(self) -> None: @@ -314,6 +355,10 @@ class AppConfig: print(f" ENCRYPTION: Enabled (Master key: {self.encryption_master_key_path})") if self.kms_enabled: print(f" KMS: Enabled (Keys: {self.kms_keys_path})") + print(f" SERVER_THREADS: {self.server_threads}") + print(f" CONNECTION_LIMIT: {self.server_connection_limit}") + print(f" BACKLOG: {self.server_backlog}") + print(f" CHANNEL_TIMEOUT: {self.server_channel_timeout}s") print("=" * 60) issues = self.validate_and_report() @@ -371,4 +416,8 @@ class AppConfig: "OPERATION_METRICS_ENABLED": self.operation_metrics_enabled, "OPERATION_METRICS_INTERVAL_MINUTES": self.operation_metrics_interval_minutes, "OPERATION_METRICS_RETENTION_HOURS": self.operation_metrics_retention_hours, + "SERVER_THREADS": self.server_threads, + "SERVER_CONNECTION_LIMIT": self.server_connection_limit, + "SERVER_BACKLOG": self.server_backlog, + "SERVER_CHANNEL_TIMEOUT": self.server_channel_timeout, } diff --git a/docs.md b/docs.md index c14d40a..54f956b 100644 --- a/docs.md +++ b/docs.md @@ -168,6 +168,15 @@ All configuration is done via environment variables. The table below lists every | `RATE_LIMIT_DEFAULT` | `200 per minute` | Default rate limit for API endpoints. | | `RATE_LIMIT_STORAGE_URI` | `memory://` | Storage backend for rate limits. Use `redis://host:port` for distributed setups. | +### Server Configuration + +| Variable | Default | Notes | +| --- | --- | --- | +| `SERVER_THREADS` | `4` | Waitress worker threads (1-64). More threads handle more concurrent requests but use more memory. | +| `SERVER_CONNECTION_LIMIT` | `100` | Maximum concurrent connections (10-1000). Ensure OS file descriptor limits support this value. | +| `SERVER_BACKLOG` | `1024` | TCP listen backlog (64-4096). Connections queue here when all threads are busy. | +| `SERVER_CHANNEL_TIMEOUT` | `120` | Seconds before idle connections are closed (10-300). | + ### Logging | Variable | Default | Notes | diff --git a/run.py b/run.py index 3de61c4..013f79c 100644 --- a/run.py +++ b/run.py @@ -18,6 +18,8 @@ for _env_file in [ if _env_file.exists(): load_dotenv(_env_file, override=True) +from typing import Optional + from app import create_api_app, create_ui_app from app.config import AppConfig @@ -36,11 +38,23 @@ def _is_frozen() -> bool: return getattr(sys, 'frozen', False) or '__compiled__' in globals() -def serve_api(port: int, prod: bool = False) -> None: +def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: app = create_api_app() if prod: from waitress import serve - serve(app, host=_server_host(), port=port, ident="MyFSIO") + if config: + serve( + app, + host=_server_host(), + port=port, + ident="MyFSIO", + threads=config.server_threads, + connection_limit=config.server_connection_limit, + backlog=config.server_backlog, + channel_timeout=config.server_channel_timeout, + ) + else: + serve(app, host=_server_host(), port=port, ident="MyFSIO") else: debug = _is_debug_enabled() if debug: @@ -48,11 +62,23 @@ def serve_api(port: int, prod: bool = False) -> None: app.run(host=_server_host(), port=port, debug=debug) -def serve_ui(port: int, prod: bool = False) -> None: +def serve_ui(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: app = create_ui_app() if prod: from waitress import serve - serve(app, host=_server_host(), port=port, ident="MyFSIO") + if config: + serve( + app, + host=_server_host(), + port=port, + ident="MyFSIO", + threads=config.server_threads, + connection_limit=config.server_connection_limit, + backlog=config.server_backlog, + channel_timeout=config.server_channel_timeout, + ) + else: + serve(app, host=_server_host(), port=port, ident="MyFSIO") else: debug = _is_debug_enabled() if debug: @@ -71,7 +97,6 @@ if __name__ == "__main__": parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit") args = parser.parse_args() - # Handle config check/show modes if args.check_config or args.show_config: config = AppConfig.from_env() config.print_startup_summary() @@ -81,49 +106,50 @@ if __name__ == "__main__": sys.exit(1 if critical else 0) sys.exit(0) - # Default to production mode when running as compiled binary - # unless --dev is explicitly passed prod_mode = args.prod or (_is_frozen() and not args.dev) - # Validate configuration before starting config = AppConfig.from_env() - # Show startup summary only on first run (when marker file doesn't exist) first_run_marker = config.storage_root / ".myfsio.sys" / ".initialized" is_first_run = not first_run_marker.exists() if is_first_run: config.print_startup_summary() - # Check for critical issues that should prevent startup issues = config.validate_and_report() critical_issues = [i for i in issues if i.startswith("CRITICAL:")] if critical_issues: - print("ABORTING: Critical configuration issues detected. Fix them before starting.") + print("ABORTING: Critical configuration issues detected. Please fix them before starting.") sys.exit(1) - # Create the marker file to indicate successful first run try: first_run_marker.parent.mkdir(parents=True, exist_ok=True) first_run_marker.write_text(f"Initialized on {__import__('datetime').datetime.now().isoformat()}\n") except OSError: - pass # Non-critical, just skip marker creation + pass if prod_mode: print("Running in production mode (Waitress)") + issues = config.validate_and_report() + critical_issues = [i for i in issues if i.startswith("CRITICAL:")] + if critical_issues: + for issue in critical_issues: + print(f" {issue}") + print("ABORTING: Critical configuration issues detected. Please fix them before starting.") + sys.exit(1) else: print("Running in development mode (Flask dev server)") if args.mode in {"api", "both"}: print(f"Starting API server on port {args.api_port}...") - api_proc = Process(target=serve_api, args=(args.api_port, prod_mode), daemon=True) + api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config), daemon=True) api_proc.start() else: api_proc = None if args.mode in {"ui", "both"}: print(f"Starting UI server on port {args.ui_port}...") - serve_ui(args.ui_port, prod_mode) + serve_ui(args.ui_port, prod_mode, config) elif api_proc: try: api_proc.join() diff --git a/templates/docs.html b/templates/docs.html index fb12407..e0794c9 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -157,6 +157,29 @@ python run.py --mode ui 200 per minute Default API rate limit. + + Server Settings + + + SERVER_THREADS + 4 + Waitress worker threads (1-64). + + + SERVER_CONNECTION_LIMIT + 100 + Max concurrent connections (10-1000). + + + SERVER_BACKLOG + 1024 + TCP listen backlog (64-4096). + + + SERVER_CHANNEL_TIMEOUT + 120 + Idle connection timeout in seconds (10-300). + Encryption Settings From 71327bcbf1525383b676f936d4ff0725b93b9b87 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 22 Jan 2026 11:06:53 +0800 Subject: [PATCH 3/6] Add dynamic updates to System Health section on metrics page --- templates/metrics.html | 81 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/templates/metrics.html b/templates/metrics.html index f108213..6425531 100644 --- a/templates/metrics.html +++ b/templates/metrics.html @@ -218,10 +218,10 @@
{% set has_issues = (cpu_percent > 80) or (memory.percent > 85) or (disk.percent > 90) %} -
+
- + {% if has_issues %} @@ -232,8 +232,8 @@
- - + + {% if has_issues %} {% else %} @@ -244,22 +244,24 @@

System Health

- {% if has_issues %} -
    - {% if cpu_percent > 80 %}
  • CPU usage is high ({{ cpu_percent }}%)
  • {% endif %} - {% if memory.percent > 85 %}
  • Memory usage is high ({{ memory.percent }}%)
  • {% endif %} - {% if disk.percent > 90 %}
  • Disk space is critically low ({{ disk.percent }}% used)
  • {% endif %} -
- {% else %} -

All resources are within normal operating parameters.

- {% endif %} +
+ {% if has_issues %} +
    + {% if cpu_percent > 80 %}
  • CPU usage is high ({{ cpu_percent }}%)
  • {% endif %} + {% if memory.percent > 85 %}
  • Memory usage is high ({{ memory.percent }}%)
  • {% endif %} + {% if disk.percent > 90 %}
  • Disk space is critically low ({{ disk.percent }}% used)
  • {% endif %} +
+ {% else %} +

All resources are within normal operating parameters.

+ {% endif %} +
-
{{ app.uptime_days }}d
+
{{ app.uptime_days }}d
Uptime
-
{{ app.buckets }}
+
{{ app.buckets }}
Active Buckets
@@ -480,6 +482,55 @@ el = document.querySelector('[data-metric="objects_count"]'); if (el) el.textContent = data.app.objects; + var cpuHigh = data.cpu_percent > 80; + var memHigh = data.memory.percent > 85; + var diskHigh = data.disk.percent > 90; + var hasIssues = cpuHigh || memHigh || diskHigh; + + var healthCard = document.getElementById('systemHealthCard'); + if (healthCard) { + healthCard.style.background = hasIssues + ? 'linear-gradient(135deg, #ef4444 0%, #f97316 100%)' + : 'linear-gradient(135deg, #3b82f6 0%, #8b5cf6 100%)'; + } + + var healthIcon = document.getElementById('healthIcon'); + if (healthIcon) { + healthIcon.innerHTML = hasIssues + ? '' + : ''; + } + + var healthBadge = document.getElementById('healthBadge'); + if (healthBadge) { + healthBadge.className = 'badge bg-white fw-semibold px-3 py-2 ' + (hasIssues ? 'text-danger' : 'text-primary'); + } + + var healthBadgeIcon = document.getElementById('healthBadgeIcon'); + if (healthBadgeIcon) { + healthBadgeIcon.innerHTML = hasIssues + ? '' + : ''; + } + + var healthContent = document.getElementById('healthContent'); + if (healthContent) { + if (hasIssues) { + var issues = []; + if (cpuHigh) issues.push('
  • CPU usage is high (' + data.cpu_percent.toFixed(1) + '%)
  • '); + if (memHigh) issues.push('
  • Memory usage is high (' + data.memory.percent.toFixed(1) + '%)
  • '); + if (diskHigh) issues.push('
  • Disk space is critically low (' + data.disk.percent.toFixed(1) + '% used)
  • '); + healthContent.innerHTML = '
      ' + issues.join('') + '
    '; + } else { + healthContent.innerHTML = '

    All resources are within normal operating parameters.

    '; + } + } + + el = document.querySelector('[data-metric="health_uptime"]'); + if (el) el.textContent = data.app.uptime_days + 'd'; + el = document.querySelector('[data-metric="health_buckets"]'); + if (el) el.textContent = data.app.buckets; + countdown = 5; }) .catch(function(err) { From 7a8acfb93389141decde6465f950c883ffa21b27 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 22 Jan 2026 11:12:23 +0800 Subject: [PATCH 4/6] Add missing lifecycle and cors actions to Full control template --- static/js/iam-management.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/static/js/iam-management.js b/static/js/iam-management.js index 11b41fb..8f58133 100644 --- a/static/js/iam-management.js +++ b/static/js/iam-management.js @@ -16,7 +16,7 @@ window.IAMManagement = (function() { var currentDeleteKey = null; var policyTemplates = { - full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'iam:list_users', 'iam:*'] }], + full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'lifecycle', 'cors', 'iam:*'] }], readonly: [{ bucket: '*', actions: ['list', 'read'] }], writer: [{ bucket: '*', actions: ['list', 'read', 'write'] }] }; From 23ea164215f8353b9e5095d0c007fa4014c75916 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 24 Jan 2026 19:38:17 +0800 Subject: [PATCH 5/6] Add bi-directional site replication with LWW conflict resolution --- app/__init__.py | 14 ++ app/config.py | 14 +- app/replication.py | 15 +- app/s3_api.py | 11 +- app/site_sync.py | 396 ++++++++++++++++++++++++++++++++++ tests/test_site_sync.py | 461 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 902 insertions(+), 9 deletions(-) create mode 100644 app/site_sync.py create mode 100644 tests/test_site_sync.py diff --git a/app/__init__.py b/app/__init__.py index 02c6472..2968c03 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -208,6 +208,20 @@ def create_app( system_metrics_collector.set_storage(storage) app.extensions["system_metrics"] = system_metrics_collector + site_sync_worker = None + if app.config.get("SITE_SYNC_ENABLED", False): + from .site_sync import SiteSyncWorker + site_sync_worker = SiteSyncWorker( + storage=storage, + connections=connections, + replication_manager=replication, + storage_root=storage_root, + interval_seconds=app.config.get("SITE_SYNC_INTERVAL_SECONDS", 60), + batch_size=app.config.get("SITE_SYNC_BATCH_SIZE", 100), + ) + site_sync_worker.start() + app.extensions["site_sync"] = site_sync_worker + @app.errorhandler(500) def internal_error(error): return render_template('500.html'), 500 diff --git a/app/config.py b/app/config.py index 8e04441..b39000f 100644 --- a/app/config.py +++ b/app/config.py @@ -94,6 +94,9 @@ class AppConfig: server_connection_limit: int server_backlog: int server_channel_timeout: int + site_sync_enabled: bool + site_sync_interval_seconds: int + site_sync_batch_size: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -201,6 +204,9 @@ class AppConfig: server_connection_limit = int(_get("SERVER_CONNECTION_LIMIT", 100)) server_backlog = int(_get("SERVER_BACKLOG", 1024)) server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120)) + site_sync_enabled = str(_get("SITE_SYNC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} + site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) + site_sync_batch_size = int(_get("SITE_SYNC_BATCH_SIZE", 100)) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -249,7 +255,10 @@ class AppConfig: server_threads=server_threads, server_connection_limit=server_connection_limit, server_backlog=server_backlog, - server_channel_timeout=server_channel_timeout) + server_channel_timeout=server_channel_timeout, + site_sync_enabled=site_sync_enabled, + site_sync_interval_seconds=site_sync_interval_seconds, + site_sync_batch_size=site_sync_batch_size) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -420,4 +429,7 @@ class AppConfig: "SERVER_CONNECTION_LIMIT": self.server_connection_limit, "SERVER_BACKLOG": self.server_backlog, "SERVER_CHANNEL_TIMEOUT": self.server_channel_timeout, + "SITE_SYNC_ENABLED": self.site_sync_enabled, + "SITE_SYNC_INTERVAL_SECONDS": self.site_sync_interval_seconds, + "SITE_SYNC_BATCH_SIZE": self.site_sync_batch_size, } diff --git a/app/replication.py b/app/replication.py index 4eacdef..9cab869 100644 --- a/app/replication.py +++ b/app/replication.py @@ -27,6 +27,7 @@ STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" +REPLICATION_MODE_BIDIRECTIONAL = "bidirectional" def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: @@ -127,10 +128,12 @@ class ReplicationRule: target_connection_id: str target_bucket: str enabled: bool = True - mode: str = REPLICATION_MODE_NEW_ONLY + mode: str = REPLICATION_MODE_NEW_ONLY created_at: Optional[float] = None stats: ReplicationStats = field(default_factory=ReplicationStats) - + sync_deletions: bool = True + last_pull_at: Optional[float] = None + def to_dict(self) -> dict: return { "bucket_name": self.bucket_name, @@ -140,8 +143,10 @@ class ReplicationRule: "mode": self.mode, "created_at": self.created_at, "stats": self.stats.to_dict(), + "sync_deletions": self.sync_deletions, + "last_pull_at": self.last_pull_at, } - + @classmethod def from_dict(cls, data: dict) -> "ReplicationRule": stats_data = data.pop("stats", {}) @@ -149,6 +154,10 @@ class ReplicationRule: data["mode"] = REPLICATION_MODE_NEW_ONLY if "created_at" not in data: data["created_at"] = None + if "sync_deletions" not in data: + data["sync_deletions"] = True + if "last_pull_at" not in data: + data["last_pull_at"] = None rule = cls(**data) rule.stats = ReplicationStats.from_dict(stats_data) if stats_data else ReplicationStats() return rule diff --git a/app/s3_api.py b/app/s3_api.py index f576c32..1f49e15 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -2446,7 +2446,8 @@ def object_handler(bucket_name: str, object_key: str): operation="Put", ) - if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): + user_agent = request.headers.get("User-Agent", "") + if "S3ReplicationAgent" not in user_agent and "SiteSyncAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="write") return response @@ -2592,7 +2593,7 @@ def object_handler(bucket_name: str, object_key: str): ) user_agent = request.headers.get("User-Agent", "") - if "S3ReplicationAgent" not in user_agent: + if "S3ReplicationAgent" not in user_agent and "SiteSyncAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="delete") return Response(status=204) @@ -2826,9 +2827,9 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response: ) user_agent = request.headers.get("User-Agent", "") - if "S3ReplicationAgent" not in user_agent: + if "S3ReplicationAgent" not in user_agent and "SiteSyncAgent" not in user_agent: _replication_manager().trigger_replication(dest_bucket, dest_key, action="write") - + root = Element("CopyObjectResult") SubElement(root, "LastModified").text = meta.last_modified.isoformat() if meta.etag: @@ -3040,7 +3041,7 @@ def _complete_multipart_upload(bucket_name: str, object_key: str) -> Response: return _error_response("InvalidPart", str(exc), 400) user_agent = request.headers.get("User-Agent", "") - if "S3ReplicationAgent" not in user_agent: + if "S3ReplicationAgent" not in user_agent and "SiteSyncAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="write") root = Element("CompleteMultipartUploadResult") diff --git a/app/site_sync.py b/app/site_sync.py new file mode 100644 index 0000000..306ac28 --- /dev/null +++ b/app/site_sync.py @@ -0,0 +1,396 @@ +from __future__ import annotations + +import json +import logging +import tempfile +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, TYPE_CHECKING + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError + +if TYPE_CHECKING: + from .connections import ConnectionStore, RemoteConnection + from .replication import ReplicationManager, ReplicationRule + from .storage import ObjectStorage + +logger = logging.getLogger(__name__) + +SITE_SYNC_USER_AGENT = "SiteSyncAgent/1.0" +SITE_SYNC_CONNECT_TIMEOUT = 10 +SITE_SYNC_READ_TIMEOUT = 120 +CLOCK_SKEW_TOLERANCE_SECONDS = 1.0 + + +@dataclass +class SyncedObjectInfo: + last_synced_at: float + remote_etag: str + source: str + + def to_dict(self) -> Dict[str, Any]: + return { + "last_synced_at": self.last_synced_at, + "remote_etag": self.remote_etag, + "source": self.source, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SyncedObjectInfo": + return cls( + last_synced_at=data["last_synced_at"], + remote_etag=data["remote_etag"], + source=data["source"], + ) + + +@dataclass +class SyncState: + synced_objects: Dict[str, SyncedObjectInfo] = field(default_factory=dict) + last_full_sync: Optional[float] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "synced_objects": {k: v.to_dict() for k, v in self.synced_objects.items()}, + "last_full_sync": self.last_full_sync, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SyncState": + synced_objects = {} + for k, v in data.get("synced_objects", {}).items(): + synced_objects[k] = SyncedObjectInfo.from_dict(v) + return cls( + synced_objects=synced_objects, + last_full_sync=data.get("last_full_sync"), + ) + + +@dataclass +class SiteSyncStats: + last_sync_at: Optional[float] = None + objects_pulled: int = 0 + objects_skipped: int = 0 + conflicts_resolved: int = 0 + deletions_applied: int = 0 + errors: int = 0 + + def to_dict(self) -> Dict[str, Any]: + return { + "last_sync_at": self.last_sync_at, + "objects_pulled": self.objects_pulled, + "objects_skipped": self.objects_skipped, + "conflicts_resolved": self.conflicts_resolved, + "deletions_applied": self.deletions_applied, + "errors": self.errors, + } + + +@dataclass +class RemoteObjectMeta: + key: str + size: int + last_modified: datetime + etag: str + + @classmethod + def from_s3_object(cls, obj: Dict[str, Any]) -> "RemoteObjectMeta": + return cls( + key=obj["Key"], + size=obj.get("Size", 0), + last_modified=obj["LastModified"], + etag=obj.get("ETag", "").strip('"'), + ) + + +def _create_sync_client(connection: "RemoteConnection") -> Any: + config = Config( + user_agent_extra=SITE_SYNC_USER_AGENT, + connect_timeout=SITE_SYNC_CONNECT_TIMEOUT, + read_timeout=SITE_SYNC_READ_TIMEOUT, + retries={"max_attempts": 2}, + signature_version="s3v4", + s3={"addressing_style": "path"}, + request_checksum_calculation="when_required", + response_checksum_validation="when_required", + ) + return boto3.client( + "s3", + endpoint_url=connection.endpoint_url, + aws_access_key_id=connection.access_key, + aws_secret_access_key=connection.secret_key, + region_name=connection.region or "us-east-1", + config=config, + ) + + +class SiteSyncWorker: + def __init__( + self, + storage: "ObjectStorage", + connections: "ConnectionStore", + replication_manager: "ReplicationManager", + storage_root: Path, + interval_seconds: int = 60, + batch_size: int = 100, + ): + self.storage = storage + self.connections = connections + self.replication_manager = replication_manager + self.storage_root = storage_root + self.interval_seconds = interval_seconds + self.batch_size = batch_size + self._lock = threading.Lock() + self._shutdown = threading.Event() + self._sync_thread: Optional[threading.Thread] = None + self._bucket_stats: Dict[str, SiteSyncStats] = {} + + def start(self) -> None: + if self._sync_thread is not None and self._sync_thread.is_alive(): + return + self._shutdown.clear() + self._sync_thread = threading.Thread( + target=self._sync_loop, name="site-sync-worker", daemon=True + ) + self._sync_thread.start() + logger.info("Site sync worker started (interval=%ds)", self.interval_seconds) + + def shutdown(self) -> None: + self._shutdown.set() + if self._sync_thread is not None: + self._sync_thread.join(timeout=10.0) + logger.info("Site sync worker shut down") + + def trigger_sync(self, bucket_name: str) -> Optional[SiteSyncStats]: + from .replication import REPLICATION_MODE_BIDIRECTIONAL + rule = self.replication_manager.get_rule(bucket_name) + if not rule or rule.mode != REPLICATION_MODE_BIDIRECTIONAL or not rule.enabled: + return None + return self._sync_bucket(rule) + + def get_stats(self, bucket_name: str) -> Optional[SiteSyncStats]: + with self._lock: + return self._bucket_stats.get(bucket_name) + + def _sync_loop(self) -> None: + while not self._shutdown.is_set(): + self._shutdown.wait(timeout=self.interval_seconds) + if self._shutdown.is_set(): + break + self._run_sync_cycle() + + def _run_sync_cycle(self) -> None: + from .replication import REPLICATION_MODE_BIDIRECTIONAL + for bucket_name, rule in list(self.replication_manager._rules.items()): + if self._shutdown.is_set(): + break + if rule.mode != REPLICATION_MODE_BIDIRECTIONAL or not rule.enabled: + continue + try: + stats = self._sync_bucket(rule) + with self._lock: + self._bucket_stats[bucket_name] = stats + except Exception as e: + logger.exception("Site sync failed for bucket %s: %s", bucket_name, e) + + def _sync_bucket(self, rule: "ReplicationRule") -> SiteSyncStats: + stats = SiteSyncStats() + connection = self.connections.get(rule.target_connection_id) + if not connection: + logger.warning("Connection %s not found for bucket %s", rule.target_connection_id, rule.bucket_name) + stats.errors += 1 + return stats + + try: + local_objects = self._list_local_objects(rule.bucket_name) + except Exception as e: + logger.error("Failed to list local objects for %s: %s", rule.bucket_name, e) + stats.errors += 1 + return stats + + try: + remote_objects = self._list_remote_objects(rule, connection) + except Exception as e: + logger.error("Failed to list remote objects for %s: %s", rule.bucket_name, e) + stats.errors += 1 + return stats + + sync_state = self._load_sync_state(rule.bucket_name) + local_keys = set(local_objects.keys()) + remote_keys = set(remote_objects.keys()) + + to_pull = [] + for key in remote_keys: + remote_meta = remote_objects[key] + local_meta = local_objects.get(key) + if local_meta is None: + to_pull.append(key) + else: + resolution = self._resolve_conflict(local_meta, remote_meta) + if resolution == "pull": + to_pull.append(key) + stats.conflicts_resolved += 1 + else: + stats.objects_skipped += 1 + + pulled_count = 0 + for key in to_pull: + if self._shutdown.is_set(): + break + if pulled_count >= self.batch_size: + break + remote_meta = remote_objects[key] + success = self._pull_object(rule, key, connection, remote_meta) + if success: + stats.objects_pulled += 1 + pulled_count += 1 + sync_state.synced_objects[key] = SyncedObjectInfo( + last_synced_at=time.time(), + remote_etag=remote_meta.etag, + source="remote", + ) + else: + stats.errors += 1 + + if rule.sync_deletions: + for key in list(sync_state.synced_objects.keys()): + if key not in remote_keys and key in local_keys: + tracked = sync_state.synced_objects[key] + if tracked.source == "remote": + local_meta = local_objects.get(key) + if local_meta and local_meta.last_modified.timestamp() <= tracked.last_synced_at: + success = self._apply_remote_deletion(rule.bucket_name, key) + if success: + stats.deletions_applied += 1 + del sync_state.synced_objects[key] + + sync_state.last_full_sync = time.time() + self._save_sync_state(rule.bucket_name, sync_state) + + with self.replication_manager._stats_lock: + rule.last_pull_at = time.time() + self.replication_manager.save_rules() + + stats.last_sync_at = time.time() + logger.info( + "Site sync completed for %s: pulled=%d, skipped=%d, conflicts=%d, deletions=%d, errors=%d", + rule.bucket_name, + stats.objects_pulled, + stats.objects_skipped, + stats.conflicts_resolved, + stats.deletions_applied, + stats.errors, + ) + return stats + + def _list_local_objects(self, bucket_name: str) -> Dict[str, Any]: + from .storage import ObjectMeta + objects = self.storage.list_objects_all(bucket_name) + return {obj.key: obj for obj in objects} + + def _list_remote_objects(self, rule: "ReplicationRule", connection: "RemoteConnection") -> Dict[str, RemoteObjectMeta]: + s3 = _create_sync_client(connection) + result: Dict[str, RemoteObjectMeta] = {} + paginator = s3.get_paginator("list_objects_v2") + try: + for page in paginator.paginate(Bucket=rule.target_bucket): + for obj in page.get("Contents", []): + meta = RemoteObjectMeta.from_s3_object(obj) + result[meta.key] = meta + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchBucket": + return {} + raise + return result + + def _resolve_conflict(self, local_meta: Any, remote_meta: RemoteObjectMeta) -> str: + local_ts = local_meta.last_modified.timestamp() + remote_ts = remote_meta.last_modified.timestamp() + + if abs(remote_ts - local_ts) < CLOCK_SKEW_TOLERANCE_SECONDS: + local_etag = local_meta.etag or "" + if remote_meta.etag == local_etag: + return "skip" + return "pull" if remote_meta.etag > local_etag else "keep" + + return "pull" if remote_ts > local_ts else "keep" + + def _pull_object( + self, + rule: "ReplicationRule", + object_key: str, + connection: "RemoteConnection", + remote_meta: RemoteObjectMeta, + ) -> bool: + s3 = _create_sync_client(connection) + tmp_path = None + try: + tmp_dir = self.storage_root / ".myfsio.sys" / "tmp" + tmp_dir.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False) as tmp_file: + tmp_path = Path(tmp_file.name) + + s3.download_file(rule.target_bucket, object_key, str(tmp_path)) + + head_response = s3.head_object(Bucket=rule.target_bucket, Key=object_key) + user_metadata = head_response.get("Metadata", {}) + + with open(tmp_path, "rb") as f: + self.storage.put_object( + rule.bucket_name, + object_key, + f, + metadata=user_metadata if user_metadata else None, + ) + + logger.debug("Pulled object %s/%s from remote", rule.bucket_name, object_key) + return True + + except ClientError as e: + logger.error("Failed to pull %s/%s: %s", rule.bucket_name, object_key, e) + return False + except Exception as e: + logger.error("Failed to store pulled object %s/%s: %s", rule.bucket_name, object_key, e) + return False + finally: + if tmp_path and tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + + def _apply_remote_deletion(self, bucket_name: str, object_key: str) -> bool: + try: + self.storage.delete_object(bucket_name, object_key) + logger.debug("Applied remote deletion for %s/%s", bucket_name, object_key) + return True + except Exception as e: + logger.error("Failed to apply remote deletion for %s/%s: %s", bucket_name, object_key, e) + return False + + def _sync_state_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "site_sync_state.json" + + def _load_sync_state(self, bucket_name: str) -> SyncState: + path = self._sync_state_path(bucket_name) + if not path.exists(): + return SyncState() + try: + data = json.loads(path.read_text(encoding="utf-8")) + return SyncState.from_dict(data) + except (json.JSONDecodeError, OSError, KeyError) as e: + logger.warning("Failed to load sync state for %s: %s", bucket_name, e) + return SyncState() + + def _save_sync_state(self, bucket_name: str, state: SyncState) -> None: + path = self._sync_state_path(bucket_name) + path.parent.mkdir(parents=True, exist_ok=True) + try: + path.write_text(json.dumps(state.to_dict(), indent=2), encoding="utf-8") + except OSError as e: + logger.warning("Failed to save sync state for %s: %s", bucket_name, e) diff --git a/tests/test_site_sync.py b/tests/test_site_sync.py new file mode 100644 index 0000000..4975375 --- /dev/null +++ b/tests/test_site_sync.py @@ -0,0 +1,461 @@ +import io +import json +import time +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from app.connections import ConnectionStore, RemoteConnection +from app.replication import ( + ReplicationManager, + ReplicationRule, + REPLICATION_MODE_BIDIRECTIONAL, + REPLICATION_MODE_NEW_ONLY, +) +from app.site_sync import ( + SiteSyncWorker, + SyncState, + SyncedObjectInfo, + SiteSyncStats, + RemoteObjectMeta, + CLOCK_SKEW_TOLERANCE_SECONDS, +) +from app.storage import ObjectStorage + + +@pytest.fixture +def storage(tmp_path: Path): + storage_root = tmp_path / "data" + storage_root.mkdir(parents=True) + return ObjectStorage(storage_root) + + +@pytest.fixture +def connections(tmp_path: Path): + connections_path = tmp_path / "connections.json" + store = ConnectionStore(connections_path) + conn = RemoteConnection( + id="test-conn", + name="Test Remote", + endpoint_url="http://localhost:9000", + access_key="remote-access", + secret_key="remote-secret", + region="us-east-1", + ) + store.add(conn) + return store + + +@pytest.fixture +def replication_manager(storage, connections, tmp_path): + rules_path = tmp_path / "replication_rules.json" + storage_root = tmp_path / "data" + storage_root.mkdir(exist_ok=True) + manager = ReplicationManager(storage, connections, rules_path, storage_root) + yield manager + manager.shutdown(wait=False) + + +@pytest.fixture +def site_sync_worker(storage, connections, replication_manager, tmp_path): + storage_root = tmp_path / "data" + worker = SiteSyncWorker( + storage=storage, + connections=connections, + replication_manager=replication_manager, + storage_root=storage_root, + interval_seconds=60, + batch_size=100, + ) + yield worker + worker.shutdown() + + +class TestSyncedObjectInfo: + def test_to_dict(self): + info = SyncedObjectInfo( + last_synced_at=1234567890.0, + remote_etag="abc123", + source="remote", + ) + result = info.to_dict() + assert result["last_synced_at"] == 1234567890.0 + assert result["remote_etag"] == "abc123" + assert result["source"] == "remote" + + def test_from_dict(self): + data = { + "last_synced_at": 9876543210.0, + "remote_etag": "def456", + "source": "local", + } + info = SyncedObjectInfo.from_dict(data) + assert info.last_synced_at == 9876543210.0 + assert info.remote_etag == "def456" + assert info.source == "local" + + +class TestSyncState: + def test_to_dict(self): + state = SyncState( + synced_objects={ + "test.txt": SyncedObjectInfo( + last_synced_at=1000.0, + remote_etag="etag1", + source="remote", + ) + }, + last_full_sync=2000.0, + ) + result = state.to_dict() + assert "test.txt" in result["synced_objects"] + assert result["synced_objects"]["test.txt"]["remote_etag"] == "etag1" + assert result["last_full_sync"] == 2000.0 + + def test_from_dict(self): + data = { + "synced_objects": { + "file.txt": { + "last_synced_at": 3000.0, + "remote_etag": "etag2", + "source": "remote", + } + }, + "last_full_sync": 4000.0, + } + state = SyncState.from_dict(data) + assert "file.txt" in state.synced_objects + assert state.synced_objects["file.txt"].remote_etag == "etag2" + assert state.last_full_sync == 4000.0 + + def test_from_dict_empty(self): + state = SyncState.from_dict({}) + assert state.synced_objects == {} + assert state.last_full_sync is None + + +class TestSiteSyncStats: + def test_to_dict(self): + stats = SiteSyncStats( + last_sync_at=1234567890.0, + objects_pulled=10, + objects_skipped=5, + conflicts_resolved=2, + deletions_applied=1, + errors=0, + ) + result = stats.to_dict() + assert result["objects_pulled"] == 10 + assert result["objects_skipped"] == 5 + assert result["conflicts_resolved"] == 2 + assert result["deletions_applied"] == 1 + assert result["errors"] == 0 + + +class TestRemoteObjectMeta: + def test_from_s3_object(self): + obj = { + "Key": "test/file.txt", + "Size": 1024, + "LastModified": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + "ETag": '"abc123def456"', + } + meta = RemoteObjectMeta.from_s3_object(obj) + assert meta.key == "test/file.txt" + assert meta.size == 1024 + assert meta.last_modified == datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + assert meta.etag == "abc123def456" + + +class TestReplicationRuleBidirectional: + def test_rule_with_bidirectional_mode(self): + rule = ReplicationRule( + bucket_name="sync-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + enabled=True, + mode=REPLICATION_MODE_BIDIRECTIONAL, + sync_deletions=True, + ) + assert rule.mode == REPLICATION_MODE_BIDIRECTIONAL + assert rule.sync_deletions is True + assert rule.last_pull_at is None + + def test_rule_to_dict_includes_new_fields(self): + rule = ReplicationRule( + bucket_name="sync-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_BIDIRECTIONAL, + sync_deletions=False, + last_pull_at=1234567890.0, + ) + result = rule.to_dict() + assert result["mode"] == REPLICATION_MODE_BIDIRECTIONAL + assert result["sync_deletions"] is False + assert result["last_pull_at"] == 1234567890.0 + + def test_rule_from_dict_with_new_fields(self): + data = { + "bucket_name": "sync-bucket", + "target_connection_id": "test-conn", + "target_bucket": "remote-bucket", + "mode": REPLICATION_MODE_BIDIRECTIONAL, + "sync_deletions": False, + "last_pull_at": 1234567890.0, + } + rule = ReplicationRule.from_dict(data) + assert rule.mode == REPLICATION_MODE_BIDIRECTIONAL + assert rule.sync_deletions is False + assert rule.last_pull_at == 1234567890.0 + + def test_rule_from_dict_defaults_new_fields(self): + data = { + "bucket_name": "sync-bucket", + "target_connection_id": "test-conn", + "target_bucket": "remote-bucket", + } + rule = ReplicationRule.from_dict(data) + assert rule.sync_deletions is True + assert rule.last_pull_at is None + + +class TestSiteSyncWorker: + def test_start_and_shutdown(self, site_sync_worker): + site_sync_worker.start() + assert site_sync_worker._sync_thread is not None + assert site_sync_worker._sync_thread.is_alive() + site_sync_worker.shutdown() + assert not site_sync_worker._sync_thread.is_alive() + + def test_trigger_sync_no_rule(self, site_sync_worker): + result = site_sync_worker.trigger_sync("nonexistent-bucket") + assert result is None + + def test_trigger_sync_wrong_mode(self, site_sync_worker, replication_manager): + rule = ReplicationRule( + bucket_name="new-only-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_NEW_ONLY, + enabled=True, + ) + replication_manager.set_rule(rule) + result = site_sync_worker.trigger_sync("new-only-bucket") + assert result is None + + def test_trigger_sync_disabled_rule(self, site_sync_worker, replication_manager): + rule = ReplicationRule( + bucket_name="disabled-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_BIDIRECTIONAL, + enabled=False, + ) + replication_manager.set_rule(rule) + result = site_sync_worker.trigger_sync("disabled-bucket") + assert result is None + + def test_get_stats_no_sync(self, site_sync_worker): + stats = site_sync_worker.get_stats("nonexistent") + assert stats is None + + def test_resolve_conflict_remote_newer(self, site_sync_worker): + local_meta = MagicMock() + local_meta.last_modified = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + local_meta.etag = "local123" + + remote_meta = RemoteObjectMeta( + key="test.txt", + size=100, + last_modified=datetime(2025, 1, 2, 12, 0, 0, tzinfo=timezone.utc), + etag="remote456", + ) + + result = site_sync_worker._resolve_conflict(local_meta, remote_meta) + assert result == "pull" + + def test_resolve_conflict_local_newer(self, site_sync_worker): + local_meta = MagicMock() + local_meta.last_modified = datetime(2025, 1, 2, 12, 0, 0, tzinfo=timezone.utc) + local_meta.etag = "local123" + + remote_meta = RemoteObjectMeta( + key="test.txt", + size=100, + last_modified=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + etag="remote456", + ) + + result = site_sync_worker._resolve_conflict(local_meta, remote_meta) + assert result == "keep" + + def test_resolve_conflict_same_time_same_etag(self, site_sync_worker): + ts = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + local_meta = MagicMock() + local_meta.last_modified = ts + local_meta.etag = "same123" + + remote_meta = RemoteObjectMeta( + key="test.txt", + size=100, + last_modified=ts, + etag="same123", + ) + + result = site_sync_worker._resolve_conflict(local_meta, remote_meta) + assert result == "skip" + + def test_resolve_conflict_same_time_different_etag(self, site_sync_worker): + ts = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + local_meta = MagicMock() + local_meta.last_modified = ts + local_meta.etag = "aaa" + + remote_meta = RemoteObjectMeta( + key="test.txt", + size=100, + last_modified=ts, + etag="zzz", + ) + + result = site_sync_worker._resolve_conflict(local_meta, remote_meta) + assert result == "pull" + + def test_sync_state_persistence(self, site_sync_worker, tmp_path): + bucket_name = "test-bucket" + state = SyncState( + synced_objects={ + "file1.txt": SyncedObjectInfo( + last_synced_at=time.time(), + remote_etag="etag1", + source="remote", + ) + }, + last_full_sync=time.time(), + ) + + site_sync_worker._save_sync_state(bucket_name, state) + + loaded = site_sync_worker._load_sync_state(bucket_name) + assert "file1.txt" in loaded.synced_objects + assert loaded.synced_objects["file1.txt"].remote_etag == "etag1" + + def test_load_sync_state_nonexistent(self, site_sync_worker): + state = site_sync_worker._load_sync_state("nonexistent-bucket") + assert state.synced_objects == {} + assert state.last_full_sync is None + + @patch("app.site_sync._create_sync_client") + def test_list_remote_objects(self, mock_create_client, site_sync_worker, connections, replication_manager): + mock_client = MagicMock() + mock_paginator = MagicMock() + mock_paginator.paginate.return_value = [ + { + "Contents": [ + { + "Key": "file1.txt", + "Size": 100, + "LastModified": datetime(2025, 1, 1, tzinfo=timezone.utc), + "ETag": '"etag1"', + }, + { + "Key": "file2.txt", + "Size": 200, + "LastModified": datetime(2025, 1, 2, tzinfo=timezone.utc), + "ETag": '"etag2"', + }, + ] + } + ] + mock_client.get_paginator.return_value = mock_paginator + mock_create_client.return_value = mock_client + + rule = ReplicationRule( + bucket_name="local-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_BIDIRECTIONAL, + ) + conn = connections.get("test-conn") + + result = site_sync_worker._list_remote_objects(rule, conn) + + assert "file1.txt" in result + assert "file2.txt" in result + assert result["file1.txt"].size == 100 + assert result["file2.txt"].size == 200 + + def test_list_local_objects(self, site_sync_worker, storage): + storage.create_bucket("test-bucket") + storage.put_object("test-bucket", "file1.txt", io.BytesIO(b"content1")) + storage.put_object("test-bucket", "file2.txt", io.BytesIO(b"content2")) + + result = site_sync_worker._list_local_objects("test-bucket") + + assert "file1.txt" in result + assert "file2.txt" in result + + @patch("app.site_sync._create_sync_client") + def test_sync_bucket_connection_not_found(self, mock_create_client, site_sync_worker, replication_manager): + rule = ReplicationRule( + bucket_name="test-bucket", + target_connection_id="missing-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_BIDIRECTIONAL, + enabled=True, + ) + replication_manager.set_rule(rule) + + stats = site_sync_worker._sync_bucket(rule) + assert stats.errors == 1 + + +class TestSiteSyncIntegration: + @patch("app.site_sync._create_sync_client") + def test_full_sync_cycle(self, mock_create_client, site_sync_worker, storage, connections, replication_manager): + storage.create_bucket("sync-bucket") + storage.put_object("sync-bucket", "local-only.txt", io.BytesIO(b"local content")) + + mock_client = MagicMock() + mock_paginator = MagicMock() + mock_paginator.paginate.return_value = [ + { + "Contents": [ + { + "Key": "remote-only.txt", + "Size": 100, + "LastModified": datetime(2025, 1, 15, tzinfo=timezone.utc), + "ETag": '"remoteetag"', + }, + ] + } + ] + mock_client.get_paginator.return_value = mock_paginator + mock_client.head_object.return_value = {"Metadata": {}} + + def mock_download(bucket, key, path): + Path(path).write_bytes(b"remote content") + + mock_client.download_file.side_effect = mock_download + mock_create_client.return_value = mock_client + + rule = ReplicationRule( + bucket_name="sync-bucket", + target_connection_id="test-conn", + target_bucket="remote-bucket", + mode=REPLICATION_MODE_BIDIRECTIONAL, + enabled=True, + ) + replication_manager.set_rule(rule) + + stats = site_sync_worker._sync_bucket(rule) + + assert stats.objects_pulled == 1 + assert stats.errors == 0 + + objects = site_sync_worker._list_local_objects("sync-bucket") + assert "local-only.txt" in objects + assert "remote-only.txt" in objects From 87c7f1bc7d1b746111218f5ab190f7868698b6ef Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 25 Jan 2026 12:35:14 +0800 Subject: [PATCH 6/6] Add bidirectional mode option to replication panel UI --- app/ui.py | 2 + docs.md | 119 ++++++++++++++++++++++- templates/bucket_detail.html | 25 +++-- templates/docs.html | 178 ++++++++++++++++++++++++++++++++--- 4 files changed, 299 insertions(+), 25 deletions(-) diff --git a/app/ui.py b/app/ui.py index 1cc2b00..738521c 100644 --- a/app/ui.py +++ b/app/ui.py @@ -470,6 +470,7 @@ def bucket_detail(bucket_name: str): kms_enabled = current_app.config.get("KMS_ENABLED", False) encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False) lifecycle_enabled = current_app.config.get("LIFECYCLE_ENABLED", False) + site_sync_enabled = current_app.config.get("SITE_SYNC_ENABLED", False) can_manage_encryption = can_manage_versioning bucket_quota = storage.get_bucket_quota(bucket_name) @@ -522,6 +523,7 @@ def bucket_detail(bucket_name: str): bucket_quota=bucket_quota, bucket_stats=bucket_stats, can_manage_quota=can_manage_quota, + site_sync_enabled=site_sync_enabled, ) diff --git a/docs.md b/docs.md index 54f956b..68cf69a 100644 --- a/docs.md +++ b/docs.md @@ -1248,12 +1248,22 @@ Replication uses a two-tier permission system: This separation allows administrators to pre-configure where data should replicate, while allowing authorized users to toggle replication on/off without accessing connection credentials. +### Replication Modes + +| Mode | Behavior | +|------|----------| +| `new_only` | Only replicate new/modified objects (default) | +| `all` | Sync all existing objects when rule is enabled | +| `bidirectional` | Two-way sync with Last-Write-Wins conflict resolution | + ### Architecture - **Source Instance**: The MyFSIO instance where you upload files. It runs the replication worker. - **Target Instance**: Another MyFSIO instance (or any S3-compatible service like AWS S3, MinIO) that receives the copies. -Replication is **asynchronous** (happens in the background) and **one-way** (Source -> Target). +For `new_only` and `all` modes, replication is **asynchronous** (happens in the background) and **one-way** (Source -> Target). + +For `bidirectional` mode, replication is **two-way** with automatic conflict resolution. ### Setup Guide @@ -1355,16 +1365,117 @@ When paused, new objects uploaded to the source will not replicate until replica > **Note:** Only admins can create new replication rules, change the target connection/bucket, or delete rules entirely. -### Bidirectional Replication (Active-Active) +### Bidirectional Site Replication -To set up two-way replication (Server A ↔ Server B): +For true two-way synchronization with automatic conflict resolution, use the `bidirectional` replication mode. This enables a background sync worker that periodically pulls changes from the remote site. + +> **Important:** Both sites must be configured to sync with each other. Each site pushes its changes and pulls from the other. You must set up connections and replication rules on both ends. + +#### Step 1: Enable Site Sync on Both Sites + +Set these environment variables on **both** Site A and Site B: + +```bash +SITE_SYNC_ENABLED=true +SITE_SYNC_INTERVAL_SECONDS=60 # How often to pull changes (default: 60) +SITE_SYNC_BATCH_SIZE=100 # Max objects per sync cycle (default: 100) +``` + +#### Step 2: Create IAM Users for Cross-Site Access + +On each site, create an IAM user that the other site will use to connect: + +| Site | Create User For | Required Permissions | +|------|-----------------|---------------------| +| Site A | Site B to connect | `read`, `write`, `list`, `delete` on target bucket | +| Site B | Site A to connect | `read`, `write`, `list`, `delete` on target bucket | + +Example policy for the replication user: +```json +[{"bucket": "my-bucket", "actions": ["read", "write", "list", "delete"]}] +``` + +#### Step 3: Create Connections + +On each site, add a connection pointing to the other: + +**On Site A:** +- Go to **Connections** and add a connection to Site B +- Endpoint: `https://site-b.example.com` +- Credentials: Site B's IAM user (created in Step 2) + +**On Site B:** +- Go to **Connections** and add a connection to Site A +- Endpoint: `https://site-a.example.com` +- Credentials: Site A's IAM user (created in Step 2) + +#### Step 4: Enable Bidirectional Replication + +On each site, go to the bucket's **Replication** tab and enable with mode `bidirectional`: + +**On Site A:** +- Source bucket: `my-bucket` +- Target connection: Site B connection +- Target bucket: `my-bucket` +- Mode: **Bidirectional sync** + +**On Site B:** +- Source bucket: `my-bucket` +- Target connection: Site A connection +- Target bucket: `my-bucket` +- Mode: **Bidirectional sync** + +#### How It Works + +- **PUSH**: Local changes replicate to remote immediately on write/delete +- **PULL**: Background worker fetches remote changes every `SITE_SYNC_INTERVAL_SECONDS` +- **Loop Prevention**: `S3ReplicationAgent` and `SiteSyncAgent` User-Agents prevent infinite sync loops + +#### Conflict Resolution (Last-Write-Wins) + +When the same object exists on both sites, the system uses Last-Write-Wins (LWW) based on `last_modified` timestamps: + +- **Remote newer**: Pull the remote version +- **Local newer**: Keep the local version +- **Same timestamp**: Use ETag as tiebreaker (higher ETag wins) + +A 1-second clock skew tolerance prevents false conflicts from minor time differences. + +#### Deletion Synchronization + +When `sync_deletions=true` (default), remote deletions propagate locally only if: +1. The object was previously synced FROM remote (tracked in sync state) +2. The local version hasn't been modified since last sync + +This prevents accidental deletion of local-only objects. + +#### Sync State Storage + +Sync state is stored at: `data/.myfsio.sys/buckets//site_sync_state.json` + +```json +{ + "synced_objects": { + "path/to/file.txt": { + "last_synced_at": 1706100000.0, + "remote_etag": "abc123", + "source": "remote" + } + }, + "last_full_sync": 1706100000.0 +} +``` + +### Legacy Bidirectional Setup (Manual) + +For simpler use cases without the site sync worker, you can manually configure two one-way rules: 1. Follow the steps above to replicate **A → B**. 2. Repeat the process on Server B to replicate **B → A**: - Create a connection on Server B pointing to Server A. - Enable replication on the target bucket on Server B. -**Loop Prevention**: The system automatically detects replication traffic using a custom User-Agent (`S3ReplicationAgent`). This prevents infinite loops where an object replicated from A to B is immediately replicated back to A. +**Loop Prevention**: The system automatically detects replication traffic using custom User-Agents (`S3ReplicationAgent` and `SiteSyncAgent`). This prevents infinite loops where an object replicated from A to B is immediately replicated back to A. **Deletes**: Deleting an object on one server will propagate the deletion to the other server. diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index 8564438..ea0f9ae 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -1065,8 +1065,10 @@
    - Replication Active — - {% if replication_rule.mode == 'all' %} + Replication Active — + {% if replication_rule.mode == 'bidirectional' %} + Bi-directional sync enabled with LWW conflict resolution. + {% elif replication_rule.mode == 'all' %} All objects (existing + new) are being replicated. {% else %} New uploads to this bucket are automatically replicated. @@ -1159,7 +1161,7 @@
    Mode
    - {% if replication_rule.mode == 'all' %}All Objects{% else %}New Only{% endif %} + {% if replication_rule.mode == 'bidirectional' %}Bidirectional{% elif replication_rule.mode == 'all' %}All Objects{% else %}New Only{% endif %}
    @@ -1310,7 +1312,9 @@
    Replication Paused

    Replication is configured but currently paused. New uploads will not be replicated until resumed.

    - {% if replication_rule.mode == 'all' %} + {% if replication_rule.mode == 'bidirectional' %} +

    Tip: When you resume, bi-directional sync will continue and any missed changes will be reconciled using LWW conflict resolution.

    + {% elif replication_rule.mode == 'all' %}

    Tip: When you resume, any objects uploaded while paused will be automatically synced to the target.

    {% else %}

    Note: Objects uploaded while paused will not be synced (mode: new_only). Consider switching to "All Objects" mode if you need to sync missed uploads.

    @@ -1435,17 +1439,26 @@
    Only replicate objects uploaded after enabling replication. Existing objects will not be copied.
    -
    +
    + {% if site_sync_enabled %} +
    + + +
    + {% endif %}
    - +