From 9898167f8dba69b9d28ea4172318625acb9b1e66 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 14:17:43 +0800 Subject: [PATCH 01/11] Make integrity scan async with progress indicator in UI --- app/admin_api.py | 6 +- app/integrity.py | 102 ++++++++++++++++---------- app/ui.py | 39 +++++++++- app/version.py | 2 +- templates/system.html | 154 +++++++++++++++++++++++++++++----------- tests/test_integrity.py | 23 +++++- 6 files changed, 237 insertions(+), 89 deletions(-) diff --git a/app/admin_api.py b/app/admin_api.py index ccc0408..d09658d 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -961,12 +961,14 @@ def integrity_run_now(): payload = request.get_json(silent=True) or {} override_dry_run = payload.get("dry_run") override_auto_heal = payload.get("auto_heal") - result = checker.run_now( + started = checker.run_async( auto_heal=override_auto_heal if override_auto_heal is not None else None, dry_run=override_dry_run if override_dry_run is not None else None, ) logger.info("Integrity manual run by %s", principal.access_key) - return jsonify(result.to_dict()) + if not started: + return _json_error("Conflict", "A scan is already in progress", 409) + return jsonify({"status": "started"}) @admin_api_bp.route("/integrity/history", methods=["GET"]) diff --git a/app/integrity.py b/app/integrity.py index 4dc15d2..02b996b 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -189,6 +189,8 @@ class IntegrityChecker: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._scanning = False + self._scan_start_time: Optional[float] = None self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -229,52 +231,70 @@ class IntegrityChecker: self._schedule_next() def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult: - effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal - effective_dry_run = dry_run if dry_run is not None else self.dry_run + if not self._lock.acquire(blocking=False): + raise RuntimeError("Integrity scan is already in progress") - start = time.time() - result = IntegrityResult() + try: + self._scanning = True + self._scan_start_time = time.time() - bucket_names = self._list_bucket_names() + effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal + effective_dry_run = dry_run if dry_run is not None else self.dry_run - for bucket_name in bucket_names: - if result.objects_scanned >= self.batch_size: - break - result.buckets_scanned += 1 - self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + start = self._scan_start_time + result = IntegrityResult() - result.execution_time_seconds = time.time() - start + bucket_names = self._list_bucket_names() - if result.has_issues or result.errors: - logger.info( - "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " - "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", - result.execution_time_seconds, - result.corrupted_objects, - result.orphaned_objects, - result.phantom_metadata, - result.stale_versions, - result.etag_cache_inconsistencies, - result.legacy_metadata_drifts, - result.issues_healed, - len(result.errors), - " (dry run)" if effective_dry_run else "", + for bucket_name in bucket_names: + if result.objects_scanned >= self.batch_size: + break + result.buckets_scanned += 1 + self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + + result.execution_time_seconds = time.time() - start + + if result.has_issues or result.errors: + logger.info( + "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " + "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", + result.execution_time_seconds, + result.corrupted_objects, + result.orphaned_objects, + result.phantom_metadata, + result.stale_versions, + result.etag_cache_inconsistencies, + result.legacy_metadata_drifts, + result.issues_healed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = IntegrityExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, + auto_heal=effective_auto_heal, ) + self.history_store.add(record) - record = IntegrityExecutionRecord( - timestamp=time.time(), - result=result.to_dict(), - dry_run=effective_dry_run, - auto_heal=effective_auto_heal, - ) - self.history_store.add(record) + return result + finally: + self._scanning = False + self._scan_start_time = None + self._lock.release() - return result + def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool: + if self._scanning: + return False + t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True) + t.start() + return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT @@ -728,11 +748,15 @@ class IntegrityChecker: return [r.to_dict() for r in records] def get_status(self) -> dict: - return { + status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, + "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, } + if self._scanning and self._scan_start_time is not None: + status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) + return status diff --git a/app/ui.py b/app/ui.py index b1a0947..aff4fa4 100644 --- a/app/ui.py +++ b/app/ui.py @@ -4202,11 +4202,46 @@ def system_integrity_run(): return jsonify({"error": "Integrity checker is not enabled"}), 400 payload = request.get_json(silent=True) or {} - result = checker.run_now( + started = checker.run_async( auto_heal=payload.get("auto_heal"), dry_run=payload.get("dry_run"), ) - return jsonify(result.to_dict()) + if not started: + return jsonify({"error": "A scan is already in progress"}), 409 + return jsonify({"status": "started"}) + + +@ui_bp.get("/system/integrity/status") +def system_integrity_status(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"error": "Integrity checker is not enabled"}), 400 + + return jsonify(checker.get_status()) + + +@ui_bp.get("/system/integrity/history") +def system_integrity_history(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"executions": []}) + + limit = min(int(request.args.get("limit", 10)), 200) + offset = int(request.args.get("offset", 0)) + records = checker.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) @ui_bp.app_errorhandler(404) diff --git a/app/version.py b/app/version.py index 77fc8ae..5c24c4a 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.4.0" +APP_VERSION = "0.4.1" def get_version() -> str: diff --git a/templates/system.html b/templates/system.html index 3758517..ed3908d 100644 --- a/templates/system.html +++ b/templates/system.html @@ -233,21 +233,28 @@
{% if integrity_status.enabled %}
- - -
+
+
+
+ Scan in progress +
+
+
@@ -431,11 +438,95 @@ }); }; - window.runIntegrity = function (dryRun, autoHeal) { - var activeBtn = dryRun ? 'integrityDryRunBtn' : (autoHeal ? 'integrityHealBtn' : 'integrityRunBtn'); - ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn'].forEach(function (id) { - setLoading(id, true, id !== activeBtn); + var _integrityPollTimer = null; + var _integrityLastMode = {dryRun: false, autoHeal: false}; + + function _integritySetScanning(scanning) { + var banner = document.getElementById('integrityScanningBanner'); + var btns = ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn']; + if (scanning) { + banner.classList.remove('d-none'); + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = true; + }); + } else { + banner.classList.add('d-none'); + document.getElementById('integrityScanElapsed').textContent = ''; + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = false; + }); + } + } + + function _integrityShowResult(data, dryRun, autoHeal) { + var container = document.getElementById('integrityResult'); + var alert = document.getElementById('integrityResultAlert'); + var title = document.getElementById('integrityResultTitle'); + var body = document.getElementById('integrityResultBody'); + container.classList.remove('d-none'); + + var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) + + (data.phantom_metadata || 0) + (data.stale_versions || 0) + + (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0); + + var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : ''); + alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small'; + title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; + + var lines = []; + lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets'); + if (totalIssues === 0) { + lines.push('No issues found.'); + } else { + if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects); + if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects); + if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata); + if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions); + if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies); + if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts); + if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed); + } + if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); + + body.innerHTML = lines.join('
'); + } + + function _integrityPoll() { + fetch('{{ url_for("ui.system_integrity_status") }}', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (status) { + if (status.scanning) { + var elapsed = status.scan_elapsed_seconds || 0; + document.getElementById('integrityScanElapsed').textContent = ' (' + elapsed.toFixed(0) + 's)'; + _integrityPollTimer = setTimeout(_integrityPoll, 2000); + } else { + _integritySetScanning(false); + fetch('{{ url_for("ui.system_integrity_history") }}?limit=1', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (hist) { + if (hist.executions && hist.executions.length > 0) { + var latest = hist.executions[0]; + _integrityShowResult(latest.result, latest.dry_run, latest.auto_heal); + } + }) + .catch(function () {}); + } + }) + .catch(function () { + _integrityPollTimer = setTimeout(_integrityPoll, 3000); }); + } + + window.runIntegrity = function (dryRun, autoHeal) { + _integrityLastMode = {dryRun: dryRun, autoHeal: autoHeal}; + document.getElementById('integrityResult').classList.add('d-none'); + _integritySetScanning(true); fetch('{{ url_for("ui.system_integrity_run") }}', { method: 'POST', @@ -444,45 +535,22 @@ }) .then(function (r) { return r.json(); }) .then(function (data) { - var container = document.getElementById('integrityResult'); - var alert = document.getElementById('integrityResultAlert'); - var title = document.getElementById('integrityResultTitle'); - var body = document.getElementById('integrityResultBody'); - container.classList.remove('d-none'); - if (data.error) { + _integritySetScanning(false); + var container = document.getElementById('integrityResult'); + var alert = document.getElementById('integrityResultAlert'); + var title = document.getElementById('integrityResultTitle'); + var body = document.getElementById('integrityResultBody'); + container.classList.remove('d-none'); alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = data.error; return; } - - var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) + - (data.phantom_metadata || 0) + (data.stale_versions || 0) + - (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0); - - var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : ''); - alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small'; - title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; - - var lines = []; - lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets'); - if (totalIssues === 0) { - lines.push('No issues found.'); - } else { - if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects); - if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects); - if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata); - if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions); - if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies); - if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts); - if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed); - } - if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); - - body.innerHTML = lines.join('
'); + _integrityPollTimer = setTimeout(_integrityPoll, 2000); }) .catch(function (err) { + _integritySetScanning(false); var container = document.getElementById('integrityResult'); var alert = document.getElementById('integrityResultAlert'); var title = document.getElementById('integrityResultTitle'); @@ -491,13 +559,13 @@ alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = err.message; - }) - .finally(function () { - setLoading('integrityRunBtn', false); - setLoading('integrityHealBtn', false); - setLoading('integrityDryRunBtn', false); }); }; + + {% if integrity_status.scanning %} + _integritySetScanning(true); + _integrityPollTimer = setTimeout(_integrityPoll, 2000); + {% endif %} })(); {% endblock %} diff --git a/tests/test_integrity.py b/tests/test_integrity.py index f4b4299..6460b11 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -2,6 +2,7 @@ import hashlib import json import os import sys +import time from pathlib import Path import pytest @@ -11,6 +12,17 @@ sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from app.integrity import IntegrityChecker, IntegrityResult +def _wait_scan_done(client, headers, timeout=10): + deadline = time.time() + timeout + while time.time() < deadline: + resp = client.get("/admin/integrity/status", headers=headers) + data = resp.get_json() + if not data.get("scanning"): + return + time.sleep(0.1) + raise TimeoutError("scan did not complete") + + def _md5(data: bytes) -> str: return hashlib.md5(data).hexdigest() @@ -413,8 +425,13 @@ class TestAdminAPI: resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) assert resp.status_code == 200 data = resp.get_json() - assert "corrupted_objects" in data - assert "objects_scanned" in data + assert data["status"] == "started" + _wait_scan_done(client, AUTH_HEADERS) + resp = client.get("/admin/integrity/history?limit=1", headers=AUTH_HEADERS) + hist = resp.get_json() + assert len(hist["executions"]) >= 1 + assert "corrupted_objects" in hist["executions"][0]["result"] + assert "objects_scanned" in hist["executions"][0]["result"] def test_run_with_overrides(self, integrity_app): client = integrity_app.test_client() @@ -424,10 +441,12 @@ class TestAdminAPI: json={"dry_run": True, "auto_heal": True}, ) assert resp.status_code == 200 + _wait_scan_done(client, AUTH_HEADERS) def test_history_endpoint(self, integrity_app): client = integrity_app.test_client() client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) + _wait_scan_done(client, AUTH_HEADERS) resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS) assert resp.status_code == 200 data = resp.get_json() From 5e32cef7927394d9a851ed06d9d034a3d1b6c02c Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 11:36:38 +0800 Subject: [PATCH 02/11] Add I/O throttling to GC and integrity scanner to prevent HDD starvation --- app/__init__.py | 2 ++ app/config.py | 10 +++++++++- app/gc.py | 34 ++++++++++++++++++++++++++++++++++ app/integrity.py | 24 +++++++++++++++++++++++- 4 files changed, 68 insertions(+), 2 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 015e16d..9961fff 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -293,6 +293,7 @@ def create_app( multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7), lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0), dry_run=app.config.get("GC_DRY_RUN", False), + io_throttle_ms=app.config.get("GC_IO_THROTTLE_MS", 10), ) gc_collector.start() @@ -304,6 +305,7 @@ def create_app( batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000), auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False), dry_run=app.config.get("INTEGRITY_DRY_RUN", False), + io_throttle_ms=app.config.get("INTEGRITY_IO_THROTTLE_MS", 10), ) integrity_checker.start() diff --git a/app/config.py b/app/config.py index d196397..04cae7a 100644 --- a/app/config.py +++ b/app/config.py @@ -157,11 +157,13 @@ class AppConfig: gc_multipart_max_age_days: int gc_lock_file_max_age_hours: float gc_dry_run: bool + gc_io_throttle_ms: int integrity_enabled: bool integrity_interval_hours: float integrity_batch_size: int integrity_auto_heal: bool integrity_dry_run: bool + integrity_io_throttle_ms: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -338,11 +340,13 @@ class AppConfig: gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7)) gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0)) gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + gc_io_throttle_ms = int(_get("GC_IO_THROTTLE_MS", 10)) integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0)) integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000)) integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"} integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + integrity_io_throttle_ms = int(_get("INTEGRITY_IO_THROTTLE_MS", 10)) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -438,11 +442,13 @@ class AppConfig: gc_multipart_max_age_days=gc_multipart_max_age_days, gc_lock_file_max_age_hours=gc_lock_file_max_age_hours, gc_dry_run=gc_dry_run, + gc_io_throttle_ms=gc_io_throttle_ms, integrity_enabled=integrity_enabled, integrity_interval_hours=integrity_interval_hours, integrity_batch_size=integrity_batch_size, integrity_auto_heal=integrity_auto_heal, - integrity_dry_run=integrity_dry_run) + integrity_dry_run=integrity_dry_run, + integrity_io_throttle_ms=integrity_io_throttle_ms) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -663,9 +669,11 @@ class AppConfig: "GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days, "GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours, "GC_DRY_RUN": self.gc_dry_run, + "GC_IO_THROTTLE_MS": self.gc_io_throttle_ms, "INTEGRITY_ENABLED": self.integrity_enabled, "INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours, "INTEGRITY_BATCH_SIZE": self.integrity_batch_size, "INTEGRITY_AUTO_HEAL": self.integrity_auto_heal, "INTEGRITY_DRY_RUN": self.integrity_dry_run, + "INTEGRITY_IO_THROTTLE_MS": self.integrity_io_throttle_ms, } diff --git a/app/gc.py b/app/gc.py index 1697607..85f2617 100644 --- a/app/gc.py +++ b/app/gc.py @@ -162,6 +162,7 @@ class GarbageCollector: lock_file_max_age_hours: float = 1.0, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -172,6 +173,7 @@ class GarbageCollector: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = GCHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -255,6 +257,13 @@ class GarbageCollector: def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + def _list_bucket_names(self) -> List[str]: names = [] try: @@ -271,6 +280,8 @@ class GarbageCollector: return try: for entry in tmp_dir.iterdir(): + if self._throttle(): + return if not entry.is_file(): continue age = _file_age_hours(entry) @@ -292,6 +303,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return for multipart_root in ( self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name, self.storage_root / bucket_name / ".multipart", @@ -300,6 +313,8 @@ class GarbageCollector: continue try: for upload_dir in multipart_root.iterdir(): + if self._throttle(): + return if not upload_dir.is_dir(): continue self._maybe_clean_upload(upload_dir, cutoff_hours, result) @@ -329,6 +344,8 @@ class GarbageCollector: try: for bucket_dir in buckets_root.iterdir(): + if self._shutdown: + return if not bucket_dir.is_dir(): continue locks_dir = bucket_dir / "locks" @@ -336,6 +353,8 @@ class GarbageCollector: continue try: for lock_file in locks_dir.iterdir(): + if self._throttle(): + return if not lock_file.is_file() or not lock_file.name.endswith(".lock"): continue age = _file_age_hours(lock_file) @@ -356,6 +375,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return legacy_meta = self.storage_root / bucket_name / ".meta" if legacy_meta.exists(): self._clean_legacy_metadata(bucket_name, legacy_meta, result) @@ -368,6 +389,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for meta_file in meta_root.rglob("*.meta.json"): + if self._throttle(): + return if not meta_file.is_file(): continue try: @@ -387,6 +410,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if not index_file.is_file(): continue try: @@ -430,6 +455,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return bucket_path = self.storage_root / bucket_name for versions_root in ( self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR, @@ -439,6 +466,8 @@ class GarbageCollector: continue try: for key_dir in versions_root.iterdir(): + if self._throttle(): + return if not key_dir.is_dir(): continue self._clean_versions_for_key(bucket_path, versions_root, key_dir, result) @@ -489,6 +518,8 @@ class GarbageCollector: self._remove_empty_dirs_recursive(root, root, result) def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool: + if self._shutdown: + return False if not path.is_dir(): return False @@ -499,6 +530,8 @@ class GarbageCollector: all_empty = True for child in children: + if self._throttle(): + return False if child.is_dir(): if not self._remove_empty_dirs_recursive(child, stop_at, result): all_empty = False @@ -528,4 +561,5 @@ class GarbageCollector: "multipart_max_age_days": self.multipart_max_age_days, "lock_file_max_age_hours": self.lock_file_max_age_hours, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } diff --git a/app/integrity.py b/app/integrity.py index 02b996b..957e150 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -180,6 +180,7 @@ class IntegrityChecker: auto_heal: bool = False, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -191,6 +192,7 @@ class IntegrityChecker: self._lock = threading.Lock() self._scanning = False self._scan_start_time: Optional[float] = None + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -247,7 +249,7 @@ class IntegrityChecker: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: - if result.objects_scanned >= self.batch_size: + if self._shutdown or result.objects_scanned >= self.batch_size: break result.buckets_scanned += 1 self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) @@ -309,6 +311,13 @@ class IntegrityChecker: pass return names + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) @@ -324,6 +333,8 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return if not index_file.is_file(): @@ -334,6 +345,8 @@ class IntegrityChecker: continue for key_name, entry in list(index_data.items()): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return @@ -394,6 +407,8 @@ class IntegrityChecker: try: for entry in bucket_path.rglob("*"): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return if not entry.is_file(): @@ -469,6 +484,8 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if not index_file.is_file(): continue try: @@ -523,6 +540,8 @@ class IntegrityChecker: try: for key_dir in versions_root.rglob("*"): + if self._throttle(): + return if not key_dir.is_dir(): continue @@ -646,6 +665,8 @@ class IntegrityChecker: try: for meta_file in legacy_meta_root.rglob("*.meta.json"): + if self._throttle(): + return if not meta_file.is_file(): continue @@ -756,6 +777,7 @@ class IntegrityChecker: "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } if self._scanning and self._scan_start_time is not None: status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) From eff3e378f34c70a413b75bc75f0cf1333fb2c6ab Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 11:55:46 +0800 Subject: [PATCH 03/11] Fix mobile infinite scroll on object list and ghost preview on fast object swap --- static/css/main.css | 2 +- static/js/bucket-detail-main.js | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/static/css/main.css b/static/css/main.css index 5202aab..2a54ee0 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -2655,7 +2655,7 @@ pre code { } .objects-table-container { - max-height: none; + max-height: 60vh; } .preview-card { diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 90517fe..2648ec3 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -98,6 +98,7 @@ const previewMetadata = document.getElementById('preview-metadata'); const previewMetadataList = document.getElementById('preview-metadata-list'); const previewPlaceholder = document.getElementById('preview-placeholder'); + const previewPlaceholderDefault = previewPlaceholder ? previewPlaceholder.innerHTML : ''; const previewImage = document.getElementById('preview-image'); const previewVideo = document.getElementById('preview-video'); const previewAudio = document.getElementById('preview-audio'); @@ -1957,6 +1958,10 @@ [previewImage, previewVideo, previewAudio, previewIframe].forEach((el) => { if (!el) return; el.classList.add('d-none'); + if (el.tagName === 'IMG') { + el.removeAttribute('src'); + el.onload = null; + } if (el.tagName === 'VIDEO' || el.tagName === 'AUDIO') { el.pause(); el.removeAttribute('src'); @@ -1969,6 +1974,7 @@ previewText.classList.add('d-none'); previewText.textContent = ''; } + previewPlaceholder.innerHTML = previewPlaceholderDefault; previewPlaceholder.classList.remove('d-none'); }; @@ -2016,9 +2022,18 @@ const previewUrl = row.dataset.previewUrl; const lower = row.dataset.key.toLowerCase(); if (previewUrl && lower.match(/\.(png|jpg|jpeg|gif|webp|svg|ico|bmp)$/)) { + previewPlaceholder.innerHTML = '
Loading preview\u2026
'; + const currentRow = row; + previewImage.onload = () => { + if (activeRow !== currentRow) return; + previewImage.classList.remove('d-none'); + previewPlaceholder.classList.add('d-none'); + }; + previewImage.onerror = () => { + if (activeRow !== currentRow) return; + previewPlaceholder.innerHTML = '
Failed to load preview
'; + }; previewImage.src = previewUrl; - previewImage.classList.remove('d-none'); - previewPlaceholder.classList.add('d-none'); } else if (previewUrl && lower.match(/\.(mp4|webm|ogv|mov|avi|mkv)$/)) { previewVideo.src = previewUrl; previewVideo.classList.remove('d-none'); From f43fad02fb387a4cc44813ed2c42fb2dc9148d48 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 16:27:28 +0800 Subject: [PATCH 04/11] Replace fetch with XHR for multipart upload progress and add retry logic --- static/js/bucket-detail-upload.js | 87 ++++++++++++++++++++++++------- templates/bucket_detail.html | 2 +- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/static/js/bucket-detail-upload.js b/static/js/bucket-detail-upload.js index 6b6a655..9daa6d1 100644 --- a/static/js/bucket-detail-upload.js +++ b/static/js/bucket-detail-upload.js @@ -3,6 +3,8 @@ window.BucketDetailUpload = (function() { const MULTIPART_THRESHOLD = 8 * 1024 * 1024; const CHUNK_SIZE = 8 * 1024 * 1024; + const MAX_PART_RETRIES = 3; + const RETRY_BASE_DELAY_MS = 1000; let state = { isUploading: false, @@ -204,6 +206,67 @@ window.BucketDetailUpload = (function() { } } + function uploadPartXHR(url, chunk, csrfToken, baseBytes, fileSize, progressItem, partNumber, totalParts) { + return new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.open('PUT', url, true); + xhr.setRequestHeader('X-CSRFToken', csrfToken || ''); + + xhr.upload.addEventListener('progress', (e) => { + if (e.lengthComputable) { + updateProgressItem(progressItem, { + status: `Part ${partNumber}/${totalParts}`, + loaded: baseBytes + e.loaded, + total: fileSize + }); + } + }); + + xhr.addEventListener('load', () => { + if (xhr.status >= 200 && xhr.status < 300) { + try { + resolve(JSON.parse(xhr.responseText)); + } catch { + reject(new Error(`Part ${partNumber}: invalid response`)); + } + } else { + try { + const data = JSON.parse(xhr.responseText); + reject(new Error(data.error || `Part ${partNumber} failed (${xhr.status})`)); + } catch { + reject(new Error(`Part ${partNumber} failed (${xhr.status})`)); + } + } + }); + + xhr.addEventListener('error', () => reject(new Error(`Part ${partNumber}: network error`))); + xhr.addEventListener('abort', () => reject(new Error(`Part ${partNumber}: aborted`))); + + xhr.send(chunk); + }); + } + + async function uploadPartWithRetry(url, chunk, csrfToken, baseBytes, fileSize, progressItem, partNumber, totalParts) { + let lastError; + for (let attempt = 0; attempt <= MAX_PART_RETRIES; attempt++) { + try { + return await uploadPartXHR(url, chunk, csrfToken, baseBytes, fileSize, progressItem, partNumber, totalParts); + } catch (err) { + lastError = err; + if (attempt < MAX_PART_RETRIES) { + const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt); + updateProgressItem(progressItem, { + status: `Part ${partNumber}/${totalParts} retry ${attempt + 1}/${MAX_PART_RETRIES}...`, + loaded: baseBytes, + total: fileSize + }); + await new Promise(r => setTimeout(r, delay)); + } + } + } + throw lastError; + } + async function uploadMultipart(file, objectKey, metadata, progressItem, urls) { const csrfToken = document.querySelector('input[name="csrf_token"]')?.value; @@ -233,26 +296,14 @@ window.BucketDetailUpload = (function() { const end = Math.min(start + CHUNK_SIZE, file.size); const chunk = file.slice(start, end); - updateProgressItem(progressItem, { - status: `Part ${partNumber}/${totalParts}`, - loaded: uploadedBytes, - total: file.size - }); + const partData = await uploadPartWithRetry( + `${partUrl}?partNumber=${partNumber}`, + chunk, csrfToken, uploadedBytes, file.size, + progressItem, partNumber, totalParts + ); - const partResp = await fetch(`${partUrl}?partNumber=${partNumber}`, { - method: 'PUT', - headers: { 'X-CSRFToken': csrfToken || '' }, - body: chunk - }); - - if (!partResp.ok) { - const err = await partResp.json().catch(() => ({})); - throw new Error(err.error || `Part ${partNumber} failed`); - } - - const partData = await partResp.json(); parts.push({ part_number: partNumber, etag: partData.etag }); - uploadedBytes += chunk.size; + uploadedBytes += (end - start); updateProgressItem(progressItem, { loaded: uploadedBytes, diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index e0be305..d6ad4f3 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -2057,7 +2057,7 @@
-
Select one or more files from your device. Files ≥ 8 MB automatically switch to multipart uploads.
+
Select one or more files from your device. Files ≥ 8 MB use multipart uploads with automatic retry.
From 0e525713b12f40c3949217a77571f7319c8df651 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 16:48:25 +0800 Subject: [PATCH 05/11] Fix missing CSRF token on presigned URL request --- static/js/bucket-detail-main.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 2648ec3..2ade8e8 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -1514,7 +1514,7 @@ }; const response = await fetch(endpoint, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { 'Content-Type': 'application/json', 'X-CSRFToken': window.getCsrfToken ? window.getCsrfToken() : '' }, body: JSON.stringify(payload), }); const data = await response.json(); From a7f9b0a22fd08ae635447d53fee780f3e5fbc28a Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 17:14:04 +0800 Subject: [PATCH 06/11] Convert GC to async with polling to prevent proxy timeouts --- app/admin_api.py | 12 ++-- app/gc.py | 99 +++++++++++++++++++----------- app/ui.py | 43 ++++++++++--- templates/system.html | 139 +++++++++++++++++++++++++++++++----------- tests/test_gc.py | 10 ++- 5 files changed, 218 insertions(+), 85 deletions(-) diff --git a/app/admin_api.py b/app/admin_api.py index d09658d..89c87f0 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -907,15 +907,11 @@ def gc_run_now(): if not gc: return _json_error("InvalidRequest", "GC is not enabled", 400) payload = request.get_json(silent=True) or {} - original_dry_run = gc.dry_run - if "dry_run" in payload: - gc.dry_run = bool(payload["dry_run"]) - try: - result = gc.run_now() - finally: - gc.dry_run = original_dry_run + started = gc.run_async(dry_run=payload.get("dry_run")) logger.info("GC manual run by %s", principal.access_key) - return jsonify(result.to_dict()) + if not started: + return _json_error("Conflict", "GC is already in progress", 409) + return jsonify({"status": "started"}) @admin_api_bp.route("/gc/history", methods=["GET"]) diff --git a/app/gc.py b/app/gc.py index 85f2617..16fa3b7 100644 --- a/app/gc.py +++ b/app/gc.py @@ -173,6 +173,8 @@ class GarbageCollector: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._scanning = False + self._scan_start_time: Optional[float] = None self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = GCHistoryStore(storage_root, max_records=max_history) @@ -214,45 +216,70 @@ class GarbageCollector: finally: self._schedule_next() - def run_now(self) -> GCResult: - start = time.time() - result = GCResult() + def run_now(self, dry_run: Optional[bool] = None) -> GCResult: + if not self._lock.acquire(blocking=False): + raise RuntimeError("GC is already in progress") - self._clean_temp_files(result) - self._clean_orphaned_multipart(result) - self._clean_stale_locks(result) - self._clean_orphaned_metadata(result) - self._clean_orphaned_versions(result) - self._clean_empty_dirs(result) + effective_dry_run = dry_run if dry_run is not None else self.dry_run - result.execution_time_seconds = time.time() - start + try: + self._scanning = True + self._scan_start_time = time.time() - if result.has_work or result.errors: - logger.info( - "GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), " - "locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s", - result.execution_time_seconds, - result.temp_files_deleted, - result.temp_bytes_freed / (1024 * 1024), - result.multipart_uploads_deleted, - result.multipart_bytes_freed / (1024 * 1024), - result.lock_files_deleted, - result.orphaned_metadata_deleted, - result.orphaned_versions_deleted, - result.orphaned_version_bytes_freed / (1024 * 1024), - result.empty_dirs_removed, - len(result.errors), - " (dry run)" if self.dry_run else "", + start = self._scan_start_time + result = GCResult() + + original_dry_run = self.dry_run + self.dry_run = effective_dry_run + try: + self._clean_temp_files(result) + self._clean_orphaned_multipart(result) + self._clean_stale_locks(result) + self._clean_orphaned_metadata(result) + self._clean_orphaned_versions(result) + self._clean_empty_dirs(result) + finally: + self.dry_run = original_dry_run + + result.execution_time_seconds = time.time() - start + + if result.has_work or result.errors: + logger.info( + "GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), " + "locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s", + result.execution_time_seconds, + result.temp_files_deleted, + result.temp_bytes_freed / (1024 * 1024), + result.multipart_uploads_deleted, + result.multipart_bytes_freed / (1024 * 1024), + result.lock_files_deleted, + result.orphaned_metadata_deleted, + result.orphaned_versions_deleted, + result.orphaned_version_bytes_freed / (1024 * 1024), + result.empty_dirs_removed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = GCExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, ) + self.history_store.add(record) - record = GCExecutionRecord( - timestamp=time.time(), - result=result.to_dict(), - dry_run=self.dry_run, - ) - self.history_store.add(record) + return result + finally: + self._scanning = False + self._scan_start_time = None + self._lock.release() - return result + def run_async(self, dry_run: Optional[bool] = None) -> bool: + if self._scanning: + return False + t = threading.Thread(target=self.run_now, args=(dry_run,), daemon=True) + t.start() + return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT @@ -553,9 +580,10 @@ class GarbageCollector: return [r.to_dict() for r in records] def get_status(self) -> dict: - return { + status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, + "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "temp_file_max_age_hours": self.temp_file_max_age_hours, "multipart_max_age_days": self.multipart_max_age_days, @@ -563,3 +591,6 @@ class GarbageCollector: "dry_run": self.dry_run, "io_throttle_ms": round(self._io_throttle * 1000), } + if self._scanning and self._scan_start_time: + status["scan_elapsed_seconds"] = time.time() - self._scan_start_time + return status diff --git a/app/ui.py b/app/ui.py index aff4fa4..4423a98 100644 --- a/app/ui.py +++ b/app/ui.py @@ -4179,14 +4179,43 @@ def system_gc_run(): return jsonify({"error": "GC is not enabled"}), 400 payload = request.get_json(silent=True) or {} - original_dry_run = gc.dry_run - if "dry_run" in payload: - gc.dry_run = bool(payload["dry_run"]) + started = gc.run_async(dry_run=payload.get("dry_run")) + if not started: + return jsonify({"error": "GC is already in progress"}), 409 + return jsonify({"status": "started"}) + + +@ui_bp.get("/system/gc/status") +def system_gc_status(): + principal = _current_principal() try: - result = gc.run_now() - finally: - gc.dry_run = original_dry_run - return jsonify(result.to_dict()) + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"error": "GC is not enabled"}), 400 + + return jsonify(gc.get_status()) + + +@ui_bp.get("/system/gc/history") +def system_gc_history(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"executions": []}) + + limit = min(int(request.args.get("limit", 10)), 200) + offset = int(request.args.get("offset", 0)) + records = gc.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) @ui_bp.post("/system/integrity/run") diff --git a/templates/system.html b/templates/system.html index ed3908d..f306507 100644 --- a/templates/system.html +++ b/templates/system.html @@ -122,6 +122,13 @@
+
+
+
+ GC in progress +
+
+
@@ -376,9 +383,92 @@ return (i === 0 ? b : b.toFixed(1)) + ' ' + units[i]; } + var _gcPollTimer = null; + var _gcLastDryRun = false; + + function _gcSetScanning(scanning) { + var banner = document.getElementById('gcScanningBanner'); + var btns = ['gcRunBtn', 'gcDryRunBtn']; + if (scanning) { + banner.classList.remove('d-none'); + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = true; + }); + } else { + banner.classList.add('d-none'); + document.getElementById('gcScanElapsed').textContent = ''; + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = false; + }); + } + } + + function _gcShowResult(data, dryRun) { + var container = document.getElementById('gcResult'); + var alert = document.getElementById('gcResultAlert'); + var title = document.getElementById('gcResultTitle'); + var body = document.getElementById('gcResultBody'); + container.classList.remove('d-none'); + + var totalItems = (data.temp_files_deleted || 0) + (data.multipart_uploads_deleted || 0) + + (data.lock_files_deleted || 0) + (data.orphaned_metadata_deleted || 0) + + (data.orphaned_versions_deleted || 0) + (data.empty_dirs_removed || 0); + var totalFreed = (data.temp_bytes_freed || 0) + (data.multipart_bytes_freed || 0) + + (data.orphaned_version_bytes_freed || 0); + + alert.className = totalItems > 0 ? 'alert alert-success mb-0 small' : 'alert alert-info mb-0 small'; + title.textContent = (dryRun ? '[Dry Run] ' : '') + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; + + var lines = []; + if (data.temp_files_deleted) lines.push('Temp files: ' + data.temp_files_deleted + ' (' + formatBytes(data.temp_bytes_freed) + ')'); + if (data.multipart_uploads_deleted) lines.push('Multipart uploads: ' + data.multipart_uploads_deleted + ' (' + formatBytes(data.multipart_bytes_freed) + ')'); + if (data.lock_files_deleted) lines.push('Lock files: ' + data.lock_files_deleted); + if (data.orphaned_metadata_deleted) lines.push('Orphaned metadata: ' + data.orphaned_metadata_deleted); + if (data.orphaned_versions_deleted) lines.push('Orphaned versions: ' + data.orphaned_versions_deleted + ' (' + formatBytes(data.orphaned_version_bytes_freed) + ')'); + if (data.empty_dirs_removed) lines.push('Empty directories: ' + data.empty_dirs_removed); + if (totalItems === 0) lines.push('Nothing to clean up.'); + if (totalFreed > 0) lines.push('Total freed: ' + formatBytes(totalFreed)); + if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); + + body.innerHTML = lines.join('
'); + } + + function _gcPoll() { + fetch('{{ url_for("ui.system_gc_status") }}', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (status) { + if (status.scanning) { + var elapsed = status.scan_elapsed_seconds || 0; + document.getElementById('gcScanElapsed').textContent = ' (' + elapsed.toFixed(0) + 's)'; + _gcPollTimer = setTimeout(_gcPoll, 2000); + } else { + _gcSetScanning(false); + fetch('{{ url_for("ui.system_gc_history") }}?limit=1', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (hist) { + if (hist.executions && hist.executions.length > 0) { + var latest = hist.executions[0]; + _gcShowResult(latest.result, latest.dry_run); + } + }) + .catch(function () {}); + } + }) + .catch(function () { + _gcPollTimer = setTimeout(_gcPoll, 3000); + }); + } + window.runGC = function (dryRun) { - setLoading(dryRun ? 'gcDryRunBtn' : 'gcRunBtn', true); - setLoading(dryRun ? 'gcRunBtn' : 'gcDryRunBtn', true, true); + _gcLastDryRun = dryRun; + document.getElementById('gcResult').classList.add('d-none'); + _gcSetScanning(true); fetch('{{ url_for("ui.system_gc_run") }}', { method: 'POST', @@ -387,42 +477,22 @@ }) .then(function (r) { return r.json(); }) .then(function (data) { - var container = document.getElementById('gcResult'); - var alert = document.getElementById('gcResultAlert'); - var title = document.getElementById('gcResultTitle'); - var body = document.getElementById('gcResultBody'); - container.classList.remove('d-none'); - if (data.error) { + _gcSetScanning(false); + var container = document.getElementById('gcResult'); + var alert = document.getElementById('gcResultAlert'); + var title = document.getElementById('gcResultTitle'); + var body = document.getElementById('gcResultBody'); + container.classList.remove('d-none'); alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = data.error; return; } - - var totalItems = (data.temp_files_deleted || 0) + (data.multipart_uploads_deleted || 0) + - (data.lock_files_deleted || 0) + (data.orphaned_metadata_deleted || 0) + - (data.orphaned_versions_deleted || 0) + (data.empty_dirs_removed || 0); - var totalFreed = (data.temp_bytes_freed || 0) + (data.multipart_bytes_freed || 0) + - (data.orphaned_version_bytes_freed || 0); - - alert.className = totalItems > 0 ? 'alert alert-success mb-0 small' : 'alert alert-info mb-0 small'; - title.textContent = (dryRun ? '[Dry Run] ' : '') + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; - - var lines = []; - if (data.temp_files_deleted) lines.push('Temp files: ' + data.temp_files_deleted + ' (' + formatBytes(data.temp_bytes_freed) + ')'); - if (data.multipart_uploads_deleted) lines.push('Multipart uploads: ' + data.multipart_uploads_deleted + ' (' + formatBytes(data.multipart_bytes_freed) + ')'); - if (data.lock_files_deleted) lines.push('Lock files: ' + data.lock_files_deleted); - if (data.orphaned_metadata_deleted) lines.push('Orphaned metadata: ' + data.orphaned_metadata_deleted); - if (data.orphaned_versions_deleted) lines.push('Orphaned versions: ' + data.orphaned_versions_deleted + ' (' + formatBytes(data.orphaned_version_bytes_freed) + ')'); - if (data.empty_dirs_removed) lines.push('Empty directories: ' + data.empty_dirs_removed); - if (totalItems === 0) lines.push('Nothing to clean up.'); - if (totalFreed > 0) lines.push('Total freed: ' + formatBytes(totalFreed)); - if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); - - body.innerHTML = lines.join('
'); + _gcPollTimer = setTimeout(_gcPoll, 2000); }) .catch(function (err) { + _gcSetScanning(false); var container = document.getElementById('gcResult'); var alert = document.getElementById('gcResultAlert'); var title = document.getElementById('gcResultTitle'); @@ -431,13 +501,14 @@ alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = err.message; - }) - .finally(function () { - setLoading('gcRunBtn', false); - setLoading('gcDryRunBtn', false); }); }; + {% if gc_status.scanning %} + _gcSetScanning(true); + _gcPollTimer = setTimeout(_gcPoll, 2000); + {% endif %} + var _integrityPollTimer = null; var _integrityLastMode = {dryRun: false, autoHeal: false}; diff --git a/tests/test_gc.py b/tests/test_gc.py index 174b1ce..58b3f81 100644 --- a/tests/test_gc.py +++ b/tests/test_gc.py @@ -317,7 +317,7 @@ class TestAdminAPI: ) assert resp.status_code == 200 data = resp.get_json() - assert "temp_files_deleted" in data + assert data["status"] == "started" def test_gc_dry_run(self, gc_app): client = gc_app.test_client() @@ -329,11 +329,17 @@ class TestAdminAPI: ) assert resp.status_code == 200 data = resp.get_json() - assert "temp_files_deleted" in data + assert data["status"] == "started" def test_gc_history(self, gc_app): + import time client = gc_app.test_client() client.post("/admin/gc/run", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}) + for _ in range(50): + time.sleep(0.1) + status = client.get("/admin/gc/status", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}).get_json() + if not status.get("scanning"): + break resp = client.get("/admin/gc/history", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}) assert resp.status_code == 200 data = resp.get_json() From 326367ae4c6bdcbc8e928700cce484832db55aae Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 17:46:27 +0800 Subject: [PATCH 07/11] Fix integrity scanner batch limit and add cursor-based rotation --- app/integrity.py | 108 +++++++++++++++++++++++++++++-- tests/test_integrity.py | 136 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 236 insertions(+), 8 deletions(-) diff --git a/app/integrity.py b/app/integrity.py index 957e150..b79cbc8 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -162,6 +162,76 @@ class IntegrityHistoryStore: return self.load()[offset : offset + limit] +class IntegrityCursorStore: + def __init__(self, storage_root: Path) -> None: + self.storage_root = storage_root + self._lock = threading.Lock() + + def _get_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "integrity_cursor.json" + + def load(self) -> Dict[str, Any]: + path = self._get_path() + if not path.exists(): + return {"buckets": {}} + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data.get("buckets"), dict): + return {"buckets": {}} + return data + except (OSError, ValueError, KeyError): + return {"buckets": {}} + + def save(self, data: Dict[str, Any]) -> None: + path = self._get_path() + path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error("Failed to save integrity cursor: %s", e) + + def update_bucket(self, bucket_name: str, timestamp: float) -> None: + with self._lock: + data = self.load() + data["buckets"][bucket_name] = {"last_scanned": timestamp} + self.save(data) + + def clean_stale(self, existing_buckets: List[str]) -> None: + with self._lock: + data = self.load() + existing_set = set(existing_buckets) + stale_keys = [k for k in data["buckets"] if k not in existing_set] + if stale_keys: + for k in stale_keys: + del data["buckets"][k] + self.save(data) + + def get_bucket_order(self, bucket_names: List[str]) -> List[str]: + data = self.load() + buckets_info = data.get("buckets", {}) + + def sort_key(name: str) -> float: + entry = buckets_info.get(name) + if entry is None: + return 0.0 + return entry.get("last_scanned", 0.0) + + return sorted(bucket_names, key=sort_key) + + def get_info(self) -> Dict[str, Any]: + data = self.load() + buckets = data.get("buckets", {}) + return { + "tracked_buckets": len(buckets), + "buckets": { + name: info.get("last_scanned") + for name, info in buckets.items() + }, + } + + MAX_ISSUES = 500 @@ -194,6 +264,7 @@ class IntegrityChecker: self._scan_start_time: Optional[float] = None self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) + self.cursor_store = IntegrityCursorStore(self.storage_root) def start(self) -> None: if self._timer is not None: @@ -247,9 +318,11 @@ class IntegrityChecker: result = IntegrityResult() bucket_names = self._list_bucket_names() + self.cursor_store.clean_stale(bucket_names) + ordered_buckets = self.cursor_store.get_bucket_order(bucket_names) - for bucket_name in bucket_names: - if self._shutdown or result.objects_scanned >= self.batch_size: + for bucket_name in ordered_buckets: + if self._batch_exhausted(result): break result.buckets_scanned += 1 self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) @@ -258,6 +331,7 @@ class IntegrityChecker: self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self.cursor_store.update_bucket(bucket_name, time.time()) result.execution_time_seconds = time.time() - start @@ -318,6 +392,9 @@ class IntegrityChecker: time.sleep(self._io_throttle) return self._shutdown + def _batch_exhausted(self, result: IntegrityResult) -> bool: + return self._shutdown or result.objects_scanned >= self.batch_size + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) @@ -335,7 +412,7 @@ class IntegrityChecker: for index_file in meta_root.rglob("_index.json"): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return if not index_file.is_file(): continue @@ -347,7 +424,7 @@ class IntegrityChecker: for key_name, entry in list(index_data.items()): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return rel_dir = index_file.parent.relative_to(meta_root) @@ -409,7 +486,7 @@ class IntegrityChecker: for entry in bucket_path.rglob("*"): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return if not entry.is_file(): continue @@ -420,6 +497,7 @@ class IntegrityChecker: if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: continue + result.objects_scanned += 1 full_key = rel.as_posix() key_name = rel.name parent = rel.parent @@ -486,6 +564,8 @@ class IntegrityChecker: for index_file in meta_root.rglob("_index.json"): if self._throttle(): return + if self._batch_exhausted(result): + return if not index_file.is_file(): continue try: @@ -495,6 +575,9 @@ class IntegrityChecker: keys_to_remove = [] for key_name in list(index_data.keys()): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 rel_dir = index_file.parent.relative_to(meta_root) if rel_dir == Path("."): full_key = key_name @@ -542,6 +625,8 @@ class IntegrityChecker: for key_dir in versions_root.rglob("*"): if self._throttle(): return + if self._batch_exhausted(result): + return if not key_dir.is_dir(): continue @@ -549,6 +634,9 @@ class IntegrityChecker: json_files = {f.stem: f for f in key_dir.glob("*.json")} for stem, bin_file in bin_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in json_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -568,6 +656,9 @@ class IntegrityChecker: self._add_issue(result, issue) for stem, json_file in json_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in bin_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -608,6 +699,9 @@ class IntegrityChecker: found_mismatch = False for full_key, cached_etag in etag_cache.items(): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 key_path = Path(full_key) key_name = key_path.name parent = key_path.parent @@ -667,9 +761,12 @@ class IntegrityChecker: for meta_file in legacy_meta_root.rglob("*.meta.json"): if self._throttle(): return + if self._batch_exhausted(result): + return if not meta_file.is_file(): continue + result.objects_scanned += 1 try: rel = meta_file.relative_to(legacy_meta_root) except ValueError: @@ -781,4 +878,5 @@ class IntegrityChecker: } if self._scanning and self._scan_start_time is not None: status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) + status["cursor"] = self.cursor_store.get_info() return status diff --git a/tests/test_integrity.py b/tests/test_integrity.py index 6460b11..ddd1150 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -9,7 +9,7 @@ import pytest sys.path.insert(0, str(Path(__file__).resolve().parents[1])) -from app.integrity import IntegrityChecker, IntegrityResult +from app.integrity import IntegrityChecker, IntegrityCursorStore, IntegrityResult def _wait_scan_done(client, headers, timeout=10): @@ -118,7 +118,7 @@ class TestCorruptedObjects: result = checker.run_now() assert result.corrupted_objects == 0 - assert result.objects_scanned == 1 + assert result.objects_scanned >= 1 def test_corrupted_nested_key(self, storage_root, checker): _setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"}) @@ -503,7 +503,7 @@ class TestMultipleBuckets: result = checker.run_now() assert result.buckets_scanned == 2 - assert result.objects_scanned == 2 + assert result.objects_scanned >= 2 assert result.corrupted_objects == 0 @@ -516,3 +516,133 @@ class TestGetStatus: assert "batch_size" in status assert "auto_heal" in status assert "dry_run" in status + + def test_status_includes_cursor(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + checker.run_now() + status = checker.get_status() + assert "cursor" in status + assert status["cursor"]["tracked_buckets"] == 1 + assert "mybucket" in status["cursor"]["buckets"] + + +class TestUnifiedBatchCounter: + def test_orphaned_objects_count_toward_batch(self, storage_root): + _setup_bucket(storage_root, "mybucket", {}) + for i in range(10): + (storage_root / "mybucket" / f"orphan{i}.txt").write_bytes(f"data{i}".encode()) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=3) + result = checker.run_now() + assert result.objects_scanned <= 3 + + def test_phantom_metadata_counts_toward_batch(self, storage_root): + objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)} + _setup_bucket(storage_root, "mybucket", objects) + for i in range(10): + (storage_root / "mybucket" / f"file{i}.txt").unlink() + + checker = IntegrityChecker(storage_root=storage_root, batch_size=5) + result = checker.run_now() + assert result.objects_scanned <= 5 + + def test_all_check_types_contribute(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"valid.txt": b"hello"}) + (storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan") + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + result = checker.run_now() + assert result.objects_scanned > 2 + + +class TestCursorRotation: + def test_oldest_bucket_scanned_first(self, storage_root): + _setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"}) + _setup_bucket(storage_root, "bucket-c", {"c.txt": b"ccc"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=5) + + checker.cursor_store.update_bucket("bucket-a", 1000.0) + checker.cursor_store.update_bucket("bucket-b", 3000.0) + checker.cursor_store.update_bucket("bucket-c", 2000.0) + + ordered = checker.cursor_store.get_bucket_order(["bucket-a", "bucket-b", "bucket-c"]) + assert ordered[0] == "bucket-a" + assert ordered[1] == "bucket-c" + assert ordered[2] == "bucket-b" + + def test_never_scanned_buckets_first(self, storage_root): + _setup_bucket(storage_root, "bucket-old", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-new", {"b.txt": b"bbb"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + + checker.cursor_store.update_bucket("bucket-old", time.time()) + + ordered = checker.cursor_store.get_bucket_order(["bucket-old", "bucket-new"]) + assert ordered[0] == "bucket-new" + + def test_rotation_covers_all_buckets(self, storage_root): + for name in ["bucket-a", "bucket-b", "bucket-c"]: + _setup_bucket(storage_root, name, {f"{name}.txt": name.encode()}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=4) + + result1 = checker.run_now() + scanned_buckets_1 = set() + for issue_bucket in [storage_root]: + pass + assert result1.buckets_scanned >= 1 + + result2 = checker.run_now() + result3 = checker.run_now() + + cursor_info = checker.cursor_store.get_info() + assert cursor_info["tracked_buckets"] == 3 + + def test_cursor_persistence(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker1 = IntegrityChecker(storage_root=storage_root, batch_size=1000) + checker1.run_now() + + cursor1 = checker1.cursor_store.get_info() + assert cursor1["tracked_buckets"] == 1 + assert "mybucket" in cursor1["buckets"] + + checker2 = IntegrityChecker(storage_root=storage_root, batch_size=1000) + cursor2 = checker2.cursor_store.get_info() + assert cursor2["tracked_buckets"] == 1 + assert "mybucket" in cursor2["buckets"] + + def test_stale_cursor_cleanup(self, storage_root): + _setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + checker.run_now() + + import shutil + shutil.rmtree(storage_root / "bucket-b") + meta_b = storage_root / ".myfsio.sys" / "buckets" / "bucket-b" + if meta_b.exists(): + shutil.rmtree(meta_b) + + checker.run_now() + + cursor_info = checker.cursor_store.get_info() + assert "bucket-b" not in cursor_info["buckets"] + assert "bucket-a" in cursor_info["buckets"] + + def test_cursor_updates_after_scan(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + before = time.time() + checker.run_now() + after = time.time() + + cursor_info = checker.cursor_store.get_info() + ts = cursor_info["buckets"]["mybucket"] + assert before <= ts <= after From 1a5a7aa9e18abb28de4480107b8348c38c5a62b2 Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 18:31:13 +0800 Subject: [PATCH 08/11] Auto-refresh Recent Scans/Executions tables after GC and integrity scan completion --- templates/system.html | 101 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/templates/system.html b/templates/system.html index f306507..e351230 100644 --- a/templates/system.html +++ b/templates/system.html @@ -155,6 +155,7 @@
+
{% if gc_history %}
@@ -200,6 +201,7 @@

No executions recorded yet.

{% endif %} +
{% else %}
@@ -287,6 +289,7 @@
+
{% if integrity_history %}
@@ -340,6 +343,7 @@

No scans recorded yet.

{% endif %} +
{% else %}
@@ -383,6 +387,101 @@ return (i === 0 ? b : b.toFixed(1)) + ' ' + units[i]; } + function formatTimestamp(ts) { + var d = new Date(ts * 1000); + var pad = function (n) { return n < 10 ? '0' + n : '' + n; }; + return d.getUTCFullYear() + '-' + pad(d.getUTCMonth() + 1) + '-' + pad(d.getUTCDate()) + + ' ' + pad(d.getUTCHours()) + ':' + pad(d.getUTCMinutes()) + ' UTC'; + } + + var _gcHistoryIcon = '' + + '' + + '' + + ''; + + function _gcRefreshHistory() { + fetch('{{ url_for("ui.system_gc_history") }}?limit=10', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (hist) { + var container = document.getElementById('gcHistoryContainer'); + if (!container) return; + var execs = hist.executions || []; + if (execs.length === 0) { + container.innerHTML = '

No executions recorded yet.

'; + return; + } + var html = '
' + + _gcHistoryIcon + ' Recent Executions
' + + '
' + + '' + + ''; + execs.forEach(function (exec) { + var r = exec.result || {}; + var cleaned = (r.temp_files_deleted || 0) + (r.multipart_uploads_deleted || 0) + + (r.lock_files_deleted || 0) + (r.orphaned_metadata_deleted || 0) + + (r.orphaned_versions_deleted || 0) + (r.empty_dirs_removed || 0); + var freed = (r.temp_bytes_freed || 0) + (r.multipart_bytes_freed || 0) + + (r.orphaned_version_bytes_freed || 0); + var mode = exec.dry_run + ? 'Dry run' + : 'Live'; + html += '' + + '' + + '' + + ''; + }); + html += '
TimeCleanedFreedMode
' + formatTimestamp(exec.timestamp) + '' + cleaned + '' + formatBytes(freed) + '' + mode + '
'; + container.innerHTML = html; + }) + .catch(function () {}); + } + + function _integrityRefreshHistory() { + fetch('{{ url_for("ui.system_integrity_history") }}?limit=10', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (hist) { + var container = document.getElementById('integrityHistoryContainer'); + if (!container) return; + var execs = hist.executions || []; + if (execs.length === 0) { + container.innerHTML = '

No scans recorded yet.

'; + return; + } + var html = '
' + + _gcHistoryIcon + ' Recent Scans
' + + '
' + + '' + + '' + + ''; + execs.forEach(function (exec) { + var r = exec.result || {}; + var issues = (r.corrupted_objects || 0) + (r.orphaned_objects || 0) + + (r.phantom_metadata || 0) + (r.stale_versions || 0) + + (r.etag_cache_inconsistencies || 0) + (r.legacy_metadata_drifts || 0); + var issueHtml = issues > 0 + ? '' + issues + '' + : '0'; + var mode = exec.dry_run + ? 'Dry' + : (exec.auto_heal + ? 'Heal' + : 'Scan'); + html += '' + + '' + + '' + + '' + + ''; + }); + html += '
TimeScannedIssuesHealedMode
' + formatTimestamp(exec.timestamp) + '' + (r.objects_scanned || 0) + '' + issueHtml + '' + (r.issues_healed || 0) + '' + mode + '
'; + container.innerHTML = html; + }) + .catch(function () {}); + } + var _gcPollTimer = null; var _gcLastDryRun = false; @@ -447,6 +546,7 @@ _gcPollTimer = setTimeout(_gcPoll, 2000); } else { _gcSetScanning(false); + _gcRefreshHistory(); fetch('{{ url_for("ui.system_gc_history") }}?limit=1', { headers: {'X-CSRFToken': csrfToken} }) @@ -576,6 +676,7 @@ _integrityPollTimer = setTimeout(_integrityPoll, 2000); } else { _integritySetScanning(false); + _integrityRefreshHistory(); fetch('{{ url_for("ui.system_integrity_history") }}?limit=1', { headers: {'X-CSRFToken': csrfToken} }) From f60dbaf9c9dbae71cb743a38998b9084004a062b Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 18:36:13 +0800 Subject: [PATCH 09/11] Respect DISPLAY_TIMEZONE in GC and integrity scanner history tables --- app/ui.py | 5 +++-- templates/system.html | 13 ++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/app/ui.py b/app/ui.py index 4423a98..60d3917 100644 --- a/app/ui.py +++ b/app/ui.py @@ -4126,7 +4126,7 @@ def system_dashboard(): r = rec.get("result", {}) total_freed = r.get("temp_bytes_freed", 0) + r.get("multipart_bytes_freed", 0) + r.get("orphaned_version_bytes_freed", 0) rec["bytes_freed_display"] = _format_bytes(total_freed) - rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + rec["timestamp_display"] = _format_datetime_display(datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc)) gc_history_records.append(rec) checker = current_app.extensions.get("integrity") @@ -4135,7 +4135,7 @@ def system_dashboard(): if checker: raw = checker.get_history(limit=10, offset=0) for rec in raw: - rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + rec["timestamp_display"] = _format_datetime_display(datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc)) integrity_history_records.append(rec) features = [ @@ -4163,6 +4163,7 @@ def system_dashboard(): gc_history=gc_history_records, integrity_status=integrity_status, integrity_history=integrity_history_records, + display_timezone=current_app.config.get("DISPLAY_TIMEZONE", "UTC"), ) diff --git a/templates/system.html b/templates/system.html index e351230..5138e3b 100644 --- a/templates/system.html +++ b/templates/system.html @@ -387,11 +387,18 @@ return (i === 0 ? b : b.toFixed(1)) + ' ' + units[i]; } + var _displayTimezone = {{ display_timezone|tojson }}; + function formatTimestamp(ts) { var d = new Date(ts * 1000); - var pad = function (n) { return n < 10 ? '0' + n : '' + n; }; - return d.getUTCFullYear() + '-' + pad(d.getUTCMonth() + 1) + '-' + pad(d.getUTCDate()) + - ' ' + pad(d.getUTCHours()) + ':' + pad(d.getUTCMinutes()) + ' UTC'; + try { + var opts = {year: 'numeric', month: 'short', day: '2-digit', hour: '2-digit', minute: '2-digit', hour12: false, timeZone: _displayTimezone, timeZoneName: 'short'}; + return d.toLocaleString('en-US', opts); + } catch (e) { + var pad = function (n) { return n < 10 ? '0' + n : '' + n; }; + return d.getUTCFullYear() + '-' + pad(d.getUTCMonth() + 1) + '-' + pad(d.getUTCDate()) + + ' ' + pad(d.getUTCHours()) + ':' + pad(d.getUTCMinutes()) + ' UTC'; + } } var _gcHistoryIcon = '' + From 8996f1ce060e49f247cc2456dde9e48975331ddf Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 24 Mar 2026 12:10:38 +0800 Subject: [PATCH 10/11] Fix folder selection not showing delete button in bucket browser --- app/ui.py | 39 +++++++++++++++++++++++++++------ static/js/bucket-detail-main.js | 29 ++++++++++++++++++++---- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/app/ui.py b/app/ui.py index 60d3917..29ffd05 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1063,6 +1063,27 @@ def bulk_delete_objects(bucket_name: str): return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) + + folder_prefixes = [k for k in unique_keys if k.endswith("/")] + if folder_prefixes: + try: + client = get_session_s3_client() + for prefix in folder_prefixes: + unique_keys.remove(prefix) + paginator = client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"] not in unique_keys: + unique_keys.append(obj["Key"]) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _respond(False, err["error"], status_code=status) + return _respond(False, "S3 API server is unreachable", status_code=502) + + if not unique_keys: + return _respond(False, "No objects found under the selected folders", status_code=400) + try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: @@ -1093,13 +1114,17 @@ def bulk_delete_objects(bucket_name: str): else: try: client = get_session_s3_client() - objects_to_delete = [{"Key": k} for k in unique_keys] - resp = client.delete_objects( - Bucket=bucket_name, - Delete={"Objects": objects_to_delete, "Quiet": False}, - ) - deleted = [d["Key"] for d in resp.get("Deleted", [])] - errors = [{"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])] + deleted = [] + errors = [] + for i in range(0, len(unique_keys), 1000): + batch = unique_keys[i:i + 1000] + objects_to_delete = [{"Key": k} for k in batch] + resp = client.delete_objects( + Bucket=bucket_name, + Delete={"Objects": objects_to_delete, "Quiet": False}, + ) + deleted.extend(d["Key"] for d in resp.get("Deleted", [])) + errors.extend({"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])) for key in deleted: _replication_manager().trigger_replication(bucket_name, key, action="delete") except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 2ade8e8..9b48d21 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -867,6 +867,11 @@ const checkbox = row.querySelector('[data-folder-select]'); checkbox?.addEventListener('change', (e) => { e.stopPropagation(); + if (checkbox.checked) { + selectedRows.set(folderPath, { key: folderPath, isFolder: true }); + } else { + selectedRows.delete(folderPath); + } const folderObjects = allObjects.filter(obj => obj.key.startsWith(folderPath)); folderObjects.forEach(obj => { if (checkbox.checked) { @@ -1351,8 +1356,11 @@ } if (selectAllCheckbox) { const filesInView = visibleItems.filter(item => item.type === 'file'); - const total = filesInView.length; - const visibleSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length; + const foldersInView = visibleItems.filter(item => item.type === 'folder'); + const total = filesInView.length + foldersInView.length; + const fileSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length; + const folderSelectedCount = foldersInView.filter(item => selectedRows.has(item.path)).length; + const visibleSelectedCount = fileSelectedCount + folderSelectedCount; selectAllCheckbox.disabled = total === 0; selectAllCheckbox.checked = visibleSelectedCount > 0 && visibleSelectedCount === total && total > 0; selectAllCheckbox.indeterminate = visibleSelectedCount > 0 && visibleSelectedCount < total; @@ -1374,8 +1382,12 @@ const keys = Array.from(selectedRows.keys()); bulkDeleteList.innerHTML = ''; if (bulkDeleteCount) { - const label = keys.length === 1 ? 'object' : 'objects'; - bulkDeleteCount.textContent = `${keys.length} ${label} selected`; + const folderCount = keys.filter(k => k.endsWith('/')).length; + const objectCount = keys.length - folderCount; + const parts = []; + if (folderCount) parts.push(`${folderCount} folder${folderCount !== 1 ? 's' : ''}`); + if (objectCount) parts.push(`${objectCount} object${objectCount !== 1 ? 's' : ''}`); + bulkDeleteCount.textContent = `${parts.join(' and ')} selected`; } if (!keys.length) { const empty = document.createElement('li'); @@ -3172,6 +3184,15 @@ } }); + const foldersInView = visibleItems.filter(item => item.type === 'folder'); + foldersInView.forEach(item => { + if (shouldSelect) { + selectedRows.set(item.path, { key: item.path, isFolder: true }); + } else { + selectedRows.delete(item.path); + } + }); + document.querySelectorAll('[data-folder-select]').forEach(cb => { cb.checked = shouldSelect; }); From 0e392e18b492884e25f7d18dc281c8c99d75b3b9 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 24 Mar 2026 15:15:03 +0800 Subject: [PATCH 11/11] Hide ghost details in object panel when preview fails to load --- app/errors.py | 10 ++- static/js/bucket-detail-main.js | 106 ++++++++++++++++++++++++++++---- templates/bucket_detail.html | 3 +- 3 files changed, 105 insertions(+), 14 deletions(-) diff --git a/app/errors.py b/app/errors.py index b2a4079..049187d 100644 --- a/app/errors.py +++ b/app/errors.py @@ -175,13 +175,21 @@ def handle_app_error(error: AppError) -> Response: def handle_rate_limit_exceeded(e: RateLimitExceeded) -> Response: g.s3_error_code = "SlowDown" + if request.path.startswith("/ui") or request.path.startswith("/buckets"): + wants_json = ( + request.is_json or + request.headers.get("X-Requested-With") == "XMLHttpRequest" or + "application/json" in request.accept_mimetypes.values() + ) + if wants_json: + return jsonify({"success": False, "error": {"code": "SlowDown", "message": "Please reduce your request rate."}}), 429 error = Element("Error") SubElement(error, "Code").text = "SlowDown" SubElement(error, "Message").text = "Please reduce your request rate." SubElement(error, "Resource").text = request.path SubElement(error, "RequestId").text = getattr(g, "request_id", "") xml_bytes = tostring(error, encoding="utf-8") - return Response(xml_bytes, status=429, mimetype="application/xml") + return Response(xml_bytes, status="429 Too Many Requests", mimetype="application/xml") def register_error_handlers(app): diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 9b48d21..f35f46f 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -99,6 +99,8 @@ const previewMetadataList = document.getElementById('preview-metadata-list'); const previewPlaceholder = document.getElementById('preview-placeholder'); const previewPlaceholderDefault = previewPlaceholder ? previewPlaceholder.innerHTML : ''; + const previewErrorAlert = document.getElementById('preview-error-alert'); + const previewDetailsMeta = document.getElementById('preview-details-meta'); const previewImage = document.getElementById('preview-image'); const previewVideo = document.getElementById('preview-video'); const previewAudio = document.getElementById('preview-audio'); @@ -1990,6 +1992,34 @@ previewPlaceholder.classList.remove('d-none'); }; + let previewFailed = false; + + const handlePreviewError = () => { + previewFailed = true; + if (downloadButton) { + downloadButton.classList.add('disabled'); + downloadButton.removeAttribute('href'); + } + if (presignButton) presignButton.disabled = true; + if (generatePresignButton) generatePresignButton.disabled = true; + if (previewDetailsMeta) previewDetailsMeta.classList.add('d-none'); + if (previewMetadata) previewMetadata.classList.add('d-none'); + const tagsPanel = document.getElementById('preview-tags'); + if (tagsPanel) tagsPanel.classList.add('d-none'); + const versionPanel = document.getElementById('version-panel'); + if (versionPanel) versionPanel.classList.add('d-none'); + if (previewErrorAlert) { + previewErrorAlert.textContent = 'Unable to load object \u2014 it may have been deleted, or the server returned an error.'; + previewErrorAlert.classList.remove('d-none'); + } + }; + + const clearPreviewError = () => { + previewFailed = false; + if (previewErrorAlert) previewErrorAlert.classList.add('d-none'); + if (previewDetailsMeta) previewDetailsMeta.classList.remove('d-none'); + }; + async function fetchMetadata(metadataUrl) { if (!metadataUrl) return null; try { @@ -2011,6 +2041,7 @@ previewPanel.classList.remove('d-none'); activeRow = row; renderMetadata(null); + clearPreviewError(); previewKey.textContent = row.dataset.key; previewSize.textContent = formatBytes(Number(row.dataset.size)); @@ -2036,25 +2067,69 @@ if (previewUrl && lower.match(/\.(png|jpg|jpeg|gif|webp|svg|ico|bmp)$/)) { previewPlaceholder.innerHTML = '
Loading preview\u2026
'; const currentRow = row; - previewImage.onload = () => { - if (activeRow !== currentRow) return; - previewImage.classList.remove('d-none'); - previewPlaceholder.classList.add('d-none'); - }; - previewImage.onerror = () => { - if (activeRow !== currentRow) return; - previewPlaceholder.innerHTML = '
Failed to load preview
'; - }; - previewImage.src = previewUrl; + fetch(previewUrl) + .then((r) => { + if (activeRow !== currentRow) return; + if (!r.ok) { + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); + return; + } + return r.blob(); + }) + .then((blob) => { + if (!blob || activeRow !== currentRow) return; + const url = URL.createObjectURL(blob); + previewImage.onload = () => { + if (activeRow !== currentRow) { URL.revokeObjectURL(url); return; } + previewImage.classList.remove('d-none'); + previewPlaceholder.classList.add('d-none'); + }; + previewImage.onerror = () => { + if (activeRow !== currentRow) { URL.revokeObjectURL(url); return; } + URL.revokeObjectURL(url); + previewPlaceholder.innerHTML = '
Failed to load preview
'; + }; + previewImage.src = url; + }) + .catch(() => { + if (activeRow !== currentRow) return; + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); + }); } else if (previewUrl && lower.match(/\.(mp4|webm|ogv|mov|avi|mkv)$/)) { + const currentRow = row; + previewVideo.onerror = () => { + if (activeRow !== currentRow) return; + previewVideo.classList.add('d-none'); + previewPlaceholder.classList.remove('d-none'); + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); + }; previewVideo.src = previewUrl; previewVideo.classList.remove('d-none'); previewPlaceholder.classList.add('d-none'); } else if (previewUrl && lower.match(/\.(mp3|wav|flac|ogg|aac|m4a|wma)$/)) { + const currentRow = row; + previewAudio.onerror = () => { + if (activeRow !== currentRow) return; + previewAudio.classList.add('d-none'); + previewPlaceholder.classList.remove('d-none'); + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); + }; previewAudio.src = previewUrl; previewAudio.classList.remove('d-none'); previewPlaceholder.classList.add('d-none'); } else if (previewUrl && lower.match(/\.(pdf)$/)) { + const currentRow = row; + previewIframe.onerror = () => { + if (activeRow !== currentRow) return; + previewIframe.classList.add('d-none'); + previewPlaceholder.classList.remove('d-none'); + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); + }; previewIframe.src = previewUrl; previewIframe.style.minHeight = '500px'; previewIframe.classList.remove('d-none'); @@ -2079,14 +2154,17 @@ }) .catch(() => { if (activeRow !== currentRow) return; - previewText.textContent = 'Failed to load preview'; + previewText.classList.add('d-none'); + previewPlaceholder.classList.remove('d-none'); + previewPlaceholder.innerHTML = '
Failed to load preview
'; + handlePreviewError(); }); } const metadataUrl = row.dataset.metadataUrl; if (metadataUrl) { const metadata = await fetchMetadata(metadataUrl); - if (activeRow === row) { + if (activeRow === row && !previewFailed) { renderMetadata(metadata); } } @@ -3993,6 +4071,10 @@ const loadObjectTags = async (row) => { if (!row || !previewTagsPanel) return; + if (previewFailed) { + previewTagsPanel.classList.add('d-none'); + return; + } const tagsUrl = row.dataset.tagsUrl; if (!tagsUrl) { previewTagsPanel.classList.add('d-none'); diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index d6ad4f3..f913880 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -257,7 +257,8 @@ Share Link
-
+ +
Last modified