Add missing CreateMultipartUpload in API
This commit is contained in:
152
app/s3_api.py
152
app/s3_api.py
@@ -1078,12 +1078,23 @@ def bucket_handler(bucket_name: str) -> Response:
|
||||
return _xml_response(root)
|
||||
|
||||
|
||||
@s3_api_bp.route("/<bucket_name>/<path:object_key>", methods=["PUT", "GET", "DELETE", "HEAD"], strict_slashes=False)
|
||||
@s3_api_bp.route("/<bucket_name>/<path:object_key>", methods=["PUT", "GET", "DELETE", "HEAD", "POST"], strict_slashes=False)
|
||||
@limiter.limit("240 per minute")
|
||||
def object_handler(bucket_name: str, object_key: str):
|
||||
storage = _storage()
|
||||
|
||||
# Multipart Uploads
|
||||
if request.method == "POST":
|
||||
if "uploads" in request.args:
|
||||
return _initiate_multipart_upload(bucket_name, object_key)
|
||||
if "uploadId" in request.args:
|
||||
return _complete_multipart_upload(bucket_name, object_key)
|
||||
return _method_not_allowed(["GET", "PUT", "DELETE", "HEAD", "POST"])
|
||||
|
||||
if request.method == "PUT":
|
||||
if "partNumber" in request.args and "uploadId" in request.args:
|
||||
return _upload_part(bucket_name, object_key)
|
||||
|
||||
_, error = _object_principal("write", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
@@ -1147,6 +1158,9 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
return response
|
||||
|
||||
# DELETE
|
||||
if "uploadId" in request.args:
|
||||
return _abort_multipart_upload(bucket_name, object_key)
|
||||
|
||||
_, error = _object_principal("delete", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
@@ -1354,3 +1368,139 @@ class AwsChunkedDecoder:
|
||||
self.chunk_remaining = chunk_size
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response:
|
||||
principal, error = _object_principal("write", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
metadata = _extract_request_metadata()
|
||||
try:
|
||||
upload_id = _storage().initiate_multipart_upload(
|
||||
bucket_name,
|
||||
object_key,
|
||||
metadata=metadata or None
|
||||
)
|
||||
except StorageError as exc:
|
||||
return _error_response("NoSuchBucket", str(exc), 404)
|
||||
|
||||
root = Element("InitiateMultipartUploadResult")
|
||||
SubElement(root, "Bucket").text = bucket_name
|
||||
SubElement(root, "Key").text = object_key
|
||||
SubElement(root, "UploadId").text = upload_id
|
||||
return _xml_response(root)
|
||||
|
||||
|
||||
def _upload_part(bucket_name: str, object_key: str) -> Response:
|
||||
principal, error = _object_principal("write", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
upload_id = request.args.get("uploadId")
|
||||
part_number_str = request.args.get("partNumber")
|
||||
if not upload_id or not part_number_str:
|
||||
return _error_response("InvalidArgument", "uploadId and partNumber are required", 400)
|
||||
|
||||
try:
|
||||
part_number = int(part_number_str)
|
||||
except ValueError:
|
||||
return _error_response("InvalidArgument", "partNumber must be an integer", 400)
|
||||
|
||||
stream = request.stream
|
||||
content_encoding = request.headers.get("Content-Encoding", "").lower()
|
||||
if "aws-chunked" in content_encoding:
|
||||
stream = AwsChunkedDecoder(stream)
|
||||
|
||||
try:
|
||||
etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream)
|
||||
except StorageError as exc:
|
||||
if "NoSuchBucket" in str(exc):
|
||||
return _error_response("NoSuchBucket", str(exc), 404)
|
||||
if "Multipart upload not found" in str(exc):
|
||||
return _error_response("NoSuchUpload", str(exc), 404)
|
||||
return _error_response("InvalidArgument", str(exc), 400)
|
||||
|
||||
response = Response(status=200)
|
||||
response.headers["ETag"] = f'"{etag}"'
|
||||
return response
|
||||
|
||||
|
||||
def _complete_multipart_upload(bucket_name: str, object_key: str) -> Response:
|
||||
principal, error = _object_principal("write", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
upload_id = request.args.get("uploadId")
|
||||
if not upload_id:
|
||||
return _error_response("InvalidArgument", "uploadId is required", 400)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
if _strip_ns(root.tag) != "CompleteMultipartUpload":
|
||||
return _error_response("MalformedXML", "Root element must be CompleteMultipartUpload", 400)
|
||||
|
||||
parts = []
|
||||
for part_el in list(root):
|
||||
if _strip_ns(part_el.tag) != "Part":
|
||||
continue
|
||||
part_number_el = part_el.find("{*}PartNumber")
|
||||
if part_number_el is None:
|
||||
part_number_el = part_el.find("PartNumber")
|
||||
|
||||
etag_el = part_el.find("{*}ETag")
|
||||
if etag_el is None:
|
||||
etag_el = part_el.find("ETag")
|
||||
|
||||
if part_number_el is not None and etag_el is not None:
|
||||
parts.append({
|
||||
"PartNumber": int(part_number_el.text or 0),
|
||||
"ETag": (etag_el.text or "").strip('"')
|
||||
})
|
||||
|
||||
try:
|
||||
meta = _storage().complete_multipart_upload(bucket_name, upload_id, parts)
|
||||
except StorageError as exc:
|
||||
if "NoSuchBucket" in str(exc):
|
||||
return _error_response("NoSuchBucket", str(exc), 404)
|
||||
if "Multipart upload not found" in str(exc):
|
||||
return _error_response("NoSuchUpload", str(exc), 404)
|
||||
return _error_response("InvalidPart", str(exc), 400)
|
||||
|
||||
# Trigger replication
|
||||
user_agent = request.headers.get("User-Agent", "")
|
||||
if "S3ReplicationAgent" not in user_agent:
|
||||
_replication_manager().trigger_replication(bucket_name, object_key, action="write")
|
||||
|
||||
root = Element("CompleteMultipartUploadResult")
|
||||
# Use request.host_url to construct full location
|
||||
location = f"{request.host_url}{bucket_name}/{object_key}"
|
||||
SubElement(root, "Location").text = location
|
||||
SubElement(root, "Bucket").text = bucket_name
|
||||
SubElement(root, "Key").text = object_key
|
||||
SubElement(root, "ETag").text = f'"{meta.etag}"'
|
||||
|
||||
return _xml_response(root)
|
||||
|
||||
|
||||
def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response:
|
||||
principal, error = _object_principal("delete", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
upload_id = request.args.get("uploadId")
|
||||
if not upload_id:
|
||||
return _error_response("InvalidArgument", "uploadId is required", 400)
|
||||
|
||||
try:
|
||||
_storage().abort_multipart_upload(bucket_name, upload_id)
|
||||
except StorageError as exc:
|
||||
# Abort is idempotent, but if bucket missing...
|
||||
if "Bucket does not exist" in str(exc):
|
||||
return _error_response("NoSuchBucket", str(exc), 404)
|
||||
|
||||
return Response(status=204)
|
||||
|
||||
93
tests/test_api_multipart.py
Normal file
93
tests/test_api_multipart.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import io
|
||||
import pytest
|
||||
from xml.etree.ElementTree import fromstring
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
return app.test_client()
|
||||
|
||||
@pytest.fixture
|
||||
def auth_headers(app):
|
||||
# Create a test user and return headers
|
||||
# Using the user defined in conftest.py
|
||||
return {
|
||||
"X-Access-Key": "test",
|
||||
"X-Secret-Key": "secret"
|
||||
}
|
||||
|
||||
def test_multipart_upload_flow(client, auth_headers):
|
||||
# 1. Create bucket
|
||||
client.put("/test-bucket", headers=auth_headers)
|
||||
|
||||
# 2. Initiate Multipart Upload
|
||||
resp = client.post("/test-bucket/large-file.txt?uploads", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
root = fromstring(resp.data)
|
||||
upload_id = root.find("UploadId").text
|
||||
assert upload_id
|
||||
|
||||
# 3. Upload Part 1
|
||||
resp = client.put(
|
||||
f"/test-bucket/large-file.txt?partNumber=1&uploadId={upload_id}",
|
||||
headers=auth_headers,
|
||||
data=b"part1"
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
etag1 = resp.headers["ETag"]
|
||||
assert etag1
|
||||
|
||||
# 4. Upload Part 2
|
||||
resp = client.put(
|
||||
f"/test-bucket/large-file.txt?partNumber=2&uploadId={upload_id}",
|
||||
headers=auth_headers,
|
||||
data=b"part2"
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
etag2 = resp.headers["ETag"]
|
||||
assert etag2
|
||||
|
||||
# 5. Complete Multipart Upload
|
||||
xml_body = f"""
|
||||
<CompleteMultipartUpload>
|
||||
<Part>
|
||||
<PartNumber>1</PartNumber>
|
||||
<ETag>{etag1}</ETag>
|
||||
</Part>
|
||||
<Part>
|
||||
<PartNumber>2</PartNumber>
|
||||
<ETag>{etag2}</ETag>
|
||||
</Part>
|
||||
</CompleteMultipartUpload>
|
||||
"""
|
||||
resp = client.post(
|
||||
f"/test-bucket/large-file.txt?uploadId={upload_id}",
|
||||
headers=auth_headers,
|
||||
data=xml_body
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
root = fromstring(resp.data)
|
||||
assert root.find("Key").text == "large-file.txt"
|
||||
|
||||
# 6. Verify object content
|
||||
resp = client.get("/test-bucket/large-file.txt", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
assert resp.data == b"part1part2"
|
||||
|
||||
def test_abort_multipart_upload(client, auth_headers):
|
||||
client.put("/abort-bucket", headers=auth_headers)
|
||||
|
||||
# Initiate
|
||||
resp = client.post("/abort-bucket/file.txt?uploads", headers=auth_headers)
|
||||
upload_id = fromstring(resp.data).find("UploadId").text
|
||||
|
||||
# Abort
|
||||
resp = client.delete(f"/abort-bucket/file.txt?uploadId={upload_id}", headers=auth_headers)
|
||||
assert resp.status_code == 204
|
||||
|
||||
# Try to upload part (should fail)
|
||||
resp = client.put(
|
||||
f"/abort-bucket/file.txt?partNumber=1&uploadId={upload_id}",
|
||||
headers=auth_headers,
|
||||
data=b"data"
|
||||
)
|
||||
assert resp.status_code == 404 # NoSuchUpload
|
||||
Reference in New Issue
Block a user