UI overhaul; Replication and S3 API improvements
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
"""Background replication worker."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
@@ -21,6 +23,41 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
|
||||
|
||||
REPLICATION_MODE_NEW_ONLY = "new_only"
|
||||
REPLICATION_MODE_ALL = "all"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplicationStats:
|
||||
"""Statistics for replication operations - computed dynamically."""
|
||||
objects_synced: int = 0 # Objects that exist in both source and destination
|
||||
objects_pending: int = 0 # Objects in source but not in destination
|
||||
objects_orphaned: int = 0 # Objects in destination but not in source (will be deleted)
|
||||
bytes_synced: int = 0 # Total bytes synced to destination
|
||||
last_sync_at: Optional[float] = None
|
||||
last_sync_key: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"objects_synced": self.objects_synced,
|
||||
"objects_pending": self.objects_pending,
|
||||
"objects_orphaned": self.objects_orphaned,
|
||||
"bytes_synced": self.bytes_synced,
|
||||
"last_sync_at": self.last_sync_at,
|
||||
"last_sync_key": self.last_sync_key,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "ReplicationStats":
|
||||
return cls(
|
||||
objects_synced=data.get("objects_synced", 0),
|
||||
objects_pending=data.get("objects_pending", 0),
|
||||
objects_orphaned=data.get("objects_orphaned", 0),
|
||||
bytes_synced=data.get("bytes_synced", 0),
|
||||
last_sync_at=data.get("last_sync_at"),
|
||||
last_sync_key=data.get("last_sync_key"),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplicationRule:
|
||||
@@ -28,6 +65,32 @@ class ReplicationRule:
|
||||
target_connection_id: str
|
||||
target_bucket: str
|
||||
enabled: bool = True
|
||||
mode: str = REPLICATION_MODE_NEW_ONLY
|
||||
created_at: Optional[float] = None
|
||||
stats: ReplicationStats = field(default_factory=ReplicationStats)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"bucket_name": self.bucket_name,
|
||||
"target_connection_id": self.target_connection_id,
|
||||
"target_bucket": self.target_bucket,
|
||||
"enabled": self.enabled,
|
||||
"mode": self.mode,
|
||||
"created_at": self.created_at,
|
||||
"stats": self.stats.to_dict(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "ReplicationRule":
|
||||
stats_data = data.pop("stats", {})
|
||||
# Handle old rules without mode/created_at
|
||||
if "mode" not in data:
|
||||
data["mode"] = REPLICATION_MODE_NEW_ONLY
|
||||
if "created_at" not in data:
|
||||
data["created_at"] = None
|
||||
rule = cls(**data)
|
||||
rule.stats = ReplicationStats.from_dict(stats_data) if stats_data else ReplicationStats()
|
||||
return rule
|
||||
|
||||
|
||||
class ReplicationManager:
|
||||
@@ -36,6 +99,7 @@ class ReplicationManager:
|
||||
self.connections = connections
|
||||
self.rules_path = rules_path
|
||||
self._rules: Dict[str, ReplicationRule] = {}
|
||||
self._stats_lock = threading.Lock()
|
||||
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
|
||||
self.reload_rules()
|
||||
|
||||
@@ -44,17 +108,15 @@ class ReplicationManager:
|
||||
self._rules = {}
|
||||
return
|
||||
try:
|
||||
import json
|
||||
with open(self.rules_path, "r") as f:
|
||||
data = json.load(f)
|
||||
for bucket, rule_data in data.items():
|
||||
self._rules[bucket] = ReplicationRule(**rule_data)
|
||||
self._rules[bucket] = ReplicationRule.from_dict(rule_data)
|
||||
except (OSError, ValueError) as e:
|
||||
logger.error(f"Failed to load replication rules: {e}")
|
||||
|
||||
def save_rules(self) -> None:
|
||||
import json
|
||||
data = {b: rule.__dict__ for b, rule in self._rules.items()}
|
||||
data = {b: rule.to_dict() for b, rule in self._rules.items()}
|
||||
self.rules_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.rules_path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
@@ -70,6 +132,99 @@ class ReplicationManager:
|
||||
if bucket_name in self._rules:
|
||||
del self._rules[bucket_name]
|
||||
self.save_rules()
|
||||
|
||||
def _update_last_sync(self, bucket_name: str, object_key: str = "") -> None:
|
||||
"""Update last sync timestamp after a successful operation."""
|
||||
with self._stats_lock:
|
||||
rule = self._rules.get(bucket_name)
|
||||
if not rule:
|
||||
return
|
||||
rule.stats.last_sync_at = time.time()
|
||||
rule.stats.last_sync_key = object_key
|
||||
self.save_rules()
|
||||
|
||||
def get_sync_status(self, bucket_name: str) -> Optional[ReplicationStats]:
|
||||
"""Dynamically compute replication status by comparing source and destination buckets."""
|
||||
rule = self.get_rule(bucket_name)
|
||||
if not rule:
|
||||
return None
|
||||
|
||||
connection = self.connections.get(rule.target_connection_id)
|
||||
if not connection:
|
||||
return rule.stats # Return cached stats if connection unavailable
|
||||
|
||||
try:
|
||||
# Get source objects
|
||||
source_objects = self.storage.list_objects(bucket_name)
|
||||
source_keys = {obj.key: obj.size for obj in source_objects}
|
||||
|
||||
# Get destination objects
|
||||
s3 = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=connection.endpoint_url,
|
||||
aws_access_key_id=connection.access_key,
|
||||
aws_secret_access_key=connection.secret_key,
|
||||
region_name=connection.region,
|
||||
)
|
||||
|
||||
dest_keys = set()
|
||||
bytes_synced = 0
|
||||
paginator = s3.get_paginator('list_objects_v2')
|
||||
try:
|
||||
for page in paginator.paginate(Bucket=rule.target_bucket):
|
||||
for obj in page.get('Contents', []):
|
||||
dest_keys.add(obj['Key'])
|
||||
if obj['Key'] in source_keys:
|
||||
bytes_synced += obj.get('Size', 0)
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
# Destination bucket doesn't exist yet
|
||||
dest_keys = set()
|
||||
else:
|
||||
raise
|
||||
|
||||
# Compute stats
|
||||
synced = source_keys.keys() & dest_keys # Objects in both
|
||||
orphaned = dest_keys - source_keys.keys() # In dest but not source
|
||||
|
||||
# For "new_only" mode, we can't determine pending since we don't know
|
||||
# which objects existed before replication was enabled. Only "all" mode
|
||||
# should show pending (objects that should be replicated but aren't yet).
|
||||
if rule.mode == REPLICATION_MODE_ALL:
|
||||
pending = source_keys.keys() - dest_keys # In source but not dest
|
||||
else:
|
||||
pending = set() # New-only mode: don't show pre-existing as pending
|
||||
|
||||
# Update cached stats with computed values
|
||||
rule.stats.objects_synced = len(synced)
|
||||
rule.stats.objects_pending = len(pending)
|
||||
rule.stats.objects_orphaned = len(orphaned)
|
||||
rule.stats.bytes_synced = bytes_synced
|
||||
|
||||
return rule.stats
|
||||
|
||||
except (ClientError, StorageError) as e:
|
||||
logger.error(f"Failed to compute sync status for {bucket_name}: {e}")
|
||||
return rule.stats # Return cached stats on error
|
||||
|
||||
def replicate_existing_objects(self, bucket_name: str) -> None:
|
||||
"""Trigger replication for all existing objects in a bucket."""
|
||||
rule = self.get_rule(bucket_name)
|
||||
if not rule or not rule.enabled:
|
||||
return
|
||||
|
||||
connection = self.connections.get(rule.target_connection_id)
|
||||
if not connection:
|
||||
logger.warning(f"Cannot replicate existing objects: Connection {rule.target_connection_id} not found")
|
||||
return
|
||||
|
||||
try:
|
||||
objects = self.storage.list_objects(bucket_name)
|
||||
logger.info(f"Starting replication of {len(objects)} existing objects from {bucket_name}")
|
||||
for obj in objects:
|
||||
self._executor.submit(self._replicate_task, bucket_name, obj.key, rule, connection, "write")
|
||||
except StorageError as e:
|
||||
logger.error(f"Failed to list objects for replication: {e}")
|
||||
|
||||
def create_remote_bucket(self, connection_id: str, bucket_name: str) -> None:
|
||||
"""Create a bucket on the remote connection."""
|
||||
@@ -103,6 +258,7 @@ class ReplicationManager:
|
||||
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
|
||||
|
||||
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
|
||||
file_size = 0
|
||||
try:
|
||||
# Using boto3 to upload
|
||||
config = Config(user_agent_extra=REPLICATION_USER_AGENT)
|
||||
@@ -119,21 +275,15 @@ class ReplicationManager:
|
||||
try:
|
||||
s3.delete_object(Bucket=rule.target_bucket, Key=object_key)
|
||||
logger.info(f"Replicated DELETE {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})")
|
||||
self._update_last_sync(bucket_name, object_key)
|
||||
except ClientError as e:
|
||||
logger.error(f"Replication DELETE failed for {bucket_name}/{object_key}: {e}")
|
||||
return
|
||||
|
||||
# 1. Get local file path
|
||||
# Note: We are accessing internal storage structure here.
|
||||
# Ideally storage.py should expose a 'get_file_path' or we read the stream.
|
||||
# For efficiency, we'll try to read the file directly if we can, or use storage.get_object
|
||||
|
||||
# We need the file content.
|
||||
# Since ObjectStorage is filesystem based, let's get the stream.
|
||||
# We need to be careful about closing it.
|
||||
try:
|
||||
path = self.storage.get_object_path(bucket_name, object_key)
|
||||
except StorageError:
|
||||
logger.error(f"Source object not found: {bucket_name}/{object_key}")
|
||||
return
|
||||
|
||||
metadata = self.storage.get_object_metadata(bucket_name, object_key)
|
||||
@@ -159,7 +309,6 @@ class ReplicationManager:
|
||||
Metadata=metadata or {}
|
||||
)
|
||||
except (ClientError, S3UploadFailedError) as e:
|
||||
# Check if it's a NoSuchBucket error (either direct or wrapped)
|
||||
is_no_bucket = False
|
||||
if isinstance(e, ClientError):
|
||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
||||
@@ -189,6 +338,7 @@ class ReplicationManager:
|
||||
raise e
|
||||
|
||||
logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})")
|
||||
self._update_last_sync(bucket_name, object_key)
|
||||
|
||||
except (ClientError, OSError, ValueError) as e:
|
||||
logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user