From 37d372c617e7a7bcf40bf1e548ccc9ac8458a105 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 22 Nov 2025 22:00:24 +0800 Subject: [PATCH] Add missing CreateMultipartUpload in API --- app/s3_api.py | 152 +++++++++++++++++++++++++++++++++++- tests/test_api_multipart.py | 93 ++++++++++++++++++++++ 2 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 tests/test_api_multipart.py diff --git a/app/s3_api.py b/app/s3_api.py index b080346..31c9142 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1078,12 +1078,23 @@ def bucket_handler(bucket_name: str) -> Response: return _xml_response(root) -@s3_api_bp.route("//", methods=["PUT", "GET", "DELETE", "HEAD"], strict_slashes=False) +@s3_api_bp.route("//", methods=["PUT", "GET", "DELETE", "HEAD", "POST"], strict_slashes=False) @limiter.limit("240 per minute") def object_handler(bucket_name: str, object_key: str): storage = _storage() + # Multipart Uploads + if request.method == "POST": + if "uploads" in request.args: + return _initiate_multipart_upload(bucket_name, object_key) + if "uploadId" in request.args: + return _complete_multipart_upload(bucket_name, object_key) + return _method_not_allowed(["GET", "PUT", "DELETE", "HEAD", "POST"]) + if request.method == "PUT": + if "partNumber" in request.args and "uploadId" in request.args: + return _upload_part(bucket_name, object_key) + _, error = _object_principal("write", bucket_name, object_key) if error: return error @@ -1147,6 +1158,9 @@ def object_handler(bucket_name: str, object_key: str): return response # DELETE + if "uploadId" in request.args: + return _abort_multipart_upload(bucket_name, object_key) + _, error = _object_principal("delete", bucket_name, object_key) if error: return error @@ -1354,3 +1368,139 @@ class AwsChunkedDecoder: self.chunk_remaining = chunk_size return result + + +def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + metadata = _extract_request_metadata() + try: + upload_id = _storage().initiate_multipart_upload( + bucket_name, + object_key, + metadata=metadata or None + ) + except StorageError as exc: + return _error_response("NoSuchBucket", str(exc), 404) + + root = Element("InitiateMultipartUploadResult") + SubElement(root, "Bucket").text = bucket_name + SubElement(root, "Key").text = object_key + SubElement(root, "UploadId").text = upload_id + return _xml_response(root) + + +def _upload_part(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + part_number_str = request.args.get("partNumber") + if not upload_id or not part_number_str: + return _error_response("InvalidArgument", "uploadId and partNumber are required", 400) + + try: + part_number = int(part_number_str) + except ValueError: + return _error_response("InvalidArgument", "partNumber must be an integer", 400) + + stream = request.stream + content_encoding = request.headers.get("Content-Encoding", "").lower() + if "aws-chunked" in content_encoding: + stream = AwsChunkedDecoder(stream) + + try: + etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream) + except StorageError as exc: + if "NoSuchBucket" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + if "Multipart upload not found" in str(exc): + return _error_response("NoSuchUpload", str(exc), 404) + return _error_response("InvalidArgument", str(exc), 400) + + response = Response(status=200) + response.headers["ETag"] = f'"{etag}"' + return response + + +def _complete_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + if not upload_id: + return _error_response("InvalidArgument", "uploadId is required", 400) + + payload = request.get_data(cache=False) or b"" + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + if _strip_ns(root.tag) != "CompleteMultipartUpload": + return _error_response("MalformedXML", "Root element must be CompleteMultipartUpload", 400) + + parts = [] + for part_el in list(root): + if _strip_ns(part_el.tag) != "Part": + continue + part_number_el = part_el.find("{*}PartNumber") + if part_number_el is None: + part_number_el = part_el.find("PartNumber") + + etag_el = part_el.find("{*}ETag") + if etag_el is None: + etag_el = part_el.find("ETag") + + if part_number_el is not None and etag_el is not None: + parts.append({ + "PartNumber": int(part_number_el.text or 0), + "ETag": (etag_el.text or "").strip('"') + }) + + try: + meta = _storage().complete_multipart_upload(bucket_name, upload_id, parts) + except StorageError as exc: + if "NoSuchBucket" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + if "Multipart upload not found" in str(exc): + return _error_response("NoSuchUpload", str(exc), 404) + return _error_response("InvalidPart", str(exc), 400) + + # Trigger replication + user_agent = request.headers.get("User-Agent", "") + if "S3ReplicationAgent" not in user_agent: + _replication_manager().trigger_replication(bucket_name, object_key, action="write") + + root = Element("CompleteMultipartUploadResult") + # Use request.host_url to construct full location + location = f"{request.host_url}{bucket_name}/{object_key}" + SubElement(root, "Location").text = location + SubElement(root, "Bucket").text = bucket_name + SubElement(root, "Key").text = object_key + SubElement(root, "ETag").text = f'"{meta.etag}"' + + return _xml_response(root) + + +def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("delete", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + if not upload_id: + return _error_response("InvalidArgument", "uploadId is required", 400) + + try: + _storage().abort_multipart_upload(bucket_name, upload_id) + except StorageError as exc: + # Abort is idempotent, but if bucket missing... + if "Bucket does not exist" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + + return Response(status=204) diff --git a/tests/test_api_multipart.py b/tests/test_api_multipart.py new file mode 100644 index 0000000..47d6eb3 --- /dev/null +++ b/tests/test_api_multipart.py @@ -0,0 +1,93 @@ +import io +import pytest +from xml.etree.ElementTree import fromstring + +@pytest.fixture +def client(app): + return app.test_client() + +@pytest.fixture +def auth_headers(app): + # Create a test user and return headers + # Using the user defined in conftest.py + return { + "X-Access-Key": "test", + "X-Secret-Key": "secret" + } + +def test_multipart_upload_flow(client, auth_headers): + # 1. Create bucket + client.put("/test-bucket", headers=auth_headers) + + # 2. Initiate Multipart Upload + resp = client.post("/test-bucket/large-file.txt?uploads", headers=auth_headers) + assert resp.status_code == 200 + root = fromstring(resp.data) + upload_id = root.find("UploadId").text + assert upload_id + + # 3. Upload Part 1 + resp = client.put( + f"/test-bucket/large-file.txt?partNumber=1&uploadId={upload_id}", + headers=auth_headers, + data=b"part1" + ) + assert resp.status_code == 200 + etag1 = resp.headers["ETag"] + assert etag1 + + # 4. Upload Part 2 + resp = client.put( + f"/test-bucket/large-file.txt?partNumber=2&uploadId={upload_id}", + headers=auth_headers, + data=b"part2" + ) + assert resp.status_code == 200 + etag2 = resp.headers["ETag"] + assert etag2 + + # 5. Complete Multipart Upload + xml_body = f""" + + + 1 + {etag1} + + + 2 + {etag2} + + + """ + resp = client.post( + f"/test-bucket/large-file.txt?uploadId={upload_id}", + headers=auth_headers, + data=xml_body + ) + assert resp.status_code == 200 + root = fromstring(resp.data) + assert root.find("Key").text == "large-file.txt" + + # 6. Verify object content + resp = client.get("/test-bucket/large-file.txt", headers=auth_headers) + assert resp.status_code == 200 + assert resp.data == b"part1part2" + +def test_abort_multipart_upload(client, auth_headers): + client.put("/abort-bucket", headers=auth_headers) + + # Initiate + resp = client.post("/abort-bucket/file.txt?uploads", headers=auth_headers) + upload_id = fromstring(resp.data).find("UploadId").text + + # Abort + resp = client.delete(f"/abort-bucket/file.txt?uploadId={upload_id}", headers=auth_headers) + assert resp.status_code == 204 + + # Try to upload part (should fail) + resp = client.put( + f"/abort-bucket/file.txt?partNumber=1&uploadId={upload_id}", + headers=auth_headers, + data=b"data" + ) + assert resp.status_code == 404 # NoSuchUpload