From 6e6d6d32bf06dbbe77c5cbec3d27c72661f8d8ae Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 9 Feb 2026 17:01:19 +0800 Subject: [PATCH] Optimize KMS: cache AESGCM instance, remove duplicate get_provider --- app/kms.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/app/kms.py b/app/kms.py index dbd07e0..3ac02b8 100644 --- a/app/kms.py +++ b/app/kms.py @@ -160,6 +160,7 @@ class KMSManager: self.generate_data_key_max_bytes = generate_data_key_max_bytes self._keys: Dict[str, KMSKey] = {} self._master_key: bytes | None = None + self._master_aesgcm: AESGCM | None = None self._loaded = False @property @@ -191,6 +192,7 @@ class KMSManager: msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) else: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + self._master_aesgcm = AESGCM(self._master_key) return self._master_key def _load_keys(self) -> None: @@ -231,18 +233,16 @@ class KMSManager: _set_secure_file_permissions(self.keys_path) def _encrypt_key_material(self, key_material: bytes) -> bytes: - """Encrypt key material with the master key.""" - aesgcm = AESGCM(self.master_key) + _ = self.master_key nonce = secrets.token_bytes(12) - ciphertext = aesgcm.encrypt(nonce, key_material, None) + ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None) return nonce + ciphertext - + def _decrypt_key_material(self, encrypted: bytes) -> bytes: - """Decrypt key material with the master key.""" - aesgcm = AESGCM(self.master_key) + _ = self.master_key nonce = encrypted[:12] ciphertext = encrypted[12:] - return aesgcm.decrypt(nonce, ciphertext, None) + return self._master_aesgcm.decrypt(nonce, ciphertext, None) def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey: """Create a new KMS key.""" @@ -404,22 +404,6 @@ class KMSManager: plaintext, _ = self.decrypt(encrypted_key, context) return plaintext - def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider: - """Get an encryption provider for a specific key.""" - self._load_keys() - - if key_id is None: - if not self._keys: - key = self.create_key("Default KMS Key") - key_id = key.key_id - else: - key_id = next(iter(self._keys.keys())) - - if key_id not in self._keys: - raise EncryptionError(f"Key not found: {key_id}") - - return KMSEncryptionProvider(self, key_id) - def re_encrypt(self, ciphertext: bytes, destination_key_id: str, source_context: Dict[str, str] | None = None, destination_context: Dict[str, str] | None = None) -> bytes: