Make integrity scan async with progress indicator in UI
This commit is contained in:
@@ -961,12 +961,14 @@ def integrity_run_now():
|
||||
payload = request.get_json(silent=True) or {}
|
||||
override_dry_run = payload.get("dry_run")
|
||||
override_auto_heal = payload.get("auto_heal")
|
||||
result = checker.run_now(
|
||||
started = checker.run_async(
|
||||
auto_heal=override_auto_heal if override_auto_heal is not None else None,
|
||||
dry_run=override_dry_run if override_dry_run is not None else None,
|
||||
)
|
||||
logger.info("Integrity manual run by %s", principal.access_key)
|
||||
return jsonify(result.to_dict())
|
||||
if not started:
|
||||
return _json_error("Conflict", "A scan is already in progress", 409)
|
||||
return jsonify({"status": "started"})
|
||||
|
||||
|
||||
@admin_api_bp.route("/integrity/history", methods=["GET"])
|
||||
|
||||
102
app/integrity.py
102
app/integrity.py
@@ -189,6 +189,8 @@ class IntegrityChecker:
|
||||
self._timer: Optional[threading.Timer] = None
|
||||
self._shutdown = False
|
||||
self._lock = threading.Lock()
|
||||
self._scanning = False
|
||||
self._scan_start_time: Optional[float] = None
|
||||
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
|
||||
|
||||
def start(self) -> None:
|
||||
@@ -229,52 +231,70 @@ class IntegrityChecker:
|
||||
self._schedule_next()
|
||||
|
||||
def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult:
|
||||
effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
|
||||
effective_dry_run = dry_run if dry_run is not None else self.dry_run
|
||||
if not self._lock.acquire(blocking=False):
|
||||
raise RuntimeError("Integrity scan is already in progress")
|
||||
|
||||
start = time.time()
|
||||
result = IntegrityResult()
|
||||
try:
|
||||
self._scanning = True
|
||||
self._scan_start_time = time.time()
|
||||
|
||||
bucket_names = self._list_bucket_names()
|
||||
effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
|
||||
effective_dry_run = dry_run if dry_run is not None else self.dry_run
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
break
|
||||
result.buckets_scanned += 1
|
||||
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
start = self._scan_start_time
|
||||
result = IntegrityResult()
|
||||
|
||||
result.execution_time_seconds = time.time() - start
|
||||
bucket_names = self._list_bucket_names()
|
||||
|
||||
if result.has_issues or result.errors:
|
||||
logger.info(
|
||||
"Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
|
||||
"stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
|
||||
result.execution_time_seconds,
|
||||
result.corrupted_objects,
|
||||
result.orphaned_objects,
|
||||
result.phantom_metadata,
|
||||
result.stale_versions,
|
||||
result.etag_cache_inconsistencies,
|
||||
result.legacy_metadata_drifts,
|
||||
result.issues_healed,
|
||||
len(result.errors),
|
||||
" (dry run)" if effective_dry_run else "",
|
||||
for bucket_name in bucket_names:
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
break
|
||||
result.buckets_scanned += 1
|
||||
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
|
||||
result.execution_time_seconds = time.time() - start
|
||||
|
||||
if result.has_issues or result.errors:
|
||||
logger.info(
|
||||
"Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
|
||||
"stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
|
||||
result.execution_time_seconds,
|
||||
result.corrupted_objects,
|
||||
result.orphaned_objects,
|
||||
result.phantom_metadata,
|
||||
result.stale_versions,
|
||||
result.etag_cache_inconsistencies,
|
||||
result.legacy_metadata_drifts,
|
||||
result.issues_healed,
|
||||
len(result.errors),
|
||||
" (dry run)" if effective_dry_run else "",
|
||||
)
|
||||
|
||||
record = IntegrityExecutionRecord(
|
||||
timestamp=time.time(),
|
||||
result=result.to_dict(),
|
||||
dry_run=effective_dry_run,
|
||||
auto_heal=effective_auto_heal,
|
||||
)
|
||||
self.history_store.add(record)
|
||||
|
||||
record = IntegrityExecutionRecord(
|
||||
timestamp=time.time(),
|
||||
result=result.to_dict(),
|
||||
dry_run=effective_dry_run,
|
||||
auto_heal=effective_auto_heal,
|
||||
)
|
||||
self.history_store.add(record)
|
||||
return result
|
||||
finally:
|
||||
self._scanning = False
|
||||
self._scan_start_time = None
|
||||
self._lock.release()
|
||||
|
||||
return result
|
||||
def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool:
|
||||
if self._scanning:
|
||||
return False
|
||||
t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True)
|
||||
t.start()
|
||||
return True
|
||||
|
||||
def _system_path(self) -> Path:
|
||||
return self.storage_root / self.SYSTEM_ROOT
|
||||
@@ -728,11 +748,15 @@ class IntegrityChecker:
|
||||
return [r.to_dict() for r in records]
|
||||
|
||||
def get_status(self) -> dict:
|
||||
return {
|
||||
status: Dict[str, Any] = {
|
||||
"enabled": not self._shutdown or self._timer is not None,
|
||||
"running": self._timer is not None and not self._shutdown,
|
||||
"scanning": self._scanning,
|
||||
"interval_hours": self.interval_seconds / 3600.0,
|
||||
"batch_size": self.batch_size,
|
||||
"auto_heal": self.auto_heal,
|
||||
"dry_run": self.dry_run,
|
||||
}
|
||||
if self._scanning and self._scan_start_time is not None:
|
||||
status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1)
|
||||
return status
|
||||
|
||||
39
app/ui.py
39
app/ui.py
@@ -4202,11 +4202,46 @@ def system_integrity_run():
|
||||
return jsonify({"error": "Integrity checker is not enabled"}), 400
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
result = checker.run_now(
|
||||
started = checker.run_async(
|
||||
auto_heal=payload.get("auto_heal"),
|
||||
dry_run=payload.get("dry_run"),
|
||||
)
|
||||
return jsonify(result.to_dict())
|
||||
if not started:
|
||||
return jsonify({"error": "A scan is already in progress"}), 409
|
||||
return jsonify({"status": "started"})
|
||||
|
||||
|
||||
@ui_bp.get("/system/integrity/status")
|
||||
def system_integrity_status():
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_iam().authorize(principal, None, "iam:*")
|
||||
except IamError:
|
||||
return jsonify({"error": "Access denied"}), 403
|
||||
|
||||
checker = current_app.extensions.get("integrity")
|
||||
if not checker:
|
||||
return jsonify({"error": "Integrity checker is not enabled"}), 400
|
||||
|
||||
return jsonify(checker.get_status())
|
||||
|
||||
|
||||
@ui_bp.get("/system/integrity/history")
|
||||
def system_integrity_history():
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_iam().authorize(principal, None, "iam:*")
|
||||
except IamError:
|
||||
return jsonify({"error": "Access denied"}), 403
|
||||
|
||||
checker = current_app.extensions.get("integrity")
|
||||
if not checker:
|
||||
return jsonify({"executions": []})
|
||||
|
||||
limit = min(int(request.args.get("limit", 10)), 200)
|
||||
offset = int(request.args.get("offset", 0))
|
||||
records = checker.get_history(limit=limit, offset=offset)
|
||||
return jsonify({"executions": records})
|
||||
|
||||
|
||||
@ui_bp.app_errorhandler(404)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.4.0"
|
||||
APP_VERSION = "0.4.1"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
Reference in New Issue
Block a user