Fix 17 security vulnerabilities across encryption, auth, and API modules

This commit is contained in:
2026-01-29 12:05:35 +08:00
parent ae26d22388
commit 0ea54457e8
6 changed files with 146 additions and 62 deletions

View File

@@ -2,7 +2,10 @@ from __future__ import annotations
import base64
import json
import logging
import os
import secrets
import sys
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
@@ -13,6 +16,8 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from .encryption import EncryptionError, EncryptionProvider, EncryptionResult
logger = logging.getLogger(__name__)
@dataclass
class KMSKey:
@@ -74,11 +79,11 @@ class KMSEncryptionProvider(EncryptionProvider):
def encrypt(self, plaintext: bytes, context: Dict[str, str] | None = None) -> EncryptionResult:
"""Encrypt data using envelope encryption with KMS."""
data_key, encrypted_data_key = self.generate_data_key()
aesgcm = AESGCM(data_key)
nonce = secrets.token_bytes(12)
ciphertext = aesgcm.encrypt(nonce, plaintext,
json.dumps(context).encode() if context else None)
ciphertext = aesgcm.encrypt(nonce, plaintext,
json.dumps(context, sort_keys=True).encode() if context else None)
return EncryptionResult(
ciphertext=ciphertext,
@@ -90,15 +95,17 @@ class KMSEncryptionProvider(EncryptionProvider):
def decrypt(self, ciphertext: bytes, nonce: bytes, encrypted_data_key: bytes,
key_id: str, context: Dict[str, str] | None = None) -> bytes:
"""Decrypt data using envelope encryption with KMS."""
# Note: Data key is encrypted without context (AAD), so we decrypt without context
data_key = self.kms.decrypt_data_key(key_id, encrypted_data_key, context=None)
if len(data_key) != 32:
raise EncryptionError("Invalid data key size")
aesgcm = AESGCM(data_key)
try:
return aesgcm.decrypt(nonce, ciphertext,
json.dumps(context).encode() if context else None)
json.dumps(context, sort_keys=True).encode() if context else None)
except Exception as exc:
raise EncryptionError(f"Failed to decrypt data: {exc}") from exc
logger.debug("KMS decryption failed: %s", exc)
raise EncryptionError("Failed to decrypt data") from exc
class KMSManager:
@@ -137,6 +144,8 @@ class KMSManager:
self.master_key_path.write_text(
base64.b64encode(self._master_key).decode()
)
if sys.platform != "win32":
os.chmod(self.master_key_path, 0o600)
return self._master_key
def _load_keys(self) -> None:
@@ -153,8 +162,10 @@ class KMSManager:
encrypted = base64.b64decode(key_data["EncryptedKeyMaterial"])
key.key_material = self._decrypt_key_material(encrypted)
self._keys[key.key_id] = key
except Exception:
pass
except json.JSONDecodeError as exc:
logger.error("Failed to parse KMS keys file: %s", exc)
except (ValueError, KeyError) as exc:
logger.error("Invalid KMS key data: %s", exc)
self._loaded = True
@@ -277,7 +288,7 @@ class KMSManager:
aesgcm = AESGCM(key.key_material)
nonce = secrets.token_bytes(12)
aad = json.dumps(context).encode() if context else None
aad = json.dumps(context, sort_keys=True).encode() if context else None
ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
key_id_bytes = key_id.encode("utf-8")
@@ -306,17 +317,24 @@ class KMSManager:
encrypted = rest[12:]
aesgcm = AESGCM(key.key_material)
aad = json.dumps(context).encode() if context else None
aad = json.dumps(context, sort_keys=True).encode() if context else None
try:
plaintext = aesgcm.decrypt(nonce, encrypted, aad)
return plaintext, key_id
except Exception as exc:
raise EncryptionError(f"Decryption failed: {exc}") from exc
logger.debug("KMS decrypt operation failed: %s", exc)
raise EncryptionError("Decryption failed") from exc
def generate_data_key(self, key_id: str,
context: Dict[str, str] | None = None) -> tuple[bytes, bytes]:
context: Dict[str, str] | None = None,
key_spec: str = "AES_256") -> tuple[bytes, bytes]:
"""Generate a data key and return both plaintext and encrypted versions.
Args:
key_id: The KMS key ID to use for encryption
context: Optional encryption context
key_spec: Key specification - AES_128 or AES_256 (default)
Returns:
Tuple of (plaintext_key, encrypted_key)
"""
@@ -326,11 +344,12 @@ class KMSManager:
raise EncryptionError(f"Key not found: {key_id}")
if not key.enabled:
raise EncryptionError(f"Key is disabled: {key_id}")
plaintext_key = secrets.token_bytes(32)
key_bytes = 32 if key_spec == "AES_256" else 16
plaintext_key = secrets.token_bytes(key_bytes)
encrypted_key = self.encrypt(key_id, plaintext_key, context)
return plaintext_key, encrypted_key
def decrypt_data_key(self, key_id: str, encrypted_key: bytes,