diff --git a/app/replication.py b/app/replication.py index a48296a..9361ca2 100644 --- a/app/replication.py +++ b/app/replication.py @@ -308,7 +308,8 @@ class ReplicationManager: logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") - try: + def do_put_object() -> None: + """Helper to upload object.""" with path.open("rb") as f: s3.put_object( Bucket=rule.target_bucket, @@ -318,32 +319,37 @@ class ReplicationManager: ContentType=content_type or "application/octet-stream", Metadata=metadata or {} ) + + try: + do_put_object() except (ClientError, S3UploadFailedError) as e: - is_no_bucket = False + error_code = None if isinstance(e, ClientError): - if e.response['Error']['Code'] == 'NoSuchBucket': - is_no_bucket = True + error_code = e.response['Error']['Code'] elif isinstance(e, S3UploadFailedError): if "NoSuchBucket" in str(e): - is_no_bucket = True + error_code = 'NoSuchBucket' - if is_no_bucket: + # Handle NoSuchBucket - create bucket and retry + if error_code == 'NoSuchBucket': logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.") + bucket_ready = False try: s3.create_bucket(Bucket=rule.target_bucket) - # Retry upload - with path.open("rb") as f: - s3.put_object( - Bucket=rule.target_bucket, - Key=object_key, - Body=f, - ContentLength=file_size, - ContentType=content_type or "application/octet-stream", - Metadata=metadata or {} - ) - except Exception as create_err: - logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}") - raise e # Raise original error + bucket_ready = True + logger.info(f"Created target bucket {rule.target_bucket}") + except ClientError as bucket_err: + # BucketAlreadyExists or BucketAlreadyOwnedByYou means another thread created it - that's OK! + if bucket_err.response['Error']['Code'] in ('BucketAlreadyExists', 'BucketAlreadyOwnedByYou'): + logger.debug(f"Bucket {rule.target_bucket} already exists (created by another thread)") + bucket_ready = True + else: + logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}") + raise e # Raise original NoSuchBucket error + + if bucket_ready: + # Retry the upload now that bucket exists + do_put_object() else: raise e @@ -354,3 +360,4 @@ class ReplicationManager: logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}") except Exception: logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}") +