Test fix multipart failing upload
This commit is contained in:
28
tests/test_boto3_multipart.py
Normal file
28
tests/test_boto3_multipart.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import uuid
|
||||
import pytest
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_boto3_multipart_upload(live_server):
|
||||
bucket_name = f'mp-test-{uuid.uuid4().hex[:8]}'
|
||||
object_key = 'large-file.bin'
|
||||
s3 = boto3.client('s3', endpoint_url=live_server, aws_access_key_id='test', aws_secret_access_key='secret', region_name='us-east-1', use_ssl=False, config=Config(signature_version='s3v4', retries={'max_attempts': 1}, s3={'addressing_style': 'path'}))
|
||||
s3.create_bucket(Bucket=bucket_name)
|
||||
try:
|
||||
response = s3.create_multipart_upload(Bucket=bucket_name, Key=object_key)
|
||||
upload_id = response['UploadId']
|
||||
parts = []
|
||||
part1_data = b'A' * 1024
|
||||
part2_data = b'B' * 1024
|
||||
resp1 = s3.upload_part(Bucket=bucket_name, Key=object_key, PartNumber=1, UploadId=upload_id, Body=part1_data)
|
||||
parts.append({'PartNumber': 1, 'ETag': resp1['ETag']})
|
||||
resp2 = s3.upload_part(Bucket=bucket_name, Key=object_key, PartNumber=2, UploadId=upload_id, Body=part2_data)
|
||||
parts.append({'PartNumber': 2, 'ETag': resp2['ETag']})
|
||||
s3.complete_multipart_upload(Bucket=bucket_name, Key=object_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
|
||||
obj = s3.get_object(Bucket=bucket_name, Key=object_key)
|
||||
content = obj['Body'].read()
|
||||
assert content == part1_data + part2_data
|
||||
s3.delete_object(Bucket=bucket_name, Key=object_key)
|
||||
finally:
|
||||
s3.delete_bucket(Bucket=bucket_name)
|
||||
Reference in New Issue
Block a user