From 37d372c617e7a7bcf40bf1e548ccc9ac8458a105 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 22 Nov 2025 22:00:24 +0800 Subject: [PATCH 01/10] Add missing CreateMultipartUpload in API --- app/s3_api.py | 152 +++++++++++++++++++++++++++++++++++- tests/test_api_multipart.py | 93 ++++++++++++++++++++++ 2 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 tests/test_api_multipart.py diff --git a/app/s3_api.py b/app/s3_api.py index b080346..31c9142 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1078,12 +1078,23 @@ def bucket_handler(bucket_name: str) -> Response: return _xml_response(root) -@s3_api_bp.route("//", methods=["PUT", "GET", "DELETE", "HEAD"], strict_slashes=False) +@s3_api_bp.route("//", methods=["PUT", "GET", "DELETE", "HEAD", "POST"], strict_slashes=False) @limiter.limit("240 per minute") def object_handler(bucket_name: str, object_key: str): storage = _storage() + # Multipart Uploads + if request.method == "POST": + if "uploads" in request.args: + return _initiate_multipart_upload(bucket_name, object_key) + if "uploadId" in request.args: + return _complete_multipart_upload(bucket_name, object_key) + return _method_not_allowed(["GET", "PUT", "DELETE", "HEAD", "POST"]) + if request.method == "PUT": + if "partNumber" in request.args and "uploadId" in request.args: + return _upload_part(bucket_name, object_key) + _, error = _object_principal("write", bucket_name, object_key) if error: return error @@ -1147,6 +1158,9 @@ def object_handler(bucket_name: str, object_key: str): return response # DELETE + if "uploadId" in request.args: + return _abort_multipart_upload(bucket_name, object_key) + _, error = _object_principal("delete", bucket_name, object_key) if error: return error @@ -1354,3 +1368,139 @@ class AwsChunkedDecoder: self.chunk_remaining = chunk_size return result + + +def _initiate_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + metadata = _extract_request_metadata() + try: + upload_id = _storage().initiate_multipart_upload( + bucket_name, + object_key, + metadata=metadata or None + ) + except StorageError as exc: + return _error_response("NoSuchBucket", str(exc), 404) + + root = Element("InitiateMultipartUploadResult") + SubElement(root, "Bucket").text = bucket_name + SubElement(root, "Key").text = object_key + SubElement(root, "UploadId").text = upload_id + return _xml_response(root) + + +def _upload_part(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + part_number_str = request.args.get("partNumber") + if not upload_id or not part_number_str: + return _error_response("InvalidArgument", "uploadId and partNumber are required", 400) + + try: + part_number = int(part_number_str) + except ValueError: + return _error_response("InvalidArgument", "partNumber must be an integer", 400) + + stream = request.stream + content_encoding = request.headers.get("Content-Encoding", "").lower() + if "aws-chunked" in content_encoding: + stream = AwsChunkedDecoder(stream) + + try: + etag = _storage().upload_multipart_part(bucket_name, upload_id, part_number, stream) + except StorageError as exc: + if "NoSuchBucket" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + if "Multipart upload not found" in str(exc): + return _error_response("NoSuchUpload", str(exc), 404) + return _error_response("InvalidArgument", str(exc), 400) + + response = Response(status=200) + response.headers["ETag"] = f'"{etag}"' + return response + + +def _complete_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("write", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + if not upload_id: + return _error_response("InvalidArgument", "uploadId is required", 400) + + payload = request.get_data(cache=False) or b"" + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + if _strip_ns(root.tag) != "CompleteMultipartUpload": + return _error_response("MalformedXML", "Root element must be CompleteMultipartUpload", 400) + + parts = [] + for part_el in list(root): + if _strip_ns(part_el.tag) != "Part": + continue + part_number_el = part_el.find("{*}PartNumber") + if part_number_el is None: + part_number_el = part_el.find("PartNumber") + + etag_el = part_el.find("{*}ETag") + if etag_el is None: + etag_el = part_el.find("ETag") + + if part_number_el is not None and etag_el is not None: + parts.append({ + "PartNumber": int(part_number_el.text or 0), + "ETag": (etag_el.text or "").strip('"') + }) + + try: + meta = _storage().complete_multipart_upload(bucket_name, upload_id, parts) + except StorageError as exc: + if "NoSuchBucket" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + if "Multipart upload not found" in str(exc): + return _error_response("NoSuchUpload", str(exc), 404) + return _error_response("InvalidPart", str(exc), 400) + + # Trigger replication + user_agent = request.headers.get("User-Agent", "") + if "S3ReplicationAgent" not in user_agent: + _replication_manager().trigger_replication(bucket_name, object_key, action="write") + + root = Element("CompleteMultipartUploadResult") + # Use request.host_url to construct full location + location = f"{request.host_url}{bucket_name}/{object_key}" + SubElement(root, "Location").text = location + SubElement(root, "Bucket").text = bucket_name + SubElement(root, "Key").text = object_key + SubElement(root, "ETag").text = f'"{meta.etag}"' + + return _xml_response(root) + + +def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: + principal, error = _object_principal("delete", bucket_name, object_key) + if error: + return error + + upload_id = request.args.get("uploadId") + if not upload_id: + return _error_response("InvalidArgument", "uploadId is required", 400) + + try: + _storage().abort_multipart_upload(bucket_name, upload_id) + except StorageError as exc: + # Abort is idempotent, but if bucket missing... + if "Bucket does not exist" in str(exc): + return _error_response("NoSuchBucket", str(exc), 404) + + return Response(status=204) diff --git a/tests/test_api_multipart.py b/tests/test_api_multipart.py new file mode 100644 index 0000000..47d6eb3 --- /dev/null +++ b/tests/test_api_multipart.py @@ -0,0 +1,93 @@ +import io +import pytest +from xml.etree.ElementTree import fromstring + +@pytest.fixture +def client(app): + return app.test_client() + +@pytest.fixture +def auth_headers(app): + # Create a test user and return headers + # Using the user defined in conftest.py + return { + "X-Access-Key": "test", + "X-Secret-Key": "secret" + } + +def test_multipart_upload_flow(client, auth_headers): + # 1. Create bucket + client.put("/test-bucket", headers=auth_headers) + + # 2. Initiate Multipart Upload + resp = client.post("/test-bucket/large-file.txt?uploads", headers=auth_headers) + assert resp.status_code == 200 + root = fromstring(resp.data) + upload_id = root.find("UploadId").text + assert upload_id + + # 3. Upload Part 1 + resp = client.put( + f"/test-bucket/large-file.txt?partNumber=1&uploadId={upload_id}", + headers=auth_headers, + data=b"part1" + ) + assert resp.status_code == 200 + etag1 = resp.headers["ETag"] + assert etag1 + + # 4. Upload Part 2 + resp = client.put( + f"/test-bucket/large-file.txt?partNumber=2&uploadId={upload_id}", + headers=auth_headers, + data=b"part2" + ) + assert resp.status_code == 200 + etag2 = resp.headers["ETag"] + assert etag2 + + # 5. Complete Multipart Upload + xml_body = f""" + + + 1 + {etag1} + + + 2 + {etag2} + + + """ + resp = client.post( + f"/test-bucket/large-file.txt?uploadId={upload_id}", + headers=auth_headers, + data=xml_body + ) + assert resp.status_code == 200 + root = fromstring(resp.data) + assert root.find("Key").text == "large-file.txt" + + # 6. Verify object content + resp = client.get("/test-bucket/large-file.txt", headers=auth_headers) + assert resp.status_code == 200 + assert resp.data == b"part1part2" + +def test_abort_multipart_upload(client, auth_headers): + client.put("/abort-bucket", headers=auth_headers) + + # Initiate + resp = client.post("/abort-bucket/file.txt?uploads", headers=auth_headers) + upload_id = fromstring(resp.data).find("UploadId").text + + # Abort + resp = client.delete(f"/abort-bucket/file.txt?uploadId={upload_id}", headers=auth_headers) + assert resp.status_code == 204 + + # Try to upload part (should fail) + resp = client.put( + f"/abort-bucket/file.txt?partNumber=1&uploadId={upload_id}", + headers=auth_headers, + data=b"data" + ) + assert resp.status_code == 404 # NoSuchUpload From ef781ae0b1d761587b78f6db9abbca93024161e6 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 22 Nov 2025 23:03:50 +0800 Subject: [PATCH 02/10] Fix hardcoded localhost fallback --- app/ui.py | 3 ++- app/version.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/ui.py b/app/ui.py index adf1f6f..120c52f 100644 --- a/app/ui.py +++ b/app/ui.py @@ -224,7 +224,8 @@ def logout(): @ui_bp.get("/docs") def docs_page(): principal = _current_principal() - api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" + # Use the current request's host as the default API base if not configured + api_base = current_app.config.get("API_BASE_URL") or request.host_url api_base = api_base.rstrip("/") parsed = urlparse(api_base) api_host = parsed.netloc or parsed.path or api_base diff --git a/app/version.py b/app/version.py index bda6952..950456f 100644 --- a/app/version.py +++ b/app/version.py @@ -1,7 +1,7 @@ """Central location for the application version string.""" from __future__ import annotations -APP_VERSION = "0.1.1" +APP_VERSION = "0.1.2" def get_version() -> str: From 92cf8825cfe286101a5a594818c8698a204dea30 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 22 Nov 2025 23:18:16 +0800 Subject: [PATCH 03/10] Update docs --- app/ui.py | 13 ++++++------ templates/docs.html | 51 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/app/ui.py b/app/ui.py index 120c52f..9993ddb 100644 --- a/app/ui.py +++ b/app/ui.py @@ -712,17 +712,18 @@ def object_presign(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - api_base = current_app.config.get("API_BASE_URL") - if not api_base: - api_base = "http://127.0.0.1:5000" - api_base = api_base.rstrip("/") - - url = f"{api_base}/presign/{bucket_name}/{object_key}" + # Use internal URL for the connection to ensure reliability + # We ignore API_BASE_URL here because that might be set to a public domain + # which is not reachable from within the container (NAT/DNS issues). + connection_url = "http://127.0.0.1:5000" + url = f"{connection_url}/presign/{bucket_name}/{object_key}" headers = _api_headers() # Forward the host so the API knows the public URL + # We also add X-Forwarded-For to ensure ProxyFix middleware processes the headers headers["X-Forwarded-Host"] = request.host headers["X-Forwarded-Proto"] = request.scheme + headers["X-Forwarded-For"] = request.remote_addr or "127.0.0.1" try: response = requests.post(url, headers=headers, json=payload, timeout=5) diff --git a/templates/docs.html b/templates/docs.html index 6f0dd40..ea559c2 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -31,14 +31,61 @@ . .venv/Scripts/activate # PowerShell: .\\.venv\\Scripts\\Activate.ps1 pip install -r requirements.txt -# Run both API and UI +# Run both API and UI (Development) python run.py +# Run in Production (Waitress server) +python run.py --prod + # Or run individually python run.py --mode api python run.py --mode ui -

