MyFSIO v0.2.7 Release #19
30
app/kms.py
30
app/kms.py
@@ -160,6 +160,7 @@ class KMSManager:
|
|||||||
self.generate_data_key_max_bytes = generate_data_key_max_bytes
|
self.generate_data_key_max_bytes = generate_data_key_max_bytes
|
||||||
self._keys: Dict[str, KMSKey] = {}
|
self._keys: Dict[str, KMSKey] = {}
|
||||||
self._master_key: bytes | None = None
|
self._master_key: bytes | None = None
|
||||||
|
self._master_aesgcm: AESGCM | None = None
|
||||||
self._loaded = False
|
self._loaded = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -191,6 +192,7 @@ class KMSManager:
|
|||||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
||||||
else:
|
else:
|
||||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
||||||
|
self._master_aesgcm = AESGCM(self._master_key)
|
||||||
return self._master_key
|
return self._master_key
|
||||||
|
|
||||||
def _load_keys(self) -> None:
|
def _load_keys(self) -> None:
|
||||||
@@ -231,18 +233,16 @@ class KMSManager:
|
|||||||
_set_secure_file_permissions(self.keys_path)
|
_set_secure_file_permissions(self.keys_path)
|
||||||
|
|
||||||
def _encrypt_key_material(self, key_material: bytes) -> bytes:
|
def _encrypt_key_material(self, key_material: bytes) -> bytes:
|
||||||
"""Encrypt key material with the master key."""
|
_ = self.master_key
|
||||||
aesgcm = AESGCM(self.master_key)
|
|
||||||
nonce = secrets.token_bytes(12)
|
nonce = secrets.token_bytes(12)
|
||||||
ciphertext = aesgcm.encrypt(nonce, key_material, None)
|
ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None)
|
||||||
return nonce + ciphertext
|
return nonce + ciphertext
|
||||||
|
|
||||||
def _decrypt_key_material(self, encrypted: bytes) -> bytes:
|
def _decrypt_key_material(self, encrypted: bytes) -> bytes:
|
||||||
"""Decrypt key material with the master key."""
|
_ = self.master_key
|
||||||
aesgcm = AESGCM(self.master_key)
|
|
||||||
nonce = encrypted[:12]
|
nonce = encrypted[:12]
|
||||||
ciphertext = encrypted[12:]
|
ciphertext = encrypted[12:]
|
||||||
return aesgcm.decrypt(nonce, ciphertext, None)
|
return self._master_aesgcm.decrypt(nonce, ciphertext, None)
|
||||||
|
|
||||||
def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey:
|
def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey:
|
||||||
"""Create a new KMS key."""
|
"""Create a new KMS key."""
|
||||||
@@ -404,22 +404,6 @@ class KMSManager:
|
|||||||
plaintext, _ = self.decrypt(encrypted_key, context)
|
plaintext, _ = self.decrypt(encrypted_key, context)
|
||||||
return plaintext
|
return plaintext
|
||||||
|
|
||||||
def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider:
|
|
||||||
"""Get an encryption provider for a specific key."""
|
|
||||||
self._load_keys()
|
|
||||||
|
|
||||||
if key_id is None:
|
|
||||||
if not self._keys:
|
|
||||||
key = self.create_key("Default KMS Key")
|
|
||||||
key_id = key.key_id
|
|
||||||
else:
|
|
||||||
key_id = next(iter(self._keys.keys()))
|
|
||||||
|
|
||||||
if key_id not in self._keys:
|
|
||||||
raise EncryptionError(f"Key not found: {key_id}")
|
|
||||||
|
|
||||||
return KMSEncryptionProvider(self, key_id)
|
|
||||||
|
|
||||||
def re_encrypt(self, ciphertext: bytes, destination_key_id: str,
|
def re_encrypt(self, ciphertext: bytes, destination_key_id: str,
|
||||||
source_context: Dict[str, str] | None = None,
|
source_context: Dict[str, str] | None = None,
|
||||||
destination_context: Dict[str, str] | None = None) -> bytes:
|
destination_context: Dict[str, str] | None = None) -> bytes:
|
||||||
|
|||||||
Reference in New Issue
Block a user