Clean up code comments
This commit is contained in:
@@ -21,12 +21,11 @@ class TestLocalKeyEncryption:
|
||||
|
||||
key_path = tmp_path / "keys" / "master.key"
|
||||
provider = LocalKeyEncryption(key_path)
|
||||
|
||||
# Access master key to trigger creation
|
||||
|
||||
key = provider.master_key
|
||||
|
||||
|
||||
assert key_path.exists()
|
||||
assert len(key) == 32 # 256-bit key
|
||||
assert len(key) == 32
|
||||
|
||||
def test_load_existing_master_key(self, tmp_path):
|
||||
"""Test loading an existing master key."""
|
||||
@@ -49,16 +48,14 @@ class TestLocalKeyEncryption:
|
||||
provider = LocalKeyEncryption(key_path)
|
||||
|
||||
plaintext = b"Hello, World! This is a test message."
|
||||
|
||||
# Encrypt
|
||||
|
||||
result = provider.encrypt(plaintext)
|
||||
|
||||
|
||||
assert result.ciphertext != plaintext
|
||||
assert result.key_id == "local"
|
||||
assert len(result.nonce) == 12
|
||||
assert len(result.encrypted_data_key) > 0
|
||||
|
||||
# Decrypt
|
||||
|
||||
decrypted = provider.decrypt(
|
||||
result.ciphertext,
|
||||
result.nonce,
|
||||
@@ -79,12 +76,9 @@ class TestLocalKeyEncryption:
|
||||
|
||||
result1 = provider.encrypt(plaintext)
|
||||
result2 = provider.encrypt(plaintext)
|
||||
|
||||
# Different encrypted data keys
|
||||
|
||||
assert result1.encrypted_data_key != result2.encrypted_data_key
|
||||
# Different nonces
|
||||
assert result1.nonce != result2.nonce
|
||||
# Different ciphertexts
|
||||
assert result1.ciphertext != result2.ciphertext
|
||||
|
||||
def test_generate_data_key(self, tmp_path):
|
||||
@@ -95,30 +89,26 @@ class TestLocalKeyEncryption:
|
||||
provider = LocalKeyEncryption(key_path)
|
||||
|
||||
plaintext_key, encrypted_key = provider.generate_data_key()
|
||||
|
||||
|
||||
assert len(plaintext_key) == 32
|
||||
assert len(encrypted_key) > 32 # nonce + ciphertext + tag
|
||||
|
||||
# Verify we can decrypt the key
|
||||
assert len(encrypted_key) > 32
|
||||
|
||||
decrypted_key = provider._decrypt_data_key(encrypted_key)
|
||||
assert decrypted_key == plaintext_key
|
||||
|
||||
def test_decrypt_with_wrong_key_fails(self, tmp_path):
|
||||
"""Test that decryption fails with wrong master key."""
|
||||
from app.encryption import LocalKeyEncryption, EncryptionError
|
||||
|
||||
# Create two providers with different keys
|
||||
|
||||
key_path1 = tmp_path / "master1.key"
|
||||
key_path2 = tmp_path / "master2.key"
|
||||
|
||||
|
||||
provider1 = LocalKeyEncryption(key_path1)
|
||||
provider2 = LocalKeyEncryption(key_path2)
|
||||
|
||||
# Encrypt with provider1
|
||||
|
||||
plaintext = b"Secret message"
|
||||
result = provider1.encrypt(plaintext)
|
||||
|
||||
# Try to decrypt with provider2
|
||||
|
||||
with pytest.raises(EncryptionError):
|
||||
provider2.decrypt(
|
||||
result.ciphertext,
|
||||
@@ -195,19 +185,16 @@ class TestStreamingEncryptor:
|
||||
key_path = tmp_path / "master.key"
|
||||
provider = LocalKeyEncryption(key_path)
|
||||
encryptor = StreamingEncryptor(provider, chunk_size=1024)
|
||||
|
||||
# Create test data
|
||||
original_data = b"A" * 5000 + b"B" * 5000 + b"C" * 5000 # 15KB
|
||||
|
||||
original_data = b"A" * 5000 + b"B" * 5000 + b"C" * 5000
|
||||
stream = io.BytesIO(original_data)
|
||||
|
||||
# Encrypt
|
||||
|
||||
encrypted_stream, metadata = encryptor.encrypt_stream(stream)
|
||||
encrypted_data = encrypted_stream.read()
|
||||
|
||||
|
||||
assert encrypted_data != original_data
|
||||
assert metadata.algorithm == "AES256"
|
||||
|
||||
# Decrypt
|
||||
|
||||
encrypted_stream = io.BytesIO(encrypted_data)
|
||||
decrypted_stream = encryptor.decrypt_stream(encrypted_stream, metadata)
|
||||
decrypted_data = decrypted_stream.read()
|
||||
@@ -318,8 +305,7 @@ class TestClientEncryptionHelper:
|
||||
assert "key" in key_info
|
||||
assert key_info["algorithm"] == "AES-256-GCM"
|
||||
assert "created_at" in key_info
|
||||
|
||||
# Verify key is 256 bits
|
||||
|
||||
key = base64.b64decode(key_info["key"])
|
||||
assert len(key) == 32
|
||||
|
||||
@@ -424,8 +410,7 @@ class TestKMSManager:
|
||||
|
||||
assert key is not None
|
||||
assert key.key_id == "test-key"
|
||||
|
||||
# Non-existent key
|
||||
|
||||
assert kms.get_key("non-existent") is None
|
||||
|
||||
def test_enable_disable_key(self, tmp_path):
|
||||
@@ -438,15 +423,12 @@ class TestKMSManager:
|
||||
kms = KMSManager(keys_path, master_key_path)
|
||||
|
||||
kms.create_key("Test key", key_id="test-key")
|
||||
|
||||
# Initially enabled
|
||||
|
||||
assert kms.get_key("test-key").enabled
|
||||
|
||||
# Disable
|
||||
|
||||
kms.disable_key("test-key")
|
||||
assert not kms.get_key("test-key").enabled
|
||||
|
||||
# Enable
|
||||
|
||||
kms.enable_key("test-key")
|
||||
assert kms.get_key("test-key").enabled
|
||||
|
||||
@@ -502,12 +484,10 @@ class TestKMSManager:
|
||||
context = {"bucket": "test-bucket", "key": "test-key"}
|
||||
|
||||
ciphertext = kms.encrypt("test-key", plaintext, context)
|
||||
|
||||
# Decrypt with same context succeeds
|
||||
|
||||
decrypted, _ = kms.decrypt(ciphertext, context)
|
||||
assert decrypted == plaintext
|
||||
|
||||
# Decrypt with different context fails
|
||||
|
||||
with pytest.raises(EncryptionError):
|
||||
kms.decrypt(ciphertext, {"different": "context"})
|
||||
|
||||
@@ -526,8 +506,7 @@ class TestKMSManager:
|
||||
|
||||
assert len(plaintext_key) == 32
|
||||
assert len(encrypted_key) > 0
|
||||
|
||||
# Decrypt the encrypted key
|
||||
|
||||
decrypted_key = kms.decrypt_data_key("test-key", encrypted_key)
|
||||
|
||||
assert decrypted_key == plaintext_key
|
||||
@@ -560,14 +539,9 @@ class TestKMSManager:
|
||||
kms.create_key("Key 2", key_id="key-2")
|
||||
|
||||
plaintext = b"Data to re-encrypt"
|
||||
|
||||
# Encrypt with key-1
|
||||
|
||||
ciphertext1 = kms.encrypt("key-1", plaintext)
|
||||
|
||||
# Re-encrypt with key-2
|
||||
ciphertext2 = kms.re_encrypt(ciphertext1, "key-2")
|
||||
|
||||
# Decrypt with key-2
|
||||
decrypted, key_id = kms.decrypt(ciphertext2)
|
||||
|
||||
assert decrypted == plaintext
|
||||
@@ -587,7 +561,7 @@ class TestKMSManager:
|
||||
|
||||
assert len(random1) == 32
|
||||
assert len(random2) == 32
|
||||
assert random1 != random2 # Very unlikely to be equal
|
||||
assert random1 != random2
|
||||
|
||||
def test_keys_persist_across_instances(self, tmp_path):
|
||||
"""Test that keys persist and can be loaded by new instances."""
|
||||
@@ -595,15 +569,13 @@ class TestKMSManager:
|
||||
|
||||
keys_path = tmp_path / "kms_keys.json"
|
||||
master_key_path = tmp_path / "master.key"
|
||||
|
||||
# Create key with first instance
|
||||
|
||||
kms1 = KMSManager(keys_path, master_key_path)
|
||||
kms1.create_key("Test key", key_id="test-key")
|
||||
|
||||
|
||||
plaintext = b"Persistent encryption test"
|
||||
ciphertext = kms1.encrypt("test-key", plaintext)
|
||||
|
||||
# Create new instance and verify key works
|
||||
|
||||
kms2 = KMSManager(keys_path, master_key_path)
|
||||
|
||||
decrypted, key_id = kms2.decrypt(ciphertext)
|
||||
@@ -664,31 +636,27 @@ class TestEncryptedStorage:
|
||||
encryption = EncryptionManager(config)
|
||||
|
||||
encrypted_storage = EncryptedObjectStorage(storage, encryption)
|
||||
|
||||
# Create bucket with encryption config
|
||||
|
||||
storage.create_bucket("test-bucket")
|
||||
storage.set_bucket_encryption("test-bucket", {
|
||||
"Rules": [{"SSEAlgorithm": "AES256"}]
|
||||
})
|
||||
|
||||
# Put object
|
||||
|
||||
original_data = b"This is secret data that should be encrypted"
|
||||
stream = io.BytesIO(original_data)
|
||||
|
||||
|
||||
meta = encrypted_storage.put_object(
|
||||
"test-bucket",
|
||||
"secret.txt",
|
||||
stream,
|
||||
)
|
||||
|
||||
|
||||
assert meta is not None
|
||||
|
||||
# Verify file on disk is encrypted (not plaintext)
|
||||
|
||||
file_path = storage_root / "test-bucket" / "secret.txt"
|
||||
stored_data = file_path.read_bytes()
|
||||
assert stored_data != original_data
|
||||
|
||||
# Get object - should be decrypted
|
||||
|
||||
data, metadata = encrypted_storage.get_object_data("test-bucket", "secret.txt")
|
||||
|
||||
assert data == original_data
|
||||
@@ -711,14 +679,12 @@ class TestEncryptedStorage:
|
||||
encrypted_storage = EncryptedObjectStorage(storage, encryption)
|
||||
|
||||
storage.create_bucket("test-bucket")
|
||||
# No encryption config
|
||||
|
||||
|
||||
original_data = b"Unencrypted data"
|
||||
stream = io.BytesIO(original_data)
|
||||
|
||||
|
||||
encrypted_storage.put_object("test-bucket", "plain.txt", stream)
|
||||
|
||||
# Verify file on disk is NOT encrypted
|
||||
|
||||
file_path = storage_root / "test-bucket" / "plain.txt"
|
||||
stored_data = file_path.read_bytes()
|
||||
assert stored_data == original_data
|
||||
@@ -744,20 +710,17 @@ class TestEncryptedStorage:
|
||||
|
||||
original_data = b"Explicitly encrypted data"
|
||||
stream = io.BytesIO(original_data)
|
||||
|
||||
# Request encryption explicitly
|
||||
|
||||
encrypted_storage.put_object(
|
||||
"test-bucket",
|
||||
"encrypted.txt",
|
||||
stream,
|
||||
server_side_encryption="AES256",
|
||||
)
|
||||
|
||||
# Verify file is encrypted
|
||||
|
||||
file_path = storage_root / "test-bucket" / "encrypted.txt"
|
||||
stored_data = file_path.read_bytes()
|
||||
assert stored_data != original_data
|
||||
|
||||
# Get object - should be decrypted
|
||||
|
||||
data, _ = encrypted_storage.get_object_data("test-bucket", "encrypted.txt")
|
||||
assert data == original_data
|
||||
|
||||
Reference in New Issue
Block a user