Configuration lives in app/config.py; override variables via the shell (e.g., STORAGE_ROOT, API_BASE_URL, SECRET_KEY, MAX_UPLOAD_SIZE).

+

Configuration

+

Configuration defaults live in app/config.py. You can override them using environment variables. This is critical for production deployments behind proxies.

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
VariableDefaultDescription
API_BASE_URLhttp://127.0.0.1:5000The public URL of the API. Required if running behind a proxy or if the UI and API are on different domains. Ensures presigned URLs are generated correctly.
STORAGE_ROOT./dataDirectory for buckets and objects.
MAX_UPLOAD_SIZE5 GBMax request body size.
SECRET_KEY(Random)Flask session key. Set this in production.
APP_HOST0.0.0.0Bind interface.
APP_PORT5000Listen port.
+
From e6ee341b930566a8f48e9379fbfe82284215fb79 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 22 Nov 2025 23:28:00 +0800 Subject: [PATCH 04/10] Undo: Fix hardcoded localhost fallback --- app/ui.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/app/ui.py b/app/ui.py index 9993ddb..9693240 100644 --- a/app/ui.py +++ b/app/ui.py @@ -224,8 +224,7 @@ def logout(): @ui_bp.get("/docs") def docs_page(): principal = _current_principal() - # Use the current request's host as the default API base if not configured - api_base = current_app.config.get("API_BASE_URL") or request.host_url + api_base = current_app.config.get("API_BASE_URL") or "http://127.0.0.1:5000" api_base = api_base.rstrip("/") parsed = urlparse(api_base) api_host = parsed.netloc or parsed.path or api_base @@ -712,15 +711,10 @@ def object_presign(bucket_name: str, object_key: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - # Use internal URL for the connection to ensure reliability - # We ignore API_BASE_URL here because that might be set to a public domain - # which is not reachable from within the container (NAT/DNS issues). connection_url = "http://127.0.0.1:5000" url = f"{connection_url}/presign/{bucket_name}/{object_key}" headers = _api_headers() - # Forward the host so the API knows the public URL - # We also add X-Forwarded-For to ensure ProxyFix middleware processes the headers headers["X-Forwarded-Host"] = request.host headers["X-Forwarded-Proto"] = request.scheme headers["X-Forwarded-For"] = request.remote_addr or "127.0.0.1" @@ -732,13 +726,11 @@ def object_presign(bucket_name: str, object_key: str): try: body = response.json() except ValueError: - # Handle XML error responses from S3 backend text = response.text or "" if text.strip().startswith("<"): import xml.etree.ElementTree as ET try: root = ET.fromstring(text) - # Try to find Message or Code message = root.findtext(".//Message") or root.findtext(".//Code") or "Unknown S3 error" body = {"error": message} except ET.ParseError: @@ -948,7 +940,6 @@ def rotate_iam_secret(access_key: str): return redirect(url_for("ui.iam_dashboard")) try: new_secret = _iam().rotate_secret(access_key) - # If rotating own key, update session immediately so subsequent API calls (like presign) work if principal and principal.access_key == access_key: creds = session.get("credentials", {}) creds["secret_key"] = new_secret @@ -1040,7 +1031,6 @@ def update_iam_policies(access_key: str): policies_raw = request.form.get("policies", "").strip() if not policies_raw: - # Empty policies list is valid (clears permissions) policies = [] else: try: From 85ee5b93882004a83490d5959740fd834da6dd90 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 23 Nov 2025 23:33:53 +0800 Subject: [PATCH 05/10] Add new metrics function --- app/extensions.py | 9 +- app/s3_api.py | 101 ++++++++++++++++++++-- app/storage.py | 58 ++++++++++++- app/ui.py | 52 +++++++++++- requirements.txt | 1 + templates/base.html | 3 + templates/metrics.html | 173 ++++++++++++++++++++++++++++++++++++++ tests/test_security.py | 186 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 569 insertions(+), 14 deletions(-) create mode 100644 templates/metrics.html create mode 100644 tests/test_security.py diff --git a/app/extensions.py b/app/extensions.py index 1f8b71a..0fc97a6 100644 --- a/app/extensions.py +++ b/app/extensions.py @@ -1,10 +1,17 @@ """Application-wide extension instances.""" +from flask import g from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_wtf import CSRFProtect +def get_rate_limit_key(): + """Generate rate limit key based on authenticated user.""" + if hasattr(g, 'principal') and g.principal: + return g.principal.access_key + return get_remote_address() + # Shared rate limiter instance; configured in app factory. -limiter = Limiter(key_func=get_remote_address) +limiter = Limiter(key_func=get_rate_limit_key) # Global CSRF protection for UI routes. csrf = CSRFProtect() diff --git a/app/s3_api.py b/app/s3_api.py index 31c9142..7d0c64f 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -11,7 +11,7 @@ from typing import Any, Dict from urllib.parse import quote, urlencode, urlparse from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError -from flask import Blueprint, Response, current_app, jsonify, request +from flask import Blueprint, Response, current_app, jsonify, request, g from werkzeug.http import http_date from .bucket_policies import BucketPolicyStore @@ -127,14 +127,33 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if not amz_date: raise IamError("Missing Date header") + try: + request_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) + except ValueError: + raise IamError("Invalid X-Amz-Date format") + + now = datetime.now(timezone.utc) + time_diff = abs((now - request_time).total_seconds()) + if time_diff > 900: # 15 minutes + raise IamError("Request timestamp too old or too far in the future") + + required_headers = {'host', 'x-amz-date'} + signed_headers_set = set(signed_headers_str.split(';')) + if not required_headers.issubset(signed_headers_set): + # Some clients might sign 'date' instead of 'x-amz-date' + if 'date' in signed_headers_set: + required_headers.remove('x-amz-date') + required_headers.add('date') + + if not required_headers.issubset(signed_headers_set): + raise IamError("Required headers not signed") + credential_scope = f"{date_stamp}/{region}/{service}/aws4_request" string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}" - - # Calculate Signature signing_key = _get_signature_key(secret_key, date_stamp, region, service) calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() - if calculated_signature != signature: + if not hmac.compare_digest(calculated_signature, signature): raise IamError("SignatureDoesNotMatch") return _iam().get_principal(access_key) @@ -155,7 +174,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: except ValueError: raise IamError("Invalid Credential format") - # Check expiration try: req_time = datetime.strptime(amz_date, "%Y%m%dT%H%M%SZ").replace(tzinfo=timezone.utc) except ValueError: @@ -190,7 +208,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_headers_parts = [] for header in signed_headers_list: val = req.headers.get(header, "").strip() - # Collapse multiple spaces val = " ".join(val.split()) canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers = "".join(canonical_headers_parts) @@ -240,7 +257,6 @@ def _verify_sigv4(req: Any) -> Principal | None: def _require_principal(): - # Try SigV4 first if ("Authorization" in request.headers and request.headers["Authorization"].startswith("AWS4-HMAC-SHA256")) or \ (request.args.get("X-Amz-Algorithm") == "AWS4-HMAC-SHA256"): try: @@ -1132,6 +1148,9 @@ def object_handler(bucket_name: str, object_key: str): return response if request.method in {"GET", "HEAD"}: + if request.method == "GET" and "uploadId" in request.args: + return _list_parts(bucket_name, object_key) + _, error = _object_principal("read", bucket_name, object_key) if error: return error @@ -1157,7 +1176,6 @@ def object_handler(bucket_name: str, object_key: str): current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes}) return response - # DELETE if "uploadId" in request.args: return _abort_multipart_upload(bucket_name, object_key) @@ -1175,6 +1193,51 @@ def object_handler(bucket_name: str, object_key: str): return Response(status=204) +def _list_parts(bucket_name: str, object_key: str) -> Response: + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "read", object_key=object_key) + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + upload_id = request.args.get("uploadId") + if not upload_id: + return _error_response("InvalidArgument", "uploadId is required", 400) + + try: + parts = _storage().list_multipart_parts(bucket_name, upload_id) + except StorageError as exc: + return _error_response("NoSuchUpload", str(exc), 404) + + root = Element("ListPartsResult") + SubElement(root, "Bucket").text = bucket_name + SubElement(root, "Key").text = object_key + SubElement(root, "UploadId").text = upload_id + + initiator = SubElement(root, "Initiator") + SubElement(initiator, "ID").text = principal.access_key + SubElement(initiator, "DisplayName").text = principal.display_name + + owner = SubElement(root, "Owner") + SubElement(owner, "ID").text = principal.access_key + SubElement(owner, "DisplayName").text = principal.display_name + + SubElement(root, "StorageClass").text = "STANDARD" + SubElement(root, "PartNumberMarker").text = "0" + SubElement(root, "NextPartNumberMarker").text = str(parts[-1]["PartNumber"]) if parts else "0" + SubElement(root, "MaxParts").text = "1000" + SubElement(root, "IsTruncated").text = "false" + + for part in parts: + p = SubElement(root, "Part") + SubElement(p, "PartNumber").text = str(part["PartNumber"]) + SubElement(p, "LastModified").text = part["LastModified"].isoformat() + SubElement(p, "ETag").text = f'"{part["ETag"]}"' + SubElement(p, "Size").text = str(part["Size"]) + + return _xml_response(root) @s3_api_bp.route("/bucket-policy/", methods=["GET", "PUT", "DELETE"]) @@ -1504,3 +1567,25 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: return _error_response("NoSuchBucket", str(exc), 404) return Response(status=204) + + +@s3_api_bp.before_request +def resolve_principal(): + g.principal = None + # Try SigV4 + try: + if ("Authorization" in request.headers and request.headers["Authorization"].startswith("AWS4-HMAC-SHA256")) or \ + (request.args.get("X-Amz-Algorithm") == "AWS4-HMAC-SHA256"): + g.principal = _verify_sigv4(request) + return + except Exception: + pass + + # Try simple auth headers (internal/testing) + access_key = request.headers.get("X-Access-Key") + secret_key = request.headers.get("X-Secret-Key") + if access_key and secret_key: + try: + g.principal = _iam().authenticate(access_key, secret_key) + except Exception: + pass diff --git a/app/storage.py b/app/storage.py index 37b31db..6c617e3 100644 --- a/app/storage.py +++ b/app/storage.py @@ -120,10 +120,22 @@ class ObjectStorage: self._system_bucket_root(bucket_path.name).mkdir(parents=True, exist_ok=True) def bucket_stats(self, bucket_name: str) -> dict[str, int]: - """Return object count and total size for the bucket without hashing files.""" + """Return object count and total size for the bucket (cached).""" bucket_path = self._bucket_path(bucket_name) if not bucket_path.exists(): raise StorageError("Bucket does not exist") + + # Try to read from cache + cache_path = self._system_bucket_root(bucket_name) / "stats.json" + if cache_path.exists(): + try: + # Check if cache is fresh (e.g., < 60 seconds old) + if time.time() - cache_path.stat().st_mtime < 60: + return json.loads(cache_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + + # Calculate fresh stats object_count = 0 total_bytes = 0 for path in bucket_path.rglob("*"): @@ -134,7 +146,17 @@ class ObjectStorage: stat = path.stat() object_count += 1 total_bytes += stat.st_size - return {"objects": object_count, "bytes": total_bytes} + + stats = {"objects": object_count, "bytes": total_bytes} + + # Write to cache + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_text(json.dumps(stats), encoding="utf-8") + except OSError: + pass + + return stats def delete_bucket(self, bucket_name: str) -> None: bucket_path = self._bucket_path(bucket_name) @@ -239,7 +261,6 @@ class ObjectStorage: rel = path.relative_to(bucket_path) self._safe_unlink(path) self._delete_metadata(bucket_id, rel) - # Clean up now empty parents inside the bucket. for parent in path.parents: if parent == bucket_path: break @@ -592,6 +613,33 @@ class ObjectStorage: if legacy_root.exists(): shutil.rmtree(legacy_root, ignore_errors=True) + def list_multipart_parts(self, bucket_name: str, upload_id: str) -> List[Dict[str, Any]]: + """List uploaded parts for a multipart upload.""" + bucket_path = self._bucket_path(bucket_name) + manifest, upload_root = self._load_multipart_manifest(bucket_path.name, upload_id) + + parts = [] + parts_map = manifest.get("parts", {}) + for part_num_str, record in parts_map.items(): + part_num = int(part_num_str) + part_filename = record.get("filename") + if not part_filename: + continue + part_path = upload_root / part_filename + if not part_path.exists(): + continue + + stat = part_path.stat() + parts.append({ + "PartNumber": part_num, + "Size": stat.st_size, + "ETag": record.get("etag"), + "LastModified": datetime.fromtimestamp(stat.st_mtime, timezone.utc) + }) + + parts.sort(key=lambda x: x["PartNumber"]) + return parts + # ---------------------- internal helpers ---------------------- def _bucket_path(self, bucket_name: str) -> Path: safe_name = self._sanitize_bucket_name(bucket_name) @@ -886,7 +934,11 @@ class ObjectStorage: normalized = unicodedata.normalize("NFC", object_key) if normalized != object_key: raise StorageError("Object key must use normalized Unicode") + candidate = Path(normalized) + if ".." in candidate.parts: + raise StorageError("Object key contains parent directory references") + if candidate.is_absolute(): raise StorageError("Absolute object keys are not allowed") if getattr(candidate, "drive", ""): diff --git a/app/ui.py b/app/ui.py index 9693240..f5a9cfd 100644 --- a/app/ui.py +++ b/app/ui.py @@ -3,6 +3,8 @@ from __future__ import annotations import json import uuid +import psutil +import shutil from typing import Any from urllib.parse import urlparse @@ -469,8 +471,6 @@ def complete_multipart_upload(bucket_name: str, upload_id: str): normalized.append({"part_number": number, "etag": etag}) try: result = _storage().complete_multipart_upload(bucket_name, upload_id, normalized) - - # Trigger replication _replication().trigger_replication(bucket_name, result["key"]) return jsonify(result) @@ -1209,6 +1209,54 @@ def connections_dashboard(): return render_template("connections.html", connections=connections, principal=principal) +@ui_bp.get("/metrics") +def metrics_dashboard(): + principal = _current_principal() + + cpu_percent = psutil.cpu_percent(interval=None) + memory = psutil.virtual_memory() + + storage_root = current_app.config["STORAGE_ROOT"] + disk = psutil.disk_usage(storage_root) + + storage = _storage() + buckets = storage.list_buckets() + total_buckets = len(buckets) + + total_objects = 0 + total_bytes_used = 0 + + # Note: Uses cached stats from storage layer to improve performance + for bucket in buckets: + stats = storage.bucket_stats(bucket.name) + total_objects += stats["objects"] + total_bytes_used += stats["bytes"] + + return render_template( + "metrics.html", + principal=principal, + cpu_percent=cpu_percent, + memory={ + "total": _format_bytes(memory.total), + "available": _format_bytes(memory.available), + "used": _format_bytes(memory.used), + "percent": memory.percent, + }, + disk={ + "total": _format_bytes(disk.total), + "free": _format_bytes(disk.free), + "used": _format_bytes(disk.used), + "percent": disk.percent, + }, + app={ + "buckets": total_buckets, + "objects": total_objects, + "storage_used": _format_bytes(total_bytes_used), + "storage_raw": total_bytes_used, + } + ) + + @ui_bp.app_errorhandler(404) def ui_not_found(error): # type: ignore[override] prefix = ui_bp.url_prefix or "" diff --git a/requirements.txt b/requirements.txt index 43f1ae7..7c2c75d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ pytest>=7.4 requests>=2.31 boto3>=1.34 waitress>=2.1.2 +psutil>=5.9.0 diff --git a/templates/base.html b/templates/base.html index 1bf96ce..d53dbeb 100644 --- a/templates/base.html +++ b/templates/base.html @@ -63,6 +63,9 @@ {% if not can_manage_iam %}Restricted{% endif %} + {% endif %} {% if principal %}
-
+
02 +

