From a4ae81c77c4a90d6eb521ff655f747d10f19c153 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 10 Mar 2026 22:14:39 +0800 Subject: [PATCH] Add integrity scanner: background detection and healing of corrupted objects, orphaned files, phantom metadata, stale versions, etag cache inconsistencies, and legacy metadata drift --- app/__init__.py | 13 + app/admin_api.py | 52 +++ app/config.py | 22 +- app/integrity.py | 738 ++++++++++++++++++++++++++++++++++++++++ app/version.py | 2 +- docs.md | 63 ++++ templates/docs.html | 139 +++++++- tests/test_integrity.py | 499 +++++++++++++++++++++++++++ 8 files changed, 1509 insertions(+), 19 deletions(-) create mode 100644 app/integrity.py create mode 100644 tests/test_integrity.py diff --git a/app/__init__.py b/app/__init__.py index 161fa90..4b0a321 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -30,6 +30,7 @@ from .extensions import limiter, csrf from .iam import IamService from .kms import KMSManager from .gc import GarbageCollector +from .integrity import IntegrityChecker from .lifecycle import LifecycleManager from .notifications import NotificationService from .object_lock import ObjectLockService @@ -234,6 +235,17 @@ def create_app( ) gc_collector.start() + integrity_checker = None + if app.config.get("INTEGRITY_ENABLED", False): + integrity_checker = IntegrityChecker( + storage_root=storage_root, + interval_hours=app.config.get("INTEGRITY_INTERVAL_HOURS", 24.0), + batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000), + auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False), + dry_run=app.config.get("INTEGRITY_DRY_RUN", False), + ) + integrity_checker.start() + app.extensions["object_storage"] = storage app.extensions["iam"] = iam app.extensions["bucket_policies"] = bucket_policies @@ -246,6 +258,7 @@ def create_app( app.extensions["acl"] = acl_service app.extensions["lifecycle"] = lifecycle_manager app.extensions["gc"] = gc_collector + app.extensions["integrity"] = integrity_checker app.extensions["object_lock"] = object_lock_service app.extensions["notifications"] = notification_service app.extensions["access_logging"] = access_logging_service diff --git a/app/admin_api.py b/app/admin_api.py index 04d91c2..1d5d975 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -15,6 +15,7 @@ from flask import Blueprint, Response, current_app, jsonify, request from .connections import ConnectionStore from .extensions import limiter from .gc import GarbageCollector +from .integrity import IntegrityChecker from .iam import IamError, Principal from .replication import ReplicationManager from .site_registry import PeerSite, SiteInfo, SiteRegistry @@ -829,3 +830,54 @@ def gc_history(): offset = int(request.args.get("offset", 0)) records = gc.get_history(limit=limit, offset=offset) return jsonify({"executions": records}) + + +def _integrity() -> Optional[IntegrityChecker]: + return current_app.extensions.get("integrity") + + +@admin_api_bp.route("/integrity/status", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def integrity_status(): + principal, error = _require_admin() + if error: + return error + checker = _integrity() + if not checker: + return jsonify({"enabled": False, "message": "Integrity checker is not enabled. Set INTEGRITY_ENABLED=true to enable."}) + return jsonify(checker.get_status()) + + +@admin_api_bp.route("/integrity/run", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def integrity_run_now(): + principal, error = _require_admin() + if error: + return error + checker = _integrity() + if not checker: + return _json_error("InvalidRequest", "Integrity checker is not enabled", 400) + payload = request.get_json(silent=True) or {} + override_dry_run = payload.get("dry_run") + override_auto_heal = payload.get("auto_heal") + result = checker.run_now( + auto_heal=override_auto_heal if override_auto_heal is not None else None, + dry_run=override_dry_run if override_dry_run is not None else None, + ) + logger.info("Integrity manual run by %s", principal.access_key) + return jsonify(result.to_dict()) + + +@admin_api_bp.route("/integrity/history", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def integrity_history(): + principal, error = _require_admin() + if error: + return error + checker = _integrity() + if not checker: + return jsonify({"executions": []}) + limit = min(int(request.args.get("limit", 50)), 200) + offset = int(request.args.get("offset", 0)) + records = checker.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) diff --git a/app/config.py b/app/config.py index 7002d48..079fc50 100644 --- a/app/config.py +++ b/app/config.py @@ -156,6 +156,11 @@ class AppConfig: gc_multipart_max_age_days: int gc_lock_file_max_age_hours: float gc_dry_run: bool + integrity_enabled: bool + integrity_interval_hours: float + integrity_batch_size: int + integrity_auto_heal: bool + integrity_dry_run: bool @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -331,6 +336,11 @@ class AppConfig: gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7)) gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0)) gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} + integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0)) + integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000)) + integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"} + integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -424,7 +434,12 @@ class AppConfig: gc_temp_file_max_age_hours=gc_temp_file_max_age_hours, gc_multipart_max_age_days=gc_multipart_max_age_days, gc_lock_file_max_age_hours=gc_lock_file_max_age_hours, - gc_dry_run=gc_dry_run) + gc_dry_run=gc_dry_run, + integrity_enabled=integrity_enabled, + integrity_interval_hours=integrity_interval_hours, + integrity_batch_size=integrity_batch_size, + integrity_auto_heal=integrity_auto_heal, + integrity_dry_run=integrity_dry_run) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -641,4 +656,9 @@ class AppConfig: "GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days, "GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours, "GC_DRY_RUN": self.gc_dry_run, + "INTEGRITY_ENABLED": self.integrity_enabled, + "INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours, + "INTEGRITY_BATCH_SIZE": self.integrity_batch_size, + "INTEGRITY_AUTO_HEAL": self.integrity_auto_heal, + "INTEGRITY_DRY_RUN": self.integrity_dry_run, } diff --git a/app/integrity.py b/app/integrity.py new file mode 100644 index 0000000..4dc15d2 --- /dev/null +++ b/app/integrity.py @@ -0,0 +1,738 @@ +from __future__ import annotations + +import hashlib +import json +import logging +import os +import threading +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +try: + import myfsio_core as _rc + _HAS_RUST = True +except ImportError: + _HAS_RUST = False + +logger = logging.getLogger(__name__) + + +def _compute_etag(path: Path) -> str: + if _HAS_RUST: + return _rc.md5_file(str(path)) + checksum = hashlib.md5() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(8192), b""): + checksum.update(chunk) + return checksum.hexdigest() + + +@dataclass +class IntegrityIssue: + issue_type: str + bucket: str + key: str + detail: str + healed: bool = False + heal_action: str = "" + + def to_dict(self) -> dict: + return { + "issue_type": self.issue_type, + "bucket": self.bucket, + "key": self.key, + "detail": self.detail, + "healed": self.healed, + "heal_action": self.heal_action, + } + + +@dataclass +class IntegrityResult: + corrupted_objects: int = 0 + orphaned_objects: int = 0 + phantom_metadata: int = 0 + stale_versions: int = 0 + etag_cache_inconsistencies: int = 0 + legacy_metadata_drifts: int = 0 + issues_healed: int = 0 + issues: List[IntegrityIssue] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + objects_scanned: int = 0 + buckets_scanned: int = 0 + execution_time_seconds: float = 0.0 + + def to_dict(self) -> dict: + return { + "corrupted_objects": self.corrupted_objects, + "orphaned_objects": self.orphaned_objects, + "phantom_metadata": self.phantom_metadata, + "stale_versions": self.stale_versions, + "etag_cache_inconsistencies": self.etag_cache_inconsistencies, + "legacy_metadata_drifts": self.legacy_metadata_drifts, + "issues_healed": self.issues_healed, + "issues": [i.to_dict() for i in self.issues], + "errors": self.errors, + "objects_scanned": self.objects_scanned, + "buckets_scanned": self.buckets_scanned, + "execution_time_seconds": self.execution_time_seconds, + } + + @property + def total_issues(self) -> int: + return ( + self.corrupted_objects + + self.orphaned_objects + + self.phantom_metadata + + self.stale_versions + + self.etag_cache_inconsistencies + + self.legacy_metadata_drifts + ) + + @property + def has_issues(self) -> bool: + return self.total_issues > 0 + + +@dataclass +class IntegrityExecutionRecord: + timestamp: float + result: dict + dry_run: bool + auto_heal: bool + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp, + "result": self.result, + "dry_run": self.dry_run, + "auto_heal": self.auto_heal, + } + + @classmethod + def from_dict(cls, data: dict) -> IntegrityExecutionRecord: + return cls( + timestamp=data["timestamp"], + result=data["result"], + dry_run=data.get("dry_run", False), + auto_heal=data.get("auto_heal", False), + ) + + +class IntegrityHistoryStore: + def __init__(self, storage_root: Path, max_records: int = 50) -> None: + self.storage_root = storage_root + self.max_records = max_records + self._lock = threading.Lock() + + def _get_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "integrity_history.json" + + def load(self) -> List[IntegrityExecutionRecord]: + path = self._get_path() + if not path.exists(): + return [] + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return [IntegrityExecutionRecord.from_dict(d) for d in data.get("executions", [])] + except (OSError, ValueError, KeyError) as e: + logger.error("Failed to load integrity history: %s", e) + return [] + + def save(self, records: List[IntegrityExecutionRecord]) -> None: + path = self._get_path() + path.parent.mkdir(parents=True, exist_ok=True) + data = {"executions": [r.to_dict() for r in records[: self.max_records]]} + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error("Failed to save integrity history: %s", e) + + def add(self, record: IntegrityExecutionRecord) -> None: + with self._lock: + records = self.load() + records.insert(0, record) + self.save(records) + + def get_history(self, limit: int = 50, offset: int = 0) -> List[IntegrityExecutionRecord]: + return self.load()[offset : offset + limit] + + +MAX_ISSUES = 500 + + +class IntegrityChecker: + SYSTEM_ROOT = ".myfsio.sys" + SYSTEM_BUCKETS_DIR = "buckets" + BUCKET_META_DIR = "meta" + BUCKET_VERSIONS_DIR = "versions" + INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"} + + def __init__( + self, + storage_root: Path, + interval_hours: float = 24.0, + batch_size: int = 1000, + auto_heal: bool = False, + dry_run: bool = False, + max_history: int = 50, + ) -> None: + self.storage_root = Path(storage_root) + self.interval_seconds = interval_hours * 3600.0 + self.batch_size = batch_size + self.auto_heal = auto_heal + self.dry_run = dry_run + self._timer: Optional[threading.Timer] = None + self._shutdown = False + self._lock = threading.Lock() + self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) + + def start(self) -> None: + if self._timer is not None: + return + self._shutdown = False + self._schedule_next() + logger.info( + "Integrity checker started: interval=%.1fh, batch_size=%d, auto_heal=%s, dry_run=%s", + self.interval_seconds / 3600.0, + self.batch_size, + self.auto_heal, + self.dry_run, + ) + + def stop(self) -> None: + self._shutdown = True + if self._timer: + self._timer.cancel() + self._timer = None + logger.info("Integrity checker stopped") + + def _schedule_next(self) -> None: + if self._shutdown: + return + self._timer = threading.Timer(self.interval_seconds, self._run_cycle) + self._timer.daemon = True + self._timer.start() + + def _run_cycle(self) -> None: + if self._shutdown: + return + try: + self.run_now() + except Exception as e: + logger.error("Integrity check cycle failed: %s", e) + finally: + self._schedule_next() + + def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult: + effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal + effective_dry_run = dry_run if dry_run is not None else self.dry_run + + start = time.time() + result = IntegrityResult() + + bucket_names = self._list_bucket_names() + + for bucket_name in bucket_names: + if result.objects_scanned >= self.batch_size: + break + result.buckets_scanned += 1 + self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + + result.execution_time_seconds = time.time() - start + + if result.has_issues or result.errors: + logger.info( + "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " + "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", + result.execution_time_seconds, + result.corrupted_objects, + result.orphaned_objects, + result.phantom_metadata, + result.stale_versions, + result.etag_cache_inconsistencies, + result.legacy_metadata_drifts, + result.issues_healed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = IntegrityExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, + auto_heal=effective_auto_heal, + ) + self.history_store.add(record) + + return result + + def _system_path(self) -> Path: + return self.storage_root / self.SYSTEM_ROOT + + def _list_bucket_names(self) -> List[str]: + names = [] + try: + for entry in self.storage_root.iterdir(): + if entry.is_dir() and entry.name != self.SYSTEM_ROOT: + names.append(entry.name) + except OSError: + pass + return names + + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: + if len(result.issues) < MAX_ISSUES: + result.issues.append(issue) + + def _check_corrupted_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + bucket_path = self.storage_root / bucket_name + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + if not meta_root.exists(): + return + + try: + for index_file in meta_root.rglob("_index.json"): + if result.objects_scanned >= self.batch_size: + return + if not index_file.is_file(): + continue + try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + + for key_name, entry in list(index_data.items()): + if result.objects_scanned >= self.batch_size: + return + + rel_dir = index_file.parent.relative_to(meta_root) + if rel_dir == Path("."): + full_key = key_name + else: + full_key = rel_dir.as_posix() + "/" + key_name + + object_path = bucket_path / full_key + if not object_path.exists(): + continue + + result.objects_scanned += 1 + + meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} + stored_etag = meta.get("__etag__") + if not stored_etag: + continue + + try: + actual_etag = _compute_etag(object_path) + except OSError: + continue + + if actual_etag != stored_etag: + result.corrupted_objects += 1 + issue = IntegrityIssue( + issue_type="corrupted_object", + bucket=bucket_name, + key=full_key, + detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", + ) + + if auto_heal and not dry_run: + try: + stat = object_path.stat() + meta["__etag__"] = actual_etag + meta["__size__"] = str(stat.st_size) + meta["__last_modified__"] = str(stat.st_mtime) + index_data[key_name] = {"metadata": meta} + self._atomic_write_index(index_file, index_data) + issue.healed = True + issue.heal_action = "updated etag in index" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check corrupted {bucket_name}: {e}") + + def _check_orphaned_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + bucket_path = self.storage_root / bucket_name + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + try: + for entry in bucket_path.rglob("*"): + if result.objects_scanned >= self.batch_size: + return + if not entry.is_file(): + continue + try: + rel = entry.relative_to(bucket_path) + except ValueError: + continue + if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: + continue + + full_key = rel.as_posix() + key_name = rel.name + parent = rel.parent + + if parent == Path("."): + index_path = meta_root / "_index.json" + else: + index_path = meta_root / parent / "_index.json" + + has_entry = False + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + has_entry = key_name in index_data + except (OSError, json.JSONDecodeError): + pass + + if not has_entry: + result.orphaned_objects += 1 + issue = IntegrityIssue( + issue_type="orphaned_object", + bucket=bucket_name, + key=full_key, + detail="file exists without metadata entry", + ) + + if auto_heal and not dry_run: + try: + etag = _compute_etag(entry) + stat = entry.stat() + meta = { + "__etag__": etag, + "__size__": str(stat.st_size), + "__last_modified__": str(stat.st_mtime), + } + index_data = {} + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + index_data[key_name] = {"metadata": meta} + self._atomic_write_index(index_path, index_data) + issue.healed = True + issue.heal_action = "created metadata entry" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal orphaned {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check orphaned {bucket_name}: {e}") + + def _check_phantom_metadata( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + bucket_path = self.storage_root / bucket_name + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + if not meta_root.exists(): + return + + try: + for index_file in meta_root.rglob("_index.json"): + if not index_file.is_file(): + continue + try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + + keys_to_remove = [] + for key_name in list(index_data.keys()): + rel_dir = index_file.parent.relative_to(meta_root) + if rel_dir == Path("."): + full_key = key_name + else: + full_key = rel_dir.as_posix() + "/" + key_name + + object_path = bucket_path / full_key + if not object_path.exists(): + result.phantom_metadata += 1 + issue = IntegrityIssue( + issue_type="phantom_metadata", + bucket=bucket_name, + key=full_key, + detail="metadata entry without file on disk", + ) + if auto_heal and not dry_run: + keys_to_remove.append(key_name) + issue.healed = True + issue.heal_action = "removed stale index entry" + result.issues_healed += 1 + self._add_issue(result, issue) + + if keys_to_remove and auto_heal and not dry_run: + try: + for k in keys_to_remove: + index_data.pop(k, None) + if index_data: + self._atomic_write_index(index_file, index_data) + else: + index_file.unlink(missing_ok=True) + except OSError as e: + result.errors.append(f"heal phantom {bucket_name}: {e}") + except OSError as e: + result.errors.append(f"check phantom {bucket_name}: {e}") + + def _check_stale_versions( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR + + if not versions_root.exists(): + return + + try: + for key_dir in versions_root.rglob("*"): + if not key_dir.is_dir(): + continue + + bin_files = {f.stem: f for f in key_dir.glob("*.bin")} + json_files = {f.stem: f for f in key_dir.glob("*.json")} + + for stem, bin_file in bin_files.items(): + if stem not in json_files: + result.stale_versions += 1 + issue = IntegrityIssue( + issue_type="stale_version", + bucket=bucket_name, + key=f"{key_dir.relative_to(versions_root).as_posix()}/{bin_file.name}", + detail="version data without manifest", + ) + if auto_heal and not dry_run: + try: + bin_file.unlink(missing_ok=True) + issue.healed = True + issue.heal_action = "removed orphaned version data" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal stale version {bin_file}: {e}") + self._add_issue(result, issue) + + for stem, json_file in json_files.items(): + if stem not in bin_files: + result.stale_versions += 1 + issue = IntegrityIssue( + issue_type="stale_version", + bucket=bucket_name, + key=f"{key_dir.relative_to(versions_root).as_posix()}/{json_file.name}", + detail="version manifest without data", + ) + if auto_heal and not dry_run: + try: + json_file.unlink(missing_ok=True) + issue.healed = True + issue.heal_action = "removed orphaned version manifest" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal stale version {json_file}: {e}") + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check stale versions {bucket_name}: {e}") + + def _check_etag_cache( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json" + + if not etag_index_path.exists(): + return + + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + if not meta_root.exists(): + return + + try: + etag_cache = json.loads(etag_index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return + + found_mismatch = False + + for full_key, cached_etag in etag_cache.items(): + key_path = Path(full_key) + key_name = key_path.name + parent = key_path.parent + + if parent == Path("."): + index_path = meta_root / "_index.json" + else: + index_path = meta_root / parent / "_index.json" + + if not index_path.exists(): + continue + + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + + entry = index_data.get(key_name) + if not entry: + continue + + meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} + stored_etag = meta.get("__etag__") + + if stored_etag and cached_etag != stored_etag: + result.etag_cache_inconsistencies += 1 + found_mismatch = True + issue = IntegrityIssue( + issue_type="etag_cache_inconsistency", + bucket=bucket_name, + key=full_key, + detail=f"cached_etag={cached_etag} index_etag={stored_etag}", + ) + self._add_issue(result, issue) + + if found_mismatch and auto_heal and not dry_run: + try: + etag_index_path.unlink(missing_ok=True) + for issue in result.issues: + if issue.issue_type == "etag_cache_inconsistency" and issue.bucket == bucket_name and not issue.healed: + issue.healed = True + issue.heal_action = "deleted etag_index.json" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal etag cache {bucket_name}: {e}") + + def _check_legacy_metadata( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool + ) -> None: + legacy_meta_root = self.storage_root / bucket_name / ".meta" + if not legacy_meta_root.exists(): + return + + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + try: + for meta_file in legacy_meta_root.rglob("*.meta.json"): + if not meta_file.is_file(): + continue + + try: + rel = meta_file.relative_to(legacy_meta_root) + except ValueError: + continue + + full_key = rel.as_posix().removesuffix(".meta.json") + key_path = Path(full_key) + key_name = key_path.name + parent = key_path.parent + + if parent == Path("."): + index_path = meta_root / "_index.json" + else: + index_path = meta_root / parent / "_index.json" + + try: + legacy_data = json.loads(meta_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + continue + + index_entry = None + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + index_entry = index_data.get(key_name) + except (OSError, json.JSONDecodeError): + pass + + if index_entry is None: + result.legacy_metadata_drifts += 1 + issue = IntegrityIssue( + issue_type="legacy_metadata_drift", + bucket=bucket_name, + key=full_key, + detail="unmigrated legacy .meta.json", + ) + + if auto_heal and not dry_run: + try: + index_data = {} + if index_path.exists(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + index_data[key_name] = {"metadata": legacy_data} + self._atomic_write_index(index_path, index_data) + meta_file.unlink(missing_ok=True) + issue.healed = True + issue.heal_action = "migrated to index and deleted legacy file" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal legacy {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + else: + index_meta = index_entry.get("metadata", {}) if isinstance(index_entry, dict) else {} + if legacy_data != index_meta: + result.legacy_metadata_drifts += 1 + issue = IntegrityIssue( + issue_type="legacy_metadata_drift", + bucket=bucket_name, + key=full_key, + detail="legacy .meta.json differs from index entry", + ) + + if auto_heal and not dry_run: + try: + meta_file.unlink(missing_ok=True) + issue.healed = True + issue.heal_action = "deleted legacy file (index is authoritative)" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal legacy drift {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check legacy meta {bucket_name}: {e}") + + @staticmethod + def _atomic_write_index(index_path: Path, data: Dict[str, Any]) -> None: + index_path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = index_path.with_suffix(".tmp") + try: + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(data, f) + os.replace(str(tmp_path), str(index_path)) + except BaseException: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + raise + + def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]: + records = self.history_store.get_history(limit, offset) + return [r.to_dict() for r in records] + + def get_status(self) -> dict: + return { + "enabled": not self._shutdown or self._timer is not None, + "running": self._timer is not None and not self._shutdown, + "interval_hours": self.interval_seconds / 3600.0, + "batch_size": self.batch_size, + "auto_heal": self.auto_heal, + "dry_run": self.dry_run, + } diff --git a/app/version.py b/app/version.py index b4679f1..d429237 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.8" +APP_VERSION = "0.3.9" def get_version() -> str: diff --git a/docs.md b/docs.md index a63b72e..cc9d41b 100644 --- a/docs.md +++ b/docs.md @@ -356,6 +356,69 @@ The application automatically trusts these headers to generate correct presigned | `ALLOWED_REDIRECT_HOSTS` | `""` | Comma-separated whitelist of safe redirect targets. Empty allows only same-host redirects. | | `ALLOW_INTERNAL_ENDPOINTS` | `false` | Allow connections to internal/private IPs for webhooks and replication targets. **Keep disabled in production unless needed.** | +## Integrity Scanner + +The integrity scanner detects and optionally auto-repairs data inconsistencies: corrupted objects (ETag mismatch), orphaned files without metadata, phantom metadata without files, stale version archives, ETag cache drift, and unmigrated legacy `.meta.json` files. + +### Enabling Integrity Scanner + +By default, the integrity scanner is disabled. Enable it by setting: + +```bash +INTEGRITY_ENABLED=true python run.py +``` + +Or in your `myfsio.env` file: +``` +INTEGRITY_ENABLED=true +INTEGRITY_INTERVAL_HOURS=24 # Run every 24 hours (default) +INTEGRITY_BATCH_SIZE=1000 # Max objects to scan per cycle +INTEGRITY_AUTO_HEAL=false # Automatically repair detected issues +INTEGRITY_DRY_RUN=false # Set to true to log without healing +``` + +### What Gets Checked + +| Check | Detection | Heal Action | +|-------|-----------|-------------| +| **Corrupted objects** | File MD5 does not match stored `__etag__` | Update `__etag__` in index (disk data is authoritative) | +| **Orphaned objects** | File exists on disk without metadata entry | Create index entry with computed MD5/size/mtime | +| **Phantom metadata** | Index entry exists but file is missing from disk | Remove stale entry from `_index.json` | +| **Stale versions** | `.json` manifest without `.bin` data or vice versa | Remove orphaned version file | +| **ETag cache inconsistency** | `etag_index.json` entry differs from metadata `__etag__` | Delete `etag_index.json` (auto-rebuilt on next list) | +| **Legacy metadata drift** | Legacy `.meta.json` differs from index or is unmigrated | Migrate to index and delete legacy file | + +### Admin API + +All integrity endpoints require admin (`iam:*`) permissions. + +| Method | Route | Description | +|--------|-------|-------------| +| `GET` | `/admin/integrity/status` | Get scanner status and configuration | +| `POST` | `/admin/integrity/run` | Trigger a manual scan (body: `{"dry_run": true, "auto_heal": true}`) | +| `GET` | `/admin/integrity/history` | Get scan history (query: `?limit=50&offset=0`) | + +### Dry Run Mode + +Set `INTEGRITY_DRY_RUN=true` to log detected issues without making any changes. You can also trigger a one-time dry run via the admin API: + +```bash +curl -X POST "http://localhost:5000/admin/integrity/run" \ + -H "X-Access-Key: " -H "X-Secret-Key: " \ + -H "Content-Type: application/json" \ + -d '{"dry_run": true, "auto_heal": true}' +``` + +### Configuration Reference + +| Variable | Default | Description | +|----------|---------|-------------| +| `INTEGRITY_ENABLED` | `false` | Enable background integrity scanning | +| `INTEGRITY_INTERVAL_HOURS` | `24` | Hours between scan cycles | +| `INTEGRITY_BATCH_SIZE` | `1000` | Max objects to scan per cycle | +| `INTEGRITY_AUTO_HEAL` | `false` | Automatically repair detected issues | +| `INTEGRITY_DRY_RUN` | `false` | Log issues without healing | + ## 4. Upgrading and Updates ### Version Checking diff --git a/templates/docs.html b/templates/docs.html index e0a193e..145c90d 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -41,6 +41,7 @@
  • Encryption
  • Lifecycle Rules
  • Garbage Collection
  • +
  • Integrity Scanner
  • Metrics History
  • Operation Metrics
  • Troubleshooting
  • @@ -1731,10 +1732,114 @@ curl "{{ api_base }}/admin/gc/history?limit=10" \ -
    +
    15 +

    Integrity Scanner

    +
    +

    Detect and optionally auto-repair data inconsistencies: corrupted objects, orphaned files, phantom metadata, stale versions, ETag cache drift, and unmigrated legacy metadata.

    + +

    Enabling Integrity Scanner

    +

    Disabled by default. Enable via environment variable:

    +
    INTEGRITY_ENABLED=true python run.py
    + +

    Configuration

    +
    + + + + + + + + + + + + + + + +
    VariableDefaultDescription
    INTEGRITY_ENABLEDfalseEnable background integrity scanning
    INTEGRITY_INTERVAL_HOURS24Hours between scan cycles
    INTEGRITY_BATCH_SIZE1000Max objects to scan per cycle
    INTEGRITY_AUTO_HEALfalseAutomatically repair detected issues
    INTEGRITY_DRY_RUNfalseLog issues without healing
    +
    + +

    What Gets Checked

    +
    + + + + + + + + + + + + + + + + +
    CheckDetectionHeal Action
    Corrupted objectsFile MD5 does not match stored ETagUpdate ETag in index (disk is authoritative)
    Orphaned objectsFile exists without metadata entryCreate index entry with computed MD5/size/mtime
    Phantom metadataIndex entry exists but file is missingRemove stale entry from index
    Stale versionsManifest without data or vice versaRemove orphaned version file
    ETag cacheetag_index.json differs from metadataDelete cache file (auto-rebuilt)
    Legacy metadataLegacy .meta.json differs or unmigratedMigrate to index, delete legacy file
    +
    + +

    Admin API

    +
    + + + + + + + + + + + + + +
    MethodRouteDescription
    GET/admin/integrity/statusGet scanner status and configuration
    POST/admin/integrity/runTrigger manual scan
    GET/admin/integrity/historyGet scan history
    +
    + +
    # Trigger a dry run with auto-heal preview
    +curl -X POST "{{ api_base }}/admin/integrity/run" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "Content-Type: application/json" \
    +  -d '{"dry_run": true, "auto_heal": true}'
    +
    +# Trigger actual scan with healing
    +curl -X POST "{{ api_base }}/admin/integrity/run" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "Content-Type: application/json" \
    +  -d '{"auto_heal": true}'
    +
    +# Check status
    +curl "{{ api_base }}/admin/integrity/status" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    +
    +# View history
    +curl "{{ api_base }}/admin/integrity/history?limit=10" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    + +
    +
    + + + + +
    + Dry Run: Use INTEGRITY_DRY_RUN=true or pass {"dry_run": true} to the API to preview detected issues without making any changes. Combine with {"auto_heal": true} to see what would be repaired. +
    +
    +
    +
    +
    +
    +
    +
    + 16

    Metrics History

    Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.

    @@ -1818,7 +1923,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
    - 16 + 17

    Operation Metrics

    Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.

    @@ -1925,7 +2030,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
    - 17 + 18

    Troubleshooting & tips

    @@ -1976,7 +2081,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
    - 18 + 19

    Health Check Endpoint

    The API exposes a health check endpoint for monitoring and load balancer integration.

    @@ -1998,7 +2103,7 @@ curl {{ api_base }}/myfsio/health
    - 19 + 20

    Object Lock & Retention

    Object Lock prevents objects from being deleted or overwritten for a specified retention period.

    @@ -2058,7 +2163,7 @@ curl "{{ api_base }}/<bucket>/<key>?legal-hold" \
    - 20 + 21

    Access Logging

    Enable S3-style access logging to track all requests to your buckets for audit and analysis.

    @@ -2085,7 +2190,7 @@ curl "{{ api_base }}/<bucket>?logging" \
    - 21 + 22

    Notifications & Webhooks

    Configure event notifications to trigger webhooks when objects are created or deleted.

    @@ -2148,7 +2253,7 @@ curl -X PUT "{{ api_base }}/<bucket>?notification" \
    - 22 + 23

    SelectObjectContent (SQL)

    Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.

    @@ -2193,7 +2298,7 @@ curl -X POST "{{ api_base }}/<bucket>/data.csv?select" \
    - 23 + 24

    Advanced S3 Operations

    Copy, move, and partially download objects using advanced S3 operations.

    @@ -2267,7 +2372,7 @@ curl "{{ api_base }}/<bucket>/<key>" \
    - 24 + 25

    Access Control Lists (ACLs)

    ACLs provide legacy-style permission management for buckets and objects.

    @@ -2321,7 +2426,7 @@ curl -X PUT "{{ api_base }}/<bucket>/<key>" \
    - 25 + 26

    Object & Bucket Tagging

    Add metadata tags to buckets and objects for organization, cost allocation, or lifecycle rule filtering.

    @@ -2380,7 +2485,7 @@ curl -X PUT "{{ api_base }}/<bucket>?tagging" \
    - 26 + 27

    Static Website Hosting

    Host static websites directly from S3 buckets with custom index and error pages, served via custom domain mapping.

    @@ -2473,7 +2578,7 @@ server {
    - 27 + 28

    CORS Configuration

    Configure per-bucket Cross-Origin Resource Sharing rules to control which origins can access your bucket from a browser.

    @@ -2540,7 +2645,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
    - 28 + 29

    PostObject (HTML Form Upload)

    Upload objects directly from an HTML form using browser-based POST uploads with policy-based authorization.

    @@ -2582,7 +2687,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
    - 29 + 30

    List Objects API v2

    Use the v2 list API for improved pagination with continuation tokens instead of markers.

    @@ -2626,7 +2731,7 @@ curl "{{ api_base }}/<bucket>?list-type=2&start-after=photos/2025/" \
    - 30 + 31

    Upgrading & Updates

    How to safely update MyFSIO to a new version.

    @@ -2659,7 +2764,7 @@ cp -r logs/ logs-backup/
    - 31 + 32

    Full API Reference

    Complete list of all S3-compatible, admin, and KMS endpoints.

    diff --git a/tests/test_integrity.py b/tests/test_integrity.py new file mode 100644 index 0000000..f4b4299 --- /dev/null +++ b/tests/test_integrity.py @@ -0,0 +1,499 @@ +import hashlib +import json +import os +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from app.integrity import IntegrityChecker, IntegrityResult + + +def _md5(data: bytes) -> str: + return hashlib.md5(data).hexdigest() + + +def _setup_bucket(storage_root: Path, bucket_name: str, objects: dict[str, bytes]) -> None: + bucket_path = storage_root / bucket_name + bucket_path.mkdir(parents=True, exist_ok=True) + meta_root = storage_root / ".myfsio.sys" / "buckets" / bucket_name / "meta" + meta_root.mkdir(parents=True, exist_ok=True) + bucket_json = storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".bucket.json" + bucket_json.write_text(json.dumps({"created": "2025-01-01"})) + + for key, data in objects.items(): + obj_path = bucket_path / key + obj_path.parent.mkdir(parents=True, exist_ok=True) + obj_path.write_bytes(data) + + etag = _md5(data) + stat = obj_path.stat() + meta = { + "__etag__": etag, + "__size__": str(stat.st_size), + "__last_modified__": str(stat.st_mtime), + } + + key_path = Path(key) + parent = key_path.parent + key_name = key_path.name + if parent == Path("."): + index_path = meta_root / "_index.json" + else: + index_path = meta_root / parent / "_index.json" + index_path.parent.mkdir(parents=True, exist_ok=True) + + index_data = {} + if index_path.exists(): + index_data = json.loads(index_path.read_text()) + index_data[key_name] = {"metadata": meta} + index_path.write_text(json.dumps(index_data)) + + +def _issues_of_type(result, issue_type): + return [i for i in result.issues if i.issue_type == issue_type] + + +@pytest.fixture +def storage_root(tmp_path): + root = tmp_path / "data" + root.mkdir() + (root / ".myfsio.sys" / "config").mkdir(parents=True, exist_ok=True) + return root + + +@pytest.fixture +def checker(storage_root): + return IntegrityChecker( + storage_root=storage_root, + interval_hours=24.0, + batch_size=1000, + auto_heal=False, + dry_run=False, + ) + + +class TestCorruptedObjects: + def test_detect_corrupted(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"}) + (storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data") + + result = checker.run_now() + assert result.corrupted_objects == 1 + issues = _issues_of_type(result, "corrupted_object") + assert len(issues) == 1 + assert issues[0].bucket == "mybucket" + assert issues[0].key == "file.txt" + assert not issues[0].healed + + def test_heal_corrupted(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"}) + (storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data") + + result = checker.run_now(auto_heal=True) + assert result.corrupted_objects == 1 + assert result.issues_healed == 1 + issues = _issues_of_type(result, "corrupted_object") + assert issues[0].healed + + result2 = checker.run_now() + assert result2.corrupted_objects == 0 + + def test_valid_objects_pass(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"}) + + result = checker.run_now() + assert result.corrupted_objects == 0 + assert result.objects_scanned == 1 + + def test_corrupted_nested_key(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"}) + (storage_root / "mybucket" / "sub" / "dir" / "file.txt").write_bytes(b"bad") + + result = checker.run_now() + assert result.corrupted_objects == 1 + issues = _issues_of_type(result, "corrupted_object") + assert issues[0].key == "sub/dir/file.txt" + + +class TestOrphanedObjects: + def test_detect_orphaned(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {}) + (storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data") + + result = checker.run_now() + assert result.orphaned_objects == 1 + issues = _issues_of_type(result, "orphaned_object") + assert len(issues) == 1 + + def test_heal_orphaned(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {}) + (storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data") + + result = checker.run_now(auto_heal=True) + assert result.orphaned_objects == 1 + assert result.issues_healed == 1 + issues = _issues_of_type(result, "orphaned_object") + assert issues[0].healed + + result2 = checker.run_now() + assert result2.orphaned_objects == 0 + assert result2.objects_scanned >= 1 + + +class TestPhantomMetadata: + def test_detect_phantom(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + (storage_root / "mybucket" / "file.txt").unlink() + + result = checker.run_now() + assert result.phantom_metadata == 1 + issues = _issues_of_type(result, "phantom_metadata") + assert len(issues) == 1 + + def test_heal_phantom(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + (storage_root / "mybucket" / "file.txt").unlink() + + result = checker.run_now(auto_heal=True) + assert result.phantom_metadata == 1 + assert result.issues_healed == 1 + + result2 = checker.run_now() + assert result2.phantom_metadata == 0 + + +class TestStaleVersions: + def test_manifest_without_data(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt" + versions_root.mkdir(parents=True) + (versions_root / "v1.json").write_text(json.dumps({"etag": "abc"})) + + result = checker.run_now() + assert result.stale_versions == 1 + issues = _issues_of_type(result, "stale_version") + assert "manifest without data" in issues[0].detail + + def test_data_without_manifest(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt" + versions_root.mkdir(parents=True) + (versions_root / "v1.bin").write_bytes(b"old data") + + result = checker.run_now() + assert result.stale_versions == 1 + issues = _issues_of_type(result, "stale_version") + assert "data without manifest" in issues[0].detail + + def test_heal_stale_versions(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt" + versions_root.mkdir(parents=True) + (versions_root / "v1.json").write_text(json.dumps({"etag": "abc"})) + (versions_root / "v2.bin").write_bytes(b"old data") + + result = checker.run_now(auto_heal=True) + assert result.stale_versions == 2 + assert result.issues_healed == 2 + assert not (versions_root / "v1.json").exists() + assert not (versions_root / "v2.bin").exists() + + def test_valid_versions_pass(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt" + versions_root.mkdir(parents=True) + (versions_root / "v1.json").write_text(json.dumps({"etag": "abc"})) + (versions_root / "v1.bin").write_bytes(b"old data") + + result = checker.run_now() + assert result.stale_versions == 0 + + +class TestEtagCache: + def test_detect_mismatch(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json" + etag_path.write_text(json.dumps({"file.txt": "wrong_etag"})) + + result = checker.run_now() + assert result.etag_cache_inconsistencies == 1 + issues = _issues_of_type(result, "etag_cache_inconsistency") + assert len(issues) == 1 + + def test_heal_mismatch(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json" + etag_path.write_text(json.dumps({"file.txt": "wrong_etag"})) + + result = checker.run_now(auto_heal=True) + assert result.etag_cache_inconsistencies == 1 + assert result.issues_healed == 1 + assert not etag_path.exists() + + +class TestLegacyMetadata: + def test_detect_unmigrated(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json" + legacy_meta.parent.mkdir(parents=True) + legacy_meta.write_text(json.dumps({"__etag__": "different_value"})) + + meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta" + index_path = meta_root / "_index.json" + index_path.unlink() + + result = checker.run_now() + assert result.legacy_metadata_drifts == 1 + issues = _issues_of_type(result, "legacy_metadata_drift") + assert len(issues) == 1 + assert issues[0].detail == "unmigrated legacy .meta.json" + + def test_detect_drift(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json" + legacy_meta.parent.mkdir(parents=True) + legacy_meta.write_text(json.dumps({"__etag__": "different_value"})) + + result = checker.run_now() + assert result.legacy_metadata_drifts == 1 + issues = _issues_of_type(result, "legacy_metadata_drift") + assert "differs from index" in issues[0].detail + + def test_heal_unmigrated(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json" + legacy_meta.parent.mkdir(parents=True) + legacy_data = {"__etag__": _md5(b"hello"), "__size__": "5"} + legacy_meta.write_text(json.dumps(legacy_data)) + + meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta" + index_path = meta_root / "_index.json" + index_path.unlink() + + result = checker.run_now(auto_heal=True) + assert result.legacy_metadata_drifts == 1 + legacy_issues = _issues_of_type(result, "legacy_metadata_drift") + assert len(legacy_issues) == 1 + assert legacy_issues[0].healed + assert not legacy_meta.exists() + + index_data = json.loads(index_path.read_text()) + assert "file.txt" in index_data + assert index_data["file.txt"]["metadata"]["__etag__"] == _md5(b"hello") + + def test_heal_drift(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json" + legacy_meta.parent.mkdir(parents=True) + legacy_meta.write_text(json.dumps({"__etag__": "different_value"})) + + result = checker.run_now(auto_heal=True) + assert result.legacy_metadata_drifts == 1 + legacy_issues = _issues_of_type(result, "legacy_metadata_drift") + assert legacy_issues[0].healed + assert not legacy_meta.exists() + + +class TestDryRun: + def test_dry_run_no_changes(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + (storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted") + (storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan") + + result = checker.run_now(auto_heal=True, dry_run=True) + assert result.corrupted_objects == 1 + assert result.orphaned_objects == 1 + assert result.issues_healed == 0 + + meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta" + index_data = json.loads((meta_root / "_index.json").read_text()) + assert "orphan.txt" not in index_data + + +class TestBatchSize: + def test_batch_limits_scan(self, storage_root): + objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)} + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker( + storage_root=storage_root, + batch_size=3, + ) + result = checker.run_now() + assert result.objects_scanned <= 3 + + +class TestHistory: + def test_history_recorded(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker.run_now() + history = checker.get_history() + assert len(history) == 1 + assert "corrupted_objects" in history[0]["result"] + + def test_history_multiple(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker.run_now() + checker.run_now() + checker.run_now() + history = checker.get_history() + assert len(history) == 3 + + def test_history_pagination(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + for _ in range(5): + checker.run_now() + + history = checker.get_history(limit=2, offset=1) + assert len(history) == 2 + + +AUTH_HEADERS = {"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"} + + +class TestAdminAPI: + @pytest.fixture + def integrity_app(self, tmp_path): + from app import create_api_app + storage_root = tmp_path / "data" + iam_config = tmp_path / "iam.json" + bucket_policies = tmp_path / "bucket_policies.json" + iam_payload = { + "users": [ + { + "access_key": "admin", + "secret_key": "adminsecret", + "display_name": "Admin", + "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}], + } + ] + } + iam_config.write_text(json.dumps(iam_payload)) + flask_app = create_api_app({ + "TESTING": True, + "SECRET_KEY": "testing", + "STORAGE_ROOT": storage_root, + "IAM_CONFIG": iam_config, + "BUCKET_POLICY_PATH": bucket_policies, + "API_BASE_URL": "http://testserver", + "INTEGRITY_ENABLED": True, + "INTEGRITY_AUTO_HEAL": False, + "INTEGRITY_DRY_RUN": False, + }) + yield flask_app + storage = flask_app.extensions.get("object_storage") + if storage: + base = getattr(storage, "storage", storage) + if hasattr(base, "shutdown_stats"): + base.shutdown_stats() + ic = flask_app.extensions.get("integrity") + if ic: + ic.stop() + + def test_status_endpoint(self, integrity_app): + client = integrity_app.test_client() + resp = client.get("/admin/integrity/status", headers=AUTH_HEADERS) + assert resp.status_code == 200 + data = resp.get_json() + assert data["enabled"] is True + assert "interval_hours" in data + + def test_run_endpoint(self, integrity_app): + client = integrity_app.test_client() + resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) + assert resp.status_code == 200 + data = resp.get_json() + assert "corrupted_objects" in data + assert "objects_scanned" in data + + def test_run_with_overrides(self, integrity_app): + client = integrity_app.test_client() + resp = client.post( + "/admin/integrity/run", + headers=AUTH_HEADERS, + json={"dry_run": True, "auto_heal": True}, + ) + assert resp.status_code == 200 + + def test_history_endpoint(self, integrity_app): + client = integrity_app.test_client() + client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={}) + resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS) + assert resp.status_code == 200 + data = resp.get_json() + assert "executions" in data + assert len(data["executions"]) >= 1 + + def test_auth_required(self, integrity_app): + client = integrity_app.test_client() + resp = client.get("/admin/integrity/status") + assert resp.status_code in (401, 403) + + def test_disabled_status(self, tmp_path): + from app import create_api_app + storage_root = tmp_path / "data2" + iam_config = tmp_path / "iam2.json" + bucket_policies = tmp_path / "bp2.json" + iam_payload = { + "users": [ + { + "access_key": "admin", + "secret_key": "adminsecret", + "display_name": "Admin", + "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}], + } + ] + } + iam_config.write_text(json.dumps(iam_payload)) + flask_app = create_api_app({ + "TESTING": True, + "SECRET_KEY": "testing", + "STORAGE_ROOT": storage_root, + "IAM_CONFIG": iam_config, + "BUCKET_POLICY_PATH": bucket_policies, + "API_BASE_URL": "http://testserver", + "INTEGRITY_ENABLED": False, + }) + c = flask_app.test_client() + resp = c.get("/admin/integrity/status", headers=AUTH_HEADERS) + assert resp.status_code == 200 + data = resp.get_json() + assert data["enabled"] is False + + storage = flask_app.extensions.get("object_storage") + if storage: + base = getattr(storage, "storage", storage) + if hasattr(base, "shutdown_stats"): + base.shutdown_stats() + + +class TestMultipleBuckets: + def test_scans_multiple_buckets(self, storage_root, checker): + _setup_bucket(storage_root, "bucket1", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket2", {"b.txt": b"bbb"}) + + result = checker.run_now() + assert result.buckets_scanned == 2 + assert result.objects_scanned == 2 + assert result.corrupted_objects == 0 + + +class TestGetStatus: + def test_status_fields(self, checker): + status = checker.get_status() + assert "enabled" in status + assert "running" in status + assert "interval_hours" in status + assert "batch_size" in status + assert "auto_heal" in status + assert "dry_run" in status