From 93a5aa661882858665b41bc9020f1a904d81b59f Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 5 Jan 2026 00:18:08 +0800 Subject: [PATCH] Add replication failure tracking and lifecycle execution history --- app/__init__.py | 3 +- app/lifecycle.py | 104 +++++++++++- app/replication.py | 202 ++++++++++++++++++++++- app/ui.py | 109 +++++++++++++ templates/bucket_detail.html | 307 ++++++++++++++++++++++++++++++++++- tests/test_replication.py | 4 +- 6 files changed, 720 insertions(+), 9 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 19a890c..2029d46 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -124,7 +124,7 @@ def create_app( ) connections = ConnectionStore(connections_path) - replication = ReplicationManager(storage, connections, replication_rules_path) + replication = ReplicationManager(storage, connections, replication_rules_path, storage_root) encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), @@ -156,6 +156,7 @@ def create_app( lifecycle_manager = LifecycleManager( base_storage, interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600), + storage_root=storage_root, ) lifecycle_manager.start() diff --git a/app/lifecycle.py b/app/lifecycle.py index 8ad0636..ed9eb2c 100644 --- a/app/lifecycle.py +++ b/app/lifecycle.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import logging import threading import time @@ -23,13 +24,104 @@ class LifecycleResult: execution_time_seconds: float = 0.0 +@dataclass +class LifecycleExecutionRecord: + timestamp: float + bucket_name: str + objects_deleted: int + versions_deleted: int + uploads_aborted: int + errors: List[str] + execution_time_seconds: float + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp, + "bucket_name": self.bucket_name, + "objects_deleted": self.objects_deleted, + "versions_deleted": self.versions_deleted, + "uploads_aborted": self.uploads_aborted, + "errors": self.errors, + "execution_time_seconds": self.execution_time_seconds, + } + + @classmethod + def from_dict(cls, data: dict) -> "LifecycleExecutionRecord": + return cls( + timestamp=data["timestamp"], + bucket_name=data["bucket_name"], + objects_deleted=data["objects_deleted"], + versions_deleted=data["versions_deleted"], + uploads_aborted=data["uploads_aborted"], + errors=data.get("errors", []), + execution_time_seconds=data["execution_time_seconds"], + ) + + @classmethod + def from_result(cls, result: LifecycleResult) -> "LifecycleExecutionRecord": + return cls( + timestamp=time.time(), + bucket_name=result.bucket_name, + objects_deleted=result.objects_deleted, + versions_deleted=result.versions_deleted, + uploads_aborted=result.uploads_aborted, + errors=result.errors.copy(), + execution_time_seconds=result.execution_time_seconds, + ) + + +class LifecycleHistoryStore: + MAX_HISTORY_PER_BUCKET = 50 + + def __init__(self, storage_root: Path) -> None: + self.storage_root = storage_root + self._lock = threading.Lock() + + def _get_history_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "lifecycle_history.json" + + def load_history(self, bucket_name: str) -> List[LifecycleExecutionRecord]: + path = self._get_history_path(bucket_name) + if not path.exists(): + return [] + try: + with open(path, "r") as f: + data = json.load(f) + return [LifecycleExecutionRecord.from_dict(d) for d in data.get("executions", [])] + except (OSError, ValueError, KeyError) as e: + logger.error(f"Failed to load lifecycle history for {bucket_name}: {e}") + return [] + + def save_history(self, bucket_name: str, records: List[LifecycleExecutionRecord]) -> None: + path = self._get_history_path(bucket_name) + path.parent.mkdir(parents=True, exist_ok=True) + data = {"executions": [r.to_dict() for r in records[:self.MAX_HISTORY_PER_BUCKET]]} + try: + with open(path, "w") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error(f"Failed to save lifecycle history for {bucket_name}: {e}") + + def add_record(self, bucket_name: str, record: LifecycleExecutionRecord) -> None: + with self._lock: + records = self.load_history(bucket_name) + records.insert(0, record) + self.save_history(bucket_name, records) + + def get_history(self, bucket_name: str, limit: int = 50, offset: int = 0) -> List[LifecycleExecutionRecord]: + records = self.load_history(bucket_name) + return records[offset:offset + limit] + + class LifecycleManager: - def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600): + def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600, storage_root: Optional[Path] = None): self.storage = storage self.interval_seconds = interval_seconds + self.storage_root = storage_root self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self.history_store = LifecycleHistoryStore(storage_root) if storage_root else None def start(self) -> None: if self._timer is not None: @@ -98,12 +190,15 @@ class LifecycleManager: logger.error(f"Lifecycle enforcement error for {bucket_name}: {e}") result.execution_time_seconds = time.time() - start_time - if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0: + if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0 or result.errors: logger.info( f"Lifecycle enforcement for {bucket_name}: " f"deleted={result.objects_deleted}, versions={result.versions_deleted}, " f"aborted={result.uploads_aborted}, time={result.execution_time_seconds:.2f}s" ) + if self.history_store: + record = LifecycleExecutionRecord.from_result(result) + self.history_store.add_record(bucket_name, record) return result def _enforce_expiration( @@ -233,3 +328,8 @@ class LifecycleManager: if bucket_name: return {bucket_name: self.enforce_rules(bucket_name)} return self.enforce_all_buckets() + + def get_execution_history(self, bucket_name: str, limit: int = 50, offset: int = 0) -> List[LifecycleExecutionRecord]: + if not self.history_store: + return [] + return self.history_store.get_history(bucket_name, limit, offset) diff --git a/app/replication.py b/app/replication.py index 6620fff..4eacdef 100644 --- a/app/replication.py +++ b/app/replication.py @@ -8,7 +8,7 @@ import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import boto3 from botocore.config import Config @@ -87,6 +87,40 @@ class ReplicationStats: ) +@dataclass +class ReplicationFailure: + object_key: str + error_message: str + timestamp: float + failure_count: int + bucket_name: str + action: str + last_error_code: Optional[str] = None + + def to_dict(self) -> dict: + return { + "object_key": self.object_key, + "error_message": self.error_message, + "timestamp": self.timestamp, + "failure_count": self.failure_count, + "bucket_name": self.bucket_name, + "action": self.action, + "last_error_code": self.last_error_code, + } + + @classmethod + def from_dict(cls, data: dict) -> "ReplicationFailure": + return cls( + object_key=data["object_key"], + error_message=data["error_message"], + timestamp=data["timestamp"], + failure_count=data["failure_count"], + bucket_name=data["bucket_name"], + action=data["action"], + last_error_code=data.get("last_error_code"), + ) + + @dataclass class ReplicationRule: bucket_name: str @@ -120,15 +154,86 @@ class ReplicationRule: return rule +class ReplicationFailureStore: + MAX_FAILURES_PER_BUCKET = 50 + + def __init__(self, storage_root: Path) -> None: + self.storage_root = storage_root + self._lock = threading.Lock() + + def _get_failures_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json" + + def load_failures(self, bucket_name: str) -> List[ReplicationFailure]: + path = self._get_failures_path(bucket_name) + if not path.exists(): + return [] + try: + with open(path, "r") as f: + data = json.load(f) + return [ReplicationFailure.from_dict(d) for d in data.get("failures", [])] + except (OSError, ValueError, KeyError) as e: + logger.error(f"Failed to load replication failures for {bucket_name}: {e}") + return [] + + def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: + path = self._get_failures_path(bucket_name) + path.parent.mkdir(parents=True, exist_ok=True) + data = {"failures": [f.to_dict() for f in failures[:self.MAX_FAILURES_PER_BUCKET]]} + try: + with open(path, "w") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error(f"Failed to save replication failures for {bucket_name}: {e}") + + def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None: + with self._lock: + failures = self.load_failures(bucket_name) + existing = next((f for f in failures if f.object_key == failure.object_key), None) + if existing: + existing.failure_count += 1 + existing.timestamp = failure.timestamp + existing.error_message = failure.error_message + existing.last_error_code = failure.last_error_code + else: + failures.insert(0, failure) + self.save_failures(bucket_name, failures) + + def remove_failure(self, bucket_name: str, object_key: str) -> bool: + with self._lock: + failures = self.load_failures(bucket_name) + original_len = len(failures) + failures = [f for f in failures if f.object_key != object_key] + if len(failures) < original_len: + self.save_failures(bucket_name, failures) + return True + return False + + def clear_failures(self, bucket_name: str) -> None: + with self._lock: + path = self._get_failures_path(bucket_name) + if path.exists(): + path.unlink() + + def get_failure(self, bucket_name: str, object_key: str) -> Optional[ReplicationFailure]: + failures = self.load_failures(bucket_name) + return next((f for f in failures if f.object_key == object_key), None) + + def get_failure_count(self, bucket_name: str) -> int: + return len(self.load_failures(bucket_name)) + + class ReplicationManager: - def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path) -> None: + def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path, storage_root: Path) -> None: self.storage = storage self.connections = connections self.rules_path = rules_path + self.storage_root = storage_root self._rules: Dict[str, ReplicationRule] = {} self._stats_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") self._shutdown = False + self.failure_store = ReplicationFailureStore(storage_root) self.reload_rules() def shutdown(self, wait: bool = True) -> None: @@ -331,8 +436,19 @@ class ReplicationManager: s3.delete_object(Bucket=rule.target_bucket, Key=object_key) logger.info(f"Replicated DELETE {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") self._update_last_sync(bucket_name, object_key) + self.failure_store.remove_failure(bucket_name, object_key) except ClientError as e: + error_code = e.response.get('Error', {}).get('Code') logger.error(f"Replication DELETE failed for {bucket_name}/{object_key}: {e}") + self.failure_store.add_failure(bucket_name, ReplicationFailure( + object_key=object_key, + error_message=str(e), + timestamp=time.time(), + failure_count=1, + bucket_name=bucket_name, + action="delete", + last_error_code=error_code, + )) return try: @@ -405,9 +521,89 @@ class ReplicationManager: logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") self._update_last_sync(bucket_name, object_key) + self.failure_store.remove_failure(bucket_name, object_key) except (ClientError, OSError, ValueError) as e: + error_code = None + if isinstance(e, ClientError): + error_code = e.response.get('Error', {}).get('Code') logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}") - except Exception: + self.failure_store.add_failure(bucket_name, ReplicationFailure( + object_key=object_key, + error_message=str(e), + timestamp=time.time(), + failure_count=1, + bucket_name=bucket_name, + action=action, + last_error_code=error_code, + )) + except Exception as e: logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}") + self.failure_store.add_failure(bucket_name, ReplicationFailure( + object_key=object_key, + error_message=str(e), + timestamp=time.time(), + failure_count=1, + bucket_name=bucket_name, + action=action, + last_error_code=None, + )) + def get_failed_items(self, bucket_name: str, limit: int = 50, offset: int = 0) -> List[ReplicationFailure]: + failures = self.failure_store.load_failures(bucket_name) + return failures[offset:offset + limit] + + def get_failure_count(self, bucket_name: str) -> int: + return self.failure_store.get_failure_count(bucket_name) + + def retry_failed_item(self, bucket_name: str, object_key: str) -> bool: + failure = self.failure_store.get_failure(bucket_name, object_key) + if not failure: + return False + + rule = self.get_rule(bucket_name) + if not rule or not rule.enabled: + return False + + connection = self.connections.get(rule.target_connection_id) + if not connection: + logger.warning(f"Cannot retry: Connection {rule.target_connection_id} not found") + return False + + if not self.check_endpoint_health(connection): + logger.warning(f"Cannot retry: Endpoint {connection.name} is not reachable") + return False + + self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, failure.action) + return True + + def retry_all_failed(self, bucket_name: str) -> Dict[str, int]: + failures = self.failure_store.load_failures(bucket_name) + if not failures: + return {"submitted": 0, "skipped": 0} + + rule = self.get_rule(bucket_name) + if not rule or not rule.enabled: + return {"submitted": 0, "skipped": len(failures)} + + connection = self.connections.get(rule.target_connection_id) + if not connection: + logger.warning(f"Cannot retry: Connection {rule.target_connection_id} not found") + return {"submitted": 0, "skipped": len(failures)} + + if not self.check_endpoint_health(connection): + logger.warning(f"Cannot retry: Endpoint {connection.name} is not reachable") + return {"submitted": 0, "skipped": len(failures)} + + submitted = 0 + for failure in failures: + self._executor.submit(self._replicate_task, bucket_name, failure.object_key, rule, connection, failure.action) + submitted += 1 + + return {"submitted": submitted, "skipped": 0} + + def dismiss_failure(self, bucket_name: str, object_key: str) -> bool: + return self.failure_store.remove_failure(bucket_name, object_key) + + def clear_failures(self, bucket_name: str) -> None: + self.failure_store.clear_failures(bucket_name) diff --git a/app/ui.py b/app/ui.py index 05111b5..0dd8b90 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1590,6 +1590,84 @@ def get_replication_status(bucket_name: str): }) +@ui_bp.get("/buckets//replication/failures") +def get_replication_failures(bucket_name: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "replication") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + limit = request.args.get("limit", 50, type=int) + offset = request.args.get("offset", 0, type=int) + + failures = _replication().get_failed_items(bucket_name, limit, offset) + total = _replication().get_failure_count(bucket_name) + + return jsonify({ + "failures": [f.to_dict() for f in failures], + "total": total, + "limit": limit, + "offset": offset, + }) + + +@ui_bp.post("/buckets//replication/failures//retry") +def retry_replication_failure(bucket_name: str, object_key: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "replication") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + success = _replication().retry_failed_item(bucket_name, object_key) + if success: + return jsonify({"status": "submitted", "object_key": object_key}) + return jsonify({"error": "Failed to submit retry"}), 400 + + +@ui_bp.post("/buckets//replication/failures/retry-all") +def retry_all_replication_failures(bucket_name: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "replication") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + result = _replication().retry_all_failed(bucket_name) + return jsonify({ + "status": "submitted", + "submitted": result["submitted"], + "skipped": result["skipped"], + }) + + +@ui_bp.delete("/buckets//replication/failures/") +def dismiss_replication_failure(bucket_name: str, object_key: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "replication") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + success = _replication().dismiss_failure(bucket_name, object_key) + if success: + return jsonify({"status": "dismissed", "object_key": object_key}) + return jsonify({"error": "Failure not found"}), 404 + + +@ui_bp.delete("/buckets//replication/failures") +def clear_replication_failures(bucket_name: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "replication") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + _replication().clear_failures(bucket_name) + return jsonify({"status": "cleared"}) + + @ui_bp.get("/connections//health") def check_connection_health(connection_id: str): """Check if a connection endpoint is reachable.""" @@ -1742,6 +1820,37 @@ def bucket_lifecycle(bucket_name: str): return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules}) +@ui_bp.get("/buckets//lifecycle/history") +def get_lifecycle_history(bucket_name: str): + principal = _current_principal() + try: + _authorize_ui(principal, bucket_name, "policy") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + limit = request.args.get("limit", 50, type=int) + offset = request.args.get("offset", 0, type=int) + + lifecycle_manager = current_app.extensions.get("lifecycle") + if not lifecycle_manager: + return jsonify({ + "executions": [], + "total": 0, + "limit": limit, + "offset": offset, + "enabled": False, + }) + + records = lifecycle_manager.get_execution_history(bucket_name, limit, offset) + return jsonify({ + "executions": [r.to_dict() for r in records], + "total": len(lifecycle_manager.get_execution_history(bucket_name, 1000, 0)), + "limit": limit, + "offset": offset, + "enabled": True, + }) + + @ui_bp.route("/buckets//cors", methods=["GET", "POST", "DELETE"]) def bucket_cors(bucket_name: str): principal = _current_principal() diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index 5d76cb1..8e7c25b 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -1184,11 +1184,65 @@ - Last sync: + Last sync: - + + +
Replication Target
@@ -1526,6 +1580,46 @@
+ +
+
+ + + + + + Execution History +
+
+

