diff --git a/app/integrity.py b/app/integrity.py index 957e150..b79cbc8 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -162,6 +162,76 @@ class IntegrityHistoryStore: return self.load()[offset : offset + limit] +class IntegrityCursorStore: + def __init__(self, storage_root: Path) -> None: + self.storage_root = storage_root + self._lock = threading.Lock() + + def _get_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "integrity_cursor.json" + + def load(self) -> Dict[str, Any]: + path = self._get_path() + if not path.exists(): + return {"buckets": {}} + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data.get("buckets"), dict): + return {"buckets": {}} + return data + except (OSError, ValueError, KeyError): + return {"buckets": {}} + + def save(self, data: Dict[str, Any]) -> None: + path = self._get_path() + path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error("Failed to save integrity cursor: %s", e) + + def update_bucket(self, bucket_name: str, timestamp: float) -> None: + with self._lock: + data = self.load() + data["buckets"][bucket_name] = {"last_scanned": timestamp} + self.save(data) + + def clean_stale(self, existing_buckets: List[str]) -> None: + with self._lock: + data = self.load() + existing_set = set(existing_buckets) + stale_keys = [k for k in data["buckets"] if k not in existing_set] + if stale_keys: + for k in stale_keys: + del data["buckets"][k] + self.save(data) + + def get_bucket_order(self, bucket_names: List[str]) -> List[str]: + data = self.load() + buckets_info = data.get("buckets", {}) + + def sort_key(name: str) -> float: + entry = buckets_info.get(name) + if entry is None: + return 0.0 + return entry.get("last_scanned", 0.0) + + return sorted(bucket_names, key=sort_key) + + def get_info(self) -> Dict[str, Any]: + data = self.load() + buckets = data.get("buckets", {}) + return { + "tracked_buckets": len(buckets), + "buckets": { + name: info.get("last_scanned") + for name, info in buckets.items() + }, + } + + MAX_ISSUES = 500 @@ -194,6 +264,7 @@ class IntegrityChecker: self._scan_start_time: Optional[float] = None self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) + self.cursor_store = IntegrityCursorStore(self.storage_root) def start(self) -> None: if self._timer is not None: @@ -247,9 +318,11 @@ class IntegrityChecker: result = IntegrityResult() bucket_names = self._list_bucket_names() + self.cursor_store.clean_stale(bucket_names) + ordered_buckets = self.cursor_store.get_bucket_order(bucket_names) - for bucket_name in bucket_names: - if self._shutdown or result.objects_scanned >= self.batch_size: + for bucket_name in ordered_buckets: + if self._batch_exhausted(result): break result.buckets_scanned += 1 self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) @@ -258,6 +331,7 @@ class IntegrityChecker: self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self.cursor_store.update_bucket(bucket_name, time.time()) result.execution_time_seconds = time.time() - start @@ -318,6 +392,9 @@ class IntegrityChecker: time.sleep(self._io_throttle) return self._shutdown + def _batch_exhausted(self, result: IntegrityResult) -> bool: + return self._shutdown or result.objects_scanned >= self.batch_size + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) @@ -335,7 +412,7 @@ class IntegrityChecker: for index_file in meta_root.rglob("_index.json"): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return if not index_file.is_file(): continue @@ -347,7 +424,7 @@ class IntegrityChecker: for key_name, entry in list(index_data.items()): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return rel_dir = index_file.parent.relative_to(meta_root) @@ -409,7 +486,7 @@ class IntegrityChecker: for entry in bucket_path.rglob("*"): if self._throttle(): return - if result.objects_scanned >= self.batch_size: + if self._batch_exhausted(result): return if not entry.is_file(): continue @@ -420,6 +497,7 @@ class IntegrityChecker: if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: continue + result.objects_scanned += 1 full_key = rel.as_posix() key_name = rel.name parent = rel.parent @@ -486,6 +564,8 @@ class IntegrityChecker: for index_file in meta_root.rglob("_index.json"): if self._throttle(): return + if self._batch_exhausted(result): + return if not index_file.is_file(): continue try: @@ -495,6 +575,9 @@ class IntegrityChecker: keys_to_remove = [] for key_name in list(index_data.keys()): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 rel_dir = index_file.parent.relative_to(meta_root) if rel_dir == Path("."): full_key = key_name @@ -542,6 +625,8 @@ class IntegrityChecker: for key_dir in versions_root.rglob("*"): if self._throttle(): return + if self._batch_exhausted(result): + return if not key_dir.is_dir(): continue @@ -549,6 +634,9 @@ class IntegrityChecker: json_files = {f.stem: f for f in key_dir.glob("*.json")} for stem, bin_file in bin_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in json_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -568,6 +656,9 @@ class IntegrityChecker: self._add_issue(result, issue) for stem, json_file in json_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in bin_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -608,6 +699,9 @@ class IntegrityChecker: found_mismatch = False for full_key, cached_etag in etag_cache.items(): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 key_path = Path(full_key) key_name = key_path.name parent = key_path.parent @@ -667,9 +761,12 @@ class IntegrityChecker: for meta_file in legacy_meta_root.rglob("*.meta.json"): if self._throttle(): return + if self._batch_exhausted(result): + return if not meta_file.is_file(): continue + result.objects_scanned += 1 try: rel = meta_file.relative_to(legacy_meta_root) except ValueError: @@ -781,4 +878,5 @@ class IntegrityChecker: } if self._scanning and self._scan_start_time is not None: status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) + status["cursor"] = self.cursor_store.get_info() return status diff --git a/tests/test_integrity.py b/tests/test_integrity.py index 6460b11..ddd1150 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -9,7 +9,7 @@ import pytest sys.path.insert(0, str(Path(__file__).resolve().parents[1])) -from app.integrity import IntegrityChecker, IntegrityResult +from app.integrity import IntegrityChecker, IntegrityCursorStore, IntegrityResult def _wait_scan_done(client, headers, timeout=10): @@ -118,7 +118,7 @@ class TestCorruptedObjects: result = checker.run_now() assert result.corrupted_objects == 0 - assert result.objects_scanned == 1 + assert result.objects_scanned >= 1 def test_corrupted_nested_key(self, storage_root, checker): _setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"}) @@ -503,7 +503,7 @@ class TestMultipleBuckets: result = checker.run_now() assert result.buckets_scanned == 2 - assert result.objects_scanned == 2 + assert result.objects_scanned >= 2 assert result.corrupted_objects == 0 @@ -516,3 +516,133 @@ class TestGetStatus: assert "batch_size" in status assert "auto_heal" in status assert "dry_run" in status + + def test_status_includes_cursor(self, storage_root, checker): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + checker.run_now() + status = checker.get_status() + assert "cursor" in status + assert status["cursor"]["tracked_buckets"] == 1 + assert "mybucket" in status["cursor"]["buckets"] + + +class TestUnifiedBatchCounter: + def test_orphaned_objects_count_toward_batch(self, storage_root): + _setup_bucket(storage_root, "mybucket", {}) + for i in range(10): + (storage_root / "mybucket" / f"orphan{i}.txt").write_bytes(f"data{i}".encode()) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=3) + result = checker.run_now() + assert result.objects_scanned <= 3 + + def test_phantom_metadata_counts_toward_batch(self, storage_root): + objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)} + _setup_bucket(storage_root, "mybucket", objects) + for i in range(10): + (storage_root / "mybucket" / f"file{i}.txt").unlink() + + checker = IntegrityChecker(storage_root=storage_root, batch_size=5) + result = checker.run_now() + assert result.objects_scanned <= 5 + + def test_all_check_types_contribute(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"valid.txt": b"hello"}) + (storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan") + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + result = checker.run_now() + assert result.objects_scanned > 2 + + +class TestCursorRotation: + def test_oldest_bucket_scanned_first(self, storage_root): + _setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"}) + _setup_bucket(storage_root, "bucket-c", {"c.txt": b"ccc"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=5) + + checker.cursor_store.update_bucket("bucket-a", 1000.0) + checker.cursor_store.update_bucket("bucket-b", 3000.0) + checker.cursor_store.update_bucket("bucket-c", 2000.0) + + ordered = checker.cursor_store.get_bucket_order(["bucket-a", "bucket-b", "bucket-c"]) + assert ordered[0] == "bucket-a" + assert ordered[1] == "bucket-c" + assert ordered[2] == "bucket-b" + + def test_never_scanned_buckets_first(self, storage_root): + _setup_bucket(storage_root, "bucket-old", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-new", {"b.txt": b"bbb"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + + checker.cursor_store.update_bucket("bucket-old", time.time()) + + ordered = checker.cursor_store.get_bucket_order(["bucket-old", "bucket-new"]) + assert ordered[0] == "bucket-new" + + def test_rotation_covers_all_buckets(self, storage_root): + for name in ["bucket-a", "bucket-b", "bucket-c"]: + _setup_bucket(storage_root, name, {f"{name}.txt": name.encode()}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=4) + + result1 = checker.run_now() + scanned_buckets_1 = set() + for issue_bucket in [storage_root]: + pass + assert result1.buckets_scanned >= 1 + + result2 = checker.run_now() + result3 = checker.run_now() + + cursor_info = checker.cursor_store.get_info() + assert cursor_info["tracked_buckets"] == 3 + + def test_cursor_persistence(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker1 = IntegrityChecker(storage_root=storage_root, batch_size=1000) + checker1.run_now() + + cursor1 = checker1.cursor_store.get_info() + assert cursor1["tracked_buckets"] == 1 + assert "mybucket" in cursor1["buckets"] + + checker2 = IntegrityChecker(storage_root=storage_root, batch_size=1000) + cursor2 = checker2.cursor_store.get_info() + assert cursor2["tracked_buckets"] == 1 + assert "mybucket" in cursor2["buckets"] + + def test_stale_cursor_cleanup(self, storage_root): + _setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"}) + _setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + checker.run_now() + + import shutil + shutil.rmtree(storage_root / "bucket-b") + meta_b = storage_root / ".myfsio.sys" / "buckets" / "bucket-b" + if meta_b.exists(): + shutil.rmtree(meta_b) + + checker.run_now() + + cursor_info = checker.cursor_store.get_info() + assert "bucket-b" not in cursor_info["buckets"] + assert "bucket-a" in cursor_info["buckets"] + + def test_cursor_updates_after_scan(self, storage_root): + _setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=1000) + before = time.time() + checker.run_now() + after = time.time() + + cursor_info = checker.cursor_store.get_info() + ts = cursor_info["buckets"]["mybucket"] + assert before <= ts <= after