From d878134ebf8180060f29863df90fef7decedc575 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 14 Mar 2026 18:17:39 +0800 Subject: [PATCH 01/19] Switch from Waitress to Granian (Rust/hyper WSGI server) for improved concurrency --- app/__init__.py | 2 +- app/config.py | 6 ++--- docs.md | 8 +++--- requirements.txt | 2 +- run.py | 64 ++++++++++++++++++++++----------------------- templates/docs.html | 4 +-- 6 files changed, 43 insertions(+), 43 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 98ef1e5..e9e7b71 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -107,7 +107,6 @@ def create_app( ) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=num_proxies, x_proto=num_proxies, x_host=num_proxies, x_prefix=num_proxies) - # Enable gzip compression for responses (10-20x smaller JSON payloads) if app.config.get("ENABLE_GZIP", True): app.wsgi_app = GzipMiddleware(app.wsgi_app, compression_level=6) @@ -678,6 +677,7 @@ def _configure_logging(app: Flask) -> None: }, ) response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}" + response.headers["Server"] = "MyFSIO" operation_metrics = app.extensions.get("operation_metrics") if operation_metrics: diff --git a/app/config.py b/app/config.py index 079fc50..b8db352 100644 --- a/app/config.py +++ b/app/config.py @@ -25,7 +25,7 @@ def _calculate_auto_connection_limit() -> int: def _calculate_auto_backlog(connection_limit: int) -> int: - return max(64, min(connection_limit * 2, 4096)) + return max(128, min(connection_limit * 2, 4096)) def _validate_rate_limit(value: str) -> str: @@ -504,8 +504,8 @@ class AppConfig: issues.append(f"CRITICAL: SERVER_THREADS={self.server_threads} is outside valid range (1-64). Server cannot start.") if not (10 <= self.server_connection_limit <= 1000): issues.append(f"CRITICAL: SERVER_CONNECTION_LIMIT={self.server_connection_limit} is outside valid range (10-1000). Server cannot start.") - if not (64 <= self.server_backlog <= 4096): - issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (64-4096). Server cannot start.") + if not (128 <= self.server_backlog <= 4096): + issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (128-4096). Server cannot start.") if not (10 <= self.server_channel_timeout <= 300): issues.append(f"CRITICAL: SERVER_CHANNEL_TIMEOUT={self.server_channel_timeout} is outside valid range (10-300). Server cannot start.") diff --git a/docs.md b/docs.md index cc9d41b..daa8b24 100644 --- a/docs.md +++ b/docs.md @@ -180,9 +180,9 @@ All configuration is done via environment variables. The table below lists every | Variable | Default | Notes | | --- | --- | --- | -| `SERVER_THREADS` | `0` (auto) | Waitress worker threads (1-64). Set to `0` for auto-calculation based on CPU cores (×2). | -| `SERVER_CONNECTION_LIMIT` | `0` (auto) | Maximum concurrent connections (10-1000). Set to `0` for auto-calculation based on available RAM. | -| `SERVER_BACKLOG` | `0` (auto) | TCP listen backlog (64-4096). Set to `0` for auto-calculation (connection_limit × 2). | +| `SERVER_THREADS` | `0` (auto) | Granian blocking threads (1-64). Set to `0` for auto-calculation based on CPU cores (×2). | +| `SERVER_CONNECTION_LIMIT` | `0` (auto) | Maximum concurrent requests per worker (10-1000). Set to `0` for auto-calculation based on available RAM. | +| `SERVER_BACKLOG` | `0` (auto) | TCP listen backlog (128-4096). Set to `0` for auto-calculation (connection_limit × 2). | | `SERVER_CHANNEL_TIMEOUT` | `120` | Seconds before idle connections are closed (10-300). | ### Logging @@ -339,7 +339,7 @@ Before deploying to production, ensure you: 4. **Enable HTTPS** - Use a reverse proxy (nginx, Cloudflare) with TLS termination 5. **Review rate limits** - Adjust `RATE_LIMIT_DEFAULT` based on your needs 6. **Secure master keys** - Back up `ENCRYPTION_MASTER_KEY_PATH` if using encryption -7. **Use `--prod` flag** - Runs with Waitress instead of Flask dev server +7. **Use `--prod` flag** - Runs with Granian instead of Flask dev server 8. **Set credential expiry** - Assign `expires_at` to non-admin users for time-limited access ### Proxy Configuration diff --git a/requirements.txt b/requirements.txt index 1813b33..8a29238 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ python-dotenv>=1.2.1 pytest>=9.0.2 requests>=2.32.5 boto3>=1.42.14 -waitress>=3.0.2 +granian>=2.2.0 psutil>=7.1.3 cryptography>=46.0.3 defusedxml>=0.7.1 diff --git a/run.py b/run.py index fbe110f..4e9c3ef 100644 --- a/run.py +++ b/run.py @@ -40,24 +40,37 @@ def _is_frozen() -> bool: return getattr(sys, 'frozen', False) or '__compiled__' in globals() +def _serve_granian(target: str, port: int, config: Optional[AppConfig] = None) -> None: + from granian import Granian + from granian.constants import Interfaces + from granian.http import HTTP1Settings + + kwargs: dict = { + "target": target, + "address": _server_host(), + "port": port, + "interface": Interfaces.WSGI, + "factory": True, + "workers": 1, + } + + if config: + kwargs["blocking_threads"] = config.server_threads + kwargs["backlog"] = config.server_backlog + kwargs["backpressure"] = config.server_connection_limit + kwargs["http1_settings"] = HTTP1Settings( + header_read_timeout=config.server_channel_timeout * 1000, + ) + + server = Granian(**kwargs) + server.serve() + + def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: - app = create_api_app() if prod: - from waitress import serve - if config: - serve( - app, - host=_server_host(), - port=port, - ident="MyFSIO", - threads=config.server_threads, - connection_limit=config.server_connection_limit, - backlog=config.server_backlog, - channel_timeout=config.server_channel_timeout, - ) - else: - serve(app, host=_server_host(), port=port, ident="MyFSIO") + _serve_granian("app:create_api_app", port, config) else: + app = create_api_app() debug = _is_debug_enabled() if debug: warnings.warn("DEBUG MODE ENABLED - DO NOT USE IN PRODUCTION", RuntimeWarning) @@ -65,23 +78,10 @@ def serve_api(port: int, prod: bool = False, config: Optional[AppConfig] = None) def serve_ui(port: int, prod: bool = False, config: Optional[AppConfig] = None) -> None: - app = create_ui_app() if prod: - from waitress import serve - if config: - serve( - app, - host=_server_host(), - port=port, - ident="MyFSIO", - threads=config.server_threads, - connection_limit=config.server_connection_limit, - backlog=config.server_backlog, - channel_timeout=config.server_channel_timeout, - ) - else: - serve(app, host=_server_host(), port=port, ident="MyFSIO") + _serve_granian("app:create_ui_app", port, config) else: + app = create_ui_app() debug = _is_debug_enabled() if debug: warnings.warn("DEBUG MODE ENABLED - DO NOT USE IN PRODUCTION", RuntimeWarning) @@ -192,7 +192,7 @@ if __name__ == "__main__": parser.add_argument("--mode", choices=["api", "ui", "both", "reset-cred"], default="both") parser.add_argument("--api-port", type=int, default=5000) parser.add_argument("--ui-port", type=int, default=5100) - parser.add_argument("--prod", action="store_true", help="Run in production mode using Waitress") + parser.add_argument("--prod", action="store_true", help="Run in production mode using Granian") parser.add_argument("--dev", action="store_true", help="Force development mode (Flask dev server)") parser.add_argument("--check-config", action="store_true", help="Validate configuration and exit") parser.add_argument("--show-config", action="store_true", help="Show configuration summary and exit") @@ -235,7 +235,7 @@ if __name__ == "__main__": pass if prod_mode: - print("Running in production mode (Waitress)") + print("Running in production mode (Granian)") issues = config.validate_and_report() critical_issues = [i for i in issues if i.startswith("CRITICAL:")] if critical_issues: diff --git a/templates/docs.html b/templates/docs.html index 145c90d..cae57e1 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -84,7 +84,7 @@ pip install -r requirements.txt # Run both API and UI (Development) python run.py -# Run in Production (Waitress server) +# Run in Production (Granian server) python run.py --prod # Or run individually @@ -220,7 +220,7 @@ python run.py --mode ui SERVER_THREADS 0 (auto) - Waitress worker threads (1-64). 0 = auto (CPU cores × 2). + Granian blocking threads (1-64). 0 = auto (CPU cores × 2). SERVER_CONNECTION_LIMIT From 31ebbea680492a1ddfb14eea311a33604190e0d1 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 14 Mar 2026 18:31:12 +0800 Subject: [PATCH 02/19] Fix Docker healthcheck failure: Granian cannot run inside daemon process --- run.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/run.py b/run.py index 4e9c3ef..477bbbb 100644 --- a/run.py +++ b/run.py @@ -2,7 +2,9 @@ from __future__ import annotations import argparse +import atexit import os +import signal import sys import warnings import multiprocessing @@ -248,11 +250,22 @@ if __name__ == "__main__": if args.mode in {"api", "both"}: print(f"Starting API server on port {args.api_port}...") - api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config), daemon=True) + api_proc = Process(target=serve_api, args=(args.api_port, prod_mode, config)) api_proc.start() else: api_proc = None + def _cleanup_api(): + if api_proc and api_proc.is_alive(): + api_proc.terminate() + api_proc.join(timeout=5) + if api_proc.is_alive(): + api_proc.kill() + + if api_proc: + atexit.register(_cleanup_api) + signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) + if args.mode in {"ui", "both"}: print(f"Starting UI server on port {args.ui_port}...") serve_ui(args.ui_port, prod_mode, config) From 6ed4b7d8ea9119e18c9b0d7ecc138d963a4af98c Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 14 Mar 2026 20:27:57 +0800 Subject: [PATCH 03/19] Add System page: server info, feature flags, GC and integrity scanner UI --- app/ui.py | 111 +++++++++ app/version.py | 2 +- templates/base.html | 16 ++ templates/metrics.html | 3 - templates/system.html | 502 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 630 insertions(+), 4 deletions(-) create mode 100644 templates/system.html diff --git a/app/ui.py b/app/ui.py index 8628616..9fb832b 100644 --- a/app/ui.py +++ b/app/ui.py @@ -4041,6 +4041,117 @@ def get_peer_sync_stats(site_id: str): return jsonify(stats) +@ui_bp.get("/system") +def system_dashboard(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + flash("Access denied: System page requires admin permissions", "danger") + return redirect(url_for("ui.buckets_overview")) + + import platform as _platform + import sys + from app.version import APP_VERSION + + try: + import myfsio_core as _rc + has_rust = True + except ImportError: + has_rust = False + + gc = current_app.extensions.get("gc") + gc_status = gc.get_status() if gc else {"enabled": False} + gc_history_records = [] + if gc: + raw = gc.get_history(limit=10, offset=0) + for rec in raw: + r = rec.get("result", {}) + total_freed = r.get("temp_bytes_freed", 0) + r.get("multipart_bytes_freed", 0) + r.get("orphaned_version_bytes_freed", 0) + rec["bytes_freed_display"] = _format_bytes(total_freed) + rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + gc_history_records.append(rec) + + checker = current_app.extensions.get("integrity") + integrity_status = checker.get_status() if checker else {"enabled": False} + integrity_history_records = [] + if checker: + raw = checker.get_history(limit=10, offset=0) + for rec in raw: + rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + integrity_history_records.append(rec) + + features = [ + {"label": "Encryption (SSE-S3)", "enabled": current_app.config.get("ENCRYPTION_ENABLED", False)}, + {"label": "KMS", "enabled": current_app.config.get("KMS_ENABLED", False)}, + {"label": "Versioning Lifecycle", "enabled": current_app.config.get("LIFECYCLE_ENABLED", False)}, + {"label": "Metrics History", "enabled": current_app.config.get("METRICS_HISTORY_ENABLED", False)}, + {"label": "Operation Metrics", "enabled": current_app.config.get("OPERATION_METRICS_ENABLED", False)}, + {"label": "Site Sync", "enabled": current_app.config.get("SITE_SYNC_ENABLED", False)}, + {"label": "Website Hosting", "enabled": current_app.config.get("WEBSITE_HOSTING_ENABLED", False)}, + {"label": "Garbage Collection", "enabled": current_app.config.get("GC_ENABLED", False)}, + {"label": "Integrity Scanner", "enabled": current_app.config.get("INTEGRITY_ENABLED", False)}, + ] + + return render_template( + "system.html", + principal=principal, + app_version=APP_VERSION, + storage_root=current_app.config.get("STORAGE_ROOT", "./data"), + platform=_platform.platform(), + python_version=sys.version.split()[0], + has_rust=has_rust, + features=features, + gc_status=gc_status, + gc_history=gc_history_records, + integrity_status=integrity_status, + integrity_history=integrity_history_records, + ) + + +@ui_bp.post("/system/gc/run") +def system_gc_run(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"error": "GC is not enabled"}), 400 + + payload = request.get_json(silent=True) or {} + original_dry_run = gc.dry_run + if "dry_run" in payload: + gc.dry_run = bool(payload["dry_run"]) + try: + result = gc.run_now() + finally: + gc.dry_run = original_dry_run + return jsonify(result.to_dict()) + + +@ui_bp.post("/system/integrity/run") +def system_integrity_run(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"error": "Integrity checker is not enabled"}), 400 + + payload = request.get_json(silent=True) or {} + result = checker.run_now( + auto_heal=payload.get("auto_heal"), + dry_run=payload.get("dry_run"), + ) + return jsonify(result.to_dict()) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/app/version.py b/app/version.py index d429237..77fc8ae 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.9" +APP_VERSION = "0.4.0" def get_version() -> str: diff --git a/templates/base.html b/templates/base.html index e9e8fb4..445b799 100644 --- a/templates/base.html +++ b/templates/base.html @@ -110,6 +110,14 @@ Domains {% endif %} + {% if can_manage_iam %} + + + + + System + + {% endif %} -
- v{{ app.version }} -
diff --git a/templates/system.html b/templates/system.html new file mode 100644 index 0000000..39158d9 --- /dev/null +++ b/templates/system.html @@ -0,0 +1,502 @@ +{% extends "base.html" %} + +{% block title %}System - MyFSIO Console{% endblock %} + +{% block content %} + + +
+
+
+
+
+ + + + Server Information +
+

Runtime environment and configuration

+
+
+ + + + + + + + +
Version{{ app_version }}
Storage Root{{ storage_root }}
Platform{{ platform }}
Python{{ python_version }}
Rust Extension + {% if has_rust %} + Loaded + {% else %} + Not loaded + {% endif %} +
+
+
+
+ +
+
+
+
+ + + + Feature Flags +
+

Features configured via environment variables

+
+
+ + + {% for feat in features %} + + + + + {% endfor %} + +
{{ feat.label }} + {% if feat.enabled %} + Enabled + {% else %} + Disabled + {% endif %} +
+
+
+
+
+ +
+
+
+
+
+
+
+ + + + Garbage Collection +
+

Clean up temporary files, orphaned uploads, and stale locks

+
+
+ {% if gc_status.enabled %} + Active + {% else %} + Disabled + {% endif %} +
+
+
+
+ {% if gc_status.enabled %} +
+ + +
+ +
+
+
+
+ +
+
+
+
+ +
+
+ + + + Configuration +
+
+
Interval: {{ gc_status.interval_hours }}h
+
Dry run: {{ "Yes" if gc_status.dry_run else "No" }}
+
Temp max age: {{ gc_status.temp_file_max_age_hours }}h
+
Lock max age: {{ gc_status.lock_file_max_age_hours }}h
+
Multipart max age: {{ gc_status.multipart_max_age_days }}d
+
+
+ + {% if gc_history %} +
+ + + + + + Recent Executions +
+
+ + + + + + + + + + + {% for exec in gc_history %} + + + + + + + {% endfor %} + +
TimeCleanedFreedMode
{{ exec.timestamp_display }} + {% set r = exec.result %} + {{ (r.temp_files_deleted|d(0)) + (r.multipart_uploads_deleted|d(0)) + (r.lock_files_deleted|d(0)) + (r.orphaned_metadata_deleted|d(0)) + (r.orphaned_versions_deleted|d(0)) + (r.empty_dirs_removed|d(0)) }} + {{ exec.bytes_freed_display }} + {% if exec.dry_run %} + Dry run + {% else %} + Live + {% endif %} +
+
+ {% else %} +
+

No executions recorded yet.

+
+ {% endif %} + + {% else %} +
+ + + +

Garbage collection is not enabled.

+

Set GC_ENABLED=true to enable automatic cleanup.

+
+ {% endif %} +
+
+
+ +
+
+
+
+
+
+ + + + + Integrity Scanner +
+

Detect and heal corrupted objects, orphaned files, and metadata drift

+
+
+ {% if integrity_status.enabled %} + Active + {% else %} + Disabled + {% endif %} +
+
+
+
+ {% if integrity_status.enabled %} +
+ + + +
+ +
+
+
+
+ +
+
+
+
+ +
+
+ + + + Configuration +
+
+
Interval: {{ integrity_status.interval_hours }}h
+
Dry run: {{ "Yes" if integrity_status.dry_run else "No" }}
+
Batch size: {{ integrity_status.batch_size }}
+
Auto-heal: {{ "Yes" if integrity_status.auto_heal else "No" }}
+
+
+ + {% if integrity_history %} +
+ + + + + + Recent Scans +
+
+ + + + + + + + + + + + {% for exec in integrity_history %} + + + + + + + + {% endfor %} + +
TimeScannedIssuesHealedMode
{{ exec.timestamp_display }}{{ exec.result.objects_scanned|d(0) }} + {% set total_issues = (exec.result.corrupted_objects|d(0)) + (exec.result.orphaned_objects|d(0)) + (exec.result.phantom_metadata|d(0)) + (exec.result.stale_versions|d(0)) + (exec.result.etag_cache_inconsistencies|d(0)) + (exec.result.legacy_metadata_drifts|d(0)) %} + {% if total_issues > 0 %} + {{ total_issues }} + {% else %} + 0 + {% endif %} + {{ exec.result.issues_healed|d(0) }} + {% if exec.dry_run %} + Dry + {% elif exec.auto_heal %} + Heal + {% else %} + Scan + {% endif %} +
+
+ {% else %} +
+

No scans recorded yet.

+
+ {% endif %} + + {% else %} +
+ + + + +

Integrity scanner is not enabled.

+

Set INTEGRITY_ENABLED=true to enable automatic scanning.

+
+ {% endif %} +
+
+
+
+{% endblock %} + +{% block extra_scripts %} + +{% endblock %} From d72e0a347e4ba7a07e1c13de0bf76caf8ad1d990 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 14 Mar 2026 23:50:44 +0800 Subject: [PATCH 04/19] Overhaul IAM: granular actions, multi-key users, prefix-scoped policies --- app/admin_api.py | 101 +++++++ app/iam.py | 585 ++++++++++++++++++++++++++++-------- app/s3_api.py | 38 +-- docs.md | 71 ++++- run.py | 37 ++- static/js/iam-management.js | 14 +- templates/iam.html | 6 +- tests/conftest.py | 5 +- 8 files changed, 699 insertions(+), 158 deletions(-) diff --git a/app/admin_api.py b/app/admin_api.py index 1d5d975..5b3eec3 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -686,6 +686,107 @@ def _storage(): return current_app.extensions["object_storage"] +def _require_iam_action(action: str): + principal, error = _require_principal() + if error: + return None, error + try: + _iam().authorize(principal, None, action) + return principal, None + except IamError: + return None, _json_error("AccessDenied", f"Requires {action} permission", 403) + + +@admin_api_bp.route("/iam/users", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_list_users(): + principal, error = _require_iam_action("iam:list_users") + if error: + return error + return jsonify({"users": _iam().list_users()}) + + +@admin_api_bp.route("/iam/users/", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_get_user(identifier): + principal, error = _require_iam_action("iam:get_user") + if error: + return error + try: + user_id = _iam().resolve_user_id(identifier) + return jsonify(_iam().get_user_by_id(user_id)) + except IamError as exc: + return _json_error("NotFound", str(exc), 404) + + +@admin_api_bp.route("/iam/users//policies", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_get_user_policies(identifier): + principal, error = _require_iam_action("iam:get_policy") + if error: + return error + try: + return jsonify({"policies": _iam().get_user_policies(identifier)}) + except IamError as exc: + return _json_error("NotFound", str(exc), 404) + + +@admin_api_bp.route("/iam/users//keys", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_create_access_key(identifier): + principal, error = _require_iam_action("iam:create_key") + if error: + return error + try: + result = _iam().create_access_key(identifier) + logger.info("Access key created for %s by %s", identifier, principal.access_key) + return jsonify(result), 201 + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//keys/", methods=["DELETE"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_delete_access_key(identifier, access_key): + principal, error = _require_iam_action("iam:delete_key") + if error: + return error + try: + _iam().delete_access_key(access_key) + logger.info("Access key %s deleted by %s", access_key, principal.access_key) + return "", 204 + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//disable", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_disable_user(identifier): + principal, error = _require_iam_action("iam:disable_user") + if error: + return error + try: + _iam().disable_user(identifier) + logger.info("User %s disabled by %s", identifier, principal.access_key) + return jsonify({"status": "disabled"}) + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + +@admin_api_bp.route("/iam/users//enable", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def iam_enable_user(identifier): + principal, error = _require_iam_action("iam:disable_user") + if error: + return error + try: + _iam().enable_user(identifier) + logger.info("User %s enabled by %s", identifier, principal.access_key) + return jsonify({"status": "enabled"}) + except IamError as exc: + return _json_error("InvalidRequest", str(exc), 400) + + @admin_api_bp.route("/website-domains", methods=["GET"]) @limiter.limit(lambda: _get_admin_rate_limit()) def list_website_domains(): diff --git a/app/iam.py b/app/iam.py index 61aae45..702d209 100644 --- a/app/iam.py +++ b/app/iam.py @@ -10,7 +10,7 @@ import secrets import threading import time from collections import deque -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple @@ -22,16 +22,37 @@ class IamError(RuntimeError): """Raised when authentication or authorization fails.""" -S3_ACTIONS = {"list", "read", "write", "delete", "share", "policy", "replication", "lifecycle", "cors"} +S3_ACTIONS = { + "list", "read", "write", "delete", "share", "policy", + "replication", "lifecycle", "cors", + "create_bucket", "delete_bucket", + "versioning", "tagging", "encryption", "quota", + "object_lock", "notification", "logging", "website", +} IAM_ACTIONS = { "iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy", + "iam:create_key", + "iam:delete_key", + "iam:get_user", + "iam:get_policy", + "iam:disable_user", } ALLOWED_ACTIONS = (S3_ACTIONS | IAM_ACTIONS) | {"iam:*"} +_V1_IMPLIED_ACTIONS = { + "write": {"create_bucket"}, + "delete": {"delete_bucket"}, + "policy": { + "versioning", "tagging", "encryption", "quota", + "object_lock", "notification", "logging", "website", + "cors", "lifecycle", "replication", "share", + }, +} + ACTION_ALIASES = { "list": "list", "s3:listbucket": "list", @@ -45,14 +66,11 @@ ACTION_ALIASES = { "s3:getobjecttagging": "read", "s3:getobjectversiontagging": "read", "s3:getobjectacl": "read", - "s3:getbucketversioning": "read", "s3:headobject": "read", "s3:headbucket": "read", "write": "write", "s3:putobject": "write", - "s3:createbucket": "write", "s3:putobjecttagging": "write", - "s3:putbucketversioning": "write", "s3:createmultipartupload": "write", "s3:uploadpart": "write", "s3:completemultipartupload": "write", @@ -61,8 +79,11 @@ ACTION_ALIASES = { "delete": "delete", "s3:deleteobject": "delete", "s3:deleteobjectversion": "delete", - "s3:deletebucket": "delete", "s3:deleteobjecttagging": "delete", + "create_bucket": "create_bucket", + "s3:createbucket": "create_bucket", + "delete_bucket": "delete_bucket", + "s3:deletebucket": "delete_bucket", "share": "share", "s3:putobjectacl": "share", "s3:putbucketacl": "share", @@ -88,11 +109,50 @@ ACTION_ALIASES = { "s3:getbucketcors": "cors", "s3:putbucketcors": "cors", "s3:deletebucketcors": "cors", + "versioning": "versioning", + "s3:getbucketversioning": "versioning", + "s3:putbucketversioning": "versioning", + "tagging": "tagging", + "s3:getbuckettagging": "tagging", + "s3:putbuckettagging": "tagging", + "s3:deletebuckettagging": "tagging", + "encryption": "encryption", + "s3:getencryptionconfiguration": "encryption", + "s3:putencryptionconfiguration": "encryption", + "s3:deleteencryptionconfiguration": "encryption", + "quota": "quota", + "s3:getbucketquota": "quota", + "s3:putbucketquota": "quota", + "s3:deletebucketquota": "quota", + "object_lock": "object_lock", + "s3:getobjectlockconfiguration": "object_lock", + "s3:putobjectlockconfiguration": "object_lock", + "s3:putobjectretention": "object_lock", + "s3:getobjectretention": "object_lock", + "s3:putobjectlegalhold": "object_lock", + "s3:getobjectlegalhold": "object_lock", + "notification": "notification", + "s3:getbucketnotificationconfiguration": "notification", + "s3:putbucketnotificationconfiguration": "notification", + "s3:deletebucketnotificationconfiguration": "notification", + "logging": "logging", + "s3:getbucketlogging": "logging", + "s3:putbucketlogging": "logging", + "s3:deletebucketlogging": "logging", + "website": "website", + "s3:getbucketwebsite": "website", + "s3:putbucketwebsite": "website", + "s3:deletebucketwebsite": "website", "iam:listusers": "iam:list_users", "iam:createuser": "iam:create_user", "iam:deleteuser": "iam:delete_user", "iam:rotateaccesskey": "iam:rotate_key", "iam:putuserpolicy": "iam:update_policy", + "iam:createaccesskey": "iam:create_key", + "iam:deleteaccesskey": "iam:delete_key", + "iam:getuser": "iam:get_user", + "iam:getpolicy": "iam:get_policy", + "iam:disableuser": "iam:disable_user", "iam:*": "iam:*", } @@ -101,6 +161,7 @@ ACTION_ALIASES = { class Policy: bucket: str actions: Set[str] + prefix: str = "*" @dataclass @@ -117,6 +178,16 @@ def _derive_fernet_key(secret: str) -> bytes: _IAM_ENCRYPTED_PREFIX = b"MYFSIO_IAM_ENC:" +_CONFIG_VERSION = 2 + + +def _expand_v1_actions(actions: Set[str]) -> Set[str]: + expanded = set(actions) + for action, implied in _V1_IMPLIED_ACTIONS.items(): + if action in expanded: + expanded.update(implied) + return expanded + class IamService: """Loads IAM configuration, manages users, and evaluates policies.""" @@ -131,7 +202,10 @@ class IamService: self.config_path.parent.mkdir(parents=True, exist_ok=True) if not self.config_path.exists(): self._write_default() - self._users: Dict[str, Dict[str, Any]] = {} + self._user_records: Dict[str, Dict[str, Any]] = {} + self._key_index: Dict[str, str] = {} + self._key_secrets: Dict[str, str] = {} + self._key_status: Dict[str, str] = {} self._raw_config: Dict[str, Any] = {} self._failed_attempts: Dict[str, Deque[datetime]] = {} self._last_load_time = 0.0 @@ -146,7 +220,6 @@ class IamService: self._load_lockout_state() def _maybe_reload(self) -> None: - """Reload configuration if the file has changed on disk.""" now = time.time() if now - self._last_stat_check < self._stat_check_interval: return @@ -183,11 +256,20 @@ class IamService: raise IamError( f"Access temporarily locked. Try again in {seconds} seconds." ) - record = self._users.get(access_key) - stored_secret = record["secret_key"] if record else secrets.token_urlsafe(24) - if not record or not hmac.compare_digest(stored_secret, secret_key): + user_id = self._key_index.get(access_key) + stored_secret = self._key_secrets.get(access_key, secrets.token_urlsafe(24)) + if not user_id or not hmac.compare_digest(stored_secret, secret_key): self._record_failed_attempt(access_key) raise IamError("Invalid credentials") + key_status = self._key_status.get(access_key, "active") + if key_status != "active": + raise IamError("Access key is inactive") + record = self._user_records.get(user_id) + if not record: + self._record_failed_attempt(access_key) + raise IamError("Invalid credentials") + if not record.get("enabled", True): + raise IamError("User account is disabled") self._check_expiry(access_key, record) self._clear_failed_attempts(access_key) return self._build_principal(access_key, record) @@ -215,7 +297,6 @@ class IamService: return self.config_path.parent / "lockout_state.json" def _load_lockout_state(self) -> None: - """Load lockout state from disk.""" try: if self._lockout_file().exists(): data = json.loads(self._lockout_file().read_text(encoding="utf-8")) @@ -235,7 +316,6 @@ class IamService: pass def _save_lockout_state(self) -> None: - """Persist lockout state to disk.""" data: Dict[str, Any] = {"failed_attempts": {}} for key, attempts in self._failed_attempts.items(): data["failed_attempts"][key] = [ts.isoformat() for ts in attempts] @@ -270,10 +350,9 @@ class IamService: return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) def create_session_token(self, access_key: str, duration_seconds: int = 3600) -> str: - """Create a temporary session token for an access key.""" self._maybe_reload() - record = self._users.get(access_key) - if not record: + user_id = self._key_index.get(access_key) + if not user_id or user_id not in self._user_records: raise IamError("Unknown access key") self._cleanup_expired_sessions() token = secrets.token_urlsafe(32) @@ -285,7 +364,6 @@ class IamService: return token def validate_session_token(self, access_key: str, session_token: str) -> bool: - """Validate a session token for an access key (thread-safe, constant-time).""" dummy_key = secrets.token_urlsafe(16) dummy_token = secrets.token_urlsafe(32) with self._session_lock: @@ -304,7 +382,6 @@ class IamService: return True def _cleanup_expired_sessions(self) -> None: - """Remove expired session tokens.""" now = time.time() expired = [token for token, data in self._sessions.items() if now > data["expires_at"]] for token in expired: @@ -316,13 +393,18 @@ class IamService: if cached: principal, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return principal self._maybe_reload() - record = self._users.get(access_key) + user_id = self._key_index.get(access_key) + if not user_id: + raise IamError("Unknown access key") + record = self._user_records.get(user_id) if not record: raise IamError("Unknown access key") self._check_expiry(access_key, record) @@ -332,22 +414,26 @@ class IamService: def secret_for_key(self, access_key: str) -> str: self._maybe_reload() - record = self._users.get(access_key) - if not record: + secret = self._key_secrets.get(access_key) + if not secret: raise IamError("Unknown access key") - self._check_expiry(access_key, record) - return record["secret_key"] + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + return secret - def authorize(self, principal: Principal, bucket_name: str | None, action: str) -> None: + def authorize(self, principal: Principal, bucket_name: str | None, action: str, *, object_key: str | None = None) -> None: action = self._normalize_action(action) if action not in ALLOWED_ACTIONS: raise IamError(f"Unknown action '{action}'") bucket_name = bucket_name or "*" normalized = bucket_name.lower() if bucket_name != "*" else bucket_name - if not self._is_allowed(principal, normalized, action): + if not self._is_allowed(principal, normalized, action, object_key=object_key): raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'") - def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]: + def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str], *, object_key: str | None = None) -> Dict[str, bool]: self._maybe_reload() bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*") normalized_actions = {a: self._normalize_action(a) for a in actions} @@ -356,37 +442,53 @@ class IamService: if canonical not in ALLOWED_ACTIONS: results[original] = False else: - results[original] = self._is_allowed(principal, bucket_name, canonical) + results[original] = self._is_allowed(principal, bucket_name, canonical, object_key=object_key) return results def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]: return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")] - def _is_allowed(self, principal: Principal, bucket_name: str, action: str) -> bool: + def _is_allowed(self, principal: Principal, bucket_name: str, action: str, *, object_key: str | None = None) -> bool: bucket_name = bucket_name.lower() for policy in principal.policies: if policy.bucket not in {"*", bucket_name}: continue - if "*" in policy.actions or action in policy.actions: - return True - if "iam:*" in policy.actions and action.startswith("iam:"): - return True + action_match = "*" in policy.actions or action in policy.actions + if not action_match and "iam:*" in policy.actions and action.startswith("iam:"): + action_match = True + if not action_match: + continue + if object_key is not None and policy.prefix != "*": + prefix = policy.prefix.rstrip("*") + if not object_key.startswith(prefix): + continue + return True return False def list_users(self) -> List[Dict[str, Any]]: listing: List[Dict[str, Any]] = [] - for access_key, record in self._users.items(): - listing.append( - { - "access_key": access_key, - "display_name": record["display_name"], - "expires_at": record.get("expires_at"), - "policies": [ - {"bucket": policy.bucket, "actions": sorted(policy.actions)} - for policy in record["policies"] - ], - } - ) + for user_id, record in self._user_records.items(): + access_keys = [] + for key_info in record.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) + user_entry: Dict[str, Any] = { + "user_id": user_id, + "display_name": record["display_name"], + "enabled": record.get("enabled", True), + "expires_at": record.get("expires_at"), + "access_keys": access_keys, + "policies": [ + {"bucket": policy.bucket, "actions": sorted(policy.actions), "prefix": policy.prefix} + for policy in record["policies"] + ], + } + if access_keys: + user_entry["access_key"] = access_keys[0]["access_key"] + listing.append(user_entry) return listing def create_user( @@ -397,20 +499,33 @@ class IamService: access_key: str | None = None, secret_key: str | None = None, expires_at: str | None = None, + user_id: str | None = None, ) -> Dict[str, str]: access_key = (access_key or self._generate_access_key()).strip() if not access_key: raise IamError("Access key cannot be empty") - if access_key in self._users: + if access_key in self._key_index: raise IamError("Access key already exists") if expires_at: self._validate_expires_at(expires_at) secret_key = secret_key or self._generate_secret_key() sanitized_policies = self._prepare_policy_payload(policies) + user_id = user_id or self._generate_user_id() + if user_id in self._user_records: + raise IamError("User ID already exists") + now_iso = datetime.now(timezone.utc).isoformat() record: Dict[str, Any] = { - "access_key": access_key, - "secret_key": secret_key, + "user_id": user_id, "display_name": display_name or access_key, + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": now_iso, + } + ], "policies": sanitized_policies, } if expires_at: @@ -418,12 +533,108 @@ class IamService: self._raw_config.setdefault("users", []).append(record) self._save() self._load() - return {"access_key": access_key, "secret_key": secret_key} + return {"user_id": user_id, "access_key": access_key, "secret_key": secret_key} + + def create_access_key(self, identifier: str) -> Dict[str, str]: + user_raw, _ = self._resolve_raw_user(identifier) + new_access_key = self._generate_access_key() + new_secret_key = self._generate_secret_key() + now_iso = datetime.now(timezone.utc).isoformat() + key_entry = { + "access_key": new_access_key, + "secret_key": new_secret_key, + "status": "active", + "created_at": now_iso, + } + user_raw.setdefault("access_keys", []).append(key_entry) + self._save() + self._load() + return {"access_key": new_access_key, "secret_key": new_secret_key} + + def delete_access_key(self, access_key: str) -> None: + user_raw, _ = self._resolve_raw_user(access_key) + keys = user_raw.get("access_keys", []) + if len(keys) <= 1: + raise IamError("Cannot delete the only access key for a user") + remaining = [k for k in keys if k["access_key"] != access_key] + if len(remaining) == len(keys): + raise IamError("Access key not found") + user_raw["access_keys"] = remaining + self._save() + self._principal_cache.pop(access_key, None) + self._secret_key_cache.pop(access_key, None) + from .s3_api import clear_signing_key_cache + clear_signing_key_cache() + self._load() + + def disable_user(self, identifier: str) -> None: + user_raw, _ = self._resolve_raw_user(identifier) + user_raw["enabled"] = False + self._save() + for key_info in user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) + from .s3_api import clear_signing_key_cache + clear_signing_key_cache() + self._load() + + def enable_user(self, identifier: str) -> None: + user_raw, _ = self._resolve_raw_user(identifier) + user_raw["enabled"] = True + self._save() + self._load() + + def get_user_by_id(self, user_id: str) -> Dict[str, Any]: + record = self._user_records.get(user_id) + if not record: + raise IamError("User not found") + access_keys = [] + for key_info in record.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) + return { + "user_id": user_id, + "display_name": record["display_name"], + "enabled": record.get("enabled", True), + "expires_at": record.get("expires_at"), + "access_keys": access_keys, + "policies": [ + {"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix} + for p in record["policies"] + ], + } + + def get_user_policies(self, identifier: str) -> List[Dict[str, Any]]: + _, user_id = self._resolve_raw_user(identifier) + record = self._user_records.get(user_id) + if not record: + raise IamError("User not found") + return [ + {"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix} + for p in record["policies"] + ] + + def resolve_user_id(self, identifier: str) -> str: + if identifier in self._user_records: + return identifier + user_id = self._key_index.get(identifier) + if user_id: + return user_id + raise IamError("User not found") def rotate_secret(self, access_key: str) -> str: - user = self._get_raw_user(access_key) + user_raw, _ = self._resolve_raw_user(access_key) new_secret = self._generate_secret_key() - user["secret_key"] = new_secret + for key_info in user_raw.get("access_keys", []): + if key_info["access_key"] == access_key: + key_info["secret_key"] = new_secret + break + else: + raise IamError("Access key not found") self._save() self._principal_cache.pop(access_key, None) self._secret_key_cache.pop(access_key, None) @@ -433,8 +644,8 @@ class IamService: return new_secret def update_user(self, access_key: str, display_name: str) -> None: - user = self._get_raw_user(access_key) - user["display_name"] = display_name + user_raw, _ = self._resolve_raw_user(access_key) + user_raw["display_name"] = display_name self._save() self._load() @@ -442,32 +653,43 @@ class IamService: users = self._raw_config.get("users", []) if len(users) <= 1: raise IamError("Cannot delete the only user") - remaining = [user for user in users if user["access_key"] != access_key] - if len(remaining) == len(users): + _, target_user_id = self._resolve_raw_user(access_key) + target_user_raw = None + remaining = [] + for u in users: + if u.get("user_id") == target_user_id: + target_user_raw = u + else: + remaining.append(u) + if target_user_raw is None: raise IamError("User not found") self._raw_config["users"] = remaining self._save() - self._principal_cache.pop(access_key, None) - self._secret_key_cache.pop(access_key, None) + for key_info in target_user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) from .s3_api import clear_signing_key_cache clear_signing_key_cache() self._load() def update_user_expiry(self, access_key: str, expires_at: str | None) -> None: - user = self._get_raw_user(access_key) + user_raw, _ = self._resolve_raw_user(access_key) if expires_at: self._validate_expires_at(expires_at) - user["expires_at"] = expires_at + user_raw["expires_at"] = expires_at else: - user.pop("expires_at", None) + user_raw.pop("expires_at", None) self._save() - self._principal_cache.pop(access_key, None) - self._secret_key_cache.pop(access_key, None) + for key_info in user_raw.get("access_keys", []): + ak = key_info["access_key"] + self._principal_cache.pop(ak, None) + self._secret_key_cache.pop(ak, None) self._load() def update_user_policies(self, access_key: str, policies: Sequence[Dict[str, Any]]) -> None: - user = self._get_raw_user(access_key) - user["policies"] = self._prepare_policy_payload(policies) + user_raw, _ = self._resolve_raw_user(access_key) + user_raw["policies"] = self._prepare_policy_payload(policies) self._save() self._load() @@ -482,6 +704,52 @@ class IamService: raise IamError("Cannot decrypt IAM config. SECRET_KEY may have changed. Use 'python run.py reset-cred' to reset credentials.") return raw_bytes.decode("utf-8") + def _is_v2_config(self, raw: Dict[str, Any]) -> bool: + return raw.get("version", 1) >= _CONFIG_VERSION + + def _migrate_v1_to_v2(self, raw: Dict[str, Any]) -> Dict[str, Any]: + migrated_users = [] + now_iso = datetime.now(timezone.utc).isoformat() + for user in raw.get("users", []): + old_policies = user.get("policies", []) + expanded_policies = [] + for p in old_policies: + raw_actions = p.get("actions", []) + if isinstance(raw_actions, str): + raw_actions = [raw_actions] + action_set: Set[str] = set() + for a in raw_actions: + canonical = self._normalize_action(a) + if canonical == "*": + action_set = set(ALLOWED_ACTIONS) + break + if canonical: + action_set.add(canonical) + action_set = _expand_v1_actions(action_set) + expanded_policies.append({ + "bucket": p.get("bucket", "*"), + "actions": sorted(action_set), + "prefix": p.get("prefix", "*"), + }) + migrated_user: Dict[str, Any] = { + "user_id": user["access_key"], + "display_name": user.get("display_name", user["access_key"]), + "enabled": True, + "access_keys": [ + { + "access_key": user["access_key"], + "secret_key": user["secret_key"], + "status": "active", + "created_at": now_iso, + } + ], + "policies": expanded_policies, + } + if user.get("expires_at"): + migrated_user["expires_at"] = user["expires_at"] + migrated_users.append(migrated_user) + return {"version": _CONFIG_VERSION, "users": migrated_users} + def _load(self) -> None: try: self._last_load_time = self.config_path.stat().st_mtime @@ -500,35 +768,67 @@ class IamService: raise IamError(f"Failed to load IAM config: {e}") was_plaintext = not raw_bytes.startswith(_IAM_ENCRYPTED_PREFIX) + was_v1 = not self._is_v2_config(raw) + + if was_v1: + raw = self._migrate_v1_to_v2(raw) + + user_records: Dict[str, Dict[str, Any]] = {} + key_index: Dict[str, str] = {} + key_secrets: Dict[str, str] = {} + key_status_map: Dict[str, str] = {} - users: Dict[str, Dict[str, Any]] = {} for user in raw.get("users", []): + user_id = user["user_id"] policies = self._build_policy_objects(user.get("policies", [])) - user_record: Dict[str, Any] = { - "secret_key": user["secret_key"], - "display_name": user.get("display_name", user["access_key"]), + access_keys_raw = user.get("access_keys", []) + access_keys_info = [] + for key_entry in access_keys_raw: + ak = key_entry["access_key"] + sk = key_entry["secret_key"] + status = key_entry.get("status", "active") + key_index[ak] = user_id + key_secrets[ak] = sk + key_status_map[ak] = status + access_keys_info.append({ + "access_key": ak, + "secret_key": sk, + "status": status, + "created_at": key_entry.get("created_at"), + }) + record: Dict[str, Any] = { + "display_name": user.get("display_name", user_id), + "enabled": user.get("enabled", True), "policies": policies, + "access_keys": access_keys_info, } if user.get("expires_at"): - user_record["expires_at"] = user["expires_at"] - users[user["access_key"]] = user_record - if not users: - raise IamError("IAM configuration contains no users") - self._users = users - raw_users: List[Dict[str, Any]] = [] - for entry in raw.get("users", []): - raw_entry: Dict[str, Any] = { - "access_key": entry["access_key"], - "secret_key": entry["secret_key"], - "display_name": entry.get("display_name", entry["access_key"]), - "policies": entry.get("policies", []), - } - if entry.get("expires_at"): - raw_entry["expires_at"] = entry["expires_at"] - raw_users.append(raw_entry) - self._raw_config = {"users": raw_users} + record["expires_at"] = user["expires_at"] + user_records[user_id] = record - if was_plaintext and self._fernet: + if not user_records: + raise IamError("IAM configuration contains no users") + + self._user_records = user_records + self._key_index = key_index + self._key_secrets = key_secrets + self._key_status = key_status_map + + raw_users: List[Dict[str, Any]] = [] + for user in raw.get("users", []): + raw_entry: Dict[str, Any] = { + "user_id": user["user_id"], + "display_name": user.get("display_name", user["user_id"]), + "enabled": user.get("enabled", True), + "access_keys": user.get("access_keys", []), + "policies": user.get("policies", []), + } + if user.get("expires_at"): + raw_entry["expires_at"] = user["expires_at"] + raw_users.append(raw_entry) + self._raw_config = {"version": _CONFIG_VERSION, "users": raw_users} + + if was_v1 or (was_plaintext and self._fernet): self._save() def _save(self) -> None: @@ -547,19 +847,30 @@ class IamService: def config_summary(self) -> Dict[str, Any]: return { "path": str(self.config_path), - "user_count": len(self._users), + "user_count": len(self._user_records), "allowed_actions": sorted(ALLOWED_ACTIONS), } def export_config(self, mask_secrets: bool = True) -> Dict[str, Any]: - payload: Dict[str, Any] = {"users": []} + payload: Dict[str, Any] = {"version": _CONFIG_VERSION, "users": []} for user in self._raw_config.get("users", []): + access_keys = [] + for key_info in user.get("access_keys", []): + access_keys.append({ + "access_key": key_info["access_key"], + "secret_key": "\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022\u2022" if mask_secrets else key_info["secret_key"], + "status": key_info.get("status", "active"), + "created_at": key_info.get("created_at"), + }) record: Dict[str, Any] = { - "access_key": user["access_key"], - "secret_key": "••••••••••" if mask_secrets else user["secret_key"], + "user_id": user["user_id"], "display_name": user["display_name"], + "enabled": user.get("enabled", True), + "access_keys": access_keys, "policies": user["policies"], } + if access_keys: + record["access_key"] = access_keys[0]["access_key"] if user.get("expires_at"): record["expires_at"] = user["expires_at"] payload["users"].append(record) @@ -569,6 +880,7 @@ class IamService: entries: List[Policy] = [] for policy in policies: bucket = str(policy.get("bucket", "*")).lower() + prefix = str(policy.get("prefix", "*")) raw_actions = policy.get("actions", []) if isinstance(raw_actions, str): raw_actions = [raw_actions] @@ -581,7 +893,7 @@ class IamService: if canonical: action_set.add(canonical) if action_set: - entries.append(Policy(bucket=bucket, actions=action_set)) + entries.append(Policy(bucket=bucket, actions=action_set, prefix=prefix)) return entries def _prepare_policy_payload(self, policies: Optional[Sequence[Dict[str, Any]]]) -> List[Dict[str, Any]]: @@ -589,12 +901,14 @@ class IamService: policies = ( { "bucket": "*", - "actions": ["list", "read", "write", "delete", "share", "policy"], + "actions": ["list", "read", "write", "delete", "share", "policy", + "create_bucket", "delete_bucket"], }, ) sanitized: List[Dict[str, Any]] = [] for policy in policies: bucket = str(policy.get("bucket", "*")).lower() + prefix = str(policy.get("prefix", "*")) raw_actions = policy.get("actions", []) if isinstance(raw_actions, str): raw_actions = [raw_actions] @@ -608,7 +922,10 @@ class IamService: action_set.add(canonical) if not action_set: continue - sanitized.append({"bucket": bucket, "actions": sorted(action_set)}) + entry: Dict[str, Any] = {"bucket": bucket, "actions": sorted(action_set)} + if prefix != "*": + entry["prefix"] = prefix + sanitized.append(entry) if not sanitized: raise IamError("At least one policy with valid actions is required") return sanitized @@ -633,12 +950,23 @@ class IamService: access_key = os.environ.get("ADMIN_ACCESS_KEY", "").strip() or secrets.token_hex(12) secret_key = os.environ.get("ADMIN_SECRET_KEY", "").strip() or secrets.token_urlsafe(32) custom_keys = bool(os.environ.get("ADMIN_ACCESS_KEY", "").strip()) + user_id = self._generate_user_id() + now_iso = datetime.now(timezone.utc).isoformat() default = { + "version": _CONFIG_VERSION, "users": [ { - "access_key": access_key, - "secret_key": secret_key, + "user_id": user_id, "display_name": "Local Admin", + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": now_iso, + } + ], "policies": [ {"bucket": "*", "actions": list(ALLOWED_ACTIONS)} ], @@ -660,6 +988,7 @@ class IamService: else: print(f"Access Key: {access_key}") print(f"Secret Key: {secret_key}") + print(f"User ID: {user_id}") print(f"{'='*60}") if self._fernet: print("IAM config is encrypted at rest.") @@ -682,30 +1011,46 @@ class IamService: def _generate_secret_key(self) -> str: return secrets.token_urlsafe(24) - def _get_raw_user(self, access_key: str) -> Dict[str, Any]: + def _generate_user_id(self) -> str: + return f"u-{secrets.token_hex(8)}" + + def _resolve_raw_user(self, identifier: str) -> Tuple[Dict[str, Any], str]: for user in self._raw_config.get("users", []): - if user["access_key"] == access_key: - return user + if user.get("user_id") == identifier: + return user, identifier + for user in self._raw_config.get("users", []): + for key_info in user.get("access_keys", []): + if key_info["access_key"] == identifier: + return user, user["user_id"] raise IamError("User not found") + def _get_raw_user(self, access_key: str) -> Dict[str, Any]: + user, _ = self._resolve_raw_user(access_key) + return user + def get_secret_key(self, access_key: str) -> str | None: now = time.time() cached = self._secret_key_cache.get(access_key) if cached: secret_key, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return secret_key self._maybe_reload() - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) - secret_key = record["secret_key"] - self._secret_key_cache[access_key] = (secret_key, now) - return secret_key + secret = self._key_secrets.get(access_key) + if secret: + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + self._secret_key_cache[access_key] = (secret, now) + return secret return None def get_principal(self, access_key: str) -> Principal | None: @@ -714,16 +1059,20 @@ class IamService: if cached: principal, cached_time = cached if now - cached_time < self._cache_ttl: - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) return principal self._maybe_reload() - record = self._users.get(access_key) - if record: - self._check_expiry(access_key, record) - principal = self._build_principal(access_key, record) - self._principal_cache[access_key] = (principal, now) - return principal + user_id = self._key_index.get(access_key) + if user_id: + record = self._user_records.get(user_id) + if record: + self._check_expiry(access_key, record) + principal = self._build_principal(access_key, record) + self._principal_cache[access_key] = (principal, now) + return principal return None diff --git a/app/s3_api.py b/app/s3_api.py index af54bed..6a77017 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -488,7 +488,7 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti iam_error: IamError | None = None if principal is not None: try: - _iam().authorize(principal, bucket_name, action) + _iam().authorize(principal, bucket_name, action, object_key=object_key) iam_allowed = True except IamError as exc: iam_error = exc @@ -1135,7 +1135,7 @@ def _bucket_versioning_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "versioning") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1182,7 +1182,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "tagging") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1347,7 +1347,7 @@ def _bucket_cors_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "cors") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1400,7 +1400,7 @@ def _bucket_encryption_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "encryption") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1475,7 +1475,7 @@ def _bucket_acl_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "share") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -1718,12 +1718,12 @@ def _bucket_lifecycle_handler(bucket_name: str) -> Response: """Handle bucket lifecycle configuration (GET/PUT/DELETE /?lifecycle).""" if request.method not in {"GET", "PUT", "DELETE"}: return _method_not_allowed(["GET", "PUT", "DELETE"]) - + principal, error = _require_principal() if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "lifecycle") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -1882,12 +1882,12 @@ def _bucket_quota_handler(bucket_name: str) -> Response: """Handle bucket quota configuration (GET/PUT/DELETE /?quota).""" if request.method not in {"GET", "PUT", "DELETE"}: return _method_not_allowed(["GET", "PUT", "DELETE"]) - + principal, error = _require_principal() if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "quota") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -1964,7 +1964,7 @@ def _bucket_object_lock_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "object_lock") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2010,7 +2010,7 @@ def _bucket_notification_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "notification") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2106,7 +2106,7 @@ def _bucket_logging_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "logging") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2248,7 +2248,7 @@ def _object_retention_handler(bucket_name: str, object_key: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + _authorize_action(principal, bucket_name, "object_lock", object_key=object_key) except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2324,7 +2324,7 @@ def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + _authorize_action(principal, bucket_name, "object_lock", object_key=object_key) except IamError as exc: return _error_response("AccessDenied", str(exc), 403) @@ -2657,7 +2657,7 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "write") + _authorize_action(principal, bucket_name, "create_bucket") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) try: @@ -2674,7 +2674,7 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "delete") + _authorize_action(principal, bucket_name, "delete_bucket") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) try: @@ -3229,7 +3229,7 @@ def _bucket_replication_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "replication") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() @@ -3312,7 +3312,7 @@ def _bucket_website_handler(bucket_name: str) -> Response: if error: return error try: - _authorize_action(principal, bucket_name, "policy") + _authorize_action(principal, bucket_name, "website") except IamError as exc: return _error_response("AccessDenied", str(exc), 403) storage = _storage() diff --git a/docs.md b/docs.md index daa8b24..ba36f68 100644 --- a/docs.md +++ b/docs.md @@ -758,7 +758,7 @@ MyFSIO implements a comprehensive Identity and Access Management (IAM) system th - **Create user**: supply a display name, optional JSON inline policy array, and optional credential expiry date. - **Set expiry**: assign an expiration date to any user's credentials. Expired credentials are rejected at authentication time. The UI shows expiry badges and preset durations (1h, 24h, 7d, 30d, 90d). - **Rotate secret**: generates a new secret key; the UI surfaces it once. - - **Policy editor**: select a user, paste an array of objects (`{"bucket": "*", "actions": ["list", "read"]}`), and submit. Alias support includes AWS-style verbs (e.g., `s3:GetObject`). + - **Policy editor**: select a user, paste an array of objects (`{"bucket": "*", "actions": ["list", "read"]}`), and submit. An optional `"prefix"` field restricts object-level actions to a key prefix (e.g., `"uploads/"`). Alias support includes AWS-style verbs (e.g., `s3:GetObject`). 3. Wildcard action `iam:*` is supported for admin user definitions. > **Breaking Change (v0.2.0+):** Previous versions used fixed default credentials (`localadmin/localadmin`). If upgrading from an older version, your existing credentials remain unchanged, but new installations will generate random credentials. @@ -797,13 +797,23 @@ Both layers are evaluated for each request. A user must have permission in their | --- | --- | --- | | `list` | List buckets and objects | `s3:ListBucket`, `s3:ListAllMyBuckets`, `s3:ListBucketVersions`, `s3:ListMultipartUploads`, `s3:ListParts` | | `read` | Download objects, get metadata | `s3:GetObject`, `s3:GetObjectVersion`, `s3:GetObjectTagging`, `s3:GetObjectVersionTagging`, `s3:GetObjectAcl`, `s3:GetBucketVersioning`, `s3:HeadObject`, `s3:HeadBucket` | -| `write` | Upload objects, create buckets, manage tags | `s3:PutObject`, `s3:CreateBucket`, `s3:PutObjectTagging`, `s3:PutBucketVersioning`, `s3:CreateMultipartUpload`, `s3:UploadPart`, `s3:CompleteMultipartUpload`, `s3:AbortMultipartUpload`, `s3:CopyObject` | -| `delete` | Remove objects, versions, and buckets | `s3:DeleteObject`, `s3:DeleteObjectVersion`, `s3:DeleteBucket`, `s3:DeleteObjectTagging` | +| `write` | Upload objects, manage object tags | `s3:PutObject`, `s3:PutObjectTagging`, `s3:CreateMultipartUpload`, `s3:UploadPart`, `s3:CompleteMultipartUpload`, `s3:AbortMultipartUpload`, `s3:CopyObject` | +| `delete` | Remove objects and versions | `s3:DeleteObject`, `s3:DeleteObjectVersion`, `s3:DeleteObjectTagging` | +| `create_bucket` | Create new buckets | `s3:CreateBucket` | +| `delete_bucket` | Delete buckets | `s3:DeleteBucket` | | `share` | Manage Access Control Lists (ACLs) | `s3:PutObjectAcl`, `s3:PutBucketAcl`, `s3:GetBucketAcl` | | `policy` | Manage bucket policies | `s3:PutBucketPolicy`, `s3:GetBucketPolicy`, `s3:DeleteBucketPolicy` | +| `versioning` | Manage bucket versioning configuration | `s3:GetBucketVersioning`, `s3:PutBucketVersioning` | +| `tagging` | Manage bucket-level tags | `s3:GetBucketTagging`, `s3:PutBucketTagging`, `s3:DeleteBucketTagging` | +| `encryption` | Manage bucket encryption configuration | `s3:GetEncryptionConfiguration`, `s3:PutEncryptionConfiguration`, `s3:DeleteEncryptionConfiguration` | | `lifecycle` | Manage lifecycle rules | `s3:GetLifecycleConfiguration`, `s3:PutLifecycleConfiguration`, `s3:DeleteLifecycleConfiguration`, `s3:GetBucketLifecycle`, `s3:PutBucketLifecycle` | | `cors` | Manage CORS configuration | `s3:GetBucketCors`, `s3:PutBucketCors`, `s3:DeleteBucketCors` | | `replication` | Configure and manage replication | `s3:GetReplicationConfiguration`, `s3:PutReplicationConfiguration`, `s3:DeleteReplicationConfiguration`, `s3:ReplicateObject`, `s3:ReplicateTags`, `s3:ReplicateDelete` | +| `quota` | Manage bucket storage quotas | `s3:GetBucketQuota`, `s3:PutBucketQuota`, `s3:DeleteBucketQuota` | +| `object_lock` | Manage object lock, retention, and legal holds | `s3:GetObjectLockConfiguration`, `s3:PutObjectLockConfiguration`, `s3:PutObjectRetention`, `s3:GetObjectRetention`, `s3:PutObjectLegalHold`, `s3:GetObjectLegalHold` | +| `notification` | Manage bucket event notifications | `s3:GetBucketNotificationConfiguration`, `s3:PutBucketNotificationConfiguration`, `s3:DeleteBucketNotificationConfiguration` | +| `logging` | Manage bucket access logging | `s3:GetBucketLogging`, `s3:PutBucketLogging`, `s3:DeleteBucketLogging` | +| `website` | Manage static website hosting configuration | `s3:GetBucketWebsite`, `s3:PutBucketWebsite`, `s3:DeleteBucketWebsite` | #### IAM Actions (User Management) @@ -814,25 +824,31 @@ Both layers are evaluated for each request. A user must have permission in their | `iam:delete_user` | Delete IAM users | `iam:DeleteUser` | | `iam:rotate_key` | Rotate user secret keys | `iam:RotateAccessKey` | | `iam:update_policy` | Modify user policies | `iam:PutUserPolicy` | +| `iam:create_key` | Create additional access keys for a user | `iam:CreateAccessKey` | +| `iam:delete_key` | Delete an access key from a user | `iam:DeleteAccessKey` | +| `iam:get_user` | View user details and access keys | `iam:GetUser` | +| `iam:get_policy` | View user policy configuration | `iam:GetPolicy` | +| `iam:disable_user` | Temporarily disable/enable a user account | `iam:DisableUser` | | `iam:*` | **Admin wildcard** – grants all IAM actions | — | #### Wildcards | Wildcard | Scope | Description | | --- | --- | --- | -| `*` (in actions) | All S3 actions | Grants `list`, `read`, `write`, `delete`, `share`, `policy`, `lifecycle`, `cors`, `replication` | +| `*` (in actions) | All S3 actions | Grants all 19 S3 actions including `list`, `read`, `write`, `delete`, `create_bucket`, `delete_bucket`, `share`, `policy`, `versioning`, `tagging`, `encryption`, `lifecycle`, `cors`, `replication`, `quota`, `object_lock`, `notification`, `logging`, `website` | | `iam:*` | All IAM actions | Grants all `iam:*` actions for user management | | `*` (in bucket) | All buckets | Policy applies to every bucket | ### IAM Policy Structure -User policies are stored as a JSON array of policy objects. Each object specifies a bucket and the allowed actions: +User policies are stored as a JSON array of policy objects. Each object specifies a bucket, the allowed actions, and an optional prefix for object-level scoping: ```json [ { "bucket": "", - "actions": ["", "", ...] + "actions": ["", "", ...], + "prefix": "" } ] ``` @@ -840,12 +856,13 @@ User policies are stored as a JSON array of policy objects. Each object specifie **Fields:** - `bucket`: The bucket name (case-insensitive) or `*` for all buckets - `actions`: Array of action strings (simple names or AWS aliases) +- `prefix`: *(optional)* Restrict object-level actions to keys starting with this prefix. Defaults to `*` (all objects). Example: `"uploads/"` restricts to keys under `uploads/` ### Example User Policies **Full Administrator (complete system access):** ```json -[{"bucket": "*", "actions": ["list", "read", "write", "delete", "share", "policy", "lifecycle", "cors", "replication", "iam:*"]}] +[{"bucket": "*", "actions": ["list", "read", "write", "delete", "share", "policy", "create_bucket", "delete_bucket", "versioning", "tagging", "encryption", "lifecycle", "cors", "replication", "quota", "object_lock", "notification", "logging", "website", "iam:*"]}] ``` **Read-Only User (browse and download only):** @@ -858,6 +875,11 @@ User policies are stored as a JSON array of policy objects. Each object specifie [{"bucket": "user-bucket", "actions": ["list", "read", "write", "delete"]}] ``` +**Operator (data operations + bucket management, no config):** +```json +[{"bucket": "*", "actions": ["list", "read", "write", "delete", "create_bucket", "delete_bucket"]}] +``` + **Multiple Bucket Access (different permissions per bucket):** ```json [ @@ -867,9 +889,14 @@ User policies are stored as a JSON array of policy objects. Each object specifie ] ``` +**Prefix-Scoped Access (restrict to a folder inside a shared bucket):** +```json +[{"bucket": "shared-data", "actions": ["list", "read", "write", "delete"], "prefix": "team-a/"}] +``` + **IAM Manager (manage users but no data access):** ```json -[{"bucket": "*", "actions": ["iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy"]}] +[{"bucket": "*", "actions": ["iam:list_users", "iam:create_user", "iam:delete_user", "iam:rotate_key", "iam:update_policy", "iam:create_key", "iam:delete_key", "iam:get_user", "iam:get_policy", "iam:disable_user"]}] ``` **Replication Operator (manage replication only):** @@ -889,10 +916,10 @@ User policies are stored as a JSON array of policy objects. Each object specifie **Bucket Administrator (full bucket config, no IAM access):** ```json -[{"bucket": "my-bucket", "actions": ["list", "read", "write", "delete", "policy", "lifecycle", "cors"]}] +[{"bucket": "my-bucket", "actions": ["list", "read", "write", "delete", "create_bucket", "delete_bucket", "share", "policy", "versioning", "tagging", "encryption", "lifecycle", "cors", "replication", "quota", "object_lock", "notification", "logging", "website"]}] ``` -**Upload-Only User (write but cannot read back):** +**Upload-Only User (write but cannot create/delete buckets):** ```json [{"bucket": "drop-box", "actions": ["write"]}] ``` @@ -967,6 +994,30 @@ curl -X POST http://localhost:5000/iam/users//expiry \ # Delete a user (requires iam:delete_user) curl -X DELETE http://localhost:5000/iam/users/ \ -H "X-Access-Key: ..." -H "X-Secret-Key: ..." + +# Get user details (requires iam:get_user) — via Admin API +curl http://localhost:5000/admin/iam/users/ \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Get user policies (requires iam:get_policy) — via Admin API +curl http://localhost:5000/admin/iam/users//policies \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Create additional access key for a user (requires iam:create_key) +curl -X POST http://localhost:5000/admin/iam/users//keys \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Delete an access key (requires iam:delete_key) +curl -X DELETE http://localhost:5000/admin/iam/users//keys/ \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Disable a user account (requires iam:disable_user) +curl -X POST http://localhost:5000/admin/iam/users//disable \ + -H "Authorization: AWS4-HMAC-SHA256 ..." + +# Re-enable a user account (requires iam:disable_user) +curl -X POST http://localhost:5000/admin/iam/users//enable \ + -H "Authorization: AWS4-HMAC-SHA256 ..." ``` ### Permission Precedence diff --git a/run.py b/run.py index 477bbbb..656cce6 100644 --- a/run.py +++ b/run.py @@ -128,6 +128,7 @@ def reset_credentials() -> None: pass if raw_config and raw_config.get("users"): + is_v2 = raw_config.get("version", 1) >= 2 admin_user = None for user in raw_config["users"]: policies = user.get("policies", []) @@ -141,15 +142,39 @@ def reset_credentials() -> None: if not admin_user: admin_user = raw_config["users"][0] - admin_user["access_key"] = access_key - admin_user["secret_key"] = secret_key - else: - raw_config = { - "users": [ - { + if is_v2: + admin_keys = admin_user.get("access_keys", []) + if admin_keys: + admin_keys[0]["access_key"] = access_key + admin_keys[0]["secret_key"] = secret_key + else: + from datetime import datetime as _dt, timezone as _tz + admin_user["access_keys"] = [{ "access_key": access_key, "secret_key": secret_key, + "status": "active", + "created_at": _dt.now(_tz.utc).isoformat(), + }] + else: + admin_user["access_key"] = access_key + admin_user["secret_key"] = secret_key + else: + from datetime import datetime as _dt, timezone as _tz + raw_config = { + "version": 2, + "users": [ + { + "user_id": f"u-{secrets.token_hex(8)}", "display_name": "Local Admin", + "enabled": True, + "access_keys": [ + { + "access_key": access_key, + "secret_key": secret_key, + "status": "active", + "created_at": _dt.now(_tz.utc).isoformat(), + } + ], "policies": [ {"bucket": "*", "actions": list(ALLOWED_ACTIONS)} ], diff --git a/static/js/iam-management.js b/static/js/iam-management.js index e75484b..ef5cde7 100644 --- a/static/js/iam-management.js +++ b/static/js/iam-management.js @@ -17,12 +17,20 @@ window.IAMManagement = (function() { var currentDeleteKey = null; var currentExpiryKey = null; - var ALL_S3_ACTIONS = ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'lifecycle', 'cors']; + var ALL_S3_ACTIONS = [ + 'list', 'read', 'write', 'delete', 'share', 'policy', + 'replication', 'lifecycle', 'cors', + 'create_bucket', 'delete_bucket', + 'versioning', 'tagging', 'encryption', 'quota', + 'object_lock', 'notification', 'logging', 'website' + ]; var policyTemplates = { - full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'replication', 'lifecycle', 'cors', 'iam:*'] }], + full: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'create_bucket', 'delete_bucket', 'replication', 'lifecycle', 'cors', 'versioning', 'tagging', 'encryption', 'quota', 'object_lock', 'notification', 'logging', 'website', 'iam:*'] }], readonly: [{ bucket: '*', actions: ['list', 'read'] }], - writer: [{ bucket: '*', actions: ['list', 'read', 'write'] }] + writer: [{ bucket: '*', actions: ['list', 'read', 'write'] }], + operator: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'create_bucket', 'delete_bucket'] }], + bucketadmin: [{ bucket: '*', actions: ['list', 'read', 'write', 'delete', 'share', 'policy', 'create_bucket', 'delete_bucket', 'versioning', 'tagging', 'encryption', 'cors', 'lifecycle', 'quota', 'object_lock', 'notification', 'logging', 'website', 'replication'] }] }; function isAdminUser(policies) { diff --git a/templates/iam.html b/templates/iam.html index 5ffc151..e685b9f 100644 --- a/templates/iam.html +++ b/templates/iam.html @@ -235,7 +235,7 @@ {% set bucket_label = 'All Buckets' if policy.bucket == '*' else policy.bucket %} {% if '*' in policy.actions %} {% set perm_label = 'Full Access' %} - {% elif policy.actions|length >= 9 %} + {% elif policy.actions|length >= 19 %} {% set perm_label = 'Full Access' %} {% elif 'list' in policy.actions and 'read' in policy.actions and 'write' in policy.actions and 'delete' in policy.actions %} {% set perm_label = 'Read + Write + Delete' %} @@ -354,6 +354,8 @@ + + diff --git a/tests/conftest.py b/tests/conftest.py index 89bcca1..57d0382 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,10 @@ def app(tmp_path: Path): "access_key": "test", "secret_key": "secret", "display_name": "Test User", - "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy"]}], + "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", + "create_bucket", "delete_bucket", "share", "versioning", "tagging", + "encryption", "cors", "lifecycle", "replication", "quota", + "object_lock", "notification", "logging", "website"]}], } ] } From df4f27ca2e0279b9d83492673d2a206c9f747dc1 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 15 Mar 2026 16:04:35 +0800 Subject: [PATCH 05/19] Fix IAM policy editor injecting prefix on existing policies without one --- app/iam.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/iam.py b/app/iam.py index 702d209..f4d895d 100644 --- a/app/iam.py +++ b/app/iam.py @@ -482,7 +482,7 @@ class IamService: "expires_at": record.get("expires_at"), "access_keys": access_keys, "policies": [ - {"bucket": policy.bucket, "actions": sorted(policy.actions), "prefix": policy.prefix} + {**{"bucket": policy.bucket, "actions": sorted(policy.actions)}, **({"prefix": policy.prefix} if policy.prefix != "*" else {})} for policy in record["policies"] ], } @@ -614,7 +614,7 @@ class IamService: if not record: raise IamError("User not found") return [ - {"bucket": p.bucket, "actions": sorted(p.actions), "prefix": p.prefix} + {**{"bucket": p.bucket, "actions": sorted(p.actions)}, **({"prefix": p.prefix} if p.prefix != "*" else {})} for p in record["policies"] ] From a496862902d22fbe64a0b3f44f71c8345b90c8f0 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 17 Mar 2026 23:25:30 +0800 Subject: [PATCH 06/19] Fix stale object count on dashboard after deleting all objects in bucket --- app/storage.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app/storage.py b/app/storage.py index 88fb303..21e920a 100644 --- a/app/storage.py +++ b/app/storage.py @@ -406,6 +406,10 @@ class ObjectStorage: self._stats_serial[bucket_id] = self._stats_serial.get(bucket_id, 0) + 1 self._stats_mem_time[bucket_id] = time.monotonic() self._stats_dirty.add(bucket_id) + needs_immediate = data["objects"] == 0 and objects_delta < 0 + if needs_immediate: + self._flush_stats() + else: self._schedule_stats_flush() def _schedule_stats_flush(self) -> None: From 14786151e59ae5bc3c26de0b74c05dee9cfe8f6e Mon Sep 17 00:00:00 2001 From: kqjy Date: Fri, 20 Mar 2026 12:10:26 +0800 Subject: [PATCH 07/19] Fix selected object losing highlight on scroll in virtual list --- static/js/bucket-detail-main.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index d96b046..90517fe 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -849,6 +849,11 @@ selectCheckbox.checked = true; row.classList.add('table-active'); } + + if (activeRow && activeRow.dataset.key === row.dataset.key) { + row.classList.add('table-active'); + activeRow = row; + } }); const folderRows = document.querySelectorAll('.folder-row'); From aa4f9f5566e34b72e26e3c4ee732b48a5f4c38bd Mon Sep 17 00:00:00 2001 From: kqjy Date: Fri, 20 Mar 2026 17:35:10 +0800 Subject: [PATCH 08/19] =?UTF-8?q?Bypass=20boto3=20proxy=20for=20object=20s?= =?UTF-8?q?treaming,=20read=20directly=20from=20storage=20layer;=20Add=20s?= =?UTF-8?q?treaming=20object=20iterator=20to=20eliminate=20O(n=C2=B2)=20di?= =?UTF-8?q?rectory=20rescanning=20on=20large=20buckets;=20Add=20iter=5Fobj?= =?UTF-8?q?ects=5Fshallow=20delegation=20to=20EncryptedObjectStorage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/encrypted_storage.py | 3 ++ app/storage.py | 67 +++++++++++++++++++++++++++++++++++ app/ui.py | 75 +++++++++++++++++++++++++++++++++++----- 3 files changed, 136 insertions(+), 9 deletions(-) diff --git a/app/encrypted_storage.py b/app/encrypted_storage.py index a0d3a58..b64e1d1 100644 --- a/app/encrypted_storage.py +++ b/app/encrypted_storage.py @@ -193,6 +193,9 @@ class EncryptedObjectStorage: def list_objects_shallow(self, bucket_name: str, **kwargs): return self.storage.list_objects_shallow(bucket_name, **kwargs) + def iter_objects_shallow(self, bucket_name: str, **kwargs): + return self.storage.iter_objects_shallow(bucket_name, **kwargs) + def search_objects(self, bucket_name: str, query: str, **kwargs): return self.storage.search_objects(bucket_name, query, **kwargs) diff --git a/app/storage.py b/app/storage.py index 21e920a..fa66332 100644 --- a/app/storage.py +++ b/app/storage.py @@ -714,6 +714,73 @@ class ObjectStorage: next_continuation_token=next_token, ) + def iter_objects_shallow( + self, + bucket_name: str, + *, + prefix: str = "", + delimiter: str = "/", + ) -> Generator[tuple[str, ObjectMeta | str], None, None]: + bucket_path = self._bucket_path(bucket_name) + if not bucket_path.exists(): + raise BucketNotFoundError("Bucket does not exist") + bucket_id = bucket_path.name + + target_dir = bucket_path + if prefix: + safe_prefix_path = Path(prefix.rstrip("/")) + if ".." in safe_prefix_path.parts: + return + target_dir = bucket_path / safe_prefix_path + try: + resolved = target_dir.resolve() + bucket_resolved = bucket_path.resolve() + if not str(resolved).startswith(str(bucket_resolved) + os.sep) and resolved != bucket_resolved: + return + except (OSError, ValueError): + return + + if not target_dir.exists() or not target_dir.is_dir(): + return + + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + meta_cache: Dict[str, str] = {} + if etag_index_path.exists(): + try: + with open(etag_index_path, 'r', encoding='utf-8') as f: + meta_cache = json.load(f) + except (OSError, json.JSONDecodeError): + pass + + try: + with os.scandir(str(target_dir)) as it: + for entry in it: + name = entry.name + if name in self.INTERNAL_FOLDERS: + continue + if entry.is_dir(follow_symlinks=False): + yield ("folder", prefix + name + delimiter) + elif entry.is_file(follow_symlinks=False): + key = prefix + name + try: + st = entry.stat() + etag = meta_cache.get(key) + if etag is None: + safe_key = PurePosixPath(key) + meta = self._read_metadata(bucket_id, Path(safe_key)) + etag = meta.get("__etag__") if meta else None + yield ("object", ObjectMeta( + key=key, + size=st.st_size, + last_modified=datetime.fromtimestamp(st.st_mtime, timezone.utc), + etag=etag, + metadata=None, + )) + except OSError: + pass + except OSError: + return + def _shallow_via_full_scan( self, bucket_name: str, diff --git a/app/ui.py b/app/ui.py index 9fb832b..b1a0947 100644 --- a/app/ui.py +++ b/app/ui.py @@ -618,20 +618,77 @@ def stream_bucket_objects(bucket_name: str): prefix = request.args.get("prefix") or None delimiter = request.args.get("delimiter") or None + storage = _storage() try: - client = get_session_s3_client() - except (PermissionError, RuntimeError) as exc: - return jsonify({"error": str(exc)}), 403 - - versioning_enabled = get_versioning_via_s3(client, bucket_name) + versioning_enabled = storage.is_versioning_enabled(bucket_name) + except StorageError: + versioning_enabled = False url_templates = build_url_templates(bucket_name) display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC") + def generate(): + yield json.dumps({ + "type": "meta", + "versioning_enabled": versioning_enabled, + "url_templates": url_templates, + }) + "\n" + yield json.dumps({"type": "count", "total_count": 0}) + "\n" + + running_count = 0 + try: + if delimiter: + for item_type, item in storage.iter_objects_shallow( + bucket_name, prefix=prefix or "", delimiter=delimiter, + ): + if item_type == "folder": + yield json.dumps({"type": "folder", "prefix": item}) + "\n" + else: + last_mod = item.last_modified + yield json.dumps({ + "type": "object", + "key": item.key, + "size": item.size, + "last_modified": last_mod.isoformat(), + "last_modified_display": _format_datetime_display(last_mod, display_tz), + "last_modified_iso": _format_datetime_iso(last_mod, display_tz), + "etag": item.etag or "", + }) + "\n" + running_count += 1 + if running_count % 1000 == 0: + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + else: + continuation_token = None + while True: + result = storage.list_objects( + bucket_name, + max_keys=1000, + continuation_token=continuation_token, + prefix=prefix, + ) + for obj in result.objects: + last_mod = obj.last_modified + yield json.dumps({ + "type": "object", + "key": obj.key, + "size": obj.size, + "last_modified": last_mod.isoformat(), + "last_modified_display": _format_datetime_display(last_mod, display_tz), + "last_modified_iso": _format_datetime_iso(last_mod, display_tz), + "etag": obj.etag or "", + }) + "\n" + running_count += len(result.objects) + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + if not result.is_truncated: + break + continuation_token = result.next_continuation_token + except StorageError as exc: + yield json.dumps({"type": "error", "error": str(exc)}) + "\n" + return + yield json.dumps({"type": "count", "total_count": running_count}) + "\n" + yield json.dumps({"type": "done"}) + "\n" + return Response( - stream_objects_ndjson( - client, bucket_name, prefix, url_templates, display_tz, versioning_enabled, - delimiter=delimiter, - ), + generate(), mimetype='application/x-ndjson', headers={ 'Cache-Control': 'no-cache', From c807bb238815de7f3fe4102cd4b4268d3b81f114 Mon Sep 17 00:00:00 2001 From: kqjy Date: Fri, 20 Mar 2026 17:51:00 +0800 Subject: [PATCH 09/19] Update install/uninstall scripts for encrypted IAM config --- scripts/install.sh | 47 +++++++++++++++++++++----------------------- scripts/uninstall.sh | 5 ++++- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/scripts/install.sh b/scripts/install.sh index 8fdad2b..50094d2 100644 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -379,29 +379,25 @@ if [[ "$SKIP_SYSTEMD" != true ]]; then echo " ---------------" if systemctl is-active --quiet myfsio; then echo " [OK] MyFSIO is running" - - IAM_FILE="$DATA_DIR/.myfsio.sys/config/iam.json" - if [[ -f "$IAM_FILE" ]]; then - echo "" - echo " ============================================" - echo " ADMIN CREDENTIALS (save these securely!)" - echo " ============================================" - if command -v jq &>/dev/null; then - ACCESS_KEY=$(jq -r '.users[0].access_key' "$IAM_FILE" 2>/dev/null) - SECRET_KEY=$(jq -r '.users[0].secret_key' "$IAM_FILE" 2>/dev/null) - else - ACCESS_KEY=$(grep -o '"access_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/') - SECRET_KEY=$(grep -o '"secret_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/') - fi - if [[ -n "$ACCESS_KEY" && -n "$SECRET_KEY" ]]; then - echo " Access Key: $ACCESS_KEY" - echo " Secret Key: $SECRET_KEY" - else - echo " [!] Could not parse credentials from $IAM_FILE" - echo " Check the file manually or view service logs." - fi - echo " ============================================" + echo "" + echo " ============================================" + echo " ADMIN CREDENTIALS (save these securely!)" + echo " ============================================" + CRED_OUTPUT=$(journalctl -u myfsio --no-pager -n 50 2>/dev/null | grep -A 5 "FIRST RUN - ADMIN CREDENTIALS") + ACCESS_KEY=$(echo "$CRED_OUTPUT" | grep "Access Key:" | head -1 | sed 's/.*Access Key: //' | awk '{print $1}') + SECRET_KEY=$(echo "$CRED_OUTPUT" | grep "Secret Key:" | head -1 | sed 's/.*Secret Key: //' | awk '{print $1}') + if [[ -n "$ACCESS_KEY" && "$ACCESS_KEY" != *"from"* && -n "$SECRET_KEY" && "$SECRET_KEY" != *"from"* ]]; then + echo " Access Key: $ACCESS_KEY" + echo " Secret Key: $SECRET_KEY" + else + echo " [!] Could not extract credentials from service logs." + echo " Check startup output: journalctl -u myfsio --no-pager | grep -A 5 'ADMIN CREDENTIALS'" + echo " Or reset credentials: $INSTALL_DIR/myfsio reset-cred" fi + echo " ============================================" + echo "" + echo " NOTE: The IAM config file is encrypted at rest." + echo " Credentials are only shown on first run or after reset." else echo " [WARNING] MyFSIO may not have started correctly" echo " Check logs with: journalctl -u myfsio -f" @@ -427,12 +423,13 @@ echo " API: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "local echo " UI: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "localhost"):$UI_PORT/ui" echo "" echo "Credentials:" -echo " Admin credentials were shown above (if service was started)." -echo " You can also find them in: $DATA_DIR/.myfsio.sys/config/iam.json" +echo " Admin credentials are shown on first service start (see above)." +echo " The IAM config is encrypted at rest and cannot be read directly." +echo " To reset credentials: $INSTALL_DIR/myfsio reset-cred" echo "" echo "Configuration Files:" echo " Environment: $INSTALL_DIR/myfsio.env" -echo " IAM Users: $DATA_DIR/.myfsio.sys/config/iam.json" +echo " IAM Users: $DATA_DIR/.myfsio.sys/config/iam.json (encrypted)" echo " Bucket Policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json" echo " Secret Key: $DATA_DIR/.myfsio.sys/config/.secret (auto-generated)" echo "" diff --git a/scripts/uninstall.sh b/scripts/uninstall.sh index a920eb2..24fd3e6 100644 --- a/scripts/uninstall.sh +++ b/scripts/uninstall.sh @@ -230,11 +230,14 @@ if [[ "$KEEP_DATA" == true ]]; then echo "" echo "Preserved files include:" echo " - All buckets and objects" - echo " - IAM configuration: $DATA_DIR/.myfsio.sys/config/iam.json" + echo " - IAM configuration: $DATA_DIR/.myfsio.sys/config/iam.json (encrypted at rest)" echo " - Bucket policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json" echo " - Secret key: $DATA_DIR/.myfsio.sys/config/.secret" echo " - Encryption keys: $DATA_DIR/.myfsio.sys/keys/ (if encryption was enabled)" echo "" + echo "NOTE: The IAM config is encrypted and requires the SECRET_KEY to read." + echo " Keep the .secret file intact for reinstallation." + echo "" echo "To reinstall MyFSIO with existing data:" echo " ./install.sh --data-dir $DATA_DIR" echo "" From afd7173ba08ec052add6d42acdb8f2800720677c Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 21 Mar 2026 14:51:43 +0800 Subject: [PATCH 10/19] Fix buttons all showing Running state when only one action is triggered --- templates/system.html | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/templates/system.html b/templates/system.html index 39158d9..3758517 100644 --- a/templates/system.html +++ b/templates/system.html @@ -348,14 +348,14 @@ (function () { var csrfToken = document.querySelector('meta[name="csrf-token"]')?.getAttribute('content') || ''; - function setLoading(btnId, loading) { + function setLoading(btnId, loading, spinnerOnly) { var btn = document.getElementById(btnId); if (!btn) return; btn.disabled = loading; - if (loading) { + if (loading && !spinnerOnly) { btn.dataset.originalHtml = btn.innerHTML; btn.innerHTML = 'Running...'; - } else if (btn.dataset.originalHtml) { + } else if (!loading && btn.dataset.originalHtml) { btn.innerHTML = btn.dataset.originalHtml; } } @@ -370,8 +370,8 @@ } window.runGC = function (dryRun) { - setLoading('gcRunBtn', true); - setLoading('gcDryRunBtn', true); + setLoading(dryRun ? 'gcDryRunBtn' : 'gcRunBtn', true); + setLoading(dryRun ? 'gcRunBtn' : 'gcDryRunBtn', true, true); fetch('{{ url_for("ui.system_gc_run") }}', { method: 'POST', @@ -432,9 +432,10 @@ }; window.runIntegrity = function (dryRun, autoHeal) { - setLoading('integrityRunBtn', true); - setLoading('integrityHealBtn', true); - setLoading('integrityDryRunBtn', true); + var activeBtn = dryRun ? 'integrityDryRunBtn' : (autoHeal ? 'integrityHealBtn' : 'integrityRunBtn'); + ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn'].forEach(function (id) { + setLoading(id, true, id !== activeBtn); + }); fetch('{{ url_for("ui.system_integrity_run") }}', { method: 'POST', From a059f0502de0d2f8672cf47a438d4f420d966db3 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 21 Mar 2026 22:57:48 +0800 Subject: [PATCH 11/19] Fix 0-byte uploads caused by Granian default buffer size; Add SERVER_MAX_BUFFER_SIZE config --- app/config.py | 7 +++++++ run.py | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/app/config.py b/app/config.py index b8db352..d196397 100644 --- a/app/config.py +++ b/app/config.py @@ -115,6 +115,7 @@ class AppConfig: server_connection_limit: int server_backlog: int server_channel_timeout: int + server_max_buffer_size: int server_threads_auto: bool server_connection_limit_auto: bool server_backlog_auto: bool @@ -293,6 +294,7 @@ class AppConfig: server_backlog_auto = False server_channel_timeout = int(_get("SERVER_CHANNEL_TIMEOUT", 120)) + server_max_buffer_size = int(_get("SERVER_MAX_BUFFER_SIZE", 1024 * 1024 * 128)) site_sync_enabled = str(_get("SITE_SYNC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} site_sync_interval_seconds = int(_get("SITE_SYNC_INTERVAL_SECONDS", 60)) site_sync_batch_size = int(_get("SITE_SYNC_BATCH_SIZE", 100)) @@ -394,6 +396,7 @@ class AppConfig: server_connection_limit=server_connection_limit, server_backlog=server_backlog, server_channel_timeout=server_channel_timeout, + server_max_buffer_size=server_max_buffer_size, server_threads_auto=server_threads_auto, server_connection_limit_auto=server_connection_limit_auto, server_backlog_auto=server_backlog_auto, @@ -508,6 +511,8 @@ class AppConfig: issues.append(f"CRITICAL: SERVER_BACKLOG={self.server_backlog} is outside valid range (128-4096). Server cannot start.") if not (10 <= self.server_channel_timeout <= 300): issues.append(f"CRITICAL: SERVER_CHANNEL_TIMEOUT={self.server_channel_timeout} is outside valid range (10-300). Server cannot start.") + if self.server_max_buffer_size < 1024 * 1024: + issues.append(f"WARNING: SERVER_MAX_BUFFER_SIZE={self.server_max_buffer_size} is less than 1MB. Large uploads will fail.") if sys.platform != "win32": try: @@ -553,6 +558,7 @@ class AppConfig: print(f" CONNECTION_LIMIT: {self.server_connection_limit}{_auto(self.server_connection_limit_auto)}") print(f" BACKLOG: {self.server_backlog}{_auto(self.server_backlog_auto)}") print(f" CHANNEL_TIMEOUT: {self.server_channel_timeout}s") + print(f" MAX_BUFFER_SIZE: {self.server_max_buffer_size // (1024 * 1024)}MB") print("=" * 60) issues = self.validate_and_report() @@ -618,6 +624,7 @@ class AppConfig: "SERVER_CONNECTION_LIMIT": self.server_connection_limit, "SERVER_BACKLOG": self.server_backlog, "SERVER_CHANNEL_TIMEOUT": self.server_channel_timeout, + "SERVER_MAX_BUFFER_SIZE": self.server_max_buffer_size, "SITE_SYNC_ENABLED": self.site_sync_enabled, "SITE_SYNC_INTERVAL_SECONDS": self.site_sync_interval_seconds, "SITE_SYNC_BATCH_SIZE": self.site_sync_batch_size, diff --git a/run.py b/run.py index 656cce6..d4ea394 100644 --- a/run.py +++ b/run.py @@ -62,6 +62,11 @@ def _serve_granian(target: str, port: int, config: Optional[AppConfig] = None) - kwargs["backpressure"] = config.server_connection_limit kwargs["http1_settings"] = HTTP1Settings( header_read_timeout=config.server_channel_timeout * 1000, + max_buffer_size=config.server_max_buffer_size, + ) + else: + kwargs["http1_settings"] = HTTP1Settings( + max_buffer_size=1024 * 1024 * 128, ) server = Granian(**kwargs) From e84f1f18519ae4374092599c369b3fc5d2f4cd90 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 21 Mar 2026 23:48:19 +0800 Subject: [PATCH 12/19] Fix SigV4 SignatureDoesNotMatch when Expect header is stripped by WSGI server --- app/s3_api.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/app/s3_api.py b/app/s3_api.py index 6a77017..2c151ee 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -301,7 +301,12 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if _HAS_RUST: query_params = list(req.args.items(multi=True)) - header_values = [(h, req.headers.get(h) or "") for h in signed_headers_str.split(";")] + header_values = [] + for h in signed_headers_str.split(";"): + val = req.headers.get(h) or "" + if h.lower() == "expect" and val == "": + val = "100-continue" + header_values.append((h, val)) if not _rc.verify_sigv4_signature( req.method, canonical_uri, query_params, signed_headers_str, header_values, payload_hash, amz_date, date_stamp, region, @@ -390,7 +395,12 @@ def _verify_sigv4_query(req: Any) -> Principal | None: if _HAS_RUST: query_params = [(k, v) for k, v in req.args.items(multi=True) if k != "X-Amz-Signature"] - header_values = [(h, req.headers.get(h) or "") for h in signed_headers_str.split(";")] + header_values = [] + for h in signed_headers_str.split(";"): + val = req.headers.get(h) or "" + if h.lower() == "expect" and val == "": + val = "100-continue" + header_values.append((h, val)) if not _rc.verify_sigv4_signature( req.method, canonical_uri, query_params, signed_headers_str, header_values, "UNSIGNED-PAYLOAD", amz_date, date_stamp, region, From 966d524dca3cbc7e16b38a6d74d3d8fc4caba74d Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 00:04:55 +0800 Subject: [PATCH 13/19] Fix 0-byte uploads caused by Granian stripping Expect header and missing CONTENT_LENGTH for chunked transfers --- app/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/app/__init__.py b/app/__init__.py index e9e7b71..50f85f4 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -18,6 +18,8 @@ from flask_cors import CORS from flask_wtf.csrf import CSRFError from werkzeug.middleware.proxy_fix import ProxyFix +import io + from .access_logging import AccessLoggingService from .operation_metrics import OperationMetricsCollector, classify_endpoint from .compression import GzipMiddleware @@ -44,6 +46,20 @@ from .website_domains import WebsiteDomainStore _request_counter = itertools.count(1) +class _ChunkedTransferMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "") + if "chunked" in transfer_encoding.lower() and "CONTENT_LENGTH" not in environ: + raw = environ["wsgi.input"] + body = raw.read() + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) + return self.app(environ, start_response) + + def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: """Migrate config file from legacy locations to the active path. @@ -110,6 +126,8 @@ def create_app( if app.config.get("ENABLE_GZIP", True): app.wsgi_app = GzipMiddleware(app.wsgi_app, compression_level=6) + app.wsgi_app = _ChunkedTransferMiddleware(app.wsgi_app) + _configure_cors(app) _configure_logging(app) From 7612cb054a66a8b376f483f49b167890d8eba534 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 00:16:30 +0800 Subject: [PATCH 14/19] further fixes --- app/__init__.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 50f85f4..a708553 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -51,12 +51,21 @@ class _ChunkedTransferMiddleware: self.app = app def __call__(self, environ, start_response): - transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "") - if "chunked" in transfer_encoding.lower() and "CONTENT_LENGTH" not in environ: - raw = environ["wsgi.input"] - body = raw.read() - environ["wsgi.input"] = io.BytesIO(body) - environ["CONTENT_LENGTH"] = str(len(body)) + has_content_length = environ.get("CONTENT_LENGTH") + if not has_content_length and environ.get("REQUEST_METHOD") in ("PUT", "POST"): + transfer_chunked = "chunked" in environ.get("HTTP_TRANSFER_ENCODING", "").lower() + has_body_hint = bool(environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH")) + content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") + has_aws_chunked = "aws-chunked" in content_encoding.lower() + if transfer_chunked or has_body_hint or has_aws_chunked: + try: + raw = environ["wsgi.input"] + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) return self.app(environ, start_response) From 366f8ce60d7a8f996295fa19212a9c6fe33d1297 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 00:24:04 +0800 Subject: [PATCH 15/19] the middleware now also triggers when Content-Length is '0' but X-Amz-Decoded-Content-Length or aws-chunked headers indicate a body should be present --- app/__init__.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index a708553..45c41a3 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -51,21 +51,26 @@ class _ChunkedTransferMiddleware: self.app = app def __call__(self, environ, start_response): - has_content_length = environ.get("CONTENT_LENGTH") - if not has_content_length and environ.get("REQUEST_METHOD") in ("PUT", "POST"): - transfer_chunked = "chunked" in environ.get("HTTP_TRANSFER_ENCODING", "").lower() - has_body_hint = bool(environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH")) - content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") - has_aws_chunked = "aws-chunked" in content_encoding.lower() - if transfer_chunked or has_body_hint or has_aws_chunked: - try: - raw = environ["wsgi.input"] - body = raw.read() - except Exception: - body = b"" - if body: - environ["wsgi.input"] = io.BytesIO(body) - environ["CONTENT_LENGTH"] = str(len(body)) + if environ.get("REQUEST_METHOD") not in ("PUT", "POST"): + return self.app(environ, start_response) + + content_length = environ.get("CONTENT_LENGTH") + body_expected = ( + environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH") + or "chunked" in environ.get("HTTP_TRANSFER_ENCODING", "").lower() + or "aws-chunked" in environ.get("HTTP_CONTENT_ENCODING", "").lower() + ) + + if body_expected and (not content_length or content_length == "0"): + try: + raw = environ["wsgi.input"] + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) + return self.app(environ, start_response) From 532cf95d59be498e4fa8c527d116be753ebdcc28 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 11:14:32 +0800 Subject: [PATCH 16/19] Debug s3 api issues on Granian --- app/__init__.py | 27 ++++++++++++++++++++++----- app/admin_api.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 45c41a3..95801e7 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -47,6 +47,8 @@ _request_counter = itertools.count(1) class _ChunkedTransferMiddleware: + _logger = logging.getLogger("chunked_middleware") + def __init__(self, app): self.app = app @@ -54,14 +56,29 @@ class _ChunkedTransferMiddleware: if environ.get("REQUEST_METHOD") not in ("PUT", "POST"): return self.app(environ, start_response) + sha256 = environ.get("HTTP_X_AMZ_CONTENT_SHA256", "") + decoded_len = environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH", "") + content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") + transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "") content_length = environ.get("CONTENT_LENGTH") - body_expected = ( - environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH") - or "chunked" in environ.get("HTTP_TRANSFER_ENCODING", "").lower() - or "aws-chunked" in environ.get("HTTP_CONTENT_ENCODING", "").lower() + + is_streaming = ( + "STREAMING" in sha256.upper() + or decoded_len + or "aws-chunked" in content_encoding.lower() + or "chunked" in transfer_encoding.lower() ) - if body_expected and (not content_length or content_length == "0"): + if not is_streaming: + return self.app(environ, start_response) + + cl_int = 0 + try: + cl_int = int(content_length) if content_length else 0 + except (ValueError, TypeError): + pass + + if cl_int <= 0: try: raw = environ["wsgi.input"] body = raw.read() diff --git a/app/admin_api.py b/app/admin_api.py index 5b3eec3..c6738db 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -982,3 +982,42 @@ def integrity_history(): offset = int(request.args.get("offset", 0)) records = checker.get_history(limit=limit, offset=offset) return jsonify({"executions": records}) + + +@admin_api_bp.route("/debug/upload", methods=["PUT", "POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def debug_upload(): + principal, error = _require_admin() + if error: + return error + + env = request.environ + info = { + "CONTENT_LENGTH": env.get("CONTENT_LENGTH"), + "CONTENT_TYPE": env.get("CONTENT_TYPE"), + "HTTP_TRANSFER_ENCODING": env.get("HTTP_TRANSFER_ENCODING"), + "HTTP_CONTENT_ENCODING": env.get("HTTP_CONTENT_ENCODING"), + "HTTP_EXPECT": env.get("HTTP_EXPECT"), + "HTTP_X_AMZ_CONTENT_SHA256": env.get("HTTP_X_AMZ_CONTENT_SHA256"), + "HTTP_X_AMZ_DECODED_CONTENT_LENGTH": env.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH"), + "REQUEST_METHOD": env.get("REQUEST_METHOD"), + "SERVER_PROTOCOL": env.get("SERVER_PROTOCOL"), + "wsgi.input_type": type(env.get("wsgi.input")).__name__, + "request.content_length": request.content_length, + } + + try: + body = request.get_data(cache=False) + info["body_length"] = len(body) + if body: + info["body_preview_hex"] = body[:200].hex() + except Exception as e: + info["body_error"] = str(e) + + all_http = {} + for k, v in env.items(): + if k.startswith("HTTP_") and isinstance(v, str): + all_http[k] = v + info["all_http_headers"] = all_http + + return jsonify(info) From bd20ca86ab0144691c17fe36498a3aeb75e5d705 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 11:22:24 +0800 Subject: [PATCH 17/19] Further debugging on s3 api issues on Granian --- app/__init__.py | 11 ++++++++--- app/admin_api.py | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 95801e7..359f459 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -47,7 +47,6 @@ _request_counter = itertools.count(1) class _ChunkedTransferMiddleware: - _logger = logging.getLogger("chunked_middleware") def __init__(self, app): self.app = app @@ -72,6 +71,13 @@ class _ChunkedTransferMiddleware: if not is_streaming: return self.app(environ, start_response) + raw = environ.get("wsgi.input") + if raw and hasattr(raw, "seek"): + try: + raw.seek(0) + except Exception: + pass + cl_int = 0 try: cl_int = int(content_length) if content_length else 0 @@ -80,8 +86,7 @@ class _ChunkedTransferMiddleware: if cl_int <= 0: try: - raw = environ["wsgi.input"] - body = raw.read() + body = raw.read() if raw else b"" except Exception: body = b"" if body: diff --git a/app/admin_api.py b/app/admin_api.py index c6738db..faa0fe0 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -1006,6 +1006,32 @@ def debug_upload(): "request.content_length": request.content_length, } + raw = env.get("wsgi.input") + if raw: + try: + info["wsgi_input_pos"] = raw.tell() if hasattr(raw, "tell") else "N/A" + except Exception: + info["wsgi_input_pos"] = "error" + if hasattr(raw, "getvalue"): + try: + full = raw.getvalue() + info["wsgi_input_getvalue_len"] = len(full) + if full: + info["wsgi_input_getvalue_hex"] = full[:200].hex() + except Exception as e: + info["wsgi_input_getvalue_error"] = str(e) + try: + if hasattr(raw, "seek"): + raw.seek(0) + seek_body = raw.read() + info["wsgi_input_after_seek0_len"] = len(seek_body) + if seek_body: + info["wsgi_input_after_seek0_hex"] = seek_body[:200].hex() + if hasattr(raw, "seek"): + raw.seek(0) + except Exception as e: + info["wsgi_input_seek_read_error"] = str(e) + try: body = request.get_data(cache=False) info["body_length"] = len(body) From 7a3202c99619dc3686407e14d776bdbf16ebb10b Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 11:27:52 +0800 Subject: [PATCH 18/19] Possible fix for the issue --- app/__init__.py | 60 ++++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 359f459..015e16d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -55,21 +55,44 @@ class _ChunkedTransferMiddleware: if environ.get("REQUEST_METHOD") not in ("PUT", "POST"): return self.app(environ, start_response) - sha256 = environ.get("HTTP_X_AMZ_CONTENT_SHA256", "") - decoded_len = environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH", "") - content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "") content_length = environ.get("CONTENT_LENGTH") - is_streaming = ( - "STREAMING" in sha256.upper() - or decoded_len - or "aws-chunked" in content_encoding.lower() - or "chunked" in transfer_encoding.lower() - ) + if "chunked" in transfer_encoding.lower(): + if content_length: + del environ["HTTP_TRANSFER_ENCODING"] + else: + raw = environ.get("wsgi.input") + if raw: + try: + if hasattr(raw, "seek"): + raw.seek(0) + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) + del environ["HTTP_TRANSFER_ENCODING"] - if not is_streaming: - return self.app(environ, start_response) + content_length = environ.get("CONTENT_LENGTH") + if not content_length or content_length == "0": + sha256 = environ.get("HTTP_X_AMZ_CONTENT_SHA256", "") + decoded_len = environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH", "") + content_encoding = environ.get("HTTP_CONTENT_ENCODING", "") + if ("STREAMING" in sha256.upper() or decoded_len + or "aws-chunked" in content_encoding.lower()): + raw = environ.get("wsgi.input") + if raw: + try: + if hasattr(raw, "seek"): + raw.seek(0) + body = raw.read() + except Exception: + body = b"" + if body: + environ["wsgi.input"] = io.BytesIO(body) + environ["CONTENT_LENGTH"] = str(len(body)) raw = environ.get("wsgi.input") if raw and hasattr(raw, "seek"): @@ -78,21 +101,6 @@ class _ChunkedTransferMiddleware: except Exception: pass - cl_int = 0 - try: - cl_int = int(content_length) if content_length else 0 - except (ValueError, TypeError): - pass - - if cl_int <= 0: - try: - body = raw.read() if raw else b"" - except Exception: - body = b"" - if body: - environ["wsgi.input"] = io.BytesIO(body) - environ["CONTENT_LENGTH"] = str(len(body)) - return self.app(environ, start_response) From 4a553555d371f4174d43524f9cd387f875f82e93 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 11:38:29 +0800 Subject: [PATCH 19/19] Clean up debug code --- app/admin_api.py | 63 ------------------------------------------------ 1 file changed, 63 deletions(-) diff --git a/app/admin_api.py b/app/admin_api.py index faa0fe0..ccc0408 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -984,66 +984,3 @@ def integrity_history(): return jsonify({"executions": records}) -@admin_api_bp.route("/debug/upload", methods=["PUT", "POST"]) -@limiter.limit(lambda: _get_admin_rate_limit()) -def debug_upload(): - principal, error = _require_admin() - if error: - return error - - env = request.environ - info = { - "CONTENT_LENGTH": env.get("CONTENT_LENGTH"), - "CONTENT_TYPE": env.get("CONTENT_TYPE"), - "HTTP_TRANSFER_ENCODING": env.get("HTTP_TRANSFER_ENCODING"), - "HTTP_CONTENT_ENCODING": env.get("HTTP_CONTENT_ENCODING"), - "HTTP_EXPECT": env.get("HTTP_EXPECT"), - "HTTP_X_AMZ_CONTENT_SHA256": env.get("HTTP_X_AMZ_CONTENT_SHA256"), - "HTTP_X_AMZ_DECODED_CONTENT_LENGTH": env.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH"), - "REQUEST_METHOD": env.get("REQUEST_METHOD"), - "SERVER_PROTOCOL": env.get("SERVER_PROTOCOL"), - "wsgi.input_type": type(env.get("wsgi.input")).__name__, - "request.content_length": request.content_length, - } - - raw = env.get("wsgi.input") - if raw: - try: - info["wsgi_input_pos"] = raw.tell() if hasattr(raw, "tell") else "N/A" - except Exception: - info["wsgi_input_pos"] = "error" - if hasattr(raw, "getvalue"): - try: - full = raw.getvalue() - info["wsgi_input_getvalue_len"] = len(full) - if full: - info["wsgi_input_getvalue_hex"] = full[:200].hex() - except Exception as e: - info["wsgi_input_getvalue_error"] = str(e) - try: - if hasattr(raw, "seek"): - raw.seek(0) - seek_body = raw.read() - info["wsgi_input_after_seek0_len"] = len(seek_body) - if seek_body: - info["wsgi_input_after_seek0_hex"] = seek_body[:200].hex() - if hasattr(raw, "seek"): - raw.seek(0) - except Exception as e: - info["wsgi_input_seek_read_error"] = str(e) - - try: - body = request.get_data(cache=False) - info["body_length"] = len(body) - if body: - info["body_preview_hex"] = body[:200].hex() - except Exception as e: - info["body_error"] = str(e) - - all_http = {} - for k, v in env.items(): - if k.startswith("HTTP_") and isinstance(v, str): - all_http[k] = v - info["all_http_headers"] = all_http - - return jsonify(info)