Lifecycle rules are evaluated automatically (default: every hour). Recent executions are shown below.

+
+ + + + + + + + + + + + + + + +
ExecutedDeletedVersionsAbortedStatus
+
+ Loading... +
+
+ +
+
@@ -5018,7 +5112,139 @@ if (orphanedEl) orphanedEl.innerHTML = ''; if (bytesEl) bytesEl.innerHTML = ''; loadReplicationStats(); + loadReplicationFailures(); }); + + const failuresCard = document.getElementById('replication-failures-card'); + const failuresBody = document.getElementById('replication-failures-body'); + const failureCountBadge = document.getElementById('replication-failure-count'); + const retryAllBtn = document.getElementById('retry-all-failures-btn'); + const clearFailuresBtn = document.getElementById('clear-failures-btn'); + const showMoreFailuresBtn = document.getElementById('show-more-failures'); + const failuresPagination = document.getElementById('replication-failures-pagination'); + const failuresShownCount = document.getElementById('failures-shown-count'); + + let failuresExpanded = false; + let currentFailures = []; + + const loadReplicationFailures = async () => { + if (!failuresCard) return; + + const endpoint = failuresCard.dataset.failuresEndpoint; + const limit = failuresExpanded ? 50 : 5; + + try { + const resp = await fetch(`${endpoint}?limit=${limit}`); + if (!resp.ok) throw new Error('Failed to fetch failures'); + const data = await resp.json(); + + currentFailures = data.failures; + const total = data.total; + + if (total > 0) { + failuresCard.style.display = ''; + failureCountBadge.textContent = total; + renderFailures(currentFailures); + + if (total > 5 && !failuresExpanded) { + failuresPagination.style.display = ''; + failuresShownCount.textContent = `Showing ${Math.min(5, total)} of ${total}`; + } else { + failuresPagination.style.display = 'none'; + } + } else { + failuresCard.style.display = 'none'; + } + } catch (err) { + console.error('Failed to load replication failures:', err); + } + }; + + const renderFailures = (failures) => { + if (!failuresBody) return; + failuresBody.innerHTML = failures.map(f => ` + + + ${escapeHtml(f.object_key)} + + + ${escapeHtml(f.error_message.length > 60 ? f.error_message.substring(0, 60) + '...' : f.error_message)} + + ${new Date(f.timestamp * 1000).toLocaleString()} + ${f.failure_count} + + + + + + `).join(''); + }; + + window.retryFailure = async (objectKey) => { + const endpoint = failuresCard.dataset.retryEndpoint.replace('__KEY__', encodeURIComponent(objectKey)); + try { + const resp = await fetch(endpoint, { method: 'POST' }); + if (resp.ok) { + loadReplicationFailures(); + } + } catch (err) { + console.error('Failed to retry:', err); + } + }; + + window.dismissFailure = async (objectKey) => { + const endpoint = failuresCard.dataset.dismissEndpoint.replace('__KEY__', encodeURIComponent(objectKey)); + try { + const resp = await fetch(endpoint, { method: 'DELETE' }); + if (resp.ok) { + loadReplicationFailures(); + } + } catch (err) { + console.error('Failed to dismiss:', err); + } + }; + + retryAllBtn?.addEventListener('click', async () => { + const endpoint = failuresCard.dataset.retryAllEndpoint; + try { + const resp = await fetch(endpoint, { method: 'POST' }); + if (resp.ok) { + loadReplicationFailures(); + } + } catch (err) { + console.error('Failed to retry all:', err); + } + }); + + clearFailuresBtn?.addEventListener('click', async () => { + if (!confirm('Clear all failure records?')) return; + const endpoint = failuresCard.dataset.clearEndpoint; + try { + const resp = await fetch(endpoint, { method: 'DELETE' }); + if (resp.ok) { + loadReplicationFailures(); + } + } catch (err) { + console.error('Failed to clear failures:', err); + } + }); + + showMoreFailuresBtn?.addEventListener('click', () => { + failuresExpanded = !failuresExpanded; + showMoreFailuresBtn.textContent = failuresExpanded ? 'Show less' : 'Show more...'; + loadReplicationFailures(); + }); + + loadReplicationFailures(); } const algoAes256Radio = document.getElementById('algo_aes256'); @@ -5657,6 +5883,83 @@ }; if (lifecycleCard) loadLifecycleRules(); + + const lifecycleHistoryCard = document.getElementById('lifecycle-history-card'); + const lifecycleHistoryBody = document.getElementById('lifecycle-history-body'); + const lifecycleHistoryPagination = document.getElementById('lifecycle-history-pagination'); + const showMoreHistoryBtn = document.getElementById('show-more-history'); + const historyShownCount = document.getElementById('history-shown-count'); + let historyExpanded = false; + + const loadLifecycleHistory = async () => { + if (!lifecycleHistoryCard || !lifecycleHistoryBody) return; + + const endpoint = lifecycleHistoryCard.dataset.historyEndpoint; + const limit = historyExpanded ? 50 : 5; + + lifecycleHistoryBody.innerHTML = '
Loading...'; + + try { + const resp = await fetch(`${endpoint}?limit=${limit}`); + if (!resp.ok) throw new Error('Failed to fetch history'); + const data = await resp.json(); + + if (!data.enabled) { + lifecycleHistoryBody.innerHTML = 'Lifecycle enforcement is not enabled'; + return; + } + + const executions = data.executions || []; + const total = data.total || 0; + + if (executions.length === 0) { + lifecycleHistoryBody.innerHTML = 'No executions recorded yet'; + lifecycleHistoryPagination.style.display = 'none'; + return; + } + + lifecycleHistoryBody.innerHTML = executions.map(e => { + const date = new Date(e.timestamp * 1000); + const hasErrors = e.errors && e.errors.length > 0; + const hasActivity = e.objects_deleted > 0 || e.versions_deleted > 0 || e.uploads_aborted > 0; + let statusBadge; + if (hasErrors) { + statusBadge = 'Errors'; + } else if (hasActivity) { + statusBadge = 'Success'; + } else { + statusBadge = 'No action'; + } + const errorTooltip = hasErrors ? ` title="${escapeHtml(e.errors.join('; '))}"` : ''; + return ` + ${date.toLocaleString()} + ${e.objects_deleted} + ${e.versions_deleted} + ${e.uploads_aborted} + ${statusBadge} + `; + }).join(''); + + if (total > 5 && !historyExpanded) { + lifecycleHistoryPagination.style.display = ''; + historyShownCount.textContent = `Showing ${Math.min(5, total)} of ${total}`; + } else { + lifecycleHistoryPagination.style.display = 'none'; + } + } catch (err) { + console.error('Failed to load lifecycle history:', err); + lifecycleHistoryBody.innerHTML = 'Failed to load history'; + } + }; + + showMoreHistoryBtn?.addEventListener('click', () => { + historyExpanded = !historyExpanded; + showMoreHistoryBtn.textContent = historyExpanded ? 'Show less' : 'Show more...'; + loadLifecycleHistory(); + }); + + if (lifecycleHistoryCard) loadLifecycleHistory(); + if (corsCard) loadCorsRules(); if (aclCard) loadAcl(); diff --git a/tests/test_replication.py b/tests/test_replication.py index 3cb0c06..d7b0f7c 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -43,7 +43,9 @@ def connections(tmp_path: Path): @pytest.fixture def replication_manager(storage, connections, tmp_path): rules_path = tmp_path / "replication_rules.json" - manager = ReplicationManager(storage, connections, rules_path) + storage_root = tmp_path / "data" + storage_root.mkdir(exist_ok=True) + manager = ReplicationManager(storage, connections, rules_path, storage_root) yield manager manager.shutdown(wait=False)