Fix race condition in replication
This commit is contained in:
@@ -308,7 +308,8 @@ class ReplicationManager:
|
|||||||
|
|
||||||
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
|
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
|
||||||
|
|
||||||
try:
|
def do_put_object() -> None:
|
||||||
|
"""Helper to upload object."""
|
||||||
with path.open("rb") as f:
|
with path.open("rb") as f:
|
||||||
s3.put_object(
|
s3.put_object(
|
||||||
Bucket=rule.target_bucket,
|
Bucket=rule.target_bucket,
|
||||||
@@ -318,32 +319,37 @@ class ReplicationManager:
|
|||||||
ContentType=content_type or "application/octet-stream",
|
ContentType=content_type or "application/octet-stream",
|
||||||
Metadata=metadata or {}
|
Metadata=metadata or {}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
do_put_object()
|
||||||
except (ClientError, S3UploadFailedError) as e:
|
except (ClientError, S3UploadFailedError) as e:
|
||||||
is_no_bucket = False
|
error_code = None
|
||||||
if isinstance(e, ClientError):
|
if isinstance(e, ClientError):
|
||||||
if e.response['Error']['Code'] == 'NoSuchBucket':
|
error_code = e.response['Error']['Code']
|
||||||
is_no_bucket = True
|
|
||||||
elif isinstance(e, S3UploadFailedError):
|
elif isinstance(e, S3UploadFailedError):
|
||||||
if "NoSuchBucket" in str(e):
|
if "NoSuchBucket" in str(e):
|
||||||
is_no_bucket = True
|
error_code = 'NoSuchBucket'
|
||||||
|
|
||||||
if is_no_bucket:
|
# Handle NoSuchBucket - create bucket and retry
|
||||||
|
if error_code == 'NoSuchBucket':
|
||||||
logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.")
|
logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.")
|
||||||
|
bucket_ready = False
|
||||||
try:
|
try:
|
||||||
s3.create_bucket(Bucket=rule.target_bucket)
|
s3.create_bucket(Bucket=rule.target_bucket)
|
||||||
# Retry upload
|
bucket_ready = True
|
||||||
with path.open("rb") as f:
|
logger.info(f"Created target bucket {rule.target_bucket}")
|
||||||
s3.put_object(
|
except ClientError as bucket_err:
|
||||||
Bucket=rule.target_bucket,
|
# BucketAlreadyExists or BucketAlreadyOwnedByYou means another thread created it - that's OK!
|
||||||
Key=object_key,
|
if bucket_err.response['Error']['Code'] in ('BucketAlreadyExists', 'BucketAlreadyOwnedByYou'):
|
||||||
Body=f,
|
logger.debug(f"Bucket {rule.target_bucket} already exists (created by another thread)")
|
||||||
ContentLength=file_size,
|
bucket_ready = True
|
||||||
ContentType=content_type or "application/octet-stream",
|
else:
|
||||||
Metadata=metadata or {}
|
logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}")
|
||||||
)
|
raise e # Raise original NoSuchBucket error
|
||||||
except Exception as create_err:
|
|
||||||
logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}")
|
if bucket_ready:
|
||||||
raise e # Raise original error
|
# Retry the upload now that bucket exists
|
||||||
|
do_put_object()
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
@@ -354,3 +360,4 @@ class ReplicationManager:
|
|||||||
logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}")
|
logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}")
|
logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user