Files
MyFSIO/tests/test_security.py

192 lines
7.4 KiB
Python

import hashlib
import hmac
import pytest
from datetime import datetime, timedelta, timezone
from urllib.parse import quote
def _sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(key, date_stamp, region_name, service_name):
k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp)
k_region = _sign(k_date, region_name)
k_service = _sign(k_region, service_name)
k_signing = _sign(k_service, "aws4_request")
return k_signing
def create_signed_headers(
method,
path,
headers=None,
body=None,
access_key="test",
secret_key="secret",
region="us-east-1",
service="s3",
timestamp=None
):
if headers is None:
headers = {}
if timestamp is None:
now = datetime.now(timezone.utc)
else:
now = timestamp
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
date_stamp = now.strftime("%Y%m%d")
headers["X-Amz-Date"] = amz_date
headers["Host"] = "testserver"
canonical_uri = quote(path, safe="/-_.~")
canonical_query_string = ""
canonical_headers = ""
signed_headers_list = []
for k, v in sorted(headers.items(), key=lambda x: x[0].lower()):
canonical_headers += f"{k.lower()}:{v.strip()}\n"
signed_headers_list.append(k.lower())
signed_headers = ";".join(signed_headers_list)
payload_hash = hashlib.sha256(body or b"").hexdigest()
headers["X-Amz-Content-Sha256"] = payload_hash
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
signing_key = _get_signature_key(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
headers["Authorization"] = (
f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
return headers
def test_sigv4_old_date(client):
# Test with a date 20 minutes in the past
old_time = datetime.now(timezone.utc) - timedelta(minutes=20)
headers = create_signed_headers("GET", "/", timestamp=old_time)
response = client.get("/", headers=headers)
assert response.status_code == 403
assert b"Request timestamp too old" in response.data
def test_sigv4_future_date(client):
# Test with a date 20 minutes in the future
future_time = datetime.now(timezone.utc) + timedelta(minutes=20)
headers = create_signed_headers("GET", "/", timestamp=future_time)
response = client.get("/", headers=headers)
assert response.status_code == 403
assert b"Request timestamp too old" in response.data # The error message is the same
def test_path_traversal_in_key(client, signer):
headers = signer("PUT", "/test-bucket")
client.put("/test-bucket", headers=headers)
# Try to upload with .. in key
headers = signer("PUT", "/test-bucket/../secret.txt", body=b"attack")
response = client.put("/test-bucket/../secret.txt", headers=headers, data=b"attack")
# Should be rejected by storage layer or flask routing
# Flask might normalize it before it reaches the app, but if it reaches, it should fail.
# If Flask normalizes /test-bucket/../secret.txt to /secret.txt, then it hits 404 (bucket not found) or 403.
# But we want to test the storage layer check.
# We can try to encode the dots?
# If we use a key that doesn't get normalized by Flask routing easily.
# But wait, the route is /<bucket_name>/<path:object_key>
# If I send /test-bucket/folder/../file.txt, Flask might pass "folder/../file.txt" as object_key?
# Let's try.
headers = signer("PUT", "/test-bucket/folder/../file.txt", body=b"attack")
response = client.put("/test-bucket/folder/../file.txt", headers=headers, data=b"attack")
# If Flask normalizes it, it becomes /test-bucket/file.txt.
# If it doesn't, it hits our check.
# Let's try to call the storage method directly to verify the check works,
# because testing via client depends on Flask's URL handling.
pass
def test_storage_path_traversal(app):
storage = app.extensions["object_storage"]
from app.storage import StorageError, ObjectStorage
from app.encrypted_storage import EncryptedObjectStorage
# Get the underlying ObjectStorage if wrapped
if isinstance(storage, EncryptedObjectStorage):
storage = storage.storage
with pytest.raises(StorageError, match="Object key contains parent directory references"):
storage._sanitize_object_key("folder/../file.txt")
with pytest.raises(StorageError, match="Object key contains parent directory references"):
storage._sanitize_object_key("..")
def test_head_bucket(client, signer):
headers = signer("PUT", "/head-test")
client.put("/head-test", headers=headers)
headers = signer("HEAD", "/head-test")
response = client.head("/head-test", headers=headers)
assert response.status_code == 200
headers = signer("HEAD", "/non-existent")
response = client.head("/non-existent", headers=headers)
assert response.status_code == 404
def test_head_object(client, signer):
headers = signer("PUT", "/head-obj-test")
client.put("/head-obj-test", headers=headers)
headers = signer("PUT", "/head-obj-test/obj", body=b"content")
client.put("/head-obj-test/obj", headers=headers, data=b"content")
headers = signer("HEAD", "/head-obj-test/obj")
response = client.head("/head-obj-test/obj", headers=headers)
assert response.status_code == 200
assert response.headers["ETag"]
assert response.headers["Content-Length"] == "7"
headers = signer("HEAD", "/head-obj-test/missing")
response = client.head("/head-obj-test/missing", headers=headers)
assert response.status_code == 404
def test_list_parts(client, signer):
# Create bucket
headers = signer("PUT", "/multipart-test")
client.put("/multipart-test", headers=headers)
# Initiate multipart upload
headers = signer("POST", "/multipart-test/obj?uploads")
response = client.post("/multipart-test/obj?uploads", headers=headers)
assert response.status_code == 200
from xml.etree.ElementTree import fromstring
upload_id = fromstring(response.data).find("UploadId").text
# Upload part 1
headers = signer("PUT", f"/multipart-test/obj?partNumber=1&uploadId={upload_id}", body=b"part1")
client.put(f"/multipart-test/obj?partNumber=1&uploadId={upload_id}", headers=headers, data=b"part1")
# Upload part 2
headers = signer("PUT", f"/multipart-test/obj?partNumber=2&uploadId={upload_id}", body=b"part2")
client.put(f"/multipart-test/obj?partNumber=2&uploadId={upload_id}", headers=headers, data=b"part2")
# List parts
headers = signer("GET", f"/multipart-test/obj?uploadId={upload_id}")
response = client.get(f"/multipart-test/obj?uploadId={upload_id}", headers=headers)
assert response.status_code == 200
root = fromstring(response.data)
assert root.tag == "ListPartsResult"
parts = root.findall("Part")
assert len(parts) == 2
assert parts[0].find("PartNumber").text == "1"
assert parts[1].find("PartNumber").text == "2"