Running in background

+
+

For production or server deployments, run MyFSIO as a background service so it persists after you close the terminal.

+ +

Quick Start (nohup)

+

Simplest way to run in background—survives terminal close:

+
# Using Python
+nohup python run.py --prod > /dev/null 2>&1 &
+
+# Using compiled binary
+nohup ./myfsio > /dev/null 2>&1 &
+
+# Check if running
+ps aux | grep myfsio
+ +

Screen / Tmux

+

Attach/detach from a persistent session:

+
# Start in a detached screen session
+screen -dmS myfsio ./myfsio
+
+# Attach to view logs
+screen -r myfsio
+
+# Detach: press Ctrl+A, then D
+ +

Systemd (Recommended for Production)

+

Create /etc/systemd/system/myfsio.service:

+
[Unit]
+Description=MyFSIO S3-Compatible Storage
+After=network.target
+
+[Service]
+Type=simple
+User=myfsio
+WorkingDirectory=/opt/myfsio
+ExecStart=/opt/myfsio/myfsio
+Restart=on-failure
+RestartSec=5
+Environment=MYFSIO_DATA_DIR=/var/lib/myfsio
+Environment=API_BASE_URL=https://s3.example.com
+
+[Install]
+WantedBy=multi-user.target
+

Then enable and start:

+
sudo systemctl daemon-reload
+sudo systemctl enable myfsio
+sudo systemctl start myfsio
+
+# Check status
+sudo systemctl status myfsio
+sudo journalctl -u myfsio -f   # View logs
+
+
+
+
+
+ 03

Authenticate & manage IAM

MyFSIO seeds data/.myfsio.sys/config/iam.json with localadmin/localadmin. Sign in once, rotate it, then grant least-privilege access to teammates and tools.

@@ -109,7 +166,7 @@ python run.py --mode ui
- 03 + 04

Use the console effectively

Each workspace models an S3 workflow so you can administer buckets end-to-end.

@@ -148,7 +205,7 @@ python run.py --mode ui
- 04 + 05

Automate with CLI & tools

Point standard S3 clients at {{ api_base }} and reuse the same IAM credentials.

@@ -201,7 +258,7 @@ curl -X POST {{ api_base }}/presign/demo/notes.txt \
- 05 + 06

Key REST endpoints

@@ -268,7 +325,7 @@ curl -X POST {{ api_base }}/presign/demo/notes.txt \
- 06 + 07

API Examples

Common operations using boto3.

@@ -307,7 +364,7 @@ s3.complete_multipart_upload(
- 07 + 08

Site Replication

Automatically copy new objects to another MyFSIO instance or S3-compatible service for backup or disaster recovery.

@@ -354,7 +411,7 @@ s3.complete_multipart_upload(
- 08 + 09

Troubleshooting & tips

@@ -404,10 +461,12 @@ s3.complete_multipart_upload(

On this page

From aaa230b19b4b4dad4722de9e6ef78f820d526d69 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 25 Nov 2025 23:56:38 +0800 Subject: [PATCH 09/10] Test fix multipart failing upload --- app/storage.py | 30 ++++++++++++++++++++++++++---- tests/test_boto3_multipart.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 tests/test_boto3_multipart.py diff --git a/app/storage.py b/app/storage.py index bf819ef..29e90d5 100644 --- a/app/storage.py +++ b/app/storage.py @@ -634,7 +634,15 @@ class ObjectStorage: if part_number < 1: raise StorageError("part_number must be >= 1") bucket_path = self._bucket_path(bucket_name) - manifest, upload_root = self._load_multipart_manifest(bucket_path.name, upload_id) + + # Get the upload root directory + upload_root = self._multipart_dir(bucket_path.name, upload_id) + if not upload_root.exists(): + upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id) + if not upload_root.exists(): + raise StorageError("Multipart upload not found") + + # Write the part data first (can happen concurrently) checksum = hashlib.md5() part_filename = f"part-{part_number:05d}.part" part_path = upload_root / part_filename @@ -645,9 +653,23 @@ class ObjectStorage: "size": part_path.stat().st_size, "filename": part_filename, } - parts = manifest.setdefault("parts", {}) - parts[str(part_number)] = record - self._write_multipart_manifest(upload_root, manifest) + + # Update manifest with file locking to prevent race conditions + manifest_path = upload_root / self.MULTIPART_MANIFEST + lock_path = upload_root / ".manifest.lock" + + with lock_path.open("w") as lock_file: + with _file_lock(lock_file): + # Re-read manifest under lock to get latest state + try: + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + raise StorageError("Multipart manifest unreadable") from exc + + parts = manifest.setdefault("parts", {}) + parts[str(part_number)] = record + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + return record["etag"] def complete_multipart_upload( diff --git a/tests/test_boto3_multipart.py b/tests/test_boto3_multipart.py new file mode 100644 index 0000000..37eaabf --- /dev/null +++ b/tests/test_boto3_multipart.py @@ -0,0 +1,28 @@ +import uuid +import pytest +import boto3 +from botocore.client import Config + +@pytest.mark.integration +def test_boto3_multipart_upload(live_server): + bucket_name = f'mp-test-{uuid.uuid4().hex[:8]}' + object_key = 'large-file.bin' + s3 = boto3.client('s3', endpoint_url=live_server, aws_access_key_id='test', aws_secret_access_key='secret', region_name='us-east-1', use_ssl=False, config=Config(signature_version='s3v4', retries={'max_attempts': 1}, s3={'addressing_style': 'path'})) + s3.create_bucket(Bucket=bucket_name) + try: + response = s3.create_multipart_upload(Bucket=bucket_name, Key=object_key) + upload_id = response['UploadId'] + parts = [] + part1_data = b'A' * 1024 + part2_data = b'B' * 1024 + resp1 = s3.upload_part(Bucket=bucket_name, Key=object_key, PartNumber=1, UploadId=upload_id, Body=part1_data) + parts.append({'PartNumber': 1, 'ETag': resp1['ETag']}) + resp2 = s3.upload_part(Bucket=bucket_name, Key=object_key, PartNumber=2, UploadId=upload_id, Body=part2_data) + parts.append({'PartNumber': 2, 'ETag': resp2['ETag']}) + s3.complete_multipart_upload(Bucket=bucket_name, Key=object_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + obj = s3.get_object(Bucket=bucket_name, Key=object_key) + content = obj['Body'].read() + assert content == part1_data + part2_data + s3.delete_object(Bucket=bucket_name, Key=object_key) + finally: + s3.delete_bucket(Bucket=bucket_name) From 6a31a9082e8172f39e0b7d1fc65febd79b3b5573 Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 26 Nov 2025 12:52:53 +0800 Subject: [PATCH 10/10] Fix IAM caching issue --- app/iam.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/iam.py b/app/iam.py index 3ab64c2..1fe8d28 100644 --- a/app/iam.py +++ b/app/iam.py @@ -409,9 +409,11 @@ class IamService: raise IamError("User not found") def get_secret_key(self, access_key: str) -> str | None: + self._maybe_reload() record = self._users.get(access_key) return record["secret_key"] if record else None def get_principal(self, access_key: str) -> Principal | None: + self._maybe_reload() record = self._users.get(access_key) return self._build_principal(access_key, record) if record else None