From 54705ab9c4bda84018cb37827f75df78a2d14d92 Mon Sep 17 00:00:00 2001 From: kqjy Date: Fri, 6 Feb 2026 16:14:35 +0800 Subject: [PATCH 1/4] Fix Content-Length mismatch on range requests (206 Partial Content) --- app/s3_api.py | 3 ++- app/version.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/s3_api.py b/app/s3_api.py index 12a72d1..a237195 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -999,7 +999,8 @@ def _apply_object_headers( etag: str, ) -> None: if file_stat is not None: - response.headers["Content-Length"] = str(file_stat.st_size) + if response.status_code != 206: + response.headers["Content-Length"] = str(file_stat.st_size) response.headers["Last-Modified"] = http_date(file_stat.st_mtime) response.headers["ETag"] = f'"{etag}"' response.headers["Accept-Ranges"] = "bytes" diff --git a/app/version.py b/app/version.py index e4ed325..f5bfa82 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.2.6" +APP_VERSION = "0.2.7" def get_version() -> str: From 6e6d6d32bf06dbbe77c5cbec3d27c72661f8d8ae Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 9 Feb 2026 17:01:19 +0800 Subject: [PATCH 2/4] Optimize KMS: cache AESGCM instance, remove duplicate get_provider --- app/kms.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/app/kms.py b/app/kms.py index dbd07e0..3ac02b8 100644 --- a/app/kms.py +++ b/app/kms.py @@ -160,6 +160,7 @@ class KMSManager: self.generate_data_key_max_bytes = generate_data_key_max_bytes self._keys: Dict[str, KMSKey] = {} self._master_key: bytes | None = None + self._master_aesgcm: AESGCM | None = None self._loaded = False @property @@ -191,6 +192,7 @@ class KMSManager: msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) else: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + self._master_aesgcm = AESGCM(self._master_key) return self._master_key def _load_keys(self) -> None: @@ -231,18 +233,16 @@ class KMSManager: _set_secure_file_permissions(self.keys_path) def _encrypt_key_material(self, key_material: bytes) -> bytes: - """Encrypt key material with the master key.""" - aesgcm = AESGCM(self.master_key) + _ = self.master_key nonce = secrets.token_bytes(12) - ciphertext = aesgcm.encrypt(nonce, key_material, None) + ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None) return nonce + ciphertext - + def _decrypt_key_material(self, encrypted: bytes) -> bytes: - """Decrypt key material with the master key.""" - aesgcm = AESGCM(self.master_key) + _ = self.master_key nonce = encrypted[:12] ciphertext = encrypted[12:] - return aesgcm.decrypt(nonce, ciphertext, None) + return self._master_aesgcm.decrypt(nonce, ciphertext, None) def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey: """Create a new KMS key.""" @@ -404,22 +404,6 @@ class KMSManager: plaintext, _ = self.decrypt(encrypted_key, context) return plaintext - def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider: - """Get an encryption provider for a specific key.""" - self._load_keys() - - if key_id is None: - if not self._keys: - key = self.create_key("Default KMS Key") - key_id = key.key_id - else: - key_id = next(iter(self._keys.keys())) - - if key_id not in self._keys: - raise EncryptionError(f"Key not found: {key_id}") - - return KMSEncryptionProvider(self, key_id) - def re_encrypt(self, ciphertext: bytes, destination_key_id: str, source_context: Dict[str, str] | None = None, destination_context: Dict[str, str] | None = None) -> bytes: From aa6d7c4d285c6f643e8fa9482133b6fa36f0caaf Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 9 Feb 2026 18:23:45 +0800 Subject: [PATCH 3/4] Optimize replication failure caching, batch UI auth checks, add bulk download size limit, background parent cleanup --- app/iam.py | 12 +++++++ app/replication.py | 18 ++++++++-- app/storage.py | 13 +++++--- app/ui.py | 83 ++++++++++++++++------------------------------ 4 files changed, 66 insertions(+), 60 deletions(-) diff --git a/app/iam.py b/app/iam.py index 4dd6b68..9e14ee7 100644 --- a/app/iam.py +++ b/app/iam.py @@ -309,6 +309,18 @@ class IamService: if not self._is_allowed(principal, normalized, action): raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'") + def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]: + self._maybe_reload() + bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*") + normalized_actions = {a: self._normalize_action(a) for a in actions} + results: Dict[str, bool] = {} + for original, canonical in normalized_actions.items(): + if canonical not in ALLOWED_ACTIONS: + results[original] = False + else: + results[original] = self._is_allowed(principal, bucket_name, canonical) + return results + def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]: return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")] diff --git a/app/replication.py b/app/replication.py index 8cde3e8..ec2d113 100644 --- a/app/replication.py +++ b/app/replication.py @@ -176,11 +176,12 @@ class ReplicationFailureStore: self.storage_root = storage_root self.max_failures_per_bucket = max_failures_per_bucket self._lock = threading.Lock() + self._cache: Dict[str, List[ReplicationFailure]] = {} def _get_failures_path(self, bucket_name: str) -> Path: return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json" - def load_failures(self, bucket_name: str) -> List[ReplicationFailure]: + def _load_from_disk(self, bucket_name: str) -> List[ReplicationFailure]: path = self._get_failures_path(bucket_name) if not path.exists(): return [] @@ -192,7 +193,7 @@ class ReplicationFailureStore: logger.error(f"Failed to load replication failures for {bucket_name}: {e}") return [] - def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: + def _save_to_disk(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: path = self._get_failures_path(bucket_name) path.parent.mkdir(parents=True, exist_ok=True) data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]} @@ -202,6 +203,18 @@ class ReplicationFailureStore: except OSError as e: logger.error(f"Failed to save replication failures for {bucket_name}: {e}") + def load_failures(self, bucket_name: str) -> List[ReplicationFailure]: + if bucket_name in self._cache: + return list(self._cache[bucket_name]) + failures = self._load_from_disk(bucket_name) + self._cache[bucket_name] = failures + return list(failures) + + def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: + trimmed = failures[:self.max_failures_per_bucket] + self._cache[bucket_name] = trimmed + self._save_to_disk(bucket_name, trimmed) + def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None: with self._lock: failures = self.load_failures(bucket_name) @@ -227,6 +240,7 @@ class ReplicationFailureStore: def clear_failures(self, bucket_name: str) -> None: with self._lock: + self._cache.pop(bucket_name, None) path = self._get_failures_path(bucket_name) if path.exists(): path.unlink() diff --git a/app/storage.py b/app/storage.py index ba13c09..e688bea 100644 --- a/app/storage.py +++ b/app/storage.py @@ -11,6 +11,7 @@ import time import unicodedata import uuid from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone @@ -187,6 +188,7 @@ class ObjectStorage: self._object_cache_max_size = object_cache_max_size self._object_key_max_length_bytes = object_key_max_length_bytes self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} + self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup") def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: """Get or create a lock for a specific bucket. Reduces global lock contention.""" @@ -544,11 +546,14 @@ class ObjectStorage: return self._read_metadata(bucket_path.name, safe_key) or {} def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: - """Remove empty parent directories up to (but not including) stop_at. - + """Remove empty parent directories in a background thread. + On Windows/OneDrive, directories may be locked briefly after file deletion. - This method retries with a small delay to handle that case. + Running this in the background avoids blocking the request thread with retries. """ + self._cleanup_executor.submit(self._do_cleanup_empty_parents, path, stop_at) + + def _do_cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: for parent in path.parents: if parent == stop_at: break @@ -556,7 +561,7 @@ class ObjectStorage: try: if parent.exists() and not any(parent.iterdir()): parent.rmdir() - break + break except OSError: if attempt < 2: time.sleep(0.1) diff --git a/app/ui.py b/app/ui.py index 461d006..06141e7 100644 --- a/app/ui.py +++ b/app/ui.py @@ -423,57 +423,25 @@ def bucket_detail(bucket_name: str): }, indent=2, ) - can_edit_policy = False - if principal: - try: - _iam().authorize(principal, bucket_name, "policy") - can_edit_policy = True - except IamError: - can_edit_policy = False + iam = _iam() + bucket_perms = iam.check_permissions( + principal, bucket_name, ["policy", "lifecycle", "cors", "write", "replication"], + ) if principal else {} + admin_perms = iam.check_permissions( + principal, None, ["iam:list_users"], + ) if principal else {} - can_manage_lifecycle = False - if principal: - try: - _iam().authorize(principal, bucket_name, "lifecycle") - can_manage_lifecycle = True - except IamError: - can_manage_lifecycle = False - - can_manage_cors = False - if principal: - try: - _iam().authorize(principal, bucket_name, "cors") - can_manage_cors = True - except IamError: - can_manage_cors = False + can_edit_policy = bucket_perms.get("policy", False) + can_manage_lifecycle = bucket_perms.get("lifecycle", False) + can_manage_cors = bucket_perms.get("cors", False) + can_manage_versioning = bucket_perms.get("write", False) + can_manage_replication = bucket_perms.get("replication", False) + is_replication_admin = admin_perms.get("iam:list_users", False) try: versioning_enabled = storage.is_versioning_enabled(bucket_name) except StorageError: versioning_enabled = False - can_manage_versioning = False - if principal: - try: - _iam().authorize(principal, bucket_name, "write") - can_manage_versioning = True - except IamError: - can_manage_versioning = False - - can_manage_replication = False - if principal: - try: - _iam().authorize(principal, bucket_name, "replication") - can_manage_replication = True - except IamError: - can_manage_replication = False - - is_replication_admin = False - if principal: - try: - _iam().authorize(principal, None, "iam:list_users") - is_replication_admin = True - except IamError: - is_replication_admin = False replication_rule = _replication().get_rule(bucket_name) connections = _connections().list() if (is_replication_admin or replication_rule) else [] @@ -489,12 +457,7 @@ def bucket_detail(bucket_name: str): bucket_quota = storage.get_bucket_quota(bucket_name) bucket_stats = storage.bucket_stats(bucket_name) - can_manage_quota = False - try: - _iam().authorize(principal, None, "iam:list_users") - can_manage_quota = True - except IamError: - pass + can_manage_quota = is_replication_admin objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name) @@ -1003,21 +966,33 @@ def bulk_download_objects(bucket_name: str): unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() - + try: _authorize_ui(principal, bucket_name, "read") except IamError as exc: return jsonify({"error": str(exc)}), 403 + max_total_bytes = current_app.config.get("BULK_DOWNLOAD_MAX_BYTES", 1024 * 1024 * 1024) + total_size = 0 + for key in unique_keys: + try: + path = storage.get_object_path(bucket_name, key) + total_size += path.stat().st_size + except (StorageError, OSError): + continue + if total_size > max_total_bytes: + limit_mb = max_total_bytes // (1024 * 1024) + return jsonify({"error": f"Total download size exceeds {limit_mb} MB limit. Select fewer objects."}), 400 + buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for key in unique_keys: try: _authorize_ui(principal, bucket_name, "read", object_key=key) - + metadata = storage.get_object_metadata(bucket_name, key) is_encrypted = "x-amz-server-side-encryption" in metadata - + if is_encrypted and hasattr(storage, 'get_object_data'): data, _ = storage.get_object_data(bucket_name, key) zf.writestr(key, data) From 4ecd32a554e6c47e8f5e870ad721634001da3884 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 9 Feb 2026 19:29:50 +0800 Subject: [PATCH 4/4] Fix empty UI on large bucket first load: keep loading row during streaming, add progress indicator, throttle renders --- static/js/bucket-detail-main.js | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 48dfbcd..0864acc 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -516,6 +516,9 @@ }; }; + let lastStreamRenderTime = 0; + const STREAM_RENDER_THROTTLE_MS = 500; + const flushPendingStreamObjects = () => { if (pendingStreamObjects.length === 0) return; const batch = pendingStreamObjects.splice(0, pendingStreamObjects.length); @@ -532,6 +535,19 @@ loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()}${countText} loading...`; } } + if (objectsLoadingRow && objectsLoadingRow.parentNode) { + const loadingText = objectsLoadingRow.querySelector('p'); + if (loadingText) { + const countText = totalObjectCount > 0 ? ` of ${totalObjectCount.toLocaleString()}` : ''; + loadingText.textContent = `Loading ${loadedObjectCount.toLocaleString()}${countText} objects...`; + } + } + const now = performance.now(); + if (!streamingComplete && now - lastStreamRenderTime < STREAM_RENDER_THROTTLE_MS) { + streamRenderScheduled = false; + return; + } + lastStreamRenderTime = now; refreshVirtualList(); streamRenderScheduled = false; }; @@ -555,6 +571,7 @@ memoizedVisibleItems = null; memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null }; pendingStreamObjects = []; + lastStreamRenderTime = 0; streamAbortController = new AbortController(); @@ -569,7 +586,10 @@ throw new Error(`HTTP ${response.status}`); } - if (objectsLoadingRow) objectsLoadingRow.remove(); + if (objectsLoadingRow) { + const loadingText = objectsLoadingRow.querySelector('p'); + if (loadingText) loadingText.textContent = 'Receiving objects...'; + } const reader = response.body.getReader(); const decoder = new TextDecoder(); @@ -597,6 +617,10 @@ break; case 'count': totalObjectCount = msg.total_count || 0; + if (objectsLoadingRow) { + const loadingText = objectsLoadingRow.querySelector('p'); + if (loadingText) loadingText.textContent = `Loading 0 of ${totalObjectCount.toLocaleString()} objects...`; + } break; case 'object': pendingStreamObjects.push(processStreamObject(msg)); @@ -630,11 +654,15 @@ } catch (e) { } } - flushPendingStreamObjects(); streamingComplete = true; + flushPendingStreamObjects(); hasMoreObjects = false; updateObjectCountBadge(); + if (objectsLoadingRow && objectsLoadingRow.parentNode) { + objectsLoadingRow.remove(); + } + if (loadMoreStatus) { loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`; }