From 9898167f8dba69b9d28ea4172318625acb9b1e66 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sun, 22 Mar 2026 14:17:43 +0800 Subject: [PATCH] Make integrity scan async with progress indicator in UI --- app/admin_api.py | 6 +- app/integrity.py | 102 ++++++++++++++++---------- app/ui.py | 39 +++++++++- app/version.py | 2 +- templates/system.html | 154 +++++++++++++++++++++++++++++----------- tests/test_integrity.py | 23 +++++- 6 files changed, 237 insertions(+), 89 deletions(-) diff --git a/app/admin_api.py b/app/admin_api.py index ccc0408..d09658d 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -961,12 +961,14 @@ def integrity_run_now(): payload = request.get_json(silent=True) or {} override_dry_run = payload.get("dry_run") override_auto_heal = payload.get("auto_heal") - result = checker.run_now( + started = checker.run_async( auto_heal=override_auto_heal if override_auto_heal is not None else None, dry_run=override_dry_run if override_dry_run is not None else None, ) logger.info("Integrity manual run by %s", principal.access_key) - return jsonify(result.to_dict()) + if not started: + return _json_error("Conflict", "A scan is already in progress", 409) + return jsonify({"status": "started"}) @admin_api_bp.route("/integrity/history", methods=["GET"]) diff --git a/app/integrity.py b/app/integrity.py index 4dc15d2..02b996b 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -189,6 +189,8 @@ class IntegrityChecker: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._scanning = False + self._scan_start_time: Optional[float] = None self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -229,52 +231,70 @@ class IntegrityChecker: self._schedule_next() def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult: - effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal - effective_dry_run = dry_run if dry_run is not None else self.dry_run + if not self._lock.acquire(blocking=False): + raise RuntimeError("Integrity scan is already in progress") - start = time.time() - result = IntegrityResult() + try: + self._scanning = True + self._scan_start_time = time.time() - bucket_names = self._list_bucket_names() + effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal + effective_dry_run = dry_run if dry_run is not None else self.dry_run - for bucket_name in bucket_names: - if result.objects_scanned >= self.batch_size: - break - result.buckets_scanned += 1 - self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + start = self._scan_start_time + result = IntegrityResult() - result.execution_time_seconds = time.time() - start + bucket_names = self._list_bucket_names() - if result.has_issues or result.errors: - logger.info( - "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " - "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", - result.execution_time_seconds, - result.corrupted_objects, - result.orphaned_objects, - result.phantom_metadata, - result.stale_versions, - result.etag_cache_inconsistencies, - result.legacy_metadata_drifts, - result.issues_healed, - len(result.errors), - " (dry run)" if effective_dry_run else "", + for bucket_name in bucket_names: + if result.objects_scanned >= self.batch_size: + break + result.buckets_scanned += 1 + self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + + result.execution_time_seconds = time.time() - start + + if result.has_issues or result.errors: + logger.info( + "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " + "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", + result.execution_time_seconds, + result.corrupted_objects, + result.orphaned_objects, + result.phantom_metadata, + result.stale_versions, + result.etag_cache_inconsistencies, + result.legacy_metadata_drifts, + result.issues_healed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = IntegrityExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, + auto_heal=effective_auto_heal, ) + self.history_store.add(record) - record = IntegrityExecutionRecord( - timestamp=time.time(), - result=result.to_dict(), - dry_run=effective_dry_run, - auto_heal=effective_auto_heal, - ) - self.history_store.add(record) + return result + finally: + self._scanning = False + self._scan_start_time = None + self._lock.release() - return result + def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool: + if self._scanning: + return False + t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True) + t.start() + return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT @@ -728,11 +748,15 @@ class IntegrityChecker: return [r.to_dict() for r in records] def get_status(self) -> dict: - return { + status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, + "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, } + if self._scanning and self._scan_start_time is not None: + status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) + return status diff --git a/app/ui.py b/app/ui.py index b1a0947..aff4fa4 100644 --- a/app/ui.py +++ b/app/ui.py @@ -4202,11 +4202,46 @@ def system_integrity_run(): return jsonify({"error": "Integrity checker is not enabled"}), 400 payload = request.get_json(silent=True) or {} - result = checker.run_now( + started = checker.run_async( auto_heal=payload.get("auto_heal"), dry_run=payload.get("dry_run"), ) - return jsonify(result.to_dict()) + if not started: + return jsonify({"error": "A scan is already in progress"}), 409 + return jsonify({"status": "started"}) + + +@ui_bp.get("/system/integrity/status") +def system_integrity_status(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"error": "Integrity checker is not enabled"}), 400 + + return jsonify(checker.get_status()) + + +@ui_bp.get("/system/integrity/history") +def system_integrity_history(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"executions": []}) + + limit = min(int(request.args.get("limit", 10)), 200) + offset = int(request.args.get("offset", 0)) + records = checker.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) @ui_bp.app_errorhandler(404) diff --git a/app/version.py b/app/version.py index 77fc8ae..5c24c4a 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.4.0" +APP_VERSION = "0.4.1" def get_version() -> str: diff --git a/templates/system.html b/templates/system.html index 3758517..ed3908d 100644 --- a/templates/system.html +++ b/templates/system.html @@ -233,21 +233,28 @@
{% if integrity_status.enabled %}
- - -
+
+
+
+ Scan in progress +
+
+
@@ -431,11 +438,95 @@ }); }; - window.runIntegrity = function (dryRun, autoHeal) { - var activeBtn = dryRun ? 'integrityDryRunBtn' : (autoHeal ? 'integrityHealBtn' : 'integrityRunBtn'); - ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn'].forEach(function (id) { - setLoading(id, true, id !== activeBtn); + var _integrityPollTimer = null; + var _integrityLastMode = {dryRun: false, autoHeal: false}; + + function _integritySetScanning(scanning) { + var banner = document.getElementById('integrityScanningBanner'); + var btns = ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn']; + if (scanning) { + banner.classList.remove('d-none'); + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = true; + }); + } else { + banner.classList.add('d-none'); + document.getElementById('integrityScanElapsed').textContent = ''; + btns.forEach(function (id) { + var el = document.getElementById(id); + if (el) el.disabled = false; + }); + } + } + + function _integrityShowResult(data, dryRun, autoHeal) { + var container = document.getElementById('integrityResult'); + var alert = document.getElementById('integrityResultAlert'); + var title = document.getElementById('integrityResultTitle'); + var body = document.getElementById('integrityResultBody'); + container.classList.remove('d-none'); + + var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) + + (data.phantom_metadata || 0) + (data.stale_versions || 0) + + (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0); + + var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : ''); + alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small'; + title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; + + var lines = []; + lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets'); + if (totalIssues === 0) { + lines.push('No issues found.'); + } else { + if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects); + if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects); + if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata); + if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions); + if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies); + if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts); + if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed); + } + if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); + + body.innerHTML = lines.join('
'); + } + + function _integrityPoll() { + fetch('{{ url_for("ui.system_integrity_status") }}', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (status) { + if (status.scanning) { + var elapsed = status.scan_elapsed_seconds || 0; + document.getElementById('integrityScanElapsed').textContent = ' (' + elapsed.toFixed(0) + 's)'; + _integrityPollTimer = setTimeout(_integrityPoll, 2000); + } else { + _integritySetScanning(false); + fetch('{{ url_for("ui.system_integrity_history") }}?limit=1', { + headers: {'X-CSRFToken': csrfToken} + }) + .then(function (r) { return r.json(); }) + .then(function (hist) { + if (hist.executions && hist.executions.length > 0) { + var latest = hist.executions[0]; + _integrityShowResult(latest.result, latest.dry_run, latest.auto_heal); + } + }) + .catch(function () {}); + } + }) + .catch(function () { + _integrityPollTimer = setTimeout(_integrityPoll, 3000); }); + } + + window.runIntegrity = function (dryRun, autoHeal) { + _integrityLastMode = {dryRun: dryRun, autoHeal: autoHeal}; + document.getElementById('integrityResult').classList.add('d-none'); + _integritySetScanning(true); fetch('{{ url_for("ui.system_integrity_run") }}', { method: 'POST', @@ -444,45 +535,22 @@ }) .then(function (r) { return r.json(); }) .then(function (data) { - var container = document.getElementById('integrityResult'); - var alert = document.getElementById('integrityResultAlert'); - var title = document.getElementById('integrityResultTitle'); - var body = document.getElementById('integrityResultBody'); - container.classList.remove('d-none'); - if (data.error) { + _integritySetScanning(false); + var container = document.getElementById('integrityResult'); + var alert = document.getElementById('integrityResultAlert'); + var title = document.getElementById('integrityResultTitle'); + var body = document.getElementById('integrityResultBody'); + container.classList.remove('d-none'); alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = data.error; return; } - - var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) + - (data.phantom_metadata || 0) + (data.stale_versions || 0) + - (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0); - - var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : ''); - alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small'; - title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's'; - - var lines = []; - lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets'); - if (totalIssues === 0) { - lines.push('No issues found.'); - } else { - if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects); - if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects); - if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata); - if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions); - if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies); - if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts); - if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed); - } - if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', ')); - - body.innerHTML = lines.join('
'); + _integrityPollTimer = setTimeout(_integrityPoll, 2000); }) .catch(function (err) { + _integritySetScanning(false); var container = document.getElementById('integrityResult'); var alert = document.getElementById('integrityResultAlert'); var title = document.getElementById('integrityResultTitle'); @@ -491,13 +559,13 @@ alert.className = 'alert alert-danger mb-0 small'; title.textContent = 'Error'; body.textContent = err.message; - }) - .finally(function () { - setLoading('integrityRunBtn', false); - setLoading('integrityHealBtn', false); - setLoading('integrityDryRunBtn', false); }); }; + + {% if integrity_status.scanning %} + _integritySetScanning(true); + _integrityPollTimer = setTimeout(_integrityPoll, 2000); + {% endif %} })(); {% endblock %} diff --git a/tests/test_integrity.py b/tests/test_integrity.py index f4b4299..6460b11 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -2,6 +2,7 @@ import hashlib import json import os import sys +import time from pathlib import Path import pytest @@ -11,6 +12,17 @@ sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from app.integrity import IntegrityChecker, IntegrityResult +def _wait_scan_done(client, headers, timeout=10): + deadline = time.time() + timeout + while time.time() < deadline: + resp = client.get("/admin/integrity/status", headers=headers) + data = resp.get_json() + if not data.get("scanning"): + return + time.sleep(0.1) + raise TimeoutError("scan did not complete") + + def _md5(data: bytes) -> str: return hashlib.md5(data).hexdigest() @@ -413,8 +425,13 @@ class TestAdminAPI: resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) assert resp.status_code == 200 data = resp.get_json() - assert "corrupted_objects" in data - assert "objects_scanned" in data + assert data["status"] == "started" + _wait_scan_done(client, AUTH_HEADERS) + resp = client.get("/admin/integrity/history?limit=1", headers=AUTH_HEADERS) + hist = resp.get_json() + assert len(hist["executions"]) >= 1 + assert "corrupted_objects" in hist["executions"][0]["result"] + assert "objects_scanned" in hist["executions"][0]["result"] def test_run_with_overrides(self, integrity_app): client = integrity_app.test_client() @@ -424,10 +441,12 @@ class TestAdminAPI: json={"dry_run": True, "auto_heal": True}, ) assert resp.status_code == 200 + _wait_scan_done(client, AUTH_HEADERS) def test_history_endpoint(self, integrity_app): client = integrity_app.test_client() client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) + _wait_scan_done(client, AUTH_HEADERS) resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS) assert resp.status_code == 200 data = resp.get_json()