Overhaul IAM: granular actions, multi-key users, prefix-scoped policies

This commit is contained in:
2026-03-14 23:50:44 +08:00
parent 6ed4b7d8ea
commit d72e0a347e
8 changed files with 699 additions and 158 deletions

View File

@@ -686,6 +686,107 @@ def _storage():
return current_app.extensions["object_storage"]
def _require_iam_action(action: str):
principal, error = _require_principal()
if error:
return None, error
try:
_iam().authorize(principal, None, action)
return principal, None
except IamError:
return None, _json_error("AccessDenied", f"Requires {action} permission", 403)
@admin_api_bp.route("/iam/users", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_list_users():
principal, error = _require_iam_action("iam:list_users")
if error:
return error
return jsonify({"users": _iam().list_users()})
@admin_api_bp.route("/iam/users/<identifier>", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_get_user(identifier):
principal, error = _require_iam_action("iam:get_user")
if error:
return error
try:
user_id = _iam().resolve_user_id(identifier)
return jsonify(_iam().get_user_by_id(user_id))
except IamError as exc:
return _json_error("NotFound", str(exc), 404)
@admin_api_bp.route("/iam/users/<identifier>/policies", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_get_user_policies(identifier):
principal, error = _require_iam_action("iam:get_policy")
if error:
return error
try:
return jsonify({"policies": _iam().get_user_policies(identifier)})
except IamError as exc:
return _json_error("NotFound", str(exc), 404)
@admin_api_bp.route("/iam/users/<identifier>/keys", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_create_access_key(identifier):
principal, error = _require_iam_action("iam:create_key")
if error:
return error
try:
result = _iam().create_access_key(identifier)
logger.info("Access key created for %s by %s", identifier, principal.access_key)
return jsonify(result), 201
except IamError as exc:
return _json_error("InvalidRequest", str(exc), 400)
@admin_api_bp.route("/iam/users/<identifier>/keys/<access_key>", methods=["DELETE"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_delete_access_key(identifier, access_key):
principal, error = _require_iam_action("iam:delete_key")
if error:
return error
try:
_iam().delete_access_key(access_key)
logger.info("Access key %s deleted by %s", access_key, principal.access_key)
return "", 204
except IamError as exc:
return _json_error("InvalidRequest", str(exc), 400)
@admin_api_bp.route("/iam/users/<identifier>/disable", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_disable_user(identifier):
principal, error = _require_iam_action("iam:disable_user")
if error:
return error
try:
_iam().disable_user(identifier)
logger.info("User %s disabled by %s", identifier, principal.access_key)
return jsonify({"status": "disabled"})
except IamError as exc:
return _json_error("InvalidRequest", str(exc), 400)
@admin_api_bp.route("/iam/users/<identifier>/enable", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def iam_enable_user(identifier):
principal, error = _require_iam_action("iam:disable_user")
if error:
return error
try:
_iam().enable_user(identifier)
logger.info("User %s enabled by %s", identifier, principal.access_key)
return jsonify({"status": "enabled"})
except IamError as exc:
return _json_error("InvalidRequest", str(exc), 400)
@admin_api_bp.route("/website-domains", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def list_website_domains():

View File

@@ -10,7 +10,7 @@ import secrets
import threading
import time
from collections import deque
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple
@@ -22,16 +22,37 @@ class IamError(RuntimeError):
"""Raised when authentication or authorization fails."""
S3_ACTIONS = {"list", "read", "write", "delete", "share", "policy", "replication", "lifecycle", "cors"}
S3_ACTIONS = {
"list", "read", "write", "delete", "share", "policy",
"replication", "lifecycle", "cors",
"create_bucket", "delete_bucket",
"versioning", "tagging", "encryption", "quota",
"object_lock", "notification", "logging", "website",
}
IAM_ACTIONS = {
"iam:list_users",
"iam:create_user",
"iam:delete_user",
"iam:rotate_key",
"iam:update_policy",
"iam:create_key",
"iam:delete_key",
"iam:get_user",
"iam:get_policy",
"iam:disable_user",
}
ALLOWED_ACTIONS = (S3_ACTIONS | IAM_ACTIONS) | {"iam:*"}
_V1_IMPLIED_ACTIONS = {
"write": {"create_bucket"},
"delete": {"delete_bucket"},
"policy": {
"versioning", "tagging", "encryption", "quota",
"object_lock", "notification", "logging", "website",
"cors", "lifecycle", "replication", "share",
},
}
ACTION_ALIASES = {
"list": "list",
"s3:listbucket": "list",
@@ -45,14 +66,11 @@ ACTION_ALIASES = {
"s3:getobjecttagging": "read",
"s3:getobjectversiontagging": "read",
"s3:getobjectacl": "read",
"s3:getbucketversioning": "read",
"s3:headobject": "read",
"s3:headbucket": "read",
"write": "write",
"s3:putobject": "write",
"s3:createbucket": "write",
"s3:putobjecttagging": "write",
"s3:putbucketversioning": "write",
"s3:createmultipartupload": "write",
"s3:uploadpart": "write",
"s3:completemultipartupload": "write",
@@ -61,8 +79,11 @@ ACTION_ALIASES = {
"delete": "delete",
"s3:deleteobject": "delete",
"s3:deleteobjectversion": "delete",
"s3:deletebucket": "delete",
"s3:deleteobjecttagging": "delete",
"create_bucket": "create_bucket",
"s3:createbucket": "create_bucket",
"delete_bucket": "delete_bucket",
"s3:deletebucket": "delete_bucket",
"share": "share",
"s3:putobjectacl": "share",
"s3:putbucketacl": "share",
@@ -88,11 +109,50 @@ ACTION_ALIASES = {
"s3:getbucketcors": "cors",
"s3:putbucketcors": "cors",
"s3:deletebucketcors": "cors",
"versioning": "versioning",
"s3:getbucketversioning": "versioning",
"s3:putbucketversioning": "versioning",
"tagging": "tagging",
"s3:getbuckettagging": "tagging",
"s3:putbuckettagging": "tagging",
"s3:deletebuckettagging": "tagging",
"encryption": "encryption",
"s3:getencryptionconfiguration": "encryption",
"s3:putencryptionconfiguration": "encryption",
"s3:deleteencryptionconfiguration": "encryption",
"quota": "quota",
"s3:getbucketquota": "quota",
"s3:putbucketquota": "quota",
"s3:deletebucketquota": "quota",
"object_lock": "object_lock",
"s3:getobjectlockconfiguration": "object_lock",
"s3:putobjectlockconfiguration": "object_lock",
"s3:putobjectretention": "object_lock",
"s3:getobjectretention": "object_lock",
"s3:putobjectlegalhold": "object_lock",
"s3:getobjectlegalhold": "object_lock",
"notification": "notification",
"s3:getbucketnotificationconfiguration": "notification",
"s3:putbucketnotificationconfiguration": "notification",
"s3:deletebucketnotificationconfiguration": "notification",
"logging": "logging",
"s3:getbucketlogging": "logging",
"s3:putbucketlogging": "logging",
"s3:deletebucketlogging": "logging",
"website": "website",
"s3:getbucketwebsite": "website",
"s3:putbucketwebsite": "website",
"s3:deletebucketwebsite": "website",
"iam:listusers": "iam:list_users",
"iam:createuser": "iam:create_user",
"iam:deleteuser": "iam:delete_user",
"iam:rotateaccesskey": "iam:rotate_key",
"iam:putuserpolicy": "iam:update_policy",
"iam:createaccesskey": "iam:create_key",
"iam:deleteaccesskey": "iam:delete_key",
"iam:getuser": "iam:get_user",
"iam:getpolicy": "iam:get_policy",
"iam:disableuser": "iam:disable_user",
"iam:*": "iam:*",
}
@@ -101,6 +161,7 @@ ACTION_ALIASES = {
class Policy:
bucket: str
actions: Set[str]
prefix: str = "*"
@dataclass
@@ -117,6 +178,16 @@ def _derive_fernet_key(secret: str) -> bytes:
_IAM_ENCRYPTED_PREFIX = b"MYFSIO_IAM_ENC:"
_CONFIG_VERSION = 2
def _expand_v1_actions(actions: Set[str]) -> Set[str]:
expanded = set(actions)
for action, implied in _V1_IMPLIED_ACTIONS.items():
if action in expanded:
expanded.update(implied)
return expanded
class IamService:
"""Loads IAM configuration, manages users, and evaluates policies."""
@@ -131,7 +202,10 @@ class IamService:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
if not self.config_path.exists():
self._write_default()
self._users: Dict[str, Dict[str, Any]] = {}
self._user_records: Dict[str, Dict[str, Any]] = {}
self._key_index: Dict[str, str] = {}
self._key_secrets: Dict[str, str] = {}
self._key_status: Dict[str, str] = {}
self._raw_config: Dict[str, Any] = {}
self._failed_attempts: Dict[str, Deque[datetime]] = {}
self._last_load_time = 0.0
@@ -146,7 +220,6 @@ class IamService:
self._load_lockout_state()
def _maybe_reload(self) -> None:
"""Reload configuration if the file has changed on disk."""
now = time.time()
if now - self._last_stat_check < self._stat_check_interval:
return
@@ -183,11 +256,20 @@ class IamService:
raise IamError(
f"Access temporarily locked. Try again in {seconds} seconds."
)
record = self._users.get(access_key)
stored_secret = record["secret_key"] if record else secrets.token_urlsafe(24)
if not record or not hmac.compare_digest(stored_secret, secret_key):
user_id = self._key_index.get(access_key)
stored_secret = self._key_secrets.get(access_key, secrets.token_urlsafe(24))
if not user_id or not hmac.compare_digest(stored_secret, secret_key):
self._record_failed_attempt(access_key)
raise IamError("Invalid credentials")
key_status = self._key_status.get(access_key, "active")
if key_status != "active":
raise IamError("Access key is inactive")
record = self._user_records.get(user_id)
if not record:
self._record_failed_attempt(access_key)
raise IamError("Invalid credentials")
if not record.get("enabled", True):
raise IamError("User account is disabled")
self._check_expiry(access_key, record)
self._clear_failed_attempts(access_key)
return self._build_principal(access_key, record)
@@ -215,7 +297,6 @@ class IamService:
return self.config_path.parent / "lockout_state.json"
def _load_lockout_state(self) -> None:
"""Load lockout state from disk."""
try:
if self._lockout_file().exists():
data = json.loads(self._lockout_file().read_text(encoding="utf-8"))
@@ -235,7 +316,6 @@ class IamService:
pass
def _save_lockout_state(self) -> None:
"""Persist lockout state to disk."""
data: Dict[str, Any] = {"failed_attempts": {}}
for key, attempts in self._failed_attempts.items():
data["failed_attempts"][key] = [ts.isoformat() for ts in attempts]
@@ -270,10 +350,9 @@ class IamService:
return int(max(0, self.auth_lockout_window.total_seconds() - elapsed))
def create_session_token(self, access_key: str, duration_seconds: int = 3600) -> str:
"""Create a temporary session token for an access key."""
self._maybe_reload()
record = self._users.get(access_key)
if not record:
user_id = self._key_index.get(access_key)
if not user_id or user_id not in self._user_records:
raise IamError("Unknown access key")
self._cleanup_expired_sessions()
token = secrets.token_urlsafe(32)
@@ -285,7 +364,6 @@ class IamService:
return token
def validate_session_token(self, access_key: str, session_token: str) -> bool:
"""Validate a session token for an access key (thread-safe, constant-time)."""
dummy_key = secrets.token_urlsafe(16)
dummy_token = secrets.token_urlsafe(32)
with self._session_lock:
@@ -304,7 +382,6 @@ class IamService:
return True
def _cleanup_expired_sessions(self) -> None:
"""Remove expired session tokens."""
now = time.time()
expired = [token for token, data in self._sessions.items() if now > data["expires_at"]]
for token in expired:
@@ -316,13 +393,18 @@ class IamService:
if cached:
principal, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
return principal
self._maybe_reload()
record = self._users.get(access_key)
user_id = self._key_index.get(access_key)
if not user_id:
raise IamError("Unknown access key")
record = self._user_records.get(user_id)
if not record:
raise IamError("Unknown access key")
self._check_expiry(access_key, record)
@@ -332,22 +414,26 @@ class IamService:
def secret_for_key(self, access_key: str) -> str:
self._maybe_reload()
record = self._users.get(access_key)
if not record:
secret = self._key_secrets.get(access_key)
if not secret:
raise IamError("Unknown access key")
self._check_expiry(access_key, record)
return record["secret_key"]
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
return secret
def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None:
def authorize(self, principal: Principal, bucket_name: str | None, action: str, *, object_key: str | None = None) -> None:
action = self._normalize_action(action)
if action not in ALLOWED_ACTIONS:
raise IamError(f"Unknown action '{action}'")
bucket_name = bucket_name or "*"
normalized = bucket_name.lower() if bucket_name != "*" else bucket_name
if not self._is_allowed(principal, normalized, action):
if not self._is_allowed(principal, normalized, action, object_key=object_key):
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]:
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str], *, object_key: str | None = None) -> Dict[str, bool]:
self._maybe_reload()
bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*")
normalized_actions = {a: self._normalize_action(a) for a in actions}
@@ -356,37 +442,53 @@ class IamService:
if canonical not in ALLOWED_ACTIONS:
results[original] = False
else:
results[original] = self._is_allowed(principal, bucket_name, canonical)
results[original] = self._is_allowed(principal, bucket_name, canonical, object_key=object_key)
return results
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]
def _is_allowed(self, principal: Principal, bucket_name: str, action: str) -> bool:
def _is_allowed(self, principal: Principal, bucket_name: str, action: str, *, object_key: str | None = None) -> bool:
bucket_name = bucket_name.lower()
for policy in principal.policies:
if policy.bucket not in {"*", bucket_name}:
continue
if "*" in policy.actions or action in policy.actions:
return True
if "iam:*" in policy.actions and action.startswith("iam:"):
return True
action_match = "*" in policy.actions or action in policy.actions
if not action_match and "iam:*" in policy.actions and action.startswith("iam:"):
action_match = True
if not action_match:
continue
if object_key is not None and policy.prefix != "*":
prefix = policy.prefix.rstrip("*")
if not object_key.startswith(prefix):
continue
return True
return False
def list_users(self) -> List[Dict[str, Any]]:
listing: List[Dict[str, Any]] = []
for access_key, record in self._users.items():
listing.append(
{
"access_key": access_key,
"display_name": record["display_name"],
"expires_at": record.get("expires_at"),
"policies": [
{"bucket": policy.bucket, "actions": sorted(policy.actions)}
for policy in record["policies"]
],
}
)
for user_id, record in self._user_records.items():
access_keys = []
for key_info in record.get("access_keys", []):
access_keys.append({
"access_key": key_info["access_key"],
"status": key_info.get("status", "active"),
"created_at": key_info.get("created_at"),
})
user_entry: Dict[str, Any] = {
"user_id": user_id,
"display_name": record["display_name"],
"enabled": record.get("enabled", True),
"expires_at": record.get("expires_at"),
"access_keys": access_keys,
"policies": [
{"bucket": policy.bucket, "actions": sorted(policy.actions), "prefix": policy.prefix}
for policy in record["policies"]
],
}
if access_keys:
user_entry["access_key"] = access_keys[0]["access_key"]
listing.append(user_entry)
return listing
def create_user(
@@ -397,20 +499,33 @@ class IamService:
access_key: str | None = None,
secret_key: str | None = None,
expires_at: str | None = None,
user_id: str | None = None,
) -> Dict[str, str]:
access_key = (access_key or self._generate_access_key()).strip()
if not access_key:
raise IamError("Access key cannot be empty")
if access_key in self._users:
if access_key in self._key_index:
raise IamError("Access key already exists")
if expires_at:
self._validate_expires_at(expires_at)
secret_key = secret_key or self._generate_secret_key()
sanitized_policies = self._prepare_policy_payload(policies)
user_id = user_id or self._generate_user_id()
if user_id in self._user_records:
raise IamError("User ID already exists")
now_iso = datetime.now(timezone.utc).isoformat()
record: Dict[str, Any] = {
"access_key": access_key,
"secret_key": secret_key,
"user_id": user_id,
"display_name": display_name or access_key,
"enabled": True,
"access_keys": [
{
"access_key": access_key,
"secret_key": secret_key,
"status": "active",
"created_at": now_iso,
}
],
"policies": sanitized_policies,
}
if expires_at:
@@ -418,12 +533,108 @@ class IamService:
self._raw_config.setdefault("users", []).append(record)
self._save()
self._load()
return {"access_key": access_key, "secret_key": secret_key}
return {"user_id": user_id, "access_key": access_key, "secret_key": secret_key}
def create_access_key(self, identifier: str) -> Dict[str, str]:
user_raw, _ = self._resolve_raw_user(identifier)
new_access_key = self._generate_access_key()
new_secret_key = self._generate_secret_key()
now_iso = datetime.now(timezone.utc).isoformat()
key_entry = {
"access_key": new_access_key,
"secret_key": new_secret_key,
"status": "active",
"created_at": now_iso,
}
user_raw.setdefault("access_keys", []).append(key_entry)
self._save()
self._load()
return {"access_key": new_access_key, "secret_key": new_secret_key}
def delete_access_key(self, access_key: str) -> None:
user_raw, _ = self._resolve_raw_user(access_key)
keys = user_raw.get("access_keys", [])
if len(keys) <= 1:
raise IamError("Cannot delete the only access key for a user")
remaining = [k for k in keys if k["access_key"] != access_key]
if len(remaining) == len(keys):
raise IamError("Access key not found")
user_raw["access_keys"] = remaining
self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
from .s3_api import clear_signing_key_cache
clear_signing_key_cache()
self._load()
def disable_user(self, identifier: str) -> None:
user_raw, _ = self._resolve_raw_user(identifier)
user_raw["enabled"] = False
self._save()
for key_info in user_raw.get("access_keys", []):
ak = key_info["access_key"]
self._principal_cache.pop(ak, None)
self._secret_key_cache.pop(ak, None)
from .s3_api import clear_signing_key_cache
clear_signing_key_cache()
self._load()
def enable_user(self, identifier: str) -> None:
user_raw, _ = self._resolve_raw_user(identifier)
user_raw["enabled"] = True
self._save()
self._load()
def get_user_by_id(self, user_id: str) -> Dict[str, Any]:
record = self._user_records.get(user_id)
if not record:
raise IamError("User not found")
access_keys = []
for key_info in record.get("access_keys", []):
access_keys.append({
"access_key": key_info["access_key"],
"status": key_info.get("status", "active"),
"created_at": key_info.get("created_at"),
})
return {
"user_id": user_id,
"display_name": record["display_name"],
"enabled": record.get("enabled", True),
"expires_at": record.get("expires_at"),
"access_keys": access_keys,
"policies": [
{"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix}
for p in record["policies"]
],
}
def get_user_policies(self, identifier: str) -> List[Dict[str, Any]]:
_, user_id = self._resolve_raw_user(identifier)
record = self._user_records.get(user_id)
if not record:
raise IamError("User not found")
return [
{"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix}
for p in record["policies"]
]
def resolve_user_id(self, identifier: str) -> str:
if identifier in self._user_records:
return identifier
user_id = self._key_index.get(identifier)
if user_id:
return user_id
raise IamError("User not found")
def rotate_secret(self, access_key: str) -> str:
user = self._get_raw_user(access_key)
user_raw, _ = self._resolve_raw_user(access_key)
new_secret = self._generate_secret_key()
user["secret_key"] = new_secret
for key_info in user_raw.get("access_keys", []):
if key_info["access_key"] == access_key:
key_info["secret_key"] = new_secret
break
else:
raise IamError("Access key not found")
self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
@@ -433,8 +644,8 @@ class IamService:
return new_secret
def update_user(self, access_key: str, display_name: str) -> None:
user = self._get_raw_user(access_key)
user["display_name"] = display_name
user_raw, _ = self._resolve_raw_user(access_key)
user_raw["display_name"] = display_name
self._save()
self._load()
@@ -442,32 +653,43 @@ class IamService:
users = self._raw_config.get("users", [])
if len(users) <= 1:
raise IamError("Cannot delete the only user")
remaining = [user for user in users if user["access_key"] != access_key]
if len(remaining) == len(users):
_, target_user_id = self._resolve_raw_user(access_key)
target_user_raw = None
remaining = []
for u in users:
if u.get("user_id") == target_user_id:
target_user_raw = u
else:
remaining.append(u)
if target_user_raw is None:
raise IamError("User not found")
self._raw_config["users"] = remaining
self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
for key_info in target_user_raw.get("access_keys", []):
ak = key_info["access_key"]
self._principal_cache.pop(ak, None)
self._secret_key_cache.pop(ak, None)
from .s3_api import clear_signing_key_cache
clear_signing_key_cache()
self._load()
def update_user_expiry(self, access_key: str, expires_at: str | None) -> None:
user = self._get_raw_user(access_key)
user_raw, _ = self._resolve_raw_user(access_key)
if expires_at:
self._validate_expires_at(expires_at)
user["expires_at"] = expires_at
user_raw["expires_at"] = expires_at
else:
user.pop("expires_at", None)
user_raw.pop("expires_at", None)
self._save()
self._principal_cache.pop(access_key, None)
self._secret_key_cache.pop(access_key, None)
for key_info in user_raw.get("access_keys", []):
ak = key_info["access_key"]
self._principal_cache.pop(ak, None)
self._secret_key_cache.pop(ak, None)
self._load()
def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None:
user = self._get_raw_user(access_key)
user["policies"] = self._prepare_policy_payload(policies)
user_raw, _ = self._resolve_raw_user(access_key)
user_raw["policies"] = self._prepare_policy_payload(policies)
self._save()
self._load()
@@ -482,6 +704,52 @@ class IamService:
raise IamError("Cannot decrypt IAM config. SECRET_KEY may have changed. Use 'python run.py reset-cred' to reset credentials.")
return raw_bytes.decode("utf-8")
def _is_v2_config(self, raw: Dict[str, Any]) -> bool:
return raw.get("version", 1) >= _CONFIG_VERSION
def _migrate_v1_to_v2(self, raw: Dict[str, Any]) -> Dict[str, Any]:
migrated_users = []
now_iso = datetime.now(timezone.utc).isoformat()
for user in raw.get("users", []):
old_policies = user.get("policies", [])
expanded_policies = []
for p in old_policies:
raw_actions = p.get("actions", [])
if isinstance(raw_actions, str):
raw_actions = [raw_actions]
action_set: Set[str] = set()
for a in raw_actions:
canonical = self._normalize_action(a)
if canonical == "*":
action_set = set(ALLOWED_ACTIONS)
break
if canonical:
action_set.add(canonical)
action_set = _expand_v1_actions(action_set)
expanded_policies.append({
"bucket": p.get("bucket", "*"),
"actions": sorted(action_set),
"prefix": p.get("prefix", "*"),
})
migrated_user: Dict[str, Any] = {
"user_id": user["access_key"],
"display_name": user.get("display_name", user["access_key"]),
"enabled": True,
"access_keys": [
{
"access_key": user["access_key"],
"secret_key": user["secret_key"],
"status": "active",
"created_at": now_iso,
}
],
"policies": expanded_policies,
}
if user.get("expires_at"):
migrated_user["expires_at"] = user["expires_at"]
migrated_users.append(migrated_user)
return {"version": _CONFIG_VERSION, "users": migrated_users}
def _load(self) -> None:
try:
self._last_load_time = self.config_path.stat().st_mtime
@@ -500,35 +768,67 @@ class IamService:
raise IamError(f"Failed to load IAM config: {e}")
was_plaintext = not raw_bytes.startswith(_IAM_ENCRYPTED_PREFIX)
was_v1 = not self._is_v2_config(raw)
if was_v1:
raw = self._migrate_v1_to_v2(raw)
user_records: Dict[str, Dict[str, Any]] = {}
key_index: Dict[str, str] = {}
key_secrets: Dict[str, str] = {}
key_status_map: Dict[str, str] = {}
users: Dict[str, Dict[str, Any]] = {}
for user in raw.get("users", []):
user_id = user["user_id"]
policies = self._build_policy_objects(user.get("policies", []))
user_record: Dict[str, Any] = {
"secret_key": user["secret_key"],
"display_name": user.get("display_name", user["access_key"]),
access_keys_raw = user.get("access_keys", [])
access_keys_info = []
for key_entry in access_keys_raw:
ak = key_entry["access_key"]
sk = key_entry["secret_key"]
status = key_entry.get("status", "active")
key_index[ak] = user_id
key_secrets[ak] = sk
key_status_map[ak] = status
access_keys_info.append({
"access_key": ak,
"secret_key": sk,
"status": status,
"created_at": key_entry.get("created_at"),
})
record: Dict[str, Any] = {
"display_name": user.get("display_name", user_id),
"enabled": user.get("enabled", True),
"policies": policies,
"access_keys": access_keys_info,
}
if user.get("expires_at"):
user_record["expires_at"] = user["expires_at"]
users[user["access_key"]] = user_record
if not users:
raise IamError("IAM configuration contains no users")
self._users = users
raw_users: List[Dict[str, Any]] = []
for entry in raw.get("users", []):
raw_entry: Dict[str, Any] = {
"access_key": entry["access_key"],
"secret_key": entry["secret_key"],
"display_name": entry.get("display_name", entry["access_key"]),
"policies": entry.get("policies", []),
}
if entry.get("expires_at"):
raw_entry["expires_at"] = entry["expires_at"]
raw_users.append(raw_entry)
self._raw_config = {"users": raw_users}
record["expires_at"] = user["expires_at"]
user_records[user_id] = record
if was_plaintext and self._fernet:
if not user_records:
raise IamError("IAM configuration contains no users")
self._user_records = user_records
self._key_index = key_index
self._key_secrets = key_secrets
self._key_status = key_status_map
raw_users: List[Dict[str, Any]] = []
for user in raw.get("users", []):
raw_entry: Dict[str, Any] = {
"user_id": user["user_id"],
"display_name": user.get("display_name", user["user_id"]),
"enabled": user.get("enabled", True),
"access_keys": user.get("access_keys", []),
"policies": user.get("policies", []),
}
if user.get("expires_at"):
raw_entry["expires_at"] = user["expires_at"]
raw_users.append(raw_entry)
self._raw_config = {"version": _CONFIG_VERSION, "users": raw_users}
if was_v1 or (was_plaintext and self._fernet):
self._save()
def _save(self) -> None:
@@ -547,19 +847,30 @@ class IamService:
def config_summary(self) -> Dict[str, Any]:
return {
"path": str(self.config_path),
"user_count": len(self._users),
"user_count": len(self._user_records),
"allowed_actions": sorted(ALLOWED_ACTIONS),
}
def export_config(self, mask_secrets: bool = True) -> Dict[str, Any]:
payload: Dict[str, Any] = {"users": []}
payload: Dict[str, Any] = {"version": _CONFIG_VERSION, "users": []}
for user in self._raw_config.get("users", []):
access_keys = []
for key_info in user.get("access_keys", []):
access_keys.append({
"access_key": key_info["access_key"],
"secret_key": "\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022" if mask_secrets else key_info["secret_key"],
"status": key_info.get("status", "active"),
"created_at": key_info.get("created_at"),
})
record: Dict[str, Any] = {
"access_key": user["access_key"],
"secret_key": "••••••••••" if mask_secrets else user["secret_key"],
"user_id": user["user_id"],
"display_name": user["display_name"],
"enabled": user.get("enabled", True),
"access_keys": access_keys,
"policies": user["policies"],
}
if access_keys:
record["access_key"] = access_keys[0]["access_key"]
if user.get("expires_at"):
record["expires_at"] = user["expires_at"]
payload["users"].append(record)
@@ -569,6 +880,7 @@ class IamService:
entries: List[Policy] = []
for policy in policies:
bucket = str(policy.get("bucket", "*")).lower()
prefix = str(policy.get("prefix", "*"))
raw_actions = policy.get("actions", [])
if isinstance(raw_actions, str):
raw_actions = [raw_actions]
@@ -581,7 +893,7 @@ class IamService:
if canonical:
action_set.add(canonical)
if action_set:
entries.append(Policy(bucket=bucket, actions=action_set))
entries.append(Policy(bucket=bucket, actions=action_set, prefix=prefix))
return entries
def _prepare_policy_payload(self, policies: Optional[Sequence[Dict[str, Any]]]) -> List[Dict[str, Any]]:
@@ -589,12 +901,14 @@ class IamService:
policies = (
{
"bucket": "*",
"actions": ["list", "read", "write", "delete", "share", "policy"],
"actions": ["list", "read", "write", "delete", "share", "policy",
"create_bucket", "delete_bucket"],
},
)
sanitized: List[Dict[str, Any]] = []
for policy in policies:
bucket = str(policy.get("bucket", "*")).lower()
prefix = str(policy.get("prefix", "*"))
raw_actions = policy.get("actions", [])
if isinstance(raw_actions, str):
raw_actions = [raw_actions]
@@ -608,7 +922,10 @@ class IamService:
action_set.add(canonical)
if not action_set:
continue
sanitized.append({"bucket": bucket, "actions": sorted(action_set)})
entry: Dict[str, Any] = {"bucket": bucket, "actions": sorted(action_set)}
if prefix != "*":
entry["prefix"] = prefix
sanitized.append(entry)
if not sanitized:
raise IamError("At least one policy with valid actions is required")
return sanitized
@@ -633,12 +950,23 @@ class IamService:
access_key = os.environ.get("ADMIN_ACCESS_KEY", "").strip() or secrets.token_hex(12)
secret_key = os.environ.get("ADMIN_SECRET_KEY", "").strip() or secrets.token_urlsafe(32)
custom_keys = bool(os.environ.get("ADMIN_ACCESS_KEY", "").strip())
user_id = self._generate_user_id()
now_iso = datetime.now(timezone.utc).isoformat()
default = {
"version": _CONFIG_VERSION,
"users": [
{
"access_key": access_key,
"secret_key": secret_key,
"user_id": user_id,
"display_name": "Local Admin",
"enabled": True,
"access_keys": [
{
"access_key": access_key,
"secret_key": secret_key,
"status": "active",
"created_at": now_iso,
}
],
"policies": [
{"bucket": "*", "actions": list(ALLOWED_ACTIONS)}
],
@@ -660,6 +988,7 @@ class IamService:
else:
print(f"Access Key: {access_key}")
print(f"Secret Key: {secret_key}")
print(f"User ID: {user_id}")
print(f"{'='*60}")
if self._fernet:
print("IAM config is encrypted at rest.")
@@ -682,30 +1011,46 @@ class IamService:
def _generate_secret_key(self) -> str:
return secrets.token_urlsafe(24)
def _get_raw_user(self, access_key: str) -> Dict[str, Any]:
def _generate_user_id(self) -> str:
return f"u-{secrets.token_hex(8)}"
def _resolve_raw_user(self, identifier: str) -> Tuple[Dict[str, Any], str]:
for user in self._raw_config.get("users", []):
if user["access_key"] == access_key:
return user
if user.get("user_id") == identifier:
return user, identifier
for user in self._raw_config.get("users", []):
for key_info in user.get("access_keys", []):
if key_info["access_key"] == identifier:
return user, user["user_id"]
raise IamError("User not found")
def _get_raw_user(self, access_key: str) -> Dict[str, Any]:
user, _ = self._resolve_raw_user(access_key)
return user
def get_secret_key(self, access_key: str) -> str | None:
now = time.time()
cached = self._secret_key_cache.get(access_key)
if cached:
secret_key, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
return secret_key
self._maybe_reload()
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
secret_key = record["secret_key"]
self._secret_key_cache[access_key] = (secret_key, now)
return secret_key
secret = self._key_secrets.get(access_key)
if secret:
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
self._secret_key_cache[access_key] = (secret, now)
return secret
return None
def get_principal(self, access_key: str) -> Principal | None:
@@ -714,16 +1059,20 @@ class IamService:
if cached:
principal, cached_time = cached
if now - cached_time < self._cache_ttl:
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
return principal
self._maybe_reload()
record = self._users.get(access_key)
if record:
self._check_expiry(access_key, record)
principal = self._build_principal(access_key, record)
self._principal_cache[access_key] = (principal, now)
return principal
user_id = self._key_index.get(access_key)
if user_id:
record = self._user_records.get(user_id)
if record:
self._check_expiry(access_key, record)
principal = self._build_principal(access_key, record)
self._principal_cache[access_key] = (principal, now)
return principal
return None

View File

@@ -488,7 +488,7 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti
iam_error: IamError | None = None
if principal is not None:
try:
_iam().authorize(principal, bucket_name, action)
_iam().authorize(principal, bucket_name, action, object_key=object_key)
iam_allowed = True
except IamError as exc:
iam_error = exc
@@ -1135,7 +1135,7 @@ def _bucket_versioning_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "versioning")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -1182,7 +1182,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "tagging")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -1347,7 +1347,7 @@ def _bucket_cors_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "cors")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -1400,7 +1400,7 @@ def _bucket_encryption_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "encryption")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -1475,7 +1475,7 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "share")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -1718,12 +1718,12 @@ def _bucket_lifecycle_handler(bucket_name: str) -> Response:
"""Handle bucket lifecycle configuration (GET/PUT/DELETE /<bucket>?lifecycle)."""
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "lifecycle")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -1882,12 +1882,12 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
"""Handle bucket quota configuration (GET/PUT/DELETE /<bucket>?quota)."""
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "quota")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -1964,7 +1964,7 @@ def _bucket_object_lock_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "object_lock")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -2010,7 +2010,7 @@ def _bucket_notification_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "notification")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -2106,7 +2106,7 @@ def _bucket_logging_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "logging")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -2248,7 +2248,7 @@ def _object_retention_handler(bucket_name: str, object_key: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
_authorize_action(principal, bucket_name, "object_lock", object_key=object_key)
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -2324,7 +2324,7 @@ def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
_authorize_action(principal, bucket_name, "object_lock", object_key=object_key)
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
@@ -2657,7 +2657,7 @@ def bucket_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "write")
_authorize_action(principal, bucket_name, "create_bucket")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
try:
@@ -2674,7 +2674,7 @@ def bucket_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "delete")
_authorize_action(principal, bucket_name, "delete_bucket")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
try:
@@ -3229,7 +3229,7 @@ def _bucket_replication_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "replication")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
@@ -3312,7 +3312,7 @@ def _bucket_website_handler(bucket_name: str) -> Response:
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
_authorize_action(principal, bucket_name, "website")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()