diff --git a/app/__init__.py b/app/__init__.py index 98ef1e5..015e16d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -18,6 +18,8 @@ from flask_cors import CORS from flask_wtf.csrf import CSRFError from werkzeug.middleware.proxy_fix import ProxyFix +import io + from .access_logging import AccessLoggingService from .operation_metrics import OperationMetricsCollector, classify_endpoint from .compression import GzipMiddleware @@ -44,6 +46,64 @@ from .website_domains import WebsiteDomainStore _request_counter = itertools.count(1) +class _ChunkedTransferMiddleware: + + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + if environ.get("REQUEST_METHOD") not in ("PUT", "POST"): + return self.app(environ, start_response) + + transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "") + content_length = environ.get("CONTENT_LENGTH") + + if "chunked" in transfer_encoding.lower(): + if content_length: + del environ["HTTP_TRANSFER_ENCODING"] + else: + raw = environ.get("wsgi.input") + if raw: + try: + if hasattr(raw, "seek"): + raw.seek(0) + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) + del environ["HTTP_TRANSFER_ENCODING"] + + content_length = environ.get("CONTENT_LENGTH") + if not content_length or content_length == "0": + sha256 = environ.get("HTTP_X_AMZ_CONTENT_SHA256", "") + decoded_len = environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH", "") + content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") + if ("STREAMING" in sha256.upper() or decoded_len + or "aws-chunked" in content_encoding.lower()): + raw = environ.get("wsgi.input") + if raw: + try: + if hasattr(raw, "seek"): + raw.seek(0) + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) + + raw = environ.get("wsgi.input") + if raw and hasattr(raw, "seek"): + try: + raw.seek(0) + except Exception: + pass + + return self.app(environ, start_response) + + def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: """Migrate config file from legacy locations to the active path. @@ -107,10 +167,11 @@ def create_app( ) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=num_proxies, x_proto=num_proxies, x_host=num_proxies, x_prefix=num_proxies) - # Enable gzip compression for responses (10-20x smaller JSON payloads) if app.config.get("ENABLE_GZIP", True): app.wsgi_app = GzipMiddleware(app.wsgi_app, compression_level=6) + app.wsgi_app = _ChunkedTransferMiddleware(app.wsgi_app) + _configure_cors(app) _configure_logging(app) @@ -678,6 +739,7 @@ def _configure_logging(app: Flask) -> None: }, ) response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}" + response.headers["Server"] = "MyFSIO" operation_metrics = app.extensions.get("operation_metrics") if operation_metrics: diff --git a/app/admin_api.py b/app/admin_api.py index 1d5d975..ccc0408 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -686,6 +686,107 @@ def _storage(): return current_app.extensions["object_storage"] +def _require_iam_action(action: str): + principal, error = _require_principal() + if error: + return None, error + try: + _iam().authorize(principal, None, action) + return principal, None + except IamError: + return None, _json_error("AccessDenied", f"Requires {action} permission", 403) + + +@admin_api_bp.route("/iam/users", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_list_users(): + principal, error = _require_iam_action("iam:list_users") + if error: + return error + return jsonify({"users": _iam().list_users()}) + + +@admin_api_bp.route("/iam/users/", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_get_user(identifier): + principal, error = _require_iam_action("iam:get_user") + if error: + return error + try: + user_id = _iam().resolve_user_id(identifier) + return jsonify(_iam().get_user_by_id(user_id)) + except IamError as exc: + return _json_error("NotFound", str(exc), 404) + + +@admin_api_bp.route("/iam/users//policies", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_get_user_policies(identifier): + principal, error = _require_iam_action("iam:get_policy") + if error: + return error + try: + return jsonify({"policies": _iam().get_user_policies(identifier)}) + except IamError as exc: + return _json_error("NotFound", str(exc), 404) + + +@admin_api_bp.route("/iam/users//keys", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_create_access_key(identifier): + principal, error = _require_iam_action("iam:create_key") + if error: + return error + try: + result = _iam().create_access_key(identifier) + logger.info("Access key created for %s by %s", identifier, principal.access_key) + return jsonify(result), 201 + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//keys/", methods=["DELETE"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_delete_access_key(identifier, access_key): + principal, error = _require_iam_action("iam:delete_key") + if error: + return error + try: + _iam().delete_access_key(access_key) + logger.info("Access key %s deleted by %s", access_key, principal.access_key) + return "", 204 + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//disable", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_disable_user(identifier): + principal, error = _require_iam_action("iam:disable_user") + if error: + return error + try: + _iam().disable_user(identifier) + logger.info("User %s disabled by %s", identifier, principal.access_key) + return jsonify({"status": "disabled"}) + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//enable", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_enable_user(identifier): + principal, error = _require_iam_action("iam:disable_user") + if error: + return error + try: + _iam().enable_user(identifier) + logger.info("User %s enabled by %s", identifier, principal.access_key) + return jsonify({"status": "enabled"}) + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + @admin_api_bp.route("/website-domains", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def list_website_domains(): @@ -881,3 +982,5 @@ def integrity_history(): offset = int(request.args.get("offset", 0)) records = checker.get_history(limit=limit, offset=offset) return jsonify({"executions": records}) + + diff --git a/app/config.py b/app/config.py index 079fc50..d196397 100644 --- a/app/config.py +++ b/app/config.py @@ -25,7 +25,7 @@ def _calculate_auto_connection_limit() -> int: def _calculate_auto_backlog(connection_limit: int) -> int: - return max(64, min(connection_limit * 2, 4096)) + return max(128, min(connection_limit * 2, 4096)) def _validate_rate_limit(value: str) -> str: @@ -115,6 +115,7 @@ class AppConfig: server_connection_limit: int server_backlog: int server_channel_timeout: int + server_max_buffer_size: int server_threads_auto: bool server_connection_limit_auto: bool server_backlog_auto: bool @@ -293,6 +294,7 @@ class AppConfig: server_backlog_auto = False server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120)) + server_max_buffer_size = int(_get("SERVER_MAX_BUFFER_SIZE", 1024 * 1024 * 128)) site_sync_enabled = str(_get("SITE_SYNC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) site_sync_batch_size = int(_get("SITE_SYNC_BATCH_SIZE", 100)) @@ -394,6 +396,7 @@ class AppConfig: server_connection_limit=server_connection_limit, server_backlog=server_backlog, server_channel_timeout=server_channel_timeout, + server_max_buffer_size=server_max_buffer_size, server_threads_auto=server_threads_auto, server_connection_limit_auto=server_connection_limit_auto, server_backlog_auto=server_backlog_auto, @@ -504,10 +507,12 @@ class AppConfig: issues.append(f"CRITICAL: SERVER_THREADS={self.server_threads} is outside valid range (1-64). Server cannot start.") if not (10 <= self.server_connection_limit <= 1000): issues.append(f"CRITICAL: SERVER_CONNECTION_LIMIT={self.server_connection_limit} is outside valid range (10-1000). Server cannot start.") - if not (64 <= self.server_backlog <= 4096): - issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (64-4096). Server cannot start.") + if not (128 <= self.server_backlog <= 4096): + issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (128-4096). Server cannot start.") if not (10 <= self.server_channel_timeout <= 300): issues.append(f"CRITICAL: SERVER_CHANNEL_TIMEOUT={self.server_channel_timeout} is outside valid range (10-300). Server cannot start.") + if self.server_max_buffer_size < 1024 * 1024: + issues.append(f"WARNING: SERVER_MAX_BUFFER_SIZE={self.server_max_buffer_size} is less than 1MB. Large uploads will fail.") if sys.platform != "win32": try: @@ -553,6 +558,7 @@ class AppConfig: print(f" CONNECTION_LIMIT: {self.server_connection_limit}{_auto(self.server_connection_limit_auto)}") print(f" BACKLOG: {self.server_backlog}{_auto(self.server_backlog_auto)}") print(f" CHANNEL_TIMEOUT: {self.server_channel_timeout}s") + print(f" MAX_BUFFER_SIZE: {self.server_max_buffer_size // (1024 * 1024)}MB") print("=" * 60) issues = self.validate_and_report() @@ -618,6 +624,7 @@ class AppConfig: "SERVER_CONNECTION_LIMIT": self.server_connection_limit, "SERVER_BACKLOG": self.server_backlog, "SERVER_CHANNEL_TIMEOUT": self.server_channel_timeout, + "SERVER_MAX_BUFFER_SIZE": self.server_max_buffer_size, "SITE_SYNC_ENABLED": self.site_sync_enabled, "SITE_SYNC_INTERVAL_SECONDS": self.site_sync_interval_seconds, "SITE_SYNC_BATCH_SIZE": self.site_sync_batch_size, diff --git a/app/encrypted_storage.py b/app/encrypted_storage.py index a0d3a58..b64e1d1 100644 --- a/app/encrypted_storage.py +++ b/app/encrypted_storage.py @@ -193,6 +193,9 @@ class EncryptedObjectStorage: def list_objects_shallow(self, bucket_name: str, **kwargs): return self.storage.list_objects_shallow(bucket_name, **kwargs) + def iter_objects_shallow(self, bucket_name: str, **kwargs): + return self.storage.iter_objects_shallow(bucket_name, **kwargs) + def search_objects(self, bucket_name: str, query: str, **kwargs): return self.storage.search_objects(bucket_name, query, **kwargs) diff --git a/app/iam.py b/app/iam.py index 61aae45..f4d895d 100644 --- a/app/iam.py +++ b/app/iam.py @@ -10,7 +10,7 @@ import secrets import threading import time from collections import deque -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple @@ -22,16 +22,37 @@ class IamError(RuntimeError): """Raised when authentication or authorization fails.""" -S3_ACTIONS = {"list", "read", "write", "delete", "share", "policy", "replication", "lifecycle", "cors"} +S3_ACTIONS = { + "list", "read", "write", "delete", "share", "policy", + "replication", "lifecycle", "cors", + "create_bucket", "delete_bucket", + "versioning", "tagging", "encryption", "quota", + "object_lock", "notification", "logging", "website", +} IAM_ACTIONS = { "iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy", + "iam:create_key", + "iam:delete_key", + "iam:get_user", + "iam:get_policy", + "iam:disable_user", } ALLOWED_ACTIONS = (S3_ACTIONS | IAM_ACTIONS) | {"iam:*"} +_V1_IMPLIED_ACTIONS = { + "write": {"create_bucket"}, + "delete": {"delete_bucket"}, + "policy": { + "versioning", "tagging", "encryption", "quota", + "object_lock", "notification", "logging", "website", + "cors", "lifecycle", "replication", "share", + }, +} + ACTION_ALIASES = { "list": "list", "s3:listbucket": "list", @@ -45,14 +66,11 @@ ACTION_ALIASES = { "s3:getobjecttagging": "read", "s3:getobjectversiontagging": "read", "s3:getobjectacl": "read", - "s3:getbucketversioning": "read", "s3:headobject": "read", "s3:headbucket": "read", "write": "write", "s3:putobject": "write", - "s3:createbucket": "write", "s3:putobjecttagging": "write", - "s3:putbucketversioning": "write", "s3:createmultipartupload": "write", "s3:uploadpart": "write", "s3:completemultipartupload": "write", @@ -61,8 +79,11 @@ ACTION_ALIASES = { "delete": "delete", "s3:deleteobject": "delete", "s3:deleteobjectversion": "delete", - "s3:deletebucket": "delete", "s3:deleteobjecttagging": "delete", + "create_bucket": "create_bucket", + "s3:createbucket": "create_bucket", + "delete_bucket": "delete_bucket", + "s3:deletebucket": "delete_bucket", "share": "share", "s3:putobjectacl": "share", "s3:putbucketacl": "share", @@ -88,11 +109,50 @@ ACTION_ALIASES = { "s3:getbucketcors": "cors", "s3:putbucketcors": "cors", "s3:deletebucketcors": "cors", + "versioning": "versioning", + "s3:getbucketversioning": "versioning", + "s3:putbucketversioning": "versioning", + "tagging": "tagging", + "s3:getbuckettagging": "tagging", + "s3:putbuckettagging": "tagging", + "s3:deletebuckettagging": "tagging", + "encryption": "encryption", + "s3:getencryptionconfiguration": "encryption", + "s3:putencryptionconfiguration": "encryption", + "s3:deleteencryptionconfiguration": "encryption", + "quota": "quota", + "s3:getbucketquota": "quota", + "s3:putbucketquota": "quota", + "s3:deletebucketquota": "quota", + "object_lock": "object_lock", + "s3:getobjectlockconfiguration": "object_lock", + "s3:putobjectlockconfiguration": "object_lock", + "s3:putobjectretention": "object_lock", + "s3:getobjectretention": "object_lock", + "s3:putobjectlegalhold": "object_lock", + "s3:getobjectlegalhold": "object_lock", + "notification": "notification", + "s3:getbucketnotificationconfiguration": "notification", + "s3:putbucketnotificationconfiguration": "notification", + "s3:deletebucketnotificationconfiguration": "notification", + "logging": "logging", + "s3:getbucketlogging": "logging", + "s3:putbucketlogging": "logging", + "s3:deletebucketlogging": "logging", + "website": "website", + "s3:getbucketwebsite": "website", + "s3:putbucketwebsite": "website", + "s3:deletebucketwebsite": "website", "iam:listusers": "iam:list_users", "iam:createuser": "iam:create_user", "iam:deleteuser": "iam:delete_user", "iam:rotateaccesskey": "iam:rotate_key", "iam:putuserpolicy": "iam:update_policy", + "iam:createaccesskey": "iam:create_key", + "iam:deleteaccesskey": "iam:delete_key", + "iam:getuser": "iam:get_user", + "iam:getpolicy": "iam:get_policy", + "iam:disableuser": "iam:disable_user", "iam:*": "iam:*", } @@ -101,6 +161,7 @@ ACTION_ALIASES = { class Policy: bucket: str actions: Set[str] + prefix: str = "*" @dataclass @@ -117,6 +178,16 @@ def _derive_fernet_key(secret: str) -> bytes: _IAM_ENCRYPTED_PREFIX = b"MYFSIO_IAM_ENC:" +_CONFIG_VERSION = 2 + + +def _expand_v1_actions(actions: Set[str]) -> Set[str]: + expanded = set(actions) + for action, implied in _V1_IMPLIED_ACTIONS.items(): + if action in expanded: + expanded.update(implied) + return expanded + class IamService: """Loads IAM configuration, manages users, and evaluates policies.""" @@ -131,7 +202,10 @@ class IamService: self.config_path.parent.mkdir(parents=True, exist_ok=True) if not self.config_path.exists(): self._write_default() - self._users: Dict[str, Dict[str, Any]] = {} + self._user_records: Dict[str, Dict[str, Any]] = {} + self._key_index: Dict[str, str] = {} + self._key_secrets: Dict[str, str] = {} + self._key_status: Dict[str, str] = {} self._raw_config: Dict[str, Any] = {} self._failed_attempts: Dict[str, Deque[datetime]] = {} self._last_load_time = 0.0 @@ -146,7 +220,6 @@ class IamService: self._load_lockout_state() def _maybe_reload(self) -> None: - """Reload configuration if the file has changed on disk.""" now = time.time() if now - self._last_stat_check < self._stat_check_interval: return @@ -183,11 +256,20 @@ class IamService: raise IamError( f"Access temporarily locked. Try again in {seconds} seconds." ) - record = self._users.get(access_key) - stored_secret = record["secret_key"] if record else secrets.token_urlsafe(24) - if not record or not hmac.compare_digest(stored_secret, secret_key): + user_id = self._key_index.get(access_key) + stored_secret = self._key_secrets.get(access_key, secrets.token_urlsafe(24)) + if not user_id or not hmac.compare_digest(stored_secret, secret_key): self._record_failed_attempt(access_key) raise IamError("Invalid credentials") + key_status = self._key_status.get(access_key, "active") + if key_status != "active": + raise IamError("Access key is inactive") + record = self._user_records.get(user_id) + if not record: + self._record_failed_attempt(access_key) + raise IamError("Invalid credentials") + if not record.get("enabled", True): + raise IamError("User account is disabled") self._check_expiry(access_key, record) self._clear_failed_attempts(access_key) return self._build_principal(access_key, record) @@ -215,7 +297,6 @@ class IamService: return self.config_path.parent / "lockout_state.json" def _load_lockout_state(self) -> None: - """Load lockout state from disk.""" try: if self._lockout_file().exists(): data = json.loads(self._lockout_file().read_text(encoding="utf-8")) @@ -235,7 +316,6 @@ class IamService: pass def _save_lockout_state(self) -> None: - """Persist lockout state to disk.""" data: Dict[str, Any] = {"failed_attempts": {}} for key, attempts in self._failed_attempts.items(): data["failed_attempts"][key] = [ts.isoformat() for ts in attempts] @@ -270,10 +350,9 @@ class IamService: return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) def create_session_token(self, access_key: str, duration_seconds: int = 3600) -> str: - """Create a temporary session token for an access key.""" self._maybe_reload() - record = self._users.get(access_key) - if not record: + user_id = self._key_index.get(access_key) + if not user_id or user_id not in self._user_records: raise IamError("Unknown access key") self._cleanup_expired_sessions() token = secrets.token_urlsafe(32) @@ -285,7 +364,6 @@ class IamService: return token def validate_session_token(self, access_key: str, session_token: str) -> bool: - """Validate a session token for an access key (thread-safe, constant-time).""" dummy_key = secrets.token_urlsafe(16) dummy_token = secrets.token_urlsafe(32) with self._session_lock: @@ -304,7 +382,6 @@ class IamService: return True def _cleanup_expired_sessions(self) -> None: - """Remove expired session tokens.""" now = time.time() expired = [token for token, data in self._sessions.items() if now > data["expires_at"]] for token in expired: @@ -316,13 +393,18 @@ class IamService: if cached: principal, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return principal self._maybe_reload() - record = self._users.get(access_key) + user_id = self._key_index.get(access_key) + if not user_id: + raise IamError("Unknown access key") + record = self._user_records.get(user_id) if not record: raise IamError("Unknown access key") self._check_expiry(access_key, record) @@ -332,22 +414,26 @@ class IamService: def secret_for_key(self, access_key: str) -> str: self._maybe_reload() - record = self._users.get(access_key) - if not record: + secret = self._key_secrets.get(access_key) + if not secret: raise IamError("Unknown access key") - self._check_expiry(access_key, record) - return record["secret_key"] + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + return secret - def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None: + def authorize(self, principal: Principal, bucket_name: str | None, action: str, *, object_key: str | None = None) -> None: action = self._normalize_action(action) if action not in ALLOWED_ACTIONS: raise IamError(f"Unknown action '{action}'") bucket_name = bucket_name or "*" normalized = bucket_name.lower() if bucket_name != "*" else bucket_name - if not self._is_allowed(principal, normalized, action): + if not self._is_allowed(principal, normalized, action, object_key=object_key): raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'") - def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]: + def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str], *, object_key: str | None = None) -> Dict[str, bool]: self._maybe_reload() bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*") normalized_actions = {a: self._normalize_action(a) for a in actions} @@ -356,37 +442,53 @@ class IamService: if canonical not in ALLOWED_ACTIONS: results[original] = False else: - results[original] = self._is_allowed(principal, bucket_name, canonical) + results[original] = self._is_allowed(principal, bucket_name, canonical, object_key=object_key) return results def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]: return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")] - def _is_allowed(self, principal: Principal, bucket_name: str, action: str) -> bool: + def _is_allowed(self, principal: Principal, bucket_name: str, action: str, *, object_key: str | None = None) -> bool: bucket_name = bucket_name.lower() for policy in principal.policies: if policy.bucket not in {"*", bucket_name}: continue - if "*" in policy.actions or action in policy.actions: - return True - if "iam:*" in policy.actions and action.startswith("iam:"): - return True + action_match = "*" in policy.actions or action in policy.actions + if not action_match and "iam:*" in policy.actions and action.startswith("iam:"): + action_match = True + if not action_match: + continue + if object_key is not None and policy.prefix != "*": + prefix = policy.prefix.rstrip("*") + if not object_key.startswith(prefix): + continue + return True return False def list_users(self) -> List[Dict[str, Any]]: listing: List[Dict[str, Any]] = [] - for access_key, record in self._users.items(): - listing.append( - { - "access_key": access_key, - "display_name": record["display_name"], - "expires_at": record.get("expires_at"), - "policies": [ - {"bucket": policy.bucket, "actions": sorted(policy.actions)} - for policy in record["policies"] - ], - } - ) + for user_id, record in self._user_records.items(): + access_keys = [] + for key_info in record.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) + user_entry: Dict[str, Any] = { + "user_id": user_id, + "display_name": record["display_name"], + "enabled": record.get("enabled", True), + "expires_at": record.get("expires_at"), + "access_keys": access_keys, + "policies": [ + {**{"bucket": policy.bucket, "actions": sorted(policy.actions)}, **({"prefix": policy.prefix} if policy.prefix != "*" else {})} + for policy in record["policies"] + ], + } + if access_keys: + user_entry["access_key"] = access_keys[0]["access_key"] + listing.append(user_entry) return listing def create_user( @@ -397,20 +499,33 @@ class IamService: access_key: str | None = None, secret_key: str | None = None, expires_at: str | None = None, + user_id: str | None = None, ) -> Dict[str, str]: access_key = (access_key or self._generate_access_key()).strip() if not access_key: raise IamError("Access key cannot be empty") - if access_key in self._users: + if access_key in self._key_index: raise IamError("Access key already exists") if expires_at: self._validate_expires_at(expires_at) secret_key = secret_key or self._generate_secret_key() sanitized_policies = self._prepare_policy_payload(policies) + user_id = user_id or self._generate_user_id() + if user_id in self._user_records: + raise IamError("User ID already exists") + now_iso = datetime.now(timezone.utc).isoformat() record: Dict[str, Any] = { - "access_key": access_key, - "secret_key": secret_key, + "user_id": user_id, "display_name": display_name or access_key, + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": now_iso, + } + ], "policies": sanitized_policies, } if expires_at: @@ -418,12 +533,108 @@ class IamService: self._raw_config.setdefault("users", []).append(record) self._save() self._load() - return {"access_key": access_key, "secret_key": secret_key} + return {"user_id": user_id, "access_key": access_key, "secret_key": secret_key} + + def create_access_key(self, identifier: str) -> Dict[str, str]: + user_raw, _ = self._resolve_raw_user(identifier) + new_access_key = self._generate_access_key() + new_secret_key = self._generate_secret_key() + now_iso = datetime.now(timezone.utc).isoformat() + key_entry = { + "access_key": new_access_key, + "secret_key": new_secret_key, + "status": "active", + "created_at": now_iso, + } + user_raw.setdefault("access_keys", []).append(key_entry) + self._save() + self._load() + return {"access_key": new_access_key, "secret_key": new_secret_key} + + def delete_access_key(self, access_key: str) -> None: + user_raw, _ = self._resolve_raw_user(access_key) + keys = user_raw.get("access_keys", []) + if len(keys) <= 1: + raise IamError("Cannot delete the only access key for a user") + remaining = [k for k in keys if k["access_key"] != access_key] + if len(remaining) == len(keys): + raise IamError("Access key not found") + user_raw["access_keys"] = remaining + self._save() + self._principal_cache.pop(access_key, None) + self._secret_key_cache.pop(access_key, None) + from .s3_api import clear_signing_key_cache + clear_signing_key_cache() + self._load() + + def disable_user(self, identifier: str) -> None: + user_raw, _ = self._resolve_raw_user(identifier) + user_raw["enabled"] = False + self._save() + for key_info in user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) + from .s3_api import clear_signing_key_cache + clear_signing_key_cache() + self._load() + + def enable_user(self, identifier: str) -> None: + user_raw, _ = self._resolve_raw_user(identifier) + user_raw["enabled"] = True + self._save() + self._load() + + def get_user_by_id(self, user_id: str) -> Dict[str, Any]: + record = self._user_records.get(user_id) + if not record: + raise IamError("User not found") + access_keys = [] + for key_info in record.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) + return { + "user_id": user_id, + "display_name": record["display_name"], + "enabled": record.get("enabled", True), + "expires_at": record.get("expires_at"), + "access_keys": access_keys, + "policies": [ + {"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix} + for p in record["policies"] + ], + } + + def get_user_policies(self, identifier: str) -> List[Dict[str, Any]]: + _, user_id = self._resolve_raw_user(identifier) + record = self._user_records.get(user_id) + if not record: + raise IamError("User not found") + return [ + {**{"bucket": p.bucket, "actions": sorted(p.actions)}, **({"prefix": p.prefix} if p.prefix != "*" else {})} + for p in record["policies"] + ] + + def resolve_user_id(self, identifier: str) -> str: + if identifier in self._user_records: + return identifier + user_id = self._key_index.get(identifier) + if user_id: + return user_id + raise IamError("User not found") def rotate_secret(self, access_key: str) -> str: - user = self._get_raw_user(access_key) + user_raw, _ = self._resolve_raw_user(access_key) new_secret = self._generate_secret_key() - user["secret_key"] = new_secret + for key_info in user_raw.get("access_keys", []): + if key_info["access_key"] == access_key: + key_info["secret_key"] = new_secret + break + else: + raise IamError("Access key not found") self._save() self._principal_cache.pop(access_key, None) self._secret_key_cache.pop(access_key, None) @@ -433,8 +644,8 @@ class IamService: return new_secret def update_user(self, access_key: str, display_name: str) -> None: - user = self._get_raw_user(access_key) - user["display_name"] = display_name + user_raw, _ = self._resolve_raw_user(access_key) + user_raw["display_name"] = display_name self._save() self._load() @@ -442,32 +653,43 @@ class IamService: users = self._raw_config.get("users", []) if len(users) <= 1: raise IamError("Cannot delete the only user") - remaining = [user for user in users if user["access_key"] != access_key] - if len(remaining) == len(users): + _, target_user_id = self._resolve_raw_user(access_key) + target_user_raw = None + remaining = [] + for u in users: + if u.get("user_id") == target_user_id: + target_user_raw = u + else: + remaining.append(u) + if target_user_raw is None: raise IamError("User not found") self._raw_config["users"] = remaining self._save() - self._principal_cache.pop(access_key, None) - self._secret_key_cache.pop(access_key, None) + for key_info in target_user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) from .s3_api import clear_signing_key_cache clear_signing_key_cache() self._load() def update_user_expiry(self, access_key: str, expires_at: str | None) -> None: - user = self._get_raw_user(access_key) + user_raw, _ = self._resolve_raw_user(access_key) if expires_at: self._validate_expires_at(expires_at) - user["expires_at"] = expires_at + user_raw["expires_at"] = expires_at else: - user.pop("expires_at", None) + user_raw.pop("expires_at", None) self._save() - self._principal_cache.pop(access_key, None) - self._secret_key_cache.pop(access_key, None) + for key_info in user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) self._load() def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None: - user = self._get_raw_user(access_key) - user["policies"] = self._prepare_policy_payload(policies) + user_raw, _ = self._resolve_raw_user(access_key) + user_raw["policies"] = self._prepare_policy_payload(policies) self._save() self._load() @@ -482,6 +704,52 @@ class IamService: raise IamError("Cannot decrypt IAM config. SECRET_KEY may have changed. Use 'python run.py reset-cred' to reset credentials.") return raw_bytes.decode("utf-8") + def _is_v2_config(self, raw: Dict[str, Any]) -> bool: + return raw.get("version", 1) >= _CONFIG_VERSION + + def _migrate_v1_to_v2(self, raw: Dict[str, Any]) -> Dict[str, Any]: + migrated_users = [] + now_iso = datetime.now(timezone.utc).isoformat() + for user in raw.get("users", []): + old_policies = user.get("policies", []) + expanded_policies = [] + for p in old_policies: + raw_actions = p.get("actions", []) + if isinstance(raw_actions, str): + raw_actions = [raw_actions] + action_set: Set[str] = set() + for a in raw_actions: + canonical = self._normalize_action(a) + if canonical == "*": + action_set = set(ALLOWED_ACTIONS) + break + if canonical: + action_set.add(canonical) + action_set = _expand_v1_actions(action_set) + expanded_policies.append({ + "bucket": p.get("bucket", "*"), + "actions": sorted(action_set), + "prefix": p.get("prefix", "*"), + }) + migrated_user: Dict[str, Any] = { + "user_id": user["access_key"], + "display_name": user.get("display_name", user["access_key"]), + "enabled": True, + "access_keys": [ + { + "access_key": user["access_key"], + "secret_key": user["secret_key"], + "status": "active", + "created_at": now_iso, + } + ], + "policies": expanded_policies, + } + if user.get("expires_at"): + migrated_user["expires_at"] = user["expires_at"] + migrated_users.append(migrated_user) + return {"version": _CONFIG_VERSION, "users": migrated_users} + def _load(self) -> None: try: self._last_load_time = self.config_path.stat().st_mtime @@ -500,35 +768,67 @@ class IamService: raise IamError(f"Failed to load IAM config: {e}") was_plaintext = not raw_bytes.startswith(_IAM_ENCRYPTED_PREFIX) + was_v1 = not self._is_v2_config(raw) + + if was_v1: + raw = self._migrate_v1_to_v2(raw) + + user_records: Dict[str, Dict[str, Any]] = {} + key_index: Dict[str, str] = {} + key_secrets: Dict[str, str] = {} + key_status_map: Dict[str, str] = {} - users: Dict[str, Dict[str, Any]] = {} for user in raw.get("users", []): + user_id = user["user_id"] policies = self._build_policy_objects(user.get("policies", [])) - user_record: Dict[str, Any] = { - "secret_key": user["secret_key"], - "display_name": user.get("display_name", user["access_key"]), + access_keys_raw = user.get("access_keys", []) + access_keys_info = [] + for key_entry in access_keys_raw: + ak = key_entry["access_key"] + sk = key_entry["secret_key"] + status = key_entry.get("status", "active") + key_index[ak] = user_id + key_secrets[ak] = sk + key_status_map[ak] = status + access_keys_info.append({ + "access_key": ak, + "secret_key": sk, + "status": status, + "created_at": key_entry.get("created_at"), + }) + record: Dict[str, Any] = { + "display_name": user.get("display_name", user_id), + "enabled": user.get("enabled", True), "policies": policies, + "access_keys": access_keys_info, } if user.get("expires_at"): - user_record["expires_at"] = user["expires_at"] - users[user["access_key"]] = user_record - if not users: - raise IamError("IAM configuration contains no users") - self._users = users - raw_users: List[Dict[str, Any]] = [] - for entry in raw.get("users", []): - raw_entry: Dict[str, Any] = { - "access_key": entry["access_key"], - "secret_key": entry["secret_key"], - "display_name": entry.get("display_name", entry["access_key"]), - "policies": entry.get("policies", []), - } - if entry.get("expires_at"): - raw_entry["expires_at"] = entry["expires_at"] - raw_users.append(raw_entry) - self._raw_config = {"users": raw_users} + record["expires_at"] = user["expires_at"] + user_records[user_id] = record - if was_plaintext and self._fernet: + if not user_records: + raise IamError("IAM configuration contains no users") + + self._user_records = user_records + self._key_index = key_index + self._key_secrets = key_secrets + self._key_status = key_status_map + + raw_users: List[Dict[str, Any]] = [] + for user in raw.get("users", []): + raw_entry: Dict[str, Any] = { + "user_id": user["user_id"], + "display_name": user.get("display_name", user["user_id"]), + "enabled": user.get("enabled", True), + "access_keys": user.get("access_keys", []), + "policies": user.get("policies", []), + } + if user.get("expires_at"): + raw_entry["expires_at"] = user["expires_at"] + raw_users.append(raw_entry) + self._raw_config = {"version": _CONFIG_VERSION, "users": raw_users} + + if was_v1 or (was_plaintext and self._fernet): self._save() def _save(self) -> None: @@ -547,19 +847,30 @@ class IamService: def config_summary(self) -> Dict[str, Any]: return { "path": str(self.config_path), - "user_count": len(self._users), + "user_count": len(self._user_records), "allowed_actions": sorted(ALLOWED_ACTIONS), } def export_config(self, mask_secrets: bool = True) -> Dict[str, Any]: - payload: Dict[str, Any] = {"users": []} + payload: Dict[str, Any] = {"version": _CONFIG_VERSION, "users": []} for user in self._raw_config.get("users", []): + access_keys = [] + for key_info in user.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "secret_key": "\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022" if mask_secrets else key_info["secret_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) record: Dict[str, Any] = { - "access_key": user["access_key"], - "secret_key": "••••••••••" if mask_secrets else user["secret_key"], + "user_id": user["user_id"], "display_name": user["display_name"], + "enabled": user.get("enabled", True), + "access_keys": access_keys, "policies": user["policies"], } + if access_keys: + record["access_key"] = access_keys[0]["access_key"] if user.get("expires_at"): record["expires_at"] = user["expires_at"] payload["users"].append(record) @@ -569,6 +880,7 @@ class IamService: entries: List[Policy] = [] for policy in policies: bucket = str(policy.get("bucket", "*")).lower() + prefix = str(policy.get("prefix", "*")) raw_actions = policy.get("actions", []) if isinstance(raw_actions, str): raw_actions = [raw_actions] @@ -581,7 +893,7 @@ class IamService: if canonical: action_set.add(canonical) if action_set: - entries.append(Policy(bucket=bucket, actions=action_set)) + entries.append(Policy(bucket=bucket, actions=action_set, prefix=prefix)) return entries def _prepare_policy_payload(self, policies: Optional[Sequence[Dict[str, Any]]]) -> List[Dict[str, Any]]: @@ -589,12 +901,14 @@ class IamService: policies = ( { "bucket": "*", - "actions": ["list", "read", "write", "delete", "share", "policy"], + "actions": ["list", "read", "write", "delete", "share", "policy", + "create_bucket", "delete_bucket"], }, ) sanitized: List[Dict[str, Any]] = [] for policy in policies: bucket = str(policy.get("bucket", "*")).lower() + prefix = str(policy.get("prefix", "*")) raw_actions = policy.get("actions", []) if isinstance(raw_actions, str): raw_actions = [raw_actions] @@ -608,7 +922,10 @@ class IamService: action_set.add(canonical) if not action_set: continue - sanitized.append({"bucket": bucket, "actions": sorted(action_set)}) + entry: Dict[str, Any] = {"bucket": bucket, "actions": sorted(action_set)} + if prefix != "*": + entry["prefix"] = prefix + sanitized.append(entry) if not sanitized: raise IamError("At least one policy with valid actions is required") return sanitized @@ -633,12 +950,23 @@ class IamService: access_key = os.environ.get("ADMIN_ACCESS_KEY", "").strip() or secrets.token_hex(12) secret_key = os.environ.get("ADMIN_SECRET_KEY", "").strip() or secrets.token_urlsafe(32) custom_keys = bool(os.environ.get("ADMIN_ACCESS_KEY", "").strip()) + user_id = self._generate_user_id() + now_iso = datetime.now(timezone.utc).isoformat() default = { + "version": _CONFIG_VERSION, "users": [ { - "access_key": access_key, - "secret_key": secret_key, + "user_id": user_id, "display_name": "Local Admin", + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": now_iso, + } + ], "policies": [ {"bucket": "*", "actions": list(ALLOWED_ACTIONS)} ], @@ -660,6 +988,7 @@ class IamService: else: print(f"Access Key: {access_key}") print(f"Secret Key: {secret_key}") + print(f"User ID: {user_id}") print(f"{'='*60}") if self._fernet: print("IAM config is encrypted at rest.") @@ -682,30 +1011,46 @@ class IamService: def _generate_secret_key(self) -> str: return secrets.token_urlsafe(24) - def _get_raw_user(self, access_key: str) -> Dict[str, Any]: + def _generate_user_id(self) -> str: + return f"u-{secrets.token_hex(8)}" + + def _resolve_raw_user(self, identifier: str) -> Tuple[Dict[str, Any], str]: for user in self._raw_config.get("users", []): - if user["access_key"] == access_key: - return user + if user.get("user_id") == identifier: + return user, identifier + for user in self._raw_config.get("users", []): + for key_info in user.get("access_keys", []): + if key_info["access_key"] == identifier: + return user, user["user_id"] raise IamError("User not found") + def _get_raw_user(self, access_key: str) -> Dict[str, Any]: + user, _ = self._resolve_raw_user(access_key) + return user + def get_secret_key(self, access_key: str) -> str | None: now = time.time() cached = self._secret_key_cache.get(access_key) if cached: secret_key, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return secret_key self._maybe_reload() - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) - secret_key = record["secret_key"] - self._secret_key_cache[access_key] = (secret_key, now) - return secret_key + secret = self._key_secrets.get(access_key) + if secret: + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + self._secret_key_cache[access_key] = (secret, now) + return secret return None def get_principal(self, access_key: str) -> Principal | None: @@ -714,16 +1059,20 @@ class IamService: if cached: principal, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return principal self._maybe_reload() - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) - principal = self._build_principal(access_key, record) - self._principal_cache[access_key] = (principal, now) - return principal + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + principal = self._build_principal(access_key, record) + self._principal_cache[access_key] = (principal, now) + return principal return None diff --git a/app/s3_api.py b/app/s3_api.py index af54bed..2c151ee 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -301,7 +301,12 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if _HAS_RUST: query_params = list(req.args.items(multi=True)) - header_values = [(h, req.headers.get(h) or "") for h in signed_headers_str.split(";")] + header_values = [] + for h in signed_headers_str.split(";"): + val = req.headers.get(h) or "" + if h.lower() == "expect" and val == "": + val = "100-continue" + header_values.append((h, val)) if not _rc.verify_sigv4_signature( req.method, canonical_uri, query_params, signed_headers_str, header_values, payload_hash, amz_date, date_stamp, region, @@ -390,7 +395,12 @@ def _verify_sigv4_query(req: Any) -> Principal | None: if _HAS_RUST: query_params = [(k, v) for k, v in req.args.items(multi=True) if k != "X-Amz-Signature"] - header_values = [(h, req.headers.get(h) or "") for h in signed_headers_str.split(";")] + header_values = [] + for h in signed_headers_str.split(";"): + val = req.headers.get(h) or "" + if h.lower() == "expect" and val == "": + val = "100-continue" + header_values.append((h, val)) if not _rc.verify_sigv4_signature( req.method, canonical_uri, query_params, signed_headers_str, header_values, "UNSIGNED-PAYLOAD", amz_date, date_stamp, region, @@ -488,7 +498,7 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti iam_error: IamError | None = None if principal is not None: try: - _iam().authorize(principal, bucket_name, action) + _iam().authorize(principal, bucket_name, action, object_key=object_key) iam_allowed = True except IamError as exc: iam_error = exc @@ -1135,7 +1145,7 @@ def _bucket_versioning_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "versioning") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1182,7 +1192,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "tagging") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1347,7 +1357,7 @@ def _bucket_cors_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "cors") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1400,7 +1410,7 @@ def _bucket_encryption_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "encryption") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1475,7 +1485,7 @@ def _bucket_acl_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "share") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1718,12 +1728,12 @@ def _bucket_lifecycle_handler(bucket_name: str) -> Response: """Handle bucket lifecycle configuration (GET/PUT/DELETE /?lifecycle).""" if request.method not in {"GET", "PUT", "DELETE"}: return _method_not_allowed(["GET", "PUT", "DELETE"]) - + principal, error = _require_principal() if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "lifecycle") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -1882,12 +1892,12 @@ def _bucket_quota_handler(bucket_name: str) -> Response: """Handle bucket quota configuration (GET/PUT/DELETE /?quota).""" if request.method not in {"GET", "PUT", "DELETE"}: return _method_not_allowed(["GET", "PUT", "DELETE"]) - + principal, error = _require_principal() if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "quota") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -1964,7 +1974,7 @@ def _bucket_object_lock_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "object_lock") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2010,7 +2020,7 @@ def _bucket_notification_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "notification") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2106,7 +2116,7 @@ def _bucket_logging_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "logging") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2248,7 +2258,7 @@ def _object_retention_handler(bucket_name: str, object_key: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + _authorize_action(principal, bucket_name, "object_lock", object_key=object_key) except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2324,7 +2334,7 @@ def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + _authorize_action(principal, bucket_name, "object_lock", object_key=object_key) except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2657,7 +2667,7 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write") + _authorize_action(principal, bucket_name, "create_bucket") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) try: @@ -2674,7 +2684,7 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "delete") + _authorize_action(principal, bucket_name, "delete_bucket") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) try: @@ -3229,7 +3239,7 @@ def _bucket_replication_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "replication") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -3312,7 +3322,7 @@ def _bucket_website_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "website") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() diff --git a/app/storage.py b/app/storage.py index 88fb303..fa66332 100644 --- a/app/storage.py +++ b/app/storage.py @@ -406,6 +406,10 @@ class ObjectStorage: self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 self._stats_mem_time[bucket_id] = time.monotonic() self._stats_dirty.add(bucket_id) + needs_immediate = data["objects"] == 0 and objects_delta < 0 + if needs_immediate: + self._flush_stats() + else: self._schedule_stats_flush() def _schedule_stats_flush(self) -> None: @@ -710,6 +714,73 @@ class ObjectStorage: next_continuation_token=next_token, ) + def iter_objects_shallow( + self, + bucket_name: str, + *, + prefix: str = "", + delimiter: str = "/", + ) -> Generator[tuple[str, ObjectMeta | str], None, None]: + bucket_path = self._bucket_path(bucket_name) + if not bucket_path.exists(): + raise BucketNotFoundError("Bucket does not exist") + bucket_id = bucket_path.name + + target_dir = bucket_path + if prefix: + safe_prefix_path = Path(prefix.rstrip("/")) + if ".." in safe_prefix_path.parts: + return + target_dir = bucket_path / safe_prefix_path + try: + resolved = target_dir.resolve() + bucket_resolved = bucket_path.resolve() + if not str(resolved).startswith(str(bucket_resolved) + os.sep) and resolved != bucket_resolved: + return + except (OSError, ValueError): + return + + if not target_dir.exists() or not target_dir.is_dir(): + return + + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + meta_cache: Dict[str, str] = {} + if etag_index_path.exists(): + try: + with open(etag_index_path, 'r', encoding='utf-8') as f: + meta_cache = json.load(f) + except (OSError, json.JSONDecodeError): + pass + + try: + with os.scandir(str(target_dir)) as it: + for entry in it: + name = entry.name + if name in self.INTERNAL_FOLDERS: + continue + if entry.is_dir(follow_symlinks=False): + yield ("folder", prefix + name + delimiter) + elif entry.is_file(follow_symlinks=False): + key = prefix + name + try: + st = entry.stat() + etag = meta_cache.get(key) + if etag is None: + safe_key = PurePosixPath(key) + meta = self._read_metadata(bucket_id, Path(safe_key)) + etag = meta.get("__etag__") if meta else None + yield ("object", ObjectMeta( + key=key, + size=st.st_size, + last_modified=datetime.fromtimestamp(st.st_mtime, timezone.utc), + etag=etag, + metadata=None, + )) + except OSError: + pass + except OSError: + return + def _shallow_via_full_scan( self, bucket_name: str, diff --git a/app/ui.py b/app/ui.py index 8628616..b1a0947 100644 --- a/app/ui.py +++ b/app/ui.py @@ -618,20 +618,77 @@ def stream_bucket_objects(bucket_name: str): prefix = request.args.get("prefix") or None delimiter = request.args.get("delimiter") or None + storage = _storage() try: - client = get_session_s3_client() - except (PermissionError, RuntimeError) as exc: - return jsonify({"error": str(exc)}), 403 - - versioning_enabled = get_versioning_via_s3(client, bucket_name) + versioning_enabled = storage.is_versioning_enabled(bucket_name) + except StorageError: + versioning_enabled = False url_templates = build_url_templates(bucket_name) display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") + def generate(): + yield json.dumps({ + "type": "meta", + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + }) + "\n" + yield json.dumps({"type": "count", "total_count": 0}) + "\n" + + running_count = 0 + try: + if delimiter: + for item_type, item in storage.iter_objects_shallow( + bucket_name, prefix=prefix or "", delimiter=delimiter, + ): + if item_type == "folder": + yield json.dumps({"type": "folder", "prefix": item}) + "\n" + else: + last_mod = item.last_modified + yield json.dumps({ + "type": "object", + "key": item.key, + "size": item.size, + "last_modified": last_mod.isoformat(), + "last_modified_display": _format_datetime_display(last_mod, display_tz), + "last_modified_iso": _format_datetime_iso(last_mod, display_tz), + "etag": item.etag or "", + }) + "\n" + running_count += 1 + if running_count % 1000 == 0: + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + else: + continuation_token = None + while True: + result = storage.list_objects( + bucket_name, + max_keys=1000, + continuation_token=continuation_token, + prefix=prefix, + ) + for obj in result.objects: + last_mod = obj.last_modified + yield json.dumps({ + "type": "object", + "key": obj.key, + "size": obj.size, + "last_modified": last_mod.isoformat(), + "last_modified_display": _format_datetime_display(last_mod, display_tz), + "last_modified_iso": _format_datetime_iso(last_mod, display_tz), + "etag": obj.etag or "", + }) + "\n" + running_count += len(result.objects) + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + if not result.is_truncated: + break + continuation_token = result.next_continuation_token + except StorageError as exc: + yield json.dumps({"type": "error", "error": str(exc)}) + "\n" + return + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + yield json.dumps({"type": "done"}) + "\n" + return Response( - stream_objects_ndjson( - client, bucket_name, prefix, url_templates, display_tz, versioning_enabled, - delimiter=delimiter, - ), + generate(), mimetype='application/x-ndjson', headers={ 'Cache-Control': 'no-cache', @@ -4041,6 +4098,117 @@ def get_peer_sync_stats(site_id: str): return jsonify(stats) +@ui_bp.get("/system") +def system_dashboard(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied: System page requires admin permissions", "danger") + return redirect(url_for("ui.buckets_overview")) + + import platform as _platform + import sys + from app.version import APP_VERSION + + try: + import myfsio_core as _rc + has_rust = True + except ImportError: + has_rust = False + + gc = current_app.extensions.get("gc") + gc_status = gc.get_status() if gc else {"enabled": False} + gc_history_records = [] + if gc: + raw = gc.get_history(limit=10, offset=0) + for rec in raw: + r = rec.get("result", {}) + total_freed = r.get("temp_bytes_freed", 0) + r.get("multipart_bytes_freed", 0) + r.get("orphaned_version_bytes_freed", 0) + rec["bytes_freed_display"] = _format_bytes(total_freed) + rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + gc_history_records.append(rec) + + checker = current_app.extensions.get("integrity") + integrity_status = checker.get_status() if checker else {"enabled": False} + integrity_history_records = [] + if checker: + raw = checker.get_history(limit=10, offset=0) + for rec in raw: + rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + integrity_history_records.append(rec) + + features = [ + {"label": "Encryption (SSE-S3)", "enabled": current_app.config.get("ENCRYPTION_ENABLED", False)}, + {"label": "KMS", "enabled": current_app.config.get("KMS_ENABLED", False)}, + {"label": "Versioning Lifecycle", "enabled": current_app.config.get("LIFECYCLE_ENABLED", False)}, + {"label": "Metrics History", "enabled": current_app.config.get("METRICS_HISTORY_ENABLED", False)}, + {"label": "Operation Metrics", "enabled": current_app.config.get("OPERATION_METRICS_ENABLED", False)}, + {"label": "Site Sync", "enabled": current_app.config.get("SITE_SYNC_ENABLED", False)}, + {"label": "Website Hosting", "enabled": current_app.config.get("WEBSITE_HOSTING_ENABLED", False)}, + {"label": "Garbage Collection", "enabled": current_app.config.get("GC_ENABLED", False)}, + {"label": "Integrity Scanner", "enabled": current_app.config.get("INTEGRITY_ENABLED", False)}, + ] + + return render_template( + "system.html", + principal=principal, + app_version=APP_VERSION, + storage_root=current_app.config.get("STORAGE_ROOT", "./data"), + platform=_platform.platform(), + python_version=sys.version.split()[0], + has_rust=has_rust, + features=features, + gc_status=gc_status, + gc_history=gc_history_records, + integrity_status=integrity_status, + integrity_history=integrity_history_records, + ) + + +@ui_bp.post("/system/gc/run") +def system_gc_run(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"error": "GC is not enabled"}), 400 + + payload = request.get_json(silent=True) or {} + original_dry_run = gc.dry_run + if "dry_run" in payload: + gc.dry_run = bool(payload["dry_run"]) + try: + result = gc.run_now() + finally: + gc.dry_run = original_dry_run + return jsonify(result.to_dict()) + + +@ui_bp.post("/system/integrity/run") +def system_integrity_run(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"error": "Integrity checker is not enabled"}), 400 + + payload = request.get_json(silent=True) or {} + result = checker.run_now( + auto_heal=payload.get("auto_heal"), + dry_run=payload.get("dry_run"), + ) + return jsonify(result.to_dict()) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/app/version.py b/app/version.py index d429237..77fc8ae 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.9" +APP_VERSION = "0.4.0" def get_version() -> str: diff --git a/docs.md b/docs.md index cc9d41b..ba36f68 100644 --- a/docs.md +++ b/docs.md @@ -180,9 +180,9 @@ All configuration is done via environment variables. The table below lists every | Variable | Default | Notes | | --- | --- | --- | -| `SERVER_THREADS` | `0` (auto) | Waitress worker threads (1-64). Set to `0` for auto-calculation based on CPU cores (×2). | -| `SERVER_CONNECTION_LIMIT` | `0` (auto) | Maximum concurrent connections (10-1000). Set to `0` for auto-calculation based on available RAM. | -| `SERVER_BACKLOG` | `0` (auto) | TCP listen backlog (64-4096). Set to `0` for auto-calculation (connection_limit × 2). | +| `SERVER_THREADS` | `0` (auto) | Granian blocking threads (1-64). Set to `0` for auto-calculation based on CPU cores (×2). | +| `SERVER_CONNECTION_LIMIT` | `0` (auto) | Maximum concurrent requests per worker (10-1000). Set to `0` for auto-calculation based on available RAM. | +| `SERVER_BACKLOG` | `0` (auto) | TCP listen backlog (128-4096). Set to `0` for auto-calculation (connection_limit × 2). | | `SERVER_CHANNEL_TIMEOUT` | `120` | Seconds before idle connections are closed (10-300). | ### Logging @@ -339,7 +339,7 @@ Before deploying to production, ensure you: 4. **Enable HTTPS** - Use a reverse proxy (nginx, Cloudflare) with TLS termination 5. **Review rate limits** - Adjust `RATE_LIMIT_DEFAULT` based on your needs 6. **Secure master keys** - Back up `ENCRYPTION_MASTER_KEY_PATH` if using encryption -7. **Use `--prod` flag** - Runs with Waitress instead of Flask dev server +7. **Use `--prod` flag** - Runs with Granian instead of Flask dev server 8. **Set credential expiry** - Assign `expires_at` to non-admin users for time-limited access ### Proxy Configuration @@ -758,7 +758,7 @@ MyFSIO implements a comprehensive Identity and Access Management (IAM) system th - **Create user**: supply a display name, optional JSON inline policy array, and optional credential expiry date. - **Set expiry**: assign an expiration date to any user's credentials. Expired credentials are rejected at authentication time. The UI shows expiry badges and preset durations (1h, 24h, 7d, 30d, 90d). - **Rotate secret**: generates a new secret key; the UI surfaces it once. - - **Policy editor**: select a user, paste an array of objects (`{"bucket": "*", "actions": ["list", "read"]}`), and submit. Alias support includes AWS-style verbs (e.g., `s3:GetObject`). + - **Policy editor**: select a user, paste an array of objects (`{"bucket": "*", "actions": ["list", "read"]}`), and submit. An optional `"prefix"` field restricts object-level actions to a key prefix (e.g., `"uploads/"`). Alias support includes AWS-style verbs (e.g., `s3:GetObject`). 3. Wildcard action `iam:*` is supported for admin user definitions. > **Breaking Change (v0.2.0+):** Previous versions used fixed default credentials (`localadmin/localadmin`). If upgrading from an older version, your existing credentials remain unchanged, but new installations will generate random credentials. @@ -797,13 +797,23 @@ Both layers are evaluated for each request. A user must have permission in their | --- | --- | --- | | `list` | List buckets and objects | `s3:ListBucket`, `s3:ListAllMyBuckets`, `s3:ListBucketVersions`, `s3:ListMultipartUploads`, `s3:ListParts` | | `read` | Download objects, get metadata | `s3:GetObject`, `s3:GetObjectVersion`, `s3:GetObjectTagging`, `s3:GetObjectVersionTagging`, `s3:GetObjectAcl`, `s3:GetBucketVersioning`, `s3:HeadObject`, `s3:HeadBucket` | -| `write` | Upload objects, create buckets, manage tags | `s3:PutObject`, `s3:CreateBucket`, `s3:PutObjectTagging`, `s3:PutBucketVersioning`, `s3:CreateMultipartUpload`, `s3:UploadPart`, `s3:CompleteMultipartUpload`, `s3:AbortMultipartUpload`, `s3:CopyObject` | -| `delete` | Remove objects, versions, and buckets | `s3:DeleteObject`, `s3:DeleteObjectVersion`, `s3:DeleteBucket`, `s3:DeleteObjectTagging` | +| `write` | Upload objects, manage object tags | `s3:PutObject`, `s3:PutObjectTagging`, `s3:CreateMultipartUpload`, `s3:UploadPart`, `s3:CompleteMultipartUpload`, `s3:AbortMultipartUpload`, `s3:CopyObject` | +| `delete` | Remove objects and versions | `s3:DeleteObject`, `s3:DeleteObjectVersion`, `s3:DeleteObjectTagging` | +| `create_bucket` | Create new buckets | `s3:CreateBucket` | +| `delete_bucket` | Delete buckets | `s3:DeleteBucket` | | `share` | Manage Access Control Lists (ACLs) | `s3:PutObjectAcl`, `s3:PutBucketAcl`, `s3:GetBucketAcl` | | `policy` | Manage bucket policies | `s3:PutBucketPolicy`, `s3:GetBucketPolicy`, `s3:DeleteBucketPolicy` | +| `versioning` | Manage bucket versioning configuration | `s3:GetBucketVersioning`, `s3:PutBucketVersioning` | +| `tagging` | Manage bucket-level tags | `s3:GetBucketTagging`, `s3:PutBucketTagging`, `s3:DeleteBucketTagging` | +| `encryption` | Manage bucket encryption configuration | `s3:GetEncryptionConfiguration`, `s3:PutEncryptionConfiguration`, `s3:DeleteEncryptionConfiguration` | | `lifecycle` | Manage lifecycle rules | `s3:GetLifecycleConfiguration`, `s3:PutLifecycleConfiguration`, `s3:DeleteLifecycleConfiguration`, `s3:GetBucketLifecycle`, `s3:PutBucketLifecycle` | | `cors` | Manage CORS configuration | `s3:GetBucketCors`, `s3:PutBucketCors`, `s3:DeleteBucketCors` | | `replication` | Configure and manage replication | `s3:GetReplicationConfiguration`, `s3:PutReplicationConfiguration`, `s3:DeleteReplicationConfiguration`, `s3:ReplicateObject`, `s3:ReplicateTags`, `s3:ReplicateDelete` | +| `quota` | Manage bucket storage quotas | `s3:GetBucketQuota`, `s3:PutBucketQuota`, `s3:DeleteBucketQuota` | +| `object_lock` | Manage object lock, retention, and legal holds | `s3:GetObjectLockConfiguration`, `s3:PutObjectLockConfiguration`, `s3:PutObjectRetention`, `s3:GetObjectRetention`, `s3:PutObjectLegalHold`, `s3:GetObjectLegalHold` | +| `notification` | Manage bucket event notifications | `s3:GetBucketNotificationConfiguration`, `s3:PutBucketNotificationConfiguration`, `s3:DeleteBucketNotificationConfiguration` | +| `logging` | Manage bucket access logging | `s3:GetBucketLogging`, `s3:PutBucketLogging`, `s3:DeleteBucketLogging` | +| `website` | Manage static website hosting configuration | `s3:GetBucketWebsite`, `s3:PutBucketWebsite`, `s3:DeleteBucketWebsite` | #### IAM Actions (User Management) @@ -814,25 +824,31 @@ Both layers are evaluated for each request. A user must have permission in their | `iam:delete_user` | Delete IAM users | `iam:DeleteUser` | | `iam:rotate_key` | Rotate user secret keys | `iam:RotateAccessKey` | | `iam:update_policy` | Modify user policies | `iam:PutUserPolicy` | +| `iam:create_key` | Create additional access keys for a user | `iam:CreateAccessKey` | +| `iam:delete_key` | Delete an access key from a user | `iam:DeleteAccessKey` | +| `iam:get_user` | View user details and access keys | `iam:GetUser` | +| `iam:get_policy` | View user policy configuration | `iam:GetPolicy` | +| `iam:disable_user` | Temporarily disable/enable a user account | `iam:DisableUser` | | `iam:*` | **Admin wildcard** – grants all IAM actions | — | #### Wildcards | Wildcard | Scope | Description | | --- | --- | --- | -| `*` (in actions) | All S3 actions | Grants `list`, `read`, `write`, `delete`, `share`, `policy`, `lifecycle`, `cors`, `replication` | +| `*` (in actions) | All S3 actions | Grants all 19 S3 actions including `list`, `read`, `write`, `delete`, `create_bucket`, `delete_bucket`, `share`, `policy`, `versioning`, `tagging`, `encryption`, `lifecycle`, `cors`, `replication`, `quota`, `object_lock`, `notification`, `logging`, `website` | | `iam:*` | All IAM actions | Grants all `iam:*` actions for user management | | `*` (in bucket) | All buckets | Policy applies to every bucket | ### IAM Policy Structure -User policies are stored as a JSON array of policy objects. Each object specifies a bucket and the allowed actions: +User policies are stored as a JSON array of policy objects. Each object specifies a bucket, the allowed actions, and an optional prefix for object-level scoping: ```json [ { "bucket": "", - "actions": ["", "", ...] + "actions": ["", "", ...], + "prefix": "" } ] ``` @@ -840,12 +856,13 @@ User policies are stored as a JSON array of policy objects. Each object specifie **Fields:** - `bucket`: The bucket name (case-insensitive) or `*` for all buckets - `actions`: Array of action strings (simple names or AWS aliases) +- `prefix`: *(optional)* Restrict object-level actions to keys starting with this prefix. Defaults to `*` (all objects). Example: `"uploads/"` restricts to keys under `uploads/` ### Example User Policies **Full Administrator (complete system access):** ```json -[{"bucket": "*", "actions": ["list", "read", "write", "delete", "share", "policy", "lifecycle", "cors", "replication", "iam:*"]}] +[{"bucket": "*", "actions": ["list", "read", "write", "delete", "share", "policy", "create_bucket", "delete_bucket", "versioning", "tagging", "encryption", "lifecycle", "cors", "replication", "quota", "object_lock", "notification", "logging", "website", "iam:*"]}] ``` **Read-Only User (browse and download only):** @@ -858,6 +875,11 @@ User policies are stored as a JSON array of policy objects. Each object specifie [{"bucket": "user-bucket", "actions": ["list", "read", "write", "delete"]}] ``` +**Operator (data operations + bucket management, no config):** +```json +[{"bucket": "*", "actions": ["list", "read", "write", "delete", "create_bucket", "delete_bucket"]}] +``` + **Multiple Bucket Access (different permissions per bucket):** ```json [ @@ -867,9 +889,14 @@ User policies are stored as a JSON array of policy objects. Each object specifie ] ``` +**Prefix-Scoped Access (restrict to a folder inside a shared bucket):** +```json +[{"bucket": "shared-data", "actions": ["list", "read", "write", "delete"], "prefix": "team-a/"}] +``` + **IAM Manager (manage users but no data access):** ```json -[{"bucket": "*", "actions": ["iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy"]}] +[{"bucket": "*", "actions": ["iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy", "iam:create_key", "iam:delete_key", "iam:get_user", "iam:get_policy", "iam:disable_user"]}] ``` **Replication Operator (manage replication only):** @@ -889,10 +916,10 @@ User policies are stored as a JSON array of policy objects. Each object specifie **Bucket Administrator (full bucket config, no IAM access):** ```json -[{"bucket": "my-bucket", "actions": ["list", "read", "write", "delete", "policy", "lifecycle", "cors"]}] +[{"bucket": "my-bucket", "actions": ["list", "read", "write", "delete", "create_bucket", "delete_bucket", "share", "policy", "versioning", "tagging", "encryption", "lifecycle", "cors", "replication", "quota", "object_lock", "notification", "logging", "website"]}] ``` -**Upload-Only User (write but cannot read back):** +**Upload-Only User (write but cannot create/delete buckets):** ```json [{"bucket": "drop-box", "actions": ["write"]}] ``` @@ -967,6 +994,30 @@ curl -X POST http://localhost:5000/iam/users//expiry \ # Delete a user (requires iam:delete_user) curl -X DELETE http://localhost:5000/iam/users/ \ -H "X-Access-Key: ..." -H "X-Secret-Key: ..." + +# Get user details (requires iam:get_user) — via Admin API +curl http://localhost:5000/admin/iam/users/ \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Get user policies (requires iam:get_policy) — via Admin API +curl http://localhost:5000/admin/iam/users//policies \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Create additional access key for a user (requires iam:create_key) +curl -X POST http://localhost:5000/admin/iam/users//keys \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Delete an access key (requires iam:delete_key) +curl -X DELETE http://localhost:5000/admin/iam/users//keys/ \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Disable a user account (requires iam:disable_user) +curl -X POST http://localhost:5000/admin/iam/users//disable \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Re-enable a user account (requires iam:disable_user) +curl -X POST http://localhost:5000/admin/iam/users//enable \ + -H "Authorization: AWS4-HMAC-SHA256 ..." ``` ### Permission Precedence diff --git a/requirements.txt b/requirements.txt index 1813b33..8a29238 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ python-dotenv>=1.2.1 pytest>=9.0.2 requests>=2.32.5 boto3>=1.42.14 -waitress>=3.0.2 +granian>=2.2.0 psutil>=7.1.3 cryptography>=46.0.3 defusedxml>=0.7.1 diff --git a/run.py b/run.py index fbe110f..d4ea394 100644 --- a/run.py +++ b/run.py @@ -2,7 +2,9 @@ from __future__ import annotations import argparse +import atexit import os +import signal import sys import warnings import multiprocessing @@ -40,24 +42,42 @@ def _is_frozen() -> bool: return getattr(sys, 'frozen', False) or '__compiled__' in globals() -def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: - app = create_api_app() - if prod: - from waitress import serve - if config: - serve( - app, - host=_server_host(), - port=port, - ident="MyFSIO", - threads=config.server_threads, - connection_limit=config.server_connection_limit, - backlog=config.server_backlog, - channel_timeout=config.server_channel_timeout, - ) - else: - serve(app, host=_server_host(), port=port, ident="MyFSIO") +def _serve_granian(target: str, port: int, config: Optional[AppConfig] = None) -> None: + from granian import Granian + from granian.constants import Interfaces + from granian.http import HTTP1Settings + + kwargs: dict = { + "target": target, + "address": _server_host(), + "port": port, + "interface": Interfaces.WSGI, + "factory": True, + "workers": 1, + } + + if config: + kwargs["blocking_threads"] = config.server_threads + kwargs["backlog"] = config.server_backlog + kwargs["backpressure"] = config.server_connection_limit + kwargs["http1_settings"] = HTTP1Settings( + header_read_timeout=config.server_channel_timeout * 1000, + max_buffer_size=config.server_max_buffer_size, + ) else: + kwargs["http1_settings"] = HTTP1Settings( + max_buffer_size=1024 * 1024 * 128, + ) + + server = Granian(**kwargs) + server.serve() + + +def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: + if prod: + _serve_granian("app:create_api_app", port, config) + else: + app = create_api_app() debug = _is_debug_enabled() if debug: warnings.warn("DEBUG MODE ENABLED - DO NOT USE IN PRODUCTION", RuntimeWarning) @@ -65,23 +85,10 @@ def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) def serve_ui(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: - app = create_ui_app() if prod: - from waitress import serve - if config: - serve( - app, - host=_server_host(), - port=port, - ident="MyFSIO", - threads=config.server_threads, - connection_limit=config.server_connection_limit, - backlog=config.server_backlog, - channel_timeout=config.server_channel_timeout, - ) - else: - serve(app, host=_server_host(), port=port, ident="MyFSIO") + _serve_granian("app:create_ui_app", port, config) else: + app = create_ui_app() debug = _is_debug_enabled() if debug: warnings.warn("DEBUG MODE ENABLED - DO NOT USE IN PRODUCTION", RuntimeWarning) @@ -126,6 +133,7 @@ def reset_credentials() -> None: pass if raw_config and raw_config.get("users"): + is_v2 = raw_config.get("version", 1) >= 2 admin_user = None for user in raw_config["users"]: policies = user.get("policies", []) @@ -139,15 +147,39 @@ def reset_credentials() -> None: if not admin_user: admin_user = raw_config["users"][0] - admin_user["access_key"] = access_key - admin_user["secret_key"] = secret_key - else: - raw_config = { - "users": [ - { + if is_v2: + admin_keys = admin_user.get("access_keys", []) + if admin_keys: + admin_keys[0]["access_key"] = access_key + admin_keys[0]["secret_key"] = secret_key + else: + from datetime import datetime as _dt, timezone as _tz + admin_user["access_keys"] = [{ "access_key": access_key, "secret_key": secret_key, + "status": "active", + "created_at": _dt.now(_tz.utc).isoformat(), + }] + else: + admin_user["access_key"] = access_key + admin_user["secret_key"] = secret_key + else: + from datetime import datetime as _dt, timezone as _tz + raw_config = { + "version": 2, + "users": [ + { + "user_id": f"u-{secrets.token_hex(8)}", "display_name": "Local Admin", + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": _dt.now(_tz.utc).isoformat(), + } + ], "policies": [ {"bucket": "*", "actions": list(ALLOWED_ACTIONS)} ], @@ -192,7 +224,7 @@ if __name__ == "__main__": parser.add_argument("--mode", choices=["api", "ui", "both", "reset-cred"], default="both") parser.add_argument("--api-port", type=int, default=5000) parser.add_argument("--ui-port", type=int, default=5100) - parser.add_argument("--prod", action="store_true", help="Run in production mode using Waitress") + parser.add_argument("--prod", action="store_true", help="Run in production mode using Granian") parser.add_argument("--dev", action="store_true", help="Force development mode (Flask dev server)") parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit") parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit") @@ -235,7 +267,7 @@ if __name__ == "__main__": pass if prod_mode: - print("Running in production mode (Waitress)") + print("Running in production mode (Granian)") issues = config.validate_and_report() critical_issues = [i for i in issues if i.startswith("CRITICAL:")] if critical_issues: @@ -248,11 +280,22 @@ if __name__ == "__main__": if args.mode in {"api", "both"}: print(f"Starting API server on port {args.api_port}...") - api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config), daemon=True) + api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config)) api_proc.start() else: api_proc = None + def _cleanup_api(): + if api_proc and api_proc.is_alive(): + api_proc.terminate() + api_proc.join(timeout=5) + if api_proc.is_alive(): + api_proc.kill() + + if api_proc: + atexit.register(_cleanup_api) + signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) + if args.mode in {"ui", "both"}: print(f"Starting UI server on port {args.ui_port}...") serve_ui(args.ui_port, prod_mode, config) diff --git a/scripts/install.sh b/scripts/install.sh index 8fdad2b..50094d2 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -379,29 +379,25 @@ if [[ "$SKIP_SYSTEMD" != true ]]; then echo " ---------------" if systemctl is-active --quiet myfsio; then echo " [OK] MyFSIO is running" - - IAM_FILE="$DATA_DIR/.myfsio.sys/config/iam.json" - if [[ -f "$IAM_FILE" ]]; then - echo "" - echo " ============================================" - echo " ADMIN CREDENTIALS (save these securely!)" - echo " ============================================" - if command -v jq &>/dev/null; then - ACCESS_KEY=$(jq -r '.users[0].access_key' "$IAM_FILE" 2>/dev/null) - SECRET_KEY=$(jq -r '.users[0].secret_key' "$IAM_FILE" 2>/dev/null) - else - ACCESS_KEY=$(grep -o '"access_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/') - SECRET_KEY=$(grep -o '"secret_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/') - fi - if [[ -n "$ACCESS_KEY" && -n "$SECRET_KEY" ]]; then - echo " Access Key: $ACCESS_KEY" - echo " Secret Key: $SECRET_KEY" - else - echo " [!] Could not parse credentials from $IAM_FILE" - echo " Check the file manually or view service logs." - fi - echo " ============================================" + echo "" + echo " ============================================" + echo " ADMIN CREDENTIALS (save these securely!)" + echo " ============================================" + CRED_OUTPUT=$(journalctl -u myfsio --no-pager -n 50 2>/dev/null | grep -A 5 "FIRST RUN - ADMIN CREDENTIALS") + ACCESS_KEY=$(echo "$CRED_OUTPUT" | grep "Access Key:" | head -1 | sed 's/.*Access Key: //' | awk '{print $1}') + SECRET_KEY=$(echo "$CRED_OUTPUT" | grep "Secret Key:" | head -1 | sed 's/.*Secret Key: //' | awk '{print $1}') + if [[ -n "$ACCESS_KEY" && "$ACCESS_KEY" != *"from"* && -n "$SECRET_KEY" && "$SECRET_KEY" != *"from"* ]]; then + echo " Access Key: $ACCESS_KEY" + echo " Secret Key: $SECRET_KEY" + else + echo " [!] Could not extract credentials from service logs." + echo " Check startup output: journalctl -u myfsio --no-pager | grep -A 5 'ADMIN CREDENTIALS'" + echo " Or reset credentials: $INSTALL_DIR/myfsio reset-cred" fi + echo " ============================================" + echo "" + echo " NOTE: The IAM config file is encrypted at rest." + echo " Credentials are only shown on first run or after reset." else echo " [WARNING] MyFSIO may not have started correctly" echo " Check logs with: journalctl -u myfsio -f" @@ -427,12 +423,13 @@ echo " API: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "local echo " UI: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "localhost"):$UI_PORT/ui" echo "" echo "Credentials:" -echo " Admin credentials were shown above (if service was started)." -echo " You can also find them in: $DATA_DIR/.myfsio.sys/config/iam.json" +echo " Admin credentials are shown on first service start (see above)." +echo " The IAM config is encrypted at rest and cannot be read directly." +echo " To reset credentials: $INSTALL_DIR/myfsio reset-cred" echo "" echo "Configuration Files:" echo " Environment: $INSTALL_DIR/myfsio.env" -echo " IAM Users: $DATA_DIR/.myfsio.sys/config/iam.json" +echo " IAM Users: $DATA_DIR/.myfsio.sys/config/iam.json (encrypted)" echo " Bucket Policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json" echo " Secret Key: $DATA_DIR/.myfsio.sys/config/.secret (auto-generated)" echo "" diff --git a/scripts/uninstall.sh b/scripts/uninstall.sh index a920eb2..24fd3e6 100644 --- a/scripts/uninstall.sh +++ b/scripts/uninstall.sh @@ -230,11 +230,14 @@ if [[ "$KEEP_DATA" == true ]]; then echo "" echo "Preserved files include:" echo " - All buckets and objects" - echo " - IAM configuration: $DATA_DIR/.myfsio.sys/config/iam.json" + echo " - IAM configuration: $DATA_DIR/.myfsio.sys/config/iam.json (encrypted at rest)" echo " - Bucket policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json" echo " - Secret key: $DATA_DIR/.myfsio.sys/config/.secret" echo " - Encryption keys: $DATA_DIR/.myfsio.sys/keys/ (if encryption was enabled)" echo "" + echo "NOTE: The IAM config is encrypted and requires the SECRET_KEY to read." + echo " Keep the .secret file intact for reinstallation." + echo "" echo "To reinstall MyFSIO with existing data:" echo " ./install.sh --data-dir $DATA_DIR" echo "" diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index d96b046..90517fe 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -849,6 +849,11 @@ selectCheckbox.checked = true; row.classList.add('table-active'); } + + if (activeRow && activeRow.dataset.key === row.dataset.key) { + row.classList.add('table-active'); + activeRow = row; + } }); const folderRows = document.querySelectorAll('.folder-row'); diff --git a/static/js/iam-management.js b/static/js/iam-management.js index e75484b..ef5cde7 100644 --- a/static/js/iam-management.js +++ b/static/js/iam-management.js @@ -17,12 +17,20 @@ window.IAMManagement = (function() { var currentDeleteKey = null; var currentExpiryKey = null; - var ALL_S3_ACTIONS = ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'lifecycle', 'cors']; + var ALL_S3_ACTIONS = [ + 'list', 'read', 'write', 'delete', 'share', 'policy', + 'replication', 'lifecycle', 'cors', + 'create_bucket', 'delete_bucket', + 'versioning', 'tagging', 'encryption', 'quota', + 'object_lock', 'notification', 'logging', 'website' + ]; var policyTemplates = { - full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'lifecycle', 'cors', 'iam:*'] }], + full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'create_bucket', 'delete_bucket', 'replication', 'lifecycle', 'cors', 'versioning', 'tagging', 'encryption', 'quota', 'object_lock', 'notification', 'logging', 'website', 'iam:*'] }], readonly: [{ bucket: '*', actions: ['list', 'read'] }], - writer: [{ bucket: '*', actions: ['list', 'read', 'write'] }] + writer: [{ bucket: '*', actions: ['list', 'read', 'write'] }], + operator: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'create_bucket', 'delete_bucket'] }], + bucketadmin: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'create_bucket', 'delete_bucket', 'versioning', 'tagging', 'encryption', 'cors', 'lifecycle', 'quota', 'object_lock', 'notification', 'logging', 'website', 'replication'] }] }; function isAdminUser(policies) { diff --git a/templates/base.html b/templates/base.html index e9e8fb4..445b799 100644 --- a/templates/base.html +++ b/templates/base.html @@ -110,6 +110,14 @@ Domains {% endif %} + {% if can_manage_iam %} + + + + + System + + {% endif %} diff --git a/templates/metrics.html b/templates/metrics.html index 56a41d1..1aa2861 100644 --- a/templates/metrics.html +++ b/templates/metrics.html @@ -210,9 +210,6 @@
{{ app.uptime_days }}d
Uptime -
- v{{ app.version }} -
diff --git a/templates/system.html b/templates/system.html new file mode 100644 index 0000000..3758517 --- /dev/null +++ b/templates/system.html @@ -0,0 +1,503 @@ +{% extends "base.html" %} + +{% block title %}System - MyFSIO Console{% endblock %} + +{% block content %} + + +
+
+
+
+
+ + + + Server Information +
+

Runtime environment and configuration

+
+
+ + + + + + + + +
Version{{ app_version }}
Storage Root{{ storage_root }}
Platform{{ platform }}
Python{{ python_version }}
Rust Extension + {% if has_rust %} + Loaded + {% else %} + Not loaded + {% endif %} +
+
+
+
+ +
+
+
+
+ + + + Feature Flags +
+

Features configured via environment variables

+
+
+ + + {% for feat in features %} + + + + + {% endfor %} + +
{{ feat.label }} + {% if feat.enabled %} + Enabled + {% else %} + Disabled + {% endif %} +
+
+
+
+
+ +
+
+
+
+
+
+
+ + + + Garbage Collection +
+

Clean up temporary files, orphaned uploads, and stale locks

+
+
+ {% if gc_status.enabled %} + Active + {% else %} + Disabled + {% endif %} +
+
+
+
+ {% if gc_status.enabled %} +
+ + +
+ +
+
+
+
+ +
+
+
+
+ +
+
+ + + + Configuration +
+
+
Interval: {{ gc_status.interval_hours }}h
+
Dry run: {{ "Yes" if gc_status.dry_run else "No" }}
+
Temp max age: {{ gc_status.temp_file_max_age_hours }}h
+
Lock max age: {{ gc_status.lock_file_max_age_hours }}h
+
Multipart max age: {{ gc_status.multipart_max_age_days }}d
+
+
+ + {% if gc_history %} +
+ + + + + + Recent Executions +
+
+ + + + + + + + + + + {% for exec in gc_history %} + + + + + + + {% endfor %} + +
TimeCleanedFreedMode
{{ exec.timestamp_display }} + {% set r = exec.result %} + {{ (r.temp_files_deleted|d(0)) + (r.multipart_uploads_deleted|d(0)) + (r.lock_files_deleted|d(0)) + (r.orphaned_metadata_deleted|d(0)) + (r.orphaned_versions_deleted|d(0)) + (r.empty_dirs_removed|d(0)) }} + {{ exec.bytes_freed_display }} + {% if exec.dry_run %} + Dry run + {% else %} + Live + {% endif %} +
+
+ {% else %} +
+

No executions recorded yet.

+
+ {% endif %} + + {% else %} +
+ + + +

Garbage collection is not enabled.

+

Set GC_ENABLED=true to enable automatic cleanup.

+
+ {% endif %} +
+
+
+ +
+
+
+
+
+
+ + + + + Integrity Scanner +
+

Detect and heal corrupted objects, orphaned files, and metadata drift

+
+
+ {% if integrity_status.enabled %} + Active + {% else %} + Disabled + {% endif %} +
+
+
+
+ {% if integrity_status.enabled %} +
+ + + +
+ +
+
+
+
+ +
+
+
+
+ +
+
+ + + + Configuration +
+
+
Interval: {{ integrity_status.interval_hours }}h
+
Dry run: {{ "Yes" if integrity_status.dry_run else "No" }}
+
Batch size: {{ integrity_status.batch_size }}
+
Auto-heal: {{ "Yes" if integrity_status.auto_heal else "No" }}
+
+
+ + {% if integrity_history %} +
+ + + + + + Recent Scans +
+
+ + + + + + + + + + + + {% for exec in integrity_history %} + + + + + + + + {% endfor %} + +
TimeScannedIssuesHealedMode
{{ exec.timestamp_display }}{{ exec.result.objects_scanned|d(0) }} + {% set total_issues = (exec.result.corrupted_objects|d(0)) + (exec.result.orphaned_objects|d(0)) + (exec.result.phantom_metadata|d(0)) + (exec.result.stale_versions|d(0)) + (exec.result.etag_cache_inconsistencies|d(0)) + (exec.result.legacy_metadata_drifts|d(0)) %} + {% if total_issues > 0 %} + {{ total_issues }} + {% else %} + 0 + {% endif %} + {{ exec.result.issues_healed|d(0) }} + {% if exec.dry_run %} + Dry + {% elif exec.auto_heal %} + Heal + {% else %} + Scan + {% endif %} +
+
+ {% else %} +
+

No scans recorded yet.

+
+ {% endif %} + + {% else %} +
+ + + + +

Integrity scanner is not enabled.

+

Set INTEGRITY_ENABLED=true to enable automatic scanning.

+
+ {% endif %} +
+
+
+
+{% endblock %} + +{% block extra_scripts %} + +{% endblock %} diff --git a/tests/conftest.py b/tests/conftest.py index 89bcca1..57d0382 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,10 @@ def app(tmp_path: Path): "access_key": "test", "secret_key": "secret", "display_name": "Test User", - "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy"]}], + "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", + "create_bucket", "delete_bucket", "share", "versioning", "tagging", + "encryption", "cors", "lifecycle", "replication", "quota", + "object_lock", "notification", "logging", "website"]}], } ] }