Add credential expiry support: per-user expires_at with UI management, presets, and badge indicators; Add credential expiry support: per-user expires_at with UI management, presets, and badge indicators; Fix IAM card dropdown clipped by overflow: remove gradient bar, allow overflow visible

This commit is contained in:
2026-03-08 13:08:57 +08:00
parent 72f5d9d70c
commit 03353a0aec
8 changed files with 553 additions and 55 deletions

View File

@@ -130,6 +130,7 @@ def create_app(
Path(app.config["IAM_CONFIG"]),
auth_max_attempts=app.config.get("AUTH_MAX_ATTEMPTS", 5),
auth_lockout_minutes=app.config.get("AUTH_LOCKOUT_MINUTES", 15),
encryption_key=app.config.get("SECRET_KEY"),
)
bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"]))
secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300))

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import base64
import hashlib
import hmac
import json
@@ -14,6 +15,8 @@ from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple
from cryptography.fernet import Fernet, InvalidToken
class IamError(RuntimeError):
"""Raised when authentication or authorization fails."""
@@ -107,13 +110,24 @@ class Principal:
policies: List[Policy]
def _derive_fernet_key(secret: str) -> bytes:
raw = hashlib.pbkdf2_hmac("sha256", secret.encode(), b"myfsio-iam-encryption", 100_000)
return base64.urlsafe_b64encode(raw)
_IAM_ENCRYPTED_PREFIX = b"MYFSIO_IAM_ENC:"
class IamService:
"""Loads IAM configuration, manages users, and evaluates policies."""
def __init__(self, config_path: Path, auth_max_attempts: int = 5, auth_lockout_minutes: int = 15) -> None:
def __init__(self, config_path: Path, auth_max_attempts: int = 5, auth_lockout_minutes: int = 15, encryption_key: str | None = None) -> None:
self.config_path = Path(config_path)
self.auth_max_attempts = auth_max_attempts
self.auth_lockout_window = timedelta(minutes=auth_lockout_minutes)
self._fernet: Fernet | None = None
if encryption_key:
self._fernet = Fernet(_derive_fernet_key(encryption_key))
self.config_path.parent.mkdir(parents=True, exist_ok=True)
if not self.config_path.exists():
self._write_default()
@@ -145,6 +159,19 @@ class IamService:
except OSError:
pass
def _check_expiry(self, access_key: str, record: Dict[str, Any]) -> None:
expires_at = record.get("expires_at")
if not expires_at:
return
try:
exp_dt = datetime.fromisoformat(expires_at)
if exp_dt.tzinfo is None:
exp_dt = exp_dt.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) >= exp_dt:
raise IamError(f"Credentials for '{access_key}' have expired")
except (ValueError, TypeError):
pass
def authenticate(self, access_key: str, secret_key: str) -> Principal:
self._maybe_reload()
access_key = (access_key or "").strip()
@@ -161,6 +188,7 @@ class IamService:
if not record or not hmac.compare_digest(stored_secret, secret_key):
self._record_failed_attempt(access_key)
raise IamError("Invalid credentials")
self._check_expiry(access_key, record)
self._clear_failed_attempts(access_key)
return self._build_principal(access_key, record)
@@ -288,12 +316,16 @@ class IamService:
if cached:
principal, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
return principal
self._maybe_reload()
record = self._users.get(access_key)
if not record:
raise IamError("Unknown access key")
self._check_expiry(access_key, record)
principal = self._build_principal(access_key, record)
self._principal_cache[access_key] = (principal, now)
return principal
@@ -303,6 +335,7 @@ class IamService:
record = self._users.get(access_key)
if not record:
raise IamError("Unknown access key")
self._check_expiry(access_key, record)
return record["secret_key"]
def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None:
@@ -347,6 +380,7 @@ class IamService:
{
"access_key": access_key,
"display_name": record["display_name"],
"expires_at": record.get("expires_at"),
"policies": [
{"bucket": policy.bucket, "actions": sorted(policy.actions)}
for policy in record["policies"]
@@ -362,20 +396,25 @@ class IamService:
policies: Optional[Sequence[Dict[str, Any]]] = None,
access_key: str | None = None,
secret_key: str | None = None,
expires_at: str | None = None,
) -> Dict[str, str]:
access_key = (access_key or self._generate_access_key()).strip()
if not access_key:
raise IamError("Access key cannot be empty")
if access_key in self._users:
raise IamError("Access key already exists")
if expires_at:
self._validate_expires_at(expires_at)
secret_key = secret_key or self._generate_secret_key()
sanitized_policies = self._prepare_policy_payload(policies)
record = {
record: Dict[str, Any] = {
"access_key": access_key,
"secret_key": secret_key,
"display_name": display_name or access_key,
"policies": sanitized_policies,
}
if expires_at:
record["expires_at"] = expires_at
self._raw_config.setdefault("users", []).append(record)
self._save()
self._load()
@@ -414,17 +453,43 @@ class IamService:
clear_signing_key_cache()
self._load()
def update_user_expiry(self, access_key: str, expires_at: str | None) -> None:
user = self._get_raw_user(access_key)
if expires_at:
self._validate_expires_at(expires_at)
user["expires_at"] = expires_at
else:
user.pop("expires_at", None)
self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
self._load()
def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None:
user = self._get_raw_user(access_key)
user["policies"] = self._prepare_policy_payload(policies)
self._save()
self._load()
def _decrypt_content(self, raw_bytes: bytes) -> str:
if raw_bytes.startswith(_IAM_ENCRYPTED_PREFIX):
if not self._fernet:
raise IamError("IAM config is encrypted but no encryption key provided. Set SECRET_KEY or use 'python run.py reset-cred'.")
try:
encrypted_data = raw_bytes[len(_IAM_ENCRYPTED_PREFIX):]
return self._fernet.decrypt(encrypted_data).decode("utf-8")
except InvalidToken:
raise IamError("Cannot decrypt IAM config. SECRET_KEY may have changed. Use 'python run.py reset-cred' to reset credentials.")
return raw_bytes.decode("utf-8")
def _load(self) -> None:
try:
self._last_load_time = self.config_path.stat().st_mtime
content = self.config_path.read_text(encoding='utf-8')
raw_bytes = self.config_path.read_bytes()
content = self._decrypt_content(raw_bytes)
raw = json.loads(content)
except IamError:
raise
except FileNotFoundError:
raise IamError(f"IAM config not found: {self.config_path}")
except json.JSONDecodeError as e:
@@ -433,34 +498,48 @@ class IamService:
raise IamError(f"Cannot read IAM config (permission denied): {e}")
except (OSError, ValueError) as e:
raise IamError(f"Failed to load IAM config: {e}")
was_plaintext = not raw_bytes.startswith(_IAM_ENCRYPTED_PREFIX)
users: Dict[str, Dict[str, Any]] = {}
for user in raw.get("users", []):
policies = self._build_policy_objects(user.get("policies", []))
users[user["access_key"]] = {
user_record: Dict[str, Any] = {
"secret_key": user["secret_key"],
"display_name": user.get("display_name", user["access_key"]),
"policies": policies,
}
if user.get("expires_at"):
user_record["expires_at"] = user["expires_at"]
users[user["access_key"]] = user_record
if not users:
raise IamError("IAM configuration contains no users")
self._users = users
self._raw_config = {
"users": [
{
"access_key": entry["access_key"],
"secret_key": entry["secret_key"],
"display_name": entry.get("display_name", entry["access_key"]),
"policies": entry.get("policies", []),
}
for entry in raw.get("users", [])
]
}
raw_users: List[Dict[str, Any]] = []
for entry in raw.get("users", []):
raw_entry: Dict[str, Any] = {
"access_key": entry["access_key"],
"secret_key": entry["secret_key"],
"display_name": entry.get("display_name", entry["access_key"]),
"policies": entry.get("policies", []),
}
if entry.get("expires_at"):
raw_entry["expires_at"] = entry["expires_at"]
raw_users.append(raw_entry)
self._raw_config = {"users": raw_users}
if was_plaintext and self._fernet:
self._save()
def _save(self) -> None:
try:
json_text = json.dumps(self._raw_config, indent=2)
temp_path = self.config_path.with_suffix('.json.tmp')
temp_path.write_text(json.dumps(self._raw_config, indent=2), encoding='utf-8')
if self._fernet:
encrypted = self._fernet.encrypt(json_text.encode("utf-8"))
temp_path.write_bytes(_IAM_ENCRYPTED_PREFIX + encrypted)
else:
temp_path.write_text(json_text, encoding='utf-8')
temp_path.replace(self.config_path)
except (OSError, PermissionError) as e:
raise IamError(f"Cannot save IAM config: {e}")
@@ -475,9 +554,14 @@ class IamService:
def export_config(self, mask_secrets: bool = True) -> Dict[str, Any]:
payload: Dict[str, Any] = {"users": []}
for user in self._raw_config.get("users", []):
record = dict(user)
if mask_secrets and "secret_key" in record:
record["secret_key"] = "••••••••••"
record: Dict[str, Any] = {
"access_key": user["access_key"],
"secret_key": "••••••••••" if mask_secrets else user["secret_key"],
"display_name": user["display_name"],
"policies": user["policies"],
}
if user.get("expires_at"):
record["expires_at"] = user["expires_at"]
payload["users"].append(record)
return payload
@@ -546,8 +630,9 @@ class IamService:
return candidate if candidate in ALLOWED_ACTIONS else ""
def _write_default(self) -> None:
access_key = secrets.token_hex(12)
secret_key = secrets.token_urlsafe(32)
access_key = os.environ.get("ADMIN_ACCESS_KEY", "").strip() or secrets.token_hex(12)
secret_key = os.environ.get("ADMIN_SECRET_KEY", "").strip() or secrets.token_urlsafe(32)
custom_keys = bool(os.environ.get("ADMIN_ACCESS_KEY", "").strip())
default = {
"users": [
{
@@ -560,16 +645,37 @@ class IamService:
}
]
}
self.config_path.write_text(json.dumps(default, indent=2))
json_text = json.dumps(default, indent=2)
if self._fernet:
encrypted = self._fernet.encrypt(json_text.encode("utf-8"))
self.config_path.write_bytes(_IAM_ENCRYPTED_PREFIX + encrypted)
else:
self.config_path.write_text(json_text)
print(f"\n{'='*60}")
print("MYFSIO FIRST RUN - ADMIN CREDENTIALS GENERATED")
print("MYFSIO FIRST RUN - ADMIN CREDENTIALS")
print(f"{'='*60}")
print(f"Access Key: {access_key}")
print(f"Secret Key: {secret_key}")
if custom_keys:
print(f"Access Key: {access_key} (from ADMIN_ACCESS_KEY)")
print(f"Secret Key: {'(from ADMIN_SECRET_KEY)' if os.environ.get('ADMIN_SECRET_KEY', '').strip() else secret_key}")
else:
print(f"Access Key: {access_key}")
print(f"Secret Key: {secret_key}")
print(f"{'='*60}")
print(f"Missed this? Check: {self.config_path}")
if self._fernet:
print("IAM config is encrypted at rest.")
print("Lost credentials? Run: python run.py reset-cred")
else:
print(f"Missed this? Check: {self.config_path}")
print(f"{'='*60}\n")
def _validate_expires_at(self, expires_at: str) -> None:
try:
dt = datetime.fromisoformat(expires_at)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
raise IamError(f"Invalid expires_at format: {expires_at}. Use ISO 8601 (e.g. 2026-12-31T23:59:59Z)")
def _generate_access_key(self) -> str:
return secrets.token_hex(8)
@@ -588,11 +694,15 @@ class IamService:
if cached:
secret_key, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
return secret_key
self._maybe_reload()
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
secret_key = record["secret_key"]
self._secret_key_cache[access_key] = (secret_key, now)
return secret_key
@@ -604,11 +714,15 @@ class IamService:
if cached:
principal, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
return principal
self._maybe_reload()
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
principal = self._build_principal(access_key, record)
self._principal_cache[access_key] = (principal, now)
return principal

View File

@@ -1754,6 +1754,10 @@ def iam_dashboard():
users = iam_service.list_users() if not locked else []
config_summary = iam_service.config_summary()
config_document = json.dumps(iam_service.export_config(mask_secrets=True), indent=2)
from datetime import datetime as _dt, timedelta as _td, timezone as _tz
_now = _dt.now(_tz.utc)
now_iso = _now.isoformat()
soon_iso = (_now + _td(days=7)).isoformat()
return render_template(
"iam.html",
users=users,
@@ -1763,6 +1767,8 @@ def iam_dashboard():
config_summary=config_summary,
config_document=config_document,
disclosed_secret=disclosed_secret,
now_iso=now_iso,
soon_iso=soon_iso,
)
@@ -1782,6 +1788,8 @@ def create_iam_user():
return jsonify({"error": "Display name must be 64 characters or fewer"}), 400
flash("Display name must be 64 characters or fewer", "danger")
return redirect(url_for("ui.iam_dashboard"))
custom_access_key = request.form.get("access_key", "").strip() or None
custom_secret_key = request.form.get("secret_key", "").strip() or None
policies_text = request.form.get("policies", "").strip()
policies = None
if policies_text:
@@ -1792,8 +1800,21 @@ def create_iam_user():
return jsonify({"error": f"Invalid JSON: {exc}"}), 400
flash(f"Invalid JSON: {exc}", "danger")
return redirect(url_for("ui.iam_dashboard"))
expires_at = request.form.get("expires_at", "").strip() or None
if expires_at:
try:
from datetime import datetime as _dt, timezone as _tz
exp_dt = _dt.fromisoformat(expires_at)
if exp_dt.tzinfo is None:
exp_dt = exp_dt.replace(tzinfo=_tz.utc)
expires_at = exp_dt.isoformat()
except (ValueError, TypeError):
if _wants_json():
return jsonify({"error": "Invalid expiry date format"}), 400
flash("Invalid expiry date format", "danger")
return redirect(url_for("ui.iam_dashboard"))
try:
created = _iam().create_user(display_name=display_name, policies=policies)
created = _iam().create_user(display_name=display_name, policies=policies, access_key=custom_access_key, secret_key=custom_secret_key, expires_at=expires_at)
except IamError as exc:
if _wants_json():
return jsonify({"error": str(exc)}), 400
@@ -1967,6 +1988,45 @@ def update_iam_policies(access_key: str):
return redirect(url_for("ui.iam_dashboard"))
@ui_bp.post("/iam/users/<access_key>/expiry")
def update_iam_expiry(access_key: str):
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:update_policy")
except IamError as exc:
if _wants_json():
return jsonify({"error": str(exc)}), 403
flash(str(exc), "danger")
return redirect(url_for("ui.iam_dashboard"))
expires_at = request.form.get("expires_at", "").strip() or None
if expires_at:
try:
from datetime import datetime as _dt, timezone as _tz
exp_dt = _dt.fromisoformat(expires_at)
if exp_dt.tzinfo is None:
exp_dt = exp_dt.replace(tzinfo=_tz.utc)
expires_at = exp_dt.isoformat()
except (ValueError, TypeError):
if _wants_json():
return jsonify({"error": "Invalid expiry date format"}), 400
flash("Invalid expiry date format", "danger")
return redirect(url_for("ui.iam_dashboard"))
try:
_iam().update_user_expiry(access_key, expires_at)
if _wants_json():
return jsonify({"success": True, "message": f"Updated expiry for {access_key}", "expires_at": expires_at})
label = expires_at if expires_at else "never"
flash(f"Expiry for {access_key} set to {label}", "success")
except IamError as exc:
if _wants_json():
return jsonify({"error": str(exc)}), 400
flash(str(exc), "danger")
return redirect(url_for("ui.iam_dashboard"))
@ui_bp.post("/connections")
def create_connection():
principal = _current_principal()

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.3.6"
APP_VERSION = "0.3.7"
def get_version() -> str: