"""Background replication worker.""" from __future__ import annotations import logging import mimetypes import threading from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path from typing import Dict, Optional import boto3 from botocore.config import Config from botocore.exceptions import ClientError from boto3.exceptions import S3UploadFailedError from .connections import ConnectionStore, RemoteConnection from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" @dataclass class ReplicationRule: bucket_name: str target_connection_id: str target_bucket: str enabled: bool = True class ReplicationManager: def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path) -> None: self.storage = storage self.connections = connections self.rules_path = rules_path self._rules: Dict[str, ReplicationRule] = {} self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") self.reload_rules() def reload_rules(self) -> None: if not self.rules_path.exists(): self._rules = {} return try: import json with open(self.rules_path, "r") as f: data = json.load(f) for bucket, rule_data in data.items(): self._rules[bucket] = ReplicationRule(**rule_data) except (OSError, ValueError) as e: logger.error(f"Failed to load replication rules: {e}") def save_rules(self) -> None: import json data = {b: rule.__dict__ for b, rule in self._rules.items()} self.rules_path.parent.mkdir(parents=True, exist_ok=True) with open(self.rules_path, "w") as f: json.dump(data, f, indent=2) def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]: return self._rules.get(bucket_name) def set_rule(self, rule: ReplicationRule) -> None: self._rules[rule.bucket_name] = rule self.save_rules() def delete_rule(self, bucket_name: str) -> None: if bucket_name in self._rules: del self._rules[bucket_name] self.save_rules() def create_remote_bucket(self, connection_id: str, bucket_name: str) -> None: """Create a bucket on the remote connection.""" connection = self.connections.get(connection_id) if not connection: raise ValueError(f"Connection {connection_id} not found") try: s3 = boto3.client( "s3", endpoint_url=connection.endpoint_url, aws_access_key_id=connection.access_key, aws_secret_access_key=connection.secret_key, region_name=connection.region, ) s3.create_bucket(Bucket=bucket_name) except ClientError as e: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") raise def trigger_replication(self, bucket_name: str, object_key: str, action: str = "write") -> None: rule = self.get_rule(bucket_name) if not rule or not rule.enabled: return connection = self.connections.get(rule.target_connection_id) if not connection: logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found") return self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: try: # Using boto3 to upload config = Config(user_agent_extra=REPLICATION_USER_AGENT) s3 = boto3.client( "s3", endpoint_url=conn.endpoint_url, aws_access_key_id=conn.access_key, aws_secret_access_key=conn.secret_key, region_name=conn.region, config=config, ) if action == "delete": try: s3.delete_object(Bucket=rule.target_bucket, Key=object_key) logger.info(f"Replicated DELETE {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") except ClientError as e: logger.error(f"Replication DELETE failed for {bucket_name}/{object_key}: {e}") return # 1. Get local file path # Note: We are accessing internal storage structure here. # Ideally storage.py should expose a 'get_file_path' or we read the stream. # For efficiency, we'll try to read the file directly if we can, or use storage.get_object # We need the file content. # Since ObjectStorage is filesystem based, let's get the stream. # We need to be careful about closing it. try: path = self.storage.get_object_path(bucket_name, object_key) except StorageError: return metadata = self.storage.get_object_metadata(bucket_name, object_key) extra_args = {} if metadata: extra_args["Metadata"] = metadata # Guess content type to prevent corruption/wrong handling content_type, _ = mimetypes.guess_type(path) file_size = path.stat().st_size logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") try: with path.open("rb") as f: s3.put_object( Bucket=rule.target_bucket, Key=object_key, Body=f, ContentLength=file_size, ContentType=content_type or "application/octet-stream", Metadata=metadata or {} ) except (ClientError, S3UploadFailedError) as e: # Check if it's a NoSuchBucket error (either direct or wrapped) is_no_bucket = False if isinstance(e, ClientError): if e.response['Error']['Code'] == 'NoSuchBucket': is_no_bucket = True elif isinstance(e, S3UploadFailedError): if "NoSuchBucket" in str(e): is_no_bucket = True if is_no_bucket: logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.") try: s3.create_bucket(Bucket=rule.target_bucket) # Retry upload with path.open("rb") as f: s3.put_object( Bucket=rule.target_bucket, Key=object_key, Body=f, ContentLength=file_size, ContentType=content_type or "application/octet-stream", Metadata=metadata or {} ) except Exception as create_err: logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}") raise e # Raise original error else: raise e logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") except (ClientError, OSError, ValueError) as e: logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}") except Exception: logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}")