diff --git a/app/admin_api.py b/app/admin_api.py
index ccc0408..d09658d 100644
--- a/app/admin_api.py
+++ b/app/admin_api.py
@@ -961,12 +961,14 @@ def integrity_run_now():
payload = request.get_json(silent=True) or {}
override_dry_run = payload.get("dry_run")
override_auto_heal = payload.get("auto_heal")
- result = checker.run_now(
+ started = checker.run_async(
auto_heal=override_auto_heal if override_auto_heal is not None else None,
dry_run=override_dry_run if override_dry_run is not None else None,
)
logger.info("Integrity manual run by %s", principal.access_key)
- return jsonify(result.to_dict())
+ if not started:
+ return _json_error("Conflict", "A scan is already in progress", 409)
+ return jsonify({"status": "started"})
@admin_api_bp.route("/integrity/history", methods=["GET"])
diff --git a/app/integrity.py b/app/integrity.py
index 4dc15d2..02b996b 100644
--- a/app/integrity.py
+++ b/app/integrity.py
@@ -189,6 +189,8 @@ class IntegrityChecker:
self._timer: Optional[threading.Timer] = None
self._shutdown = False
self._lock = threading.Lock()
+ self._scanning = False
+ self._scan_start_time: Optional[float] = None
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
def start(self) -> None:
@@ -229,52 +231,70 @@ class IntegrityChecker:
self._schedule_next()
def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult:
- effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
- effective_dry_run = dry_run if dry_run is not None else self.dry_run
+ if not self._lock.acquire(blocking=False):
+ raise RuntimeError("Integrity scan is already in progress")
- start = time.time()
- result = IntegrityResult()
+ try:
+ self._scanning = True
+ self._scan_start_time = time.time()
- bucket_names = self._list_bucket_names()
+ effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
+ effective_dry_run = dry_run if dry_run is not None else self.dry_run
- for bucket_name in bucket_names:
- if result.objects_scanned >= self.batch_size:
- break
- result.buckets_scanned += 1
- self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
- self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
- self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
- self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
- self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
- self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
+ start = self._scan_start_time
+ result = IntegrityResult()
- result.execution_time_seconds = time.time() - start
+ bucket_names = self._list_bucket_names()
- if result.has_issues or result.errors:
- logger.info(
- "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
- "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
- result.execution_time_seconds,
- result.corrupted_objects,
- result.orphaned_objects,
- result.phantom_metadata,
- result.stale_versions,
- result.etag_cache_inconsistencies,
- result.legacy_metadata_drifts,
- result.issues_healed,
- len(result.errors),
- " (dry run)" if effective_dry_run else "",
+ for bucket_name in bucket_names:
+ if result.objects_scanned >= self.batch_size:
+ break
+ result.buckets_scanned += 1
+ self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
+ self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
+ self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
+ self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
+ self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
+ self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
+
+ result.execution_time_seconds = time.time() - start
+
+ if result.has_issues or result.errors:
+ logger.info(
+ "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
+ "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
+ result.execution_time_seconds,
+ result.corrupted_objects,
+ result.orphaned_objects,
+ result.phantom_metadata,
+ result.stale_versions,
+ result.etag_cache_inconsistencies,
+ result.legacy_metadata_drifts,
+ result.issues_healed,
+ len(result.errors),
+ " (dry run)" if effective_dry_run else "",
+ )
+
+ record = IntegrityExecutionRecord(
+ timestamp=time.time(),
+ result=result.to_dict(),
+ dry_run=effective_dry_run,
+ auto_heal=effective_auto_heal,
)
+ self.history_store.add(record)
- record = IntegrityExecutionRecord(
- timestamp=time.time(),
- result=result.to_dict(),
- dry_run=effective_dry_run,
- auto_heal=effective_auto_heal,
- )
- self.history_store.add(record)
+ return result
+ finally:
+ self._scanning = False
+ self._scan_start_time = None
+ self._lock.release()
- return result
+ def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool:
+ if self._scanning:
+ return False
+ t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True)
+ t.start()
+ return True
def _system_path(self) -> Path:
return self.storage_root / self.SYSTEM_ROOT
@@ -728,11 +748,15 @@ class IntegrityChecker:
return [r.to_dict() for r in records]
def get_status(self) -> dict:
- return {
+ status: Dict[str, Any] = {
"enabled": not self._shutdown or self._timer is not None,
"running": self._timer is not None and not self._shutdown,
+ "scanning": self._scanning,
"interval_hours": self.interval_seconds / 3600.0,
"batch_size": self.batch_size,
"auto_heal": self.auto_heal,
"dry_run": self.dry_run,
}
+ if self._scanning and self._scan_start_time is not None:
+ status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1)
+ return status
diff --git a/app/ui.py b/app/ui.py
index b1a0947..aff4fa4 100644
--- a/app/ui.py
+++ b/app/ui.py
@@ -4202,11 +4202,46 @@ def system_integrity_run():
return jsonify({"error": "Integrity checker is not enabled"}), 400
payload = request.get_json(silent=True) or {}
- result = checker.run_now(
+ started = checker.run_async(
auto_heal=payload.get("auto_heal"),
dry_run=payload.get("dry_run"),
)
- return jsonify(result.to_dict())
+ if not started:
+ return jsonify({"error": "A scan is already in progress"}), 409
+ return jsonify({"status": "started"})
+
+
+@ui_bp.get("/system/integrity/status")
+def system_integrity_status():
+ principal = _current_principal()
+ try:
+ _iam().authorize(principal, None, "iam:*")
+ except IamError:
+ return jsonify({"error": "Access denied"}), 403
+
+ checker = current_app.extensions.get("integrity")
+ if not checker:
+ return jsonify({"error": "Integrity checker is not enabled"}), 400
+
+ return jsonify(checker.get_status())
+
+
+@ui_bp.get("/system/integrity/history")
+def system_integrity_history():
+ principal = _current_principal()
+ try:
+ _iam().authorize(principal, None, "iam:*")
+ except IamError:
+ return jsonify({"error": "Access denied"}), 403
+
+ checker = current_app.extensions.get("integrity")
+ if not checker:
+ return jsonify({"executions": []})
+
+ limit = min(int(request.args.get("limit", 10)), 200)
+ offset = int(request.args.get("offset", 0))
+ records = checker.get_history(limit=limit, offset=offset)
+ return jsonify({"executions": records})
@ui_bp.app_errorhandler(404)
diff --git a/app/version.py b/app/version.py
index 77fc8ae..5c24c4a 100644
--- a/app/version.py
+++ b/app/version.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-APP_VERSION = "0.4.0"
+APP_VERSION = "0.4.1"
def get_version() -> str:
diff --git a/templates/system.html b/templates/system.html
index 3758517..ed3908d 100644
--- a/templates/system.html
+++ b/templates/system.html
@@ -233,21 +233,28 @@
{% if integrity_status.enabled %}
-
+
+
@@ -431,11 +438,95 @@
});
};
- window.runIntegrity = function (dryRun, autoHeal) {
- var activeBtn = dryRun ? 'integrityDryRunBtn' : (autoHeal ? 'integrityHealBtn' : 'integrityRunBtn');
- ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn'].forEach(function (id) {
- setLoading(id, true, id !== activeBtn);
+ var _integrityPollTimer = null;
+ var _integrityLastMode = {dryRun: false, autoHeal: false};
+
+ function _integritySetScanning(scanning) {
+ var banner = document.getElementById('integrityScanningBanner');
+ var btns = ['integrityRunBtn', 'integrityHealBtn', 'integrityDryRunBtn'];
+ if (scanning) {
+ banner.classList.remove('d-none');
+ btns.forEach(function (id) {
+ var el = document.getElementById(id);
+ if (el) el.disabled = true;
+ });
+ } else {
+ banner.classList.add('d-none');
+ document.getElementById('integrityScanElapsed').textContent = '';
+ btns.forEach(function (id) {
+ var el = document.getElementById(id);
+ if (el) el.disabled = false;
+ });
+ }
+ }
+
+ function _integrityShowResult(data, dryRun, autoHeal) {
+ var container = document.getElementById('integrityResult');
+ var alert = document.getElementById('integrityResultAlert');
+ var title = document.getElementById('integrityResultTitle');
+ var body = document.getElementById('integrityResultBody');
+ container.classList.remove('d-none');
+
+ var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) +
+ (data.phantom_metadata || 0) + (data.stale_versions || 0) +
+ (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0);
+
+ var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : '');
+ alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small';
+ title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's';
+
+ var lines = [];
+ lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets');
+ if (totalIssues === 0) {
+ lines.push('No issues found.');
+ } else {
+ if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects);
+ if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects);
+ if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata);
+ if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions);
+ if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies);
+ if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts);
+ if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed);
+ }
+ if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', '));
+
+ body.innerHTML = lines.join('
');
+ }
+
+ function _integrityPoll() {
+ fetch('{{ url_for("ui.system_integrity_status") }}', {
+ headers: {'X-CSRFToken': csrfToken}
+ })
+ .then(function (r) { return r.json(); })
+ .then(function (status) {
+ if (status.scanning) {
+ var elapsed = status.scan_elapsed_seconds || 0;
+ document.getElementById('integrityScanElapsed').textContent = ' (' + elapsed.toFixed(0) + 's)';
+ _integrityPollTimer = setTimeout(_integrityPoll, 2000);
+ } else {
+ _integritySetScanning(false);
+ fetch('{{ url_for("ui.system_integrity_history") }}?limit=1', {
+ headers: {'X-CSRFToken': csrfToken}
+ })
+ .then(function (r) { return r.json(); })
+ .then(function (hist) {
+ if (hist.executions && hist.executions.length > 0) {
+ var latest = hist.executions[0];
+ _integrityShowResult(latest.result, latest.dry_run, latest.auto_heal);
+ }
+ })
+ .catch(function () {});
+ }
+ })
+ .catch(function () {
+ _integrityPollTimer = setTimeout(_integrityPoll, 3000);
});
+ }
+
+ window.runIntegrity = function (dryRun, autoHeal) {
+ _integrityLastMode = {dryRun: dryRun, autoHeal: autoHeal};
+ document.getElementById('integrityResult').classList.add('d-none');
+ _integritySetScanning(true);
fetch('{{ url_for("ui.system_integrity_run") }}', {
method: 'POST',
@@ -444,45 +535,22 @@
})
.then(function (r) { return r.json(); })
.then(function (data) {
- var container = document.getElementById('integrityResult');
- var alert = document.getElementById('integrityResultAlert');
- var title = document.getElementById('integrityResultTitle');
- var body = document.getElementById('integrityResultBody');
- container.classList.remove('d-none');
-
if (data.error) {
+ _integritySetScanning(false);
+ var container = document.getElementById('integrityResult');
+ var alert = document.getElementById('integrityResultAlert');
+ var title = document.getElementById('integrityResultTitle');
+ var body = document.getElementById('integrityResultBody');
+ container.classList.remove('d-none');
alert.className = 'alert alert-danger mb-0 small';
title.textContent = 'Error';
body.textContent = data.error;
return;
}
-
- var totalIssues = (data.corrupted_objects || 0) + (data.orphaned_objects || 0) +
- (data.phantom_metadata || 0) + (data.stale_versions || 0) +
- (data.etag_cache_inconsistencies || 0) + (data.legacy_metadata_drifts || 0);
-
- var prefix = dryRun ? '[Dry Run] ' : (autoHeal ? '[Heal] ' : '');
- alert.className = totalIssues > 0 ? 'alert alert-warning mb-0 small' : 'alert alert-success mb-0 small';
- title.textContent = prefix + 'Completed in ' + (data.execution_time_seconds || 0).toFixed(2) + 's';
-
- var lines = [];
- lines.push('Scanned: ' + (data.objects_scanned || 0) + ' objects in ' + (data.buckets_scanned || 0) + ' buckets');
- if (totalIssues === 0) {
- lines.push('No issues found.');
- } else {
- if (data.corrupted_objects) lines.push('Corrupted objects: ' + data.corrupted_objects);
- if (data.orphaned_objects) lines.push('Orphaned objects: ' + data.orphaned_objects);
- if (data.phantom_metadata) lines.push('Phantom metadata: ' + data.phantom_metadata);
- if (data.stale_versions) lines.push('Stale versions: ' + data.stale_versions);
- if (data.etag_cache_inconsistencies) lines.push('ETag inconsistencies: ' + data.etag_cache_inconsistencies);
- if (data.legacy_metadata_drifts) lines.push('Legacy metadata drifts: ' + data.legacy_metadata_drifts);
- if (data.issues_healed) lines.push('Issues healed: ' + data.issues_healed);
- }
- if (data.errors && data.errors.length > 0) lines.push('Errors: ' + data.errors.join(', '));
-
- body.innerHTML = lines.join('
');
+ _integrityPollTimer = setTimeout(_integrityPoll, 2000);
})
.catch(function (err) {
+ _integritySetScanning(false);
var container = document.getElementById('integrityResult');
var alert = document.getElementById('integrityResultAlert');
var title = document.getElementById('integrityResultTitle');
@@ -491,13 +559,13 @@
alert.className = 'alert alert-danger mb-0 small';
title.textContent = 'Error';
body.textContent = err.message;
- })
- .finally(function () {
- setLoading('integrityRunBtn', false);
- setLoading('integrityHealBtn', false);
- setLoading('integrityDryRunBtn', false);
});
};
+
+ {% if integrity_status.scanning %}
+ _integritySetScanning(true);
+ _integrityPollTimer = setTimeout(_integrityPoll, 2000);
+ {% endif %}
})();
{% endblock %}
diff --git a/tests/test_integrity.py b/tests/test_integrity.py
index f4b4299..6460b11 100644
--- a/tests/test_integrity.py
+++ b/tests/test_integrity.py
@@ -2,6 +2,7 @@ import hashlib
import json
import os
import sys
+import time
from pathlib import Path
import pytest
@@ -11,6 +12,17 @@ sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app.integrity import IntegrityChecker, IntegrityResult
+def _wait_scan_done(client, headers, timeout=10):
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ resp = client.get("/admin/integrity/status", headers=headers)
+ data = resp.get_json()
+ if not data.get("scanning"):
+ return
+ time.sleep(0.1)
+ raise TimeoutError("scan did not complete")
+
+
def _md5(data: bytes) -> str:
return hashlib.md5(data).hexdigest()
@@ -413,8 +425,13 @@ class TestAdminAPI:
resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
assert resp.status_code == 200
data = resp.get_json()
- assert "corrupted_objects" in data
- assert "objects_scanned" in data
+ assert data["status"] == "started"
+ _wait_scan_done(client, AUTH_HEADERS)
+ resp = client.get("/admin/integrity/history?limit=1", headers=AUTH_HEADERS)
+ hist = resp.get_json()
+ assert len(hist["executions"]) >= 1
+ assert "corrupted_objects" in hist["executions"][0]["result"]
+ assert "objects_scanned" in hist["executions"][0]["result"]
def test_run_with_overrides(self, integrity_app):
client = integrity_app.test_client()
@@ -424,10 +441,12 @@ class TestAdminAPI:
json={"dry_run": True, "auto_heal": True},
)
assert resp.status_code == 200
+ _wait_scan_done(client, AUTH_HEADERS)
def test_history_endpoint(self, integrity_app):
client = integrity_app.test_client()
client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
+ _wait_scan_done(client, AUTH_HEADERS)
resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()