Fix 15 security vulnerabilities across auth, storage, and API modules
This commit is contained in:
@@ -6,6 +6,7 @@ import io
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
@@ -15,6 +16,26 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
if sys.platform != "win32":
|
||||
import fcntl
|
||||
|
||||
|
||||
def _set_secure_file_permissions(file_path: Path) -> None:
|
||||
"""Set restrictive file permissions (owner read/write only)."""
|
||||
if sys.platform == "win32":
|
||||
try:
|
||||
username = os.environ.get("USERNAME", "")
|
||||
if username:
|
||||
subprocess.run(
|
||||
["icacls", str(file_path), "/inheritance:r",
|
||||
"/grant:r", f"{username}:F"],
|
||||
check=True, capture_output=True
|
||||
)
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
else:
|
||||
os.chmod(file_path, 0o600)
|
||||
|
||||
|
||||
class EncryptionError(Exception):
|
||||
"""Raised when encryption/decryption fails."""
|
||||
@@ -103,22 +124,38 @@ class LocalKeyEncryption(EncryptionProvider):
|
||||
return self._master_key
|
||||
|
||||
def _load_or_create_master_key(self) -> bytes:
|
||||
"""Load master key from file or generate a new one."""
|
||||
if self.master_key_path.exists():
|
||||
try:
|
||||
return base64.b64decode(self.master_key_path.read_text().strip())
|
||||
except Exception as exc:
|
||||
raise EncryptionError(f"Failed to load master key: {exc}") from exc
|
||||
|
||||
key = secrets.token_bytes(32)
|
||||
"""Load master key from file or generate a new one (with file locking)."""
|
||||
lock_path = self.master_key_path.with_suffix(".lock")
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
self.master_key_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.master_key_path.write_text(base64.b64encode(key).decode())
|
||||
if sys.platform != "win32":
|
||||
os.chmod(self.master_key_path, 0o600)
|
||||
with open(lock_path, "w") as lock_file:
|
||||
if sys.platform == "win32":
|
||||
import msvcrt
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
|
||||
else:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
|
||||
try:
|
||||
if self.master_key_path.exists():
|
||||
try:
|
||||
return base64.b64decode(self.master_key_path.read_text().strip())
|
||||
except Exception as exc:
|
||||
raise EncryptionError(f"Failed to load master key: {exc}") from exc
|
||||
key = secrets.token_bytes(32)
|
||||
try:
|
||||
self.master_key_path.write_text(base64.b64encode(key).decode())
|
||||
_set_secure_file_permissions(self.master_key_path)
|
||||
except OSError as exc:
|
||||
raise EncryptionError(f"Failed to save master key: {exc}") from exc
|
||||
return key
|
||||
finally:
|
||||
if sys.platform == "win32":
|
||||
import msvcrt
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
else:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
||||
except OSError as exc:
|
||||
raise EncryptionError(f"Failed to save master key: {exc}") from exc
|
||||
return key
|
||||
raise EncryptionError(f"Failed to acquire lock for master key: {exc}") from exc
|
||||
|
||||
def _encrypt_data_key(self, data_key: bytes) -> bytes:
|
||||
"""Encrypt the data key with the master key."""
|
||||
|
||||
Reference in New Issue
Block a user