from __future__ import annotations import hashlib import json import logging import os import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional try: import myfsio_core as _rc if not hasattr(_rc, "md5_file"): raise ImportError("myfsio_core is outdated, rebuild with: cd myfsio_core && maturin develop --release") _HAS_RUST = True except ImportError: _HAS_RUST = False logger = logging.getLogger(__name__) def _compute_etag(path: Path) -> str: if _HAS_RUST: return _rc.md5_file(str(path)) checksum = hashlib.md5() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(8192), b""): checksum.update(chunk) return checksum.hexdigest() @dataclass class IntegrityIssue: issue_type: str bucket: str key: str detail: str healed: bool = False heal_action: str = "" def to_dict(self) -> dict: return { "issue_type": self.issue_type, "bucket": self.bucket, "key": self.key, "detail": self.detail, "healed": self.healed, "heal_action": self.heal_action, } @dataclass class IntegrityResult: corrupted_objects: int = 0 orphaned_objects: int = 0 phantom_metadata: int = 0 stale_versions: int = 0 etag_cache_inconsistencies: int = 0 legacy_metadata_drifts: int = 0 issues_healed: int = 0 issues: List[IntegrityIssue] = field(default_factory=list) errors: List[str] = field(default_factory=list) objects_scanned: int = 0 buckets_scanned: int = 0 execution_time_seconds: float = 0.0 def to_dict(self) -> dict: return { "corrupted_objects": self.corrupted_objects, "orphaned_objects": self.orphaned_objects, "phantom_metadata": self.phantom_metadata, "stale_versions": self.stale_versions, "etag_cache_inconsistencies": self.etag_cache_inconsistencies, "legacy_metadata_drifts": self.legacy_metadata_drifts, "issues_healed": self.issues_healed, "issues": [i.to_dict() for i in self.issues], "errors": self.errors, "objects_scanned": self.objects_scanned, "buckets_scanned": self.buckets_scanned, "execution_time_seconds": self.execution_time_seconds, } @property def total_issues(self) -> int: return ( self.corrupted_objects + self.orphaned_objects + self.phantom_metadata + self.stale_versions + self.etag_cache_inconsistencies + self.legacy_metadata_drifts ) @property def has_issues(self) -> bool: return self.total_issues > 0 @dataclass class IntegrityExecutionRecord: timestamp: float result: dict dry_run: bool auto_heal: bool def to_dict(self) -> dict: return { "timestamp": self.timestamp, "result": self.result, "dry_run": self.dry_run, "auto_heal": self.auto_heal, } @classmethod def from_dict(cls, data: dict) -> IntegrityExecutionRecord: return cls( timestamp=data["timestamp"], result=data["result"], dry_run=data.get("dry_run", False), auto_heal=data.get("auto_heal", False), ) class IntegrityHistoryStore: def __init__(self, storage_root: Path, max_records: int = 50) -> None: self.storage_root = storage_root self.max_records = max_records self._lock = threading.Lock() def _get_path(self) -> Path: return self.storage_root / ".myfsio.sys" / "config" / "integrity_history.json" def load(self) -> List[IntegrityExecutionRecord]: path = self._get_path() if not path.exists(): return [] try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return [IntegrityExecutionRecord.from_dict(d) for d in data.get("executions", [])] except (OSError, ValueError, KeyError) as e: logger.error("Failed to load integrity history: %s", e) return [] def save(self, records: List[IntegrityExecutionRecord]) -> None: path = self._get_path() path.parent.mkdir(parents=True, exist_ok=True) data = {"executions": [r.to_dict() for r in records[: self.max_records]]} try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except OSError as e: logger.error("Failed to save integrity history: %s", e) def add(self, record: IntegrityExecutionRecord) -> None: with self._lock: records = self.load() records.insert(0, record) self.save(records) def get_history(self, limit: int = 50, offset: int = 0) -> List[IntegrityExecutionRecord]: return self.load()[offset : offset + limit] class IntegrityCursorStore: def __init__(self, storage_root: Path) -> None: self.storage_root = storage_root self._lock = threading.Lock() def _get_path(self) -> Path: return self.storage_root / ".myfsio.sys" / "config" / "integrity_cursor.json" def load(self) -> Dict[str, Any]: path = self._get_path() if not path.exists(): return {"buckets": {}} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data.get("buckets"), dict): return {"buckets": {}} return data except (OSError, ValueError, KeyError): return {"buckets": {}} def save(self, data: Dict[str, Any]) -> None: path = self._get_path() path.parent.mkdir(parents=True, exist_ok=True) try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except OSError as e: logger.error("Failed to save integrity cursor: %s", e) def update_bucket( self, bucket_name: str, timestamp: float, last_key: Optional[str] = None, completed: bool = False, ) -> None: with self._lock: data = self.load() entry = data["buckets"].get(bucket_name, {}) if completed: entry["last_scanned"] = timestamp entry.pop("last_key", None) entry["completed"] = True else: entry["last_scanned"] = timestamp if last_key is not None: entry["last_key"] = last_key entry["completed"] = False data["buckets"][bucket_name] = entry self.save(data) def clean_stale(self, existing_buckets: List[str]) -> None: with self._lock: data = self.load() existing_set = set(existing_buckets) stale_keys = [k for k in data["buckets"] if k not in existing_set] if stale_keys: for k in stale_keys: del data["buckets"][k] self.save(data) def get_last_key(self, bucket_name: str) -> Optional[str]: data = self.load() entry = data.get("buckets", {}).get(bucket_name) if entry is None: return None return entry.get("last_key") def get_bucket_order(self, bucket_names: List[str]) -> List[str]: data = self.load() buckets_info = data.get("buckets", {}) incomplete = [] complete = [] for name in bucket_names: entry = buckets_info.get(name) if entry is None: incomplete.append((name, 0.0)) elif entry.get("last_key") is not None: incomplete.append((name, entry.get("last_scanned", 0.0))) else: complete.append((name, entry.get("last_scanned", 0.0))) incomplete.sort(key=lambda x: x[1]) complete.sort(key=lambda x: x[1]) return [n for n, _ in incomplete] + [n for n, _ in complete] def get_info(self) -> Dict[str, Any]: data = self.load() buckets = data.get("buckets", {}) return { "tracked_buckets": len(buckets), "buckets": { name: { "last_scanned": info.get("last_scanned"), "last_key": info.get("last_key"), "completed": info.get("completed", False), } for name, info in buckets.items() }, } MAX_ISSUES = 500 class IntegrityChecker: SYSTEM_ROOT = ".myfsio.sys" SYSTEM_BUCKETS_DIR = "buckets" BUCKET_META_DIR = "meta" BUCKET_VERSIONS_DIR = "versions" INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"} def __init__( self, storage_root: Path, interval_hours: float = 24.0, batch_size: int = 1000, auto_heal: bool = False, dry_run: bool = False, max_history: int = 50, io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 self.batch_size = batch_size self.auto_heal = auto_heal self.dry_run = dry_run self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() self._scanning = False self._scan_start_time: Optional[float] = None self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) self.cursor_store = IntegrityCursorStore(self.storage_root) def start(self) -> None: if self._timer is not None: return self._shutdown = False self._schedule_next() logger.info( "Integrity checker started: interval=%.1fh, batch_size=%d, auto_heal=%s, dry_run=%s", self.interval_seconds / 3600.0, self.batch_size, self.auto_heal, self.dry_run, ) def stop(self) -> None: self._shutdown = True if self._timer: self._timer.cancel() self._timer = None logger.info("Integrity checker stopped") def _schedule_next(self) -> None: if self._shutdown: return self._timer = threading.Timer(self.interval_seconds, self._run_cycle) self._timer.daemon = True self._timer.start() def _run_cycle(self) -> None: if self._shutdown: return try: self.run_now() except Exception as e: logger.error("Integrity check cycle failed: %s", e) finally: self._schedule_next() def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult: if not self._lock.acquire(blocking=False): raise RuntimeError("Integrity scan is already in progress") try: self._scanning = True self._scan_start_time = time.time() effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal effective_dry_run = dry_run if dry_run is not None else self.dry_run start = self._scan_start_time result = IntegrityResult() bucket_names = self._list_bucket_names() self.cursor_store.clean_stale(bucket_names) ordered_buckets = self.cursor_store.get_bucket_order(bucket_names) for bucket_name in ordered_buckets: if self._batch_exhausted(result): break result.buckets_scanned += 1 cursor_key = self.cursor_store.get_last_key(bucket_name) key_corrupted = self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) key_orphaned = self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) key_phantom = self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) returned_keys = [k for k in (key_corrupted, key_orphaned, key_phantom) if k is not None] bucket_exhausted = self._batch_exhausted(result) if bucket_exhausted and returned_keys: self.cursor_store.update_bucket(bucket_name, time.time(), last_key=min(returned_keys)) else: self.cursor_store.update_bucket(bucket_name, time.time(), completed=True) result.execution_time_seconds = time.time() - start if result.has_issues or result.errors: logger.info( "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", result.execution_time_seconds, result.corrupted_objects, result.orphaned_objects, result.phantom_metadata, result.stale_versions, result.etag_cache_inconsistencies, result.legacy_metadata_drifts, result.issues_healed, len(result.errors), " (dry run)" if effective_dry_run else "", ) record = IntegrityExecutionRecord( timestamp=time.time(), result=result.to_dict(), dry_run=effective_dry_run, auto_heal=effective_auto_heal, ) self.history_store.add(record) return result finally: self._scanning = False self._scan_start_time = None self._lock.release() def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool: if self._scanning: return False t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True) t.start() return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT def _list_bucket_names(self) -> List[str]: names = [] try: for entry in self.storage_root.iterdir(): if entry.is_dir() and entry.name != self.SYSTEM_ROOT: names.append(entry.name) except OSError: pass return names def _throttle(self) -> bool: if self._shutdown: return True if self._io_throttle > 0: time.sleep(self._io_throttle) return self._shutdown def _batch_exhausted(self, result: IntegrityResult) -> bool: return self._shutdown or result.objects_scanned >= self.batch_size def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) def _collect_index_keys( self, meta_root: Path, cursor_key: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: all_keys: Dict[str, Dict[str, Any]] = {} if not meta_root.exists(): return all_keys try: for index_file in meta_root.rglob("_index.json"): if not index_file.is_file(): continue rel_dir = index_file.parent.relative_to(meta_root) dir_prefix = "" if rel_dir == Path(".") else rel_dir.as_posix() if cursor_key is not None and dir_prefix: full_prefix = dir_prefix + "/" if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: continue try: index_data = json.loads(index_file.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue for key_name, entry in index_data.items(): full_key = (dir_prefix + "/" + key_name) if dir_prefix else key_name if cursor_key is not None and full_key <= cursor_key: continue all_keys[full_key] = { "entry": entry, "index_file": index_file, "key_name": key_name, } except OSError: pass return all_keys def _walk_bucket_files_sorted( self, bucket_path: Path, cursor_key: Optional[str] = None, ): def _walk(dir_path: Path, prefix: str): try: entries = list(os.scandir(dir_path)) except OSError: return def _sort_key(e): if e.is_dir(follow_symlinks=False): return e.name + "/" return e.name entries.sort(key=_sort_key) for entry in entries: if entry.is_dir(follow_symlinks=False): if not prefix and entry.name in self.INTERNAL_FOLDERS: continue new_prefix = (prefix + "/" + entry.name) if prefix else entry.name if cursor_key is not None: full_prefix = new_prefix + "/" if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: continue yield from _walk(Path(entry.path), new_prefix) elif entry.is_file(follow_symlinks=False): full_key = (prefix + "/" + entry.name) if prefix else entry.name if cursor_key is not None and full_key <= cursor_key: continue yield full_key yield from _walk(bucket_path, "") def _check_corrupted_objects( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, cursor_key: Optional[str] = None, ) -> Optional[str]: if self._batch_exhausted(result): return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR if not meta_root.exists(): return None last_key = None try: all_keys = self._collect_index_keys(meta_root, cursor_key) sorted_keys = sorted(all_keys.keys()) for full_key in sorted_keys: if self._throttle(): return last_key if self._batch_exhausted(result): return last_key info = all_keys[full_key] entry = info["entry"] index_file = info["index_file"] key_name = info["key_name"] object_path = bucket_path / full_key if not object_path.exists(): continue result.objects_scanned += 1 last_key = full_key meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} stored_etag = meta.get("__etag__") if not stored_etag: continue try: actual_etag = _compute_etag(object_path) except OSError: continue if actual_etag != stored_etag: result.corrupted_objects += 1 issue = IntegrityIssue( issue_type="corrupted_object", bucket=bucket_name, key=full_key, detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", ) if auto_heal and not dry_run: try: stat = object_path.stat() meta["__etag__"] = actual_etag meta["__size__"] = str(stat.st_size) meta["__last_modified__"] = str(stat.st_mtime) try: index_data = json.loads(index_file.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): index_data = {} index_data[key_name] = {"metadata": meta} self._atomic_write_index(index_file, index_data) issue.healed = True issue.heal_action = "updated etag in index" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") self._add_issue(result, issue) except OSError as e: result.errors.append(f"check corrupted {bucket_name}: {e}") return last_key def _check_orphaned_objects( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, cursor_key: Optional[str] = None, ) -> Optional[str]: if self._batch_exhausted(result): return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR last_key = None try: for full_key in self._walk_bucket_files_sorted(bucket_path, cursor_key): if self._throttle(): return last_key if self._batch_exhausted(result): return last_key result.objects_scanned += 1 last_key = full_key key_path = Path(full_key) key_name = key_path.name parent = key_path.parent if parent == Path("."): index_path = meta_root / "_index.json" else: index_path = meta_root / parent / "_index.json" has_entry = False if index_path.exists(): try: index_data = json.loads(index_path.read_text(encoding="utf-8")) has_entry = key_name in index_data except (OSError, json.JSONDecodeError): pass if not has_entry: result.orphaned_objects += 1 issue = IntegrityIssue( issue_type="orphaned_object", bucket=bucket_name, key=full_key, detail="file exists without metadata entry", ) if auto_heal and not dry_run: try: object_path = bucket_path / full_key etag = _compute_etag(object_path) stat = object_path.stat() meta = { "__etag__": etag, "__size__": str(stat.st_size), "__last_modified__": str(stat.st_mtime), } index_data = {} if index_path.exists(): try: index_data = json.loads(index_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass index_data[key_name] = {"metadata": meta} self._atomic_write_index(index_path, index_data) issue.healed = True issue.heal_action = "created metadata entry" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal orphaned {bucket_name}/{full_key}: {e}") self._add_issue(result, issue) except OSError as e: result.errors.append(f"check orphaned {bucket_name}: {e}") return last_key def _check_phantom_metadata( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, cursor_key: Optional[str] = None, ) -> Optional[str]: if self._batch_exhausted(result): return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR if not meta_root.exists(): return None last_key = None try: all_keys = self._collect_index_keys(meta_root, cursor_key) sorted_keys = sorted(all_keys.keys()) heal_by_index: Dict[Path, List[str]] = {} for full_key in sorted_keys: if self._batch_exhausted(result): break result.objects_scanned += 1 last_key = full_key object_path = bucket_path / full_key if not object_path.exists(): result.phantom_metadata += 1 info = all_keys[full_key] issue = IntegrityIssue( issue_type="phantom_metadata", bucket=bucket_name, key=full_key, detail="metadata entry without file on disk", ) if auto_heal and not dry_run: index_file = info["index_file"] heal_by_index.setdefault(index_file, []).append(info["key_name"]) issue.healed = True issue.heal_action = "removed stale index entry" result.issues_healed += 1 self._add_issue(result, issue) if heal_by_index and auto_heal and not dry_run: for index_file, keys_to_remove in heal_by_index.items(): try: index_data = json.loads(index_file.read_text(encoding="utf-8")) for k in keys_to_remove: index_data.pop(k, None) if index_data: self._atomic_write_index(index_file, index_data) else: index_file.unlink(missing_ok=True) except OSError as e: result.errors.append(f"heal phantom {bucket_name}: {e}") except OSError as e: result.errors.append(f"check phantom {bucket_name}: {e}") return last_key def _check_stale_versions( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: if self._batch_exhausted(result): return versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR if not versions_root.exists(): return try: for key_dir in versions_root.rglob("*"): if self._throttle(): return if self._batch_exhausted(result): return if not key_dir.is_dir(): continue bin_files = {f.stem: f for f in key_dir.glob("*.bin")} json_files = {f.stem: f for f in key_dir.glob("*.json")} for stem, bin_file in bin_files.items(): if self._batch_exhausted(result): return result.objects_scanned += 1 if stem not in json_files: result.stale_versions += 1 issue = IntegrityIssue( issue_type="stale_version", bucket=bucket_name, key=f"{key_dir.relative_to(versions_root).as_posix()}/{bin_file.name}", detail="version data without manifest", ) if auto_heal and not dry_run: try: bin_file.unlink(missing_ok=True) issue.healed = True issue.heal_action = "removed orphaned version data" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal stale version {bin_file}: {e}") self._add_issue(result, issue) for stem, json_file in json_files.items(): if self._batch_exhausted(result): return result.objects_scanned += 1 if stem not in bin_files: result.stale_versions += 1 issue = IntegrityIssue( issue_type="stale_version", bucket=bucket_name, key=f"{key_dir.relative_to(versions_root).as_posix()}/{json_file.name}", detail="version manifest without data", ) if auto_heal and not dry_run: try: json_file.unlink(missing_ok=True) issue.healed = True issue.heal_action = "removed orphaned version manifest" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal stale version {json_file}: {e}") self._add_issue(result, issue) except OSError as e: result.errors.append(f"check stale versions {bucket_name}: {e}") def _check_etag_cache( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: if self._batch_exhausted(result): return etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json" if not etag_index_path.exists(): return meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR if not meta_root.exists(): return try: etag_cache = json.loads(etag_index_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return found_mismatch = False for full_key, cached_etag in etag_cache.items(): if self._batch_exhausted(result): break result.objects_scanned += 1 key_path = Path(full_key) key_name = key_path.name parent = key_path.parent if parent == Path("."): index_path = meta_root / "_index.json" else: index_path = meta_root / parent / "_index.json" if not index_path.exists(): continue try: index_data = json.loads(index_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue entry = index_data.get(key_name) if not entry: continue meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} stored_etag = meta.get("__etag__") if stored_etag and cached_etag != stored_etag: result.etag_cache_inconsistencies += 1 found_mismatch = True issue = IntegrityIssue( issue_type="etag_cache_inconsistency", bucket=bucket_name, key=full_key, detail=f"cached_etag={cached_etag} index_etag={stored_etag}", ) self._add_issue(result, issue) if found_mismatch and auto_heal and not dry_run: try: etag_index_path.unlink(missing_ok=True) for issue in result.issues: if issue.issue_type == "etag_cache_inconsistency" and issue.bucket == bucket_name and not issue.healed: issue.healed = True issue.heal_action = "deleted etag_index.json" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal etag cache {bucket_name}: {e}") def _check_legacy_metadata( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: if self._batch_exhausted(result): return legacy_meta_root = self.storage_root / bucket_name / ".meta" if not legacy_meta_root.exists(): return meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR try: for meta_file in legacy_meta_root.rglob("*.meta.json"): if self._throttle(): return if self._batch_exhausted(result): return if not meta_file.is_file(): continue result.objects_scanned += 1 try: rel = meta_file.relative_to(legacy_meta_root) except ValueError: continue full_key = rel.as_posix().removesuffix(".meta.json") key_path = Path(full_key) key_name = key_path.name parent = key_path.parent if parent == Path("."): index_path = meta_root / "_index.json" else: index_path = meta_root / parent / "_index.json" try: legacy_data = json.loads(meta_file.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue index_entry = None if index_path.exists(): try: index_data = json.loads(index_path.read_text(encoding="utf-8")) index_entry = index_data.get(key_name) except (OSError, json.JSONDecodeError): pass if index_entry is None: result.legacy_metadata_drifts += 1 issue = IntegrityIssue( issue_type="legacy_metadata_drift", bucket=bucket_name, key=full_key, detail="unmigrated legacy .meta.json", ) if auto_heal and not dry_run: try: index_data = {} if index_path.exists(): try: index_data = json.loads(index_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass index_data[key_name] = {"metadata": legacy_data} self._atomic_write_index(index_path, index_data) meta_file.unlink(missing_ok=True) issue.healed = True issue.heal_action = "migrated to index and deleted legacy file" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal legacy {bucket_name}/{full_key}: {e}") self._add_issue(result, issue) else: index_meta = index_entry.get("metadata", {}) if isinstance(index_entry, dict) else {} if legacy_data != index_meta: result.legacy_metadata_drifts += 1 issue = IntegrityIssue( issue_type="legacy_metadata_drift", bucket=bucket_name, key=full_key, detail="legacy .meta.json differs from index entry", ) if auto_heal and not dry_run: try: meta_file.unlink(missing_ok=True) issue.healed = True issue.heal_action = "deleted legacy file (index is authoritative)" result.issues_healed += 1 except OSError as e: result.errors.append(f"heal legacy drift {bucket_name}/{full_key}: {e}") self._add_issue(result, issue) except OSError as e: result.errors.append(f"check legacy meta {bucket_name}: {e}") @staticmethod def _atomic_write_index(index_path: Path, data: Dict[str, Any]) -> None: index_path.parent.mkdir(parents=True, exist_ok=True) tmp_path = index_path.with_suffix(".tmp") try: with open(tmp_path, "w", encoding="utf-8") as f: json.dump(data, f) os.replace(str(tmp_path), str(index_path)) except BaseException: try: tmp_path.unlink(missing_ok=True) except OSError: pass raise def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]: records = self.history_store.get_history(limit, offset) return [r.to_dict() for r in records] def get_status(self) -> dict: status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, "io_throttle_ms": round(self._io_throttle * 1000), } if self._scanning and self._scan_start_time is not None: status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) status["cursor"] = self.cursor_store.get_info() return status