"""Tests for newly implemented S3 API endpoints.""" import io import pytest from xml.etree.ElementTree import fromstring def _stream(data: bytes): return io.BytesIO(data) @pytest.fixture def storage(app): """Get the storage instance from the app.""" return app.extensions["object_storage"] class TestListObjectsV2: """Tests for ListObjectsV2 endpoint.""" def test_list_objects_v2_basic(self, client, signer, storage): storage.create_bucket("v2-test") storage.put_object("v2-test", "file1.txt", _stream(b"hello")) storage.put_object("v2-test", "file2.txt", _stream(b"world")) storage.put_object("v2-test", "folder/file3.txt", _stream(b"nested")) headers = signer("GET", "/v2-test?list-type=2") resp = client.get("/v2-test", query_string={"list-type": "2"}, headers=headers) assert resp.status_code == 200 root = fromstring(resp.data) assert root.find("KeyCount").text == "3" assert root.find("IsTruncated").text == "false" keys = [el.find("Key").text for el in root.findall("Contents")] assert "file1.txt" in keys assert "file2.txt" in keys assert "folder/file3.txt" in keys def test_list_objects_v2_with_prefix_and_delimiter(self, client, signer, storage): storage.create_bucket("prefix-test") storage.put_object("prefix-test", "photos/2023/jan.jpg", _stream(b"jan")) storage.put_object("prefix-test", "photos/2023/feb.jpg", _stream(b"feb")) storage.put_object("prefix-test", "photos/2024/mar.jpg", _stream(b"mar")) storage.put_object("prefix-test", "docs/readme.md", _stream(b"readme")) headers = signer("GET", "/prefix-test?list-type=2&prefix=photos/&delimiter=/") resp = client.get( "/prefix-test", query_string={"list-type": "2", "prefix": "photos/", "delimiter": "/"}, headers=headers ) assert resp.status_code == 200 root = fromstring(resp.data) prefixes = [el.find("Prefix").text for el in root.findall("CommonPrefixes")] assert "photos/2023/" in prefixes assert "photos/2024/" in prefixes assert len(root.findall("Contents")) == 0 class TestPutBucketVersioning: """Tests for PutBucketVersioning endpoint.""" def test_put_versioning_enabled(self, client, signer, storage): storage.create_bucket("version-test") payload = b""" Enabled """ headers = signer("PUT", "/version-test?versioning", body=payload) resp = client.put("/version-test", query_string={"versioning": ""}, data=payload, headers=headers) assert resp.status_code == 200 headers = signer("GET", "/version-test?versioning") resp = client.get("/version-test", query_string={"versioning": ""}, headers=headers) root = fromstring(resp.data) assert root.find("Status").text == "Enabled" def test_put_versioning_suspended(self, client, signer, storage): storage.create_bucket("suspend-test") storage.set_bucket_versioning("suspend-test", True) payload = b""" Suspended """ headers = signer("PUT", "/suspend-test?versioning", body=payload) resp = client.put("/suspend-test", query_string={"versioning": ""}, data=payload, headers=headers) assert resp.status_code == 200 headers = signer("GET", "/suspend-test?versioning") resp = client.get("/suspend-test", query_string={"versioning": ""}, headers=headers) root = fromstring(resp.data) assert root.find("Status").text == "Suspended" class TestDeleteBucketTagging: """Tests for DeleteBucketTagging endpoint.""" def test_delete_bucket_tags(self, client, signer, storage): storage.create_bucket("tag-delete-test") storage.set_bucket_tags("tag-delete-test", [{"Key": "env", "Value": "test"}]) headers = signer("DELETE", "/tag-delete-test?tagging") resp = client.delete("/tag-delete-test", query_string={"tagging": ""}, headers=headers) assert resp.status_code == 204 headers = signer("GET", "/tag-delete-test?tagging") resp = client.get("/tag-delete-test", query_string={"tagging": ""}, headers=headers) assert resp.status_code == 404 class TestDeleteBucketCors: """Tests for DeleteBucketCors endpoint.""" def test_delete_bucket_cors(self, client, signer, storage): storage.create_bucket("cors-delete-test") storage.set_bucket_cors("cors-delete-test", [ {"AllowedOrigins": ["*"], "AllowedMethods": ["GET"]} ]) headers = signer("DELETE", "/cors-delete-test?cors") resp = client.delete("/cors-delete-test", query_string={"cors": ""}, headers=headers) assert resp.status_code == 204 headers = signer("GET", "/cors-delete-test?cors") resp = client.get("/cors-delete-test", query_string={"cors": ""}, headers=headers) assert resp.status_code == 404 class TestGetBucketLocation: """Tests for GetBucketLocation endpoint.""" def test_get_bucket_location(self, client, signer, storage): storage.create_bucket("location-test") headers = signer("GET", "/location-test?location") resp = client.get("/location-test", query_string={"location": ""}, headers=headers) assert resp.status_code == 200 root = fromstring(resp.data) assert root.tag == "LocationConstraint" class TestBucketAcl: """Tests for Bucket ACL operations.""" def test_get_bucket_acl(self, client, signer, storage): storage.create_bucket("acl-test") headers = signer("GET", "/acl-test?acl") resp = client.get("/acl-test", query_string={"acl": ""}, headers=headers) assert resp.status_code == 200 root = fromstring(resp.data) assert root.tag == "AccessControlPolicy" assert root.find("Owner/ID") is not None assert root.find(".//Permission").text == "FULL_CONTROL" def test_put_bucket_acl(self, client, signer, storage): storage.create_bucket("acl-put-test") headers = signer("PUT", "/acl-put-test?acl") headers["x-amz-acl"] = "public-read" resp = client.put("/acl-put-test", query_string={"acl": ""}, headers=headers) assert resp.status_code == 200 class TestCopyObject: """Tests for CopyObject operation.""" def test_copy_object_basic(self, client, signer, storage): storage.create_bucket("copy-src") storage.create_bucket("copy-dst") storage.put_object("copy-src", "original.txt", _stream(b"original content")) headers = signer("PUT", "/copy-dst/copied.txt") headers["x-amz-copy-source"] = "/copy-src/original.txt" resp = client.put("/copy-dst/copied.txt", headers=headers) assert resp.status_code == 200 root = fromstring(resp.data) assert root.tag == "CopyObjectResult" assert root.find("ETag") is not None assert root.find("LastModified") is not None path = storage.get_object_path("copy-dst", "copied.txt") assert path.read_bytes() == b"original content" def test_copy_object_with_metadata_replace(self, client, signer, storage): storage.create_bucket("meta-src") storage.create_bucket("meta-dst") storage.put_object("meta-src", "source.txt", _stream(b"data"), metadata={"old": "value"}) headers = signer("PUT", "/meta-dst/target.txt") headers["x-amz-copy-source"] = "/meta-src/source.txt" headers["x-amz-metadata-directive"] = "REPLACE" headers["x-amz-meta-new"] = "metadata" resp = client.put("/meta-dst/target.txt", headers=headers) assert resp.status_code == 200 meta = storage.get_object_metadata("meta-dst", "target.txt") assert "New" in meta or "new" in meta assert "old" not in meta and "Old" not in meta class TestObjectTagging: """Tests for Object tagging operations.""" def test_put_get_delete_object_tags(self, client, signer, storage): storage.create_bucket("obj-tag-test") storage.put_object("obj-tag-test", "tagged.txt", _stream(b"content")) payload = b""" projectdemo envtest """ headers = signer("PUT", "/obj-tag-test/tagged.txt?tagging", body=payload) resp = client.put( "/obj-tag-test/tagged.txt", query_string={"tagging": ""}, data=payload, headers=headers ) assert resp.status_code == 204 headers = signer("GET", "/obj-tag-test/tagged.txt?tagging") resp = client.get("/obj-tag-test/tagged.txt", query_string={"tagging": ""}, headers=headers) assert resp.status_code == 200 root = fromstring(resp.data) tags = {el.find("Key").text: el.find("Value").text for el in root.findall(".//Tag")} assert tags["project"] == "demo" assert tags["env"] == "test" headers = signer("DELETE", "/obj-tag-test/tagged.txt?tagging") resp = client.delete("/obj-tag-test/tagged.txt", query_string={"tagging": ""}, headers=headers) assert resp.status_code == 204 headers = signer("GET", "/obj-tag-test/tagged.txt?tagging") resp = client.get("/obj-tag-test/tagged.txt", query_string={"tagging": ""}, headers=headers) root = fromstring(resp.data) assert len(root.findall(".//Tag")) == 0 def test_object_tags_limit(self, client, signer, storage): storage.create_bucket("tag-limit") storage.put_object("tag-limit", "file.txt", _stream(b"x")) tags = "".join(f"key{i}val{i}" for i in range(11)) payload = f"{tags}".encode() headers = signer("PUT", "/tag-limit/file.txt?tagging", body=payload) resp = client.put( "/tag-limit/file.txt", query_string={"tagging": ""}, data=payload, headers=headers ) assert resp.status_code == 400