From 2ad373685201bb94e73f9df8bf8394470df32745 Mon Sep 17 00:00:00 2001 From: kqjy Date: Tue, 31 Mar 2026 17:04:28 +0800 Subject: [PATCH] Add intra-bucket cursor tracking to integrity scanner for progressive full coverage; Optimize integrity scanner: early batch exit, lazy sorted walk, cursor-aware index reads --- app/integrity.py | 369 ++++++++++++++++++++++++++-------------- tests/test_integrity.py | 144 +++++++++++++++- 2 files changed, 382 insertions(+), 131 deletions(-) diff --git a/app/integrity.py b/app/integrity.py index b79cbc8..d42f5b2 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -192,10 +192,26 @@ class IntegrityCursorStore: except OSError as e: logger.error("Failed to save integrity cursor: %s", e) - def update_bucket(self, bucket_name: str, timestamp: float) -> None: + def update_bucket( + self, + bucket_name: str, + timestamp: float, + last_key: Optional[str] = None, + completed: bool = False, + ) -> None: with self._lock: data = self.load() - data["buckets"][bucket_name] = {"last_scanned": timestamp} + entry = data["buckets"].get(bucket_name, {}) + if completed: + entry["last_scanned"] = timestamp + entry.pop("last_key", None) + entry["completed"] = True + else: + entry["last_scanned"] = timestamp + if last_key is not None: + entry["last_key"] = last_key + entry["completed"] = False + data["buckets"][bucket_name] = entry self.save(data) def clean_stale(self, existing_buckets: List[str]) -> None: @@ -208,17 +224,32 @@ class IntegrityCursorStore: del data["buckets"][k] self.save(data) + def get_last_key(self, bucket_name: str) -> Optional[str]: + data = self.load() + entry = data.get("buckets", {}).get(bucket_name) + if entry is None: + return None + return entry.get("last_key") + def get_bucket_order(self, bucket_names: List[str]) -> List[str]: data = self.load() buckets_info = data.get("buckets", {}) - def sort_key(name: str) -> float: + incomplete = [] + complete = [] + for name in bucket_names: entry = buckets_info.get(name) if entry is None: - return 0.0 - return entry.get("last_scanned", 0.0) + incomplete.append((name, 0.0)) + elif entry.get("last_key") is not None: + incomplete.append((name, entry.get("last_scanned", 0.0))) + else: + complete.append((name, entry.get("last_scanned", 0.0))) - return sorted(bucket_names, key=sort_key) + incomplete.sort(key=lambda x: x[1]) + complete.sort(key=lambda x: x[1]) + + return [n for n, _ in incomplete] + [n for n, _ in complete] def get_info(self) -> Dict[str, Any]: data = self.load() @@ -226,7 +257,11 @@ class IntegrityCursorStore: return { "tracked_buckets": len(buckets), "buckets": { - name: info.get("last_scanned") + name: { + "last_scanned": info.get("last_scanned"), + "last_key": info.get("last_key"), + "completed": info.get("completed", False), + } for name, info in buckets.items() }, } @@ -325,13 +360,19 @@ class IntegrityChecker: if self._batch_exhausted(result): break result.buckets_scanned += 1 - self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + cursor_key = self.cursor_store.get_last_key(bucket_name) + key_corrupted = self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) + key_orphaned = self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) + key_phantom = self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key) self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) - self.cursor_store.update_bucket(bucket_name, time.time()) + returned_keys = [k for k in (key_corrupted, key_orphaned, key_phantom) if k is not None] + bucket_exhausted = self._batch_exhausted(result) + if bucket_exhausted and returned_keys: + self.cursor_store.update_bucket(bucket_name, time.time(), last_key=min(returned_keys)) + else: + self.cursor_store.update_bucket(bucket_name, time.time(), completed=True) result.execution_time_seconds = time.time() - start @@ -399,108 +440,172 @@ class IntegrityChecker: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) - def _check_corrupted_objects( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: - bucket_path = self.storage_root / bucket_name - meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR - + def _collect_index_keys( + self, meta_root: Path, cursor_key: Optional[str] = None, + ) -> Dict[str, Dict[str, Any]]: + all_keys: Dict[str, Dict[str, Any]] = {} if not meta_root.exists(): - return - + return all_keys try: for index_file in meta_root.rglob("_index.json"): - if self._throttle(): - return - if self._batch_exhausted(result): - return if not index_file.is_file(): continue + rel_dir = index_file.parent.relative_to(meta_root) + dir_prefix = "" if rel_dir == Path(".") else rel_dir.as_posix() + if cursor_key is not None and dir_prefix: + full_prefix = dir_prefix + "/" + if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: + continue try: index_data = json.loads(index_file.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue - - for key_name, entry in list(index_data.items()): - if self._throttle(): - return - if self._batch_exhausted(result): - return - - rel_dir = index_file.parent.relative_to(meta_root) - if rel_dir == Path("."): - full_key = key_name - else: - full_key = rel_dir.as_posix() + "/" + key_name - - object_path = bucket_path / full_key - if not object_path.exists(): + for key_name, entry in index_data.items(): + full_key = (dir_prefix + "/" + key_name) if dir_prefix else key_name + if cursor_key is not None and full_key <= cursor_key: continue + all_keys[full_key] = { + "entry": entry, + "index_file": index_file, + "key_name": key_name, + } + except OSError: + pass + return all_keys - result.objects_scanned += 1 + def _walk_bucket_files_sorted( + self, bucket_path: Path, cursor_key: Optional[str] = None, + ): + def _walk(dir_path: Path, prefix: str): + try: + entries = list(os.scandir(dir_path)) + except OSError: + return - meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} - stored_etag = meta.get("__etag__") - if not stored_etag: + def _sort_key(e): + if e.is_dir(follow_symlinks=False): + return e.name + "/" + return e.name + + entries.sort(key=_sort_key) + + for entry in entries: + if entry.is_dir(follow_symlinks=False): + if not prefix and entry.name in self.INTERNAL_FOLDERS: continue - - try: - actual_etag = _compute_etag(object_path) - except OSError: + new_prefix = (prefix + "/" + entry.name) if prefix else entry.name + if cursor_key is not None: + full_prefix = new_prefix + "/" + if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix: + continue + yield from _walk(Path(entry.path), new_prefix) + elif entry.is_file(follow_symlinks=False): + full_key = (prefix + "/" + entry.name) if prefix else entry.name + if cursor_key is not None and full_key <= cursor_key: continue + yield full_key - if actual_etag != stored_etag: - result.corrupted_objects += 1 - issue = IntegrityIssue( - issue_type="corrupted_object", - bucket=bucket_name, - key=full_key, - detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", - ) + yield from _walk(bucket_path, "") - if auto_heal and not dry_run: - try: - stat = object_path.stat() - meta["__etag__"] = actual_etag - meta["__size__"] = str(stat.st_size) - meta["__last_modified__"] = str(stat.st_mtime) - index_data[key_name] = {"metadata": meta} - self._atomic_write_index(index_file, index_data) - issue.healed = True - issue.heal_action = "updated etag in index" - result.issues_healed += 1 - except OSError as e: - result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") - - self._add_issue(result, issue) - except OSError as e: - result.errors.append(f"check corrupted {bucket_name}: {e}") - - def _check_orphaned_objects( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: + def _check_corrupted_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + if not meta_root.exists(): + return None + + last_key = None try: - for entry in bucket_path.rglob("*"): + all_keys = self._collect_index_keys(meta_root, cursor_key) + sorted_keys = sorted(all_keys.keys()) + + for full_key in sorted_keys: if self._throttle(): - return + return last_key if self._batch_exhausted(result): - return - if not entry.is_file(): - continue - try: - rel = entry.relative_to(bucket_path) - except ValueError: - continue - if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: + return last_key + + info = all_keys[full_key] + entry = info["entry"] + index_file = info["index_file"] + key_name = info["key_name"] + + object_path = bucket_path / full_key + if not object_path.exists(): continue result.objects_scanned += 1 - full_key = rel.as_posix() - key_name = rel.name - parent = rel.parent + last_key = full_key + + meta = entry.get("metadata", {}) if isinstance(entry, dict) else {} + stored_etag = meta.get("__etag__") + if not stored_etag: + continue + + try: + actual_etag = _compute_etag(object_path) + except OSError: + continue + + if actual_etag != stored_etag: + result.corrupted_objects += 1 + issue = IntegrityIssue( + issue_type="corrupted_object", + bucket=bucket_name, + key=full_key, + detail=f"stored_etag={stored_etag} actual_etag={actual_etag}", + ) + + if auto_heal and not dry_run: + try: + stat = object_path.stat() + meta["__etag__"] = actual_etag + meta["__size__"] = str(stat.st_size) + meta["__last_modified__"] = str(stat.st_mtime) + try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + index_data = {} + index_data[key_name] = {"metadata": meta} + self._atomic_write_index(index_file, index_data) + issue.healed = True + issue.heal_action = "updated etag in index" + result.issues_healed += 1 + except OSError as e: + result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}") + + self._add_issue(result, issue) + except OSError as e: + result.errors.append(f"check corrupted {bucket_name}: {e}") + return last_key + + def _check_orphaned_objects( + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None + bucket_path = self.storage_root / bucket_name + meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + + last_key = None + try: + for full_key in self._walk_bucket_files_sorted(bucket_path, cursor_key): + if self._throttle(): + return last_key + if self._batch_exhausted(result): + return last_key + + result.objects_scanned += 1 + last_key = full_key + key_path = Path(full_key) + key_name = key_path.name + parent = key_path.parent if parent == Path("."): index_path = meta_root / "_index.json" @@ -526,8 +631,9 @@ class IntegrityChecker: if auto_heal and not dry_run: try: - etag = _compute_etag(entry) - stat = entry.stat() + object_path = bucket_path / full_key + etag = _compute_etag(object_path) + stat = object_path.stat() meta = { "__etag__": etag, "__size__": str(stat.st_size), @@ -550,58 +656,56 @@ class IntegrityChecker: self._add_issue(result, issue) except OSError as e: result.errors.append(f"check orphaned {bucket_name}: {e}") + return last_key def _check_phantom_metadata( - self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool - ) -> None: + self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool, + cursor_key: Optional[str] = None, + ) -> Optional[str]: + if self._batch_exhausted(result): + return None bucket_path = self.storage_root / bucket_name meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR if not meta_root.exists(): - return + return None + last_key = None try: - for index_file in meta_root.rglob("_index.json"): - if self._throttle(): - return + all_keys = self._collect_index_keys(meta_root, cursor_key) + sorted_keys = sorted(all_keys.keys()) + + heal_by_index: Dict[Path, List[str]] = {} + + for full_key in sorted_keys: if self._batch_exhausted(result): - return - if not index_file.is_file(): - continue - try: - index_data = json.loads(index_file.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - continue + break - keys_to_remove = [] - for key_name in list(index_data.keys()): - if self._batch_exhausted(result): - break - result.objects_scanned += 1 - rel_dir = index_file.parent.relative_to(meta_root) - if rel_dir == Path("."): - full_key = key_name - else: - full_key = rel_dir.as_posix() + "/" + key_name + result.objects_scanned += 1 + last_key = full_key - object_path = bucket_path / full_key - if not object_path.exists(): - result.phantom_metadata += 1 - issue = IntegrityIssue( - issue_type="phantom_metadata", - bucket=bucket_name, - key=full_key, - detail="metadata entry without file on disk", - ) - if auto_heal and not dry_run: - keys_to_remove.append(key_name) - issue.healed = True - issue.heal_action = "removed stale index entry" - result.issues_healed += 1 - self._add_issue(result, issue) + object_path = bucket_path / full_key + if not object_path.exists(): + result.phantom_metadata += 1 + info = all_keys[full_key] + issue = IntegrityIssue( + issue_type="phantom_metadata", + bucket=bucket_name, + key=full_key, + detail="metadata entry without file on disk", + ) + if auto_heal and not dry_run: + index_file = info["index_file"] + heal_by_index.setdefault(index_file, []).append(info["key_name"]) + issue.healed = True + issue.heal_action = "removed stale index entry" + result.issues_healed += 1 + self._add_issue(result, issue) - if keys_to_remove and auto_heal and not dry_run: + if heal_by_index and auto_heal and not dry_run: + for index_file, keys_to_remove in heal_by_index.items(): try: + index_data = json.loads(index_file.read_text(encoding="utf-8")) for k in keys_to_remove: index_data.pop(k, None) if index_data: @@ -612,10 +716,13 @@ class IntegrityChecker: result.errors.append(f"heal phantom {bucket_name}: {e}") except OSError as e: result.errors.append(f"check phantom {bucket_name}: {e}") + return last_key def _check_stale_versions( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR if not versions_root.exists(): @@ -682,6 +789,8 @@ class IntegrityChecker: def _check_etag_cache( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json" if not etag_index_path.exists(): @@ -751,6 +860,8 @@ class IntegrityChecker: def _check_legacy_metadata( self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool ) -> None: + if self._batch_exhausted(result): + return legacy_meta_root = self.storage_root / bucket_name / ".meta" if not legacy_meta_root.exists(): return diff --git a/tests/test_integrity.py b/tests/test_integrity.py index ddd1150..ed38ad7 100644 --- a/tests/test_integrity.py +++ b/tests/test_integrity.py @@ -644,5 +644,145 @@ class TestCursorRotation: after = time.time() cursor_info = checker.cursor_store.get_info() - ts = cursor_info["buckets"]["mybucket"] - assert before <= ts <= after + entry = cursor_info["buckets"]["mybucket"] + assert before <= entry["last_scanned"] <= after + assert entry["completed"] is True + + +class TestIntraBucketCursor: + def test_resumes_from_cursor_key(self, storage_root): + objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)} + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=3) + result1 = checker.run_now() + assert result1.objects_scanned == 3 + + cursor_info = checker.cursor_store.get_info() + entry = cursor_info["buckets"]["mybucket"] + assert entry["last_key"] is not None + assert entry["completed"] is False + + result2 = checker.run_now() + assert result2.objects_scanned == 3 + + cursor_after = checker.cursor_store.get_info()["buckets"]["mybucket"] + assert cursor_after["last_key"] > entry["last_key"] + + def test_cursor_resets_after_full_pass(self, storage_root): + objects = {f"file_{i}.txt": f"data{i}".encode() for i in range(3)} + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=100) + checker.run_now() + + cursor_info = checker.cursor_store.get_info() + entry = cursor_info["buckets"]["mybucket"] + assert entry["last_key"] is None + assert entry["completed"] is True + + def test_full_coverage_across_cycles(self, storage_root): + objects = {f"obj_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)} + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=4) + all_scanned = 0 + for _ in range(10): + result = checker.run_now() + all_scanned += result.objects_scanned + if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]: + break + + assert all_scanned >= 10 + + def test_deleted_cursor_key_skips_gracefully(self, storage_root): + objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(6)} + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=3) + checker.run_now() + + cursor_info = checker.cursor_store.get_info() + cursor_key = cursor_info["buckets"]["mybucket"]["last_key"] + assert cursor_key is not None + + obj_path = storage_root / "mybucket" / cursor_key + meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta" + key_path = Path(cursor_key) + index_path = meta_root / key_path.parent / "_index.json" if key_path.parent != Path(".") else meta_root / "_index.json" + if key_path.parent == Path("."): + index_path = meta_root / "_index.json" + else: + index_path = meta_root / key_path.parent / "_index.json" + if obj_path.exists(): + obj_path.unlink() + if index_path.exists(): + index_data = json.loads(index_path.read_text()) + index_data.pop(key_path.name, None) + index_path.write_text(json.dumps(index_data)) + + result2 = checker.run_now() + assert result2.objects_scanned > 0 + + def test_incomplete_buckets_prioritized(self, storage_root): + _setup_bucket(storage_root, "bucket-a", {f"a{i}.txt": b"a" for i in range(5)}) + _setup_bucket(storage_root, "bucket-b", {f"b{i}.txt": b"b" for i in range(5)}) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=3) + checker.run_now() + + cursor_info = checker.cursor_store.get_info() + incomplete = [ + name for name, info in cursor_info["buckets"].items() + if info.get("last_key") is not None + ] + assert len(incomplete) >= 1 + + result2 = checker.run_now() + assert result2.objects_scanned > 0 + + def test_cursor_skips_nested_directories(self, storage_root): + objects = { + "aaa/file1.txt": b"a1", + "aaa/file2.txt": b"a2", + "bbb/file1.txt": b"b1", + "bbb/file2.txt": b"b2", + "ccc/file1.txt": b"c1", + "ccc/file2.txt": b"c2", + } + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=4) + result1 = checker.run_now() + assert result1.objects_scanned == 4 + + cursor_info = checker.cursor_store.get_info() + cursor_key = cursor_info["buckets"]["mybucket"]["last_key"] + assert cursor_key is not None + assert cursor_key.startswith("aaa/") or cursor_key.startswith("bbb/") + + result2 = checker.run_now() + assert result2.objects_scanned >= 2 + + all_scanned = result1.objects_scanned + result2.objects_scanned + for _ in range(10): + if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]: + break + r = checker.run_now() + all_scanned += r.objects_scanned + + assert all_scanned >= 6 + + def test_sorted_walk_order(self, storage_root): + objects = { + "bar.txt": b"bar", + "bar/inner.txt": b"inner", + "abc.txt": b"abc", + "zzz/deep.txt": b"deep", + } + _setup_bucket(storage_root, "mybucket", objects) + + checker = IntegrityChecker(storage_root=storage_root, batch_size=100) + result = checker.run_now() + assert result.objects_scanned >= 4 + assert result.total_issues == 0