Add intra-bucket cursor tracking to integrity scanner for progressive full coverage; Optimize integrity scanner: early batch exit, lazy sorted walk, cursor-aware index reads

This commit is contained in:
2026-03-31 17:04:28 +08:00
parent f05b2668c0
commit 2ad3736852
2 changed files with 382 additions and 131 deletions

View File

@@ -192,10 +192,26 @@ class IntegrityCursorStore:
except OSError as e:
logger.error("Failed to save integrity cursor: %s", e)
def update_bucket(self, bucket_name: str, timestamp: float) -> None:
def update_bucket(
self,
bucket_name: str,
timestamp: float,
last_key: Optional[str] = None,
completed: bool = False,
) -> None:
with self._lock:
data = self.load()
data["buckets"][bucket_name] = {"last_scanned": timestamp}
entry = data["buckets"].get(bucket_name, {})
if completed:
entry["last_scanned"] = timestamp
entry.pop("last_key", None)
entry["completed"] = True
else:
entry["last_scanned"] = timestamp
if last_key is not None:
entry["last_key"] = last_key
entry["completed"] = False
data["buckets"][bucket_name] = entry
self.save(data)
def clean_stale(self, existing_buckets: List[str]) -> None:
@@ -208,17 +224,32 @@ class IntegrityCursorStore:
del data["buckets"][k]
self.save(data)
def get_last_key(self, bucket_name: str) -> Optional[str]:
data = self.load()
entry = data.get("buckets", {}).get(bucket_name)
if entry is None:
return None
return entry.get("last_key")
def get_bucket_order(self, bucket_names: List[str]) -> List[str]:
data = self.load()
buckets_info = data.get("buckets", {})
def sort_key(name: str) -> float:
incomplete = []
complete = []
for name in bucket_names:
entry = buckets_info.get(name)
if entry is None:
return 0.0
return entry.get("last_scanned", 0.0)
incomplete.append((name, 0.0))
elif entry.get("last_key") is not None:
incomplete.append((name, entry.get("last_scanned", 0.0)))
else:
complete.append((name, entry.get("last_scanned", 0.0)))
return sorted(bucket_names, key=sort_key)
incomplete.sort(key=lambda x: x[1])
complete.sort(key=lambda x: x[1])
return [n for n, _ in incomplete] + [n for n, _ in complete]
def get_info(self) -> Dict[str, Any]:
data = self.load()
@@ -226,7 +257,11 @@ class IntegrityCursorStore:
return {
"tracked_buckets": len(buckets),
"buckets": {
name: info.get("last_scanned")
name: {
"last_scanned": info.get("last_scanned"),
"last_key": info.get("last_key"),
"completed": info.get("completed", False),
}
for name, info in buckets.items()
},
}
@@ -325,13 +360,19 @@ class IntegrityChecker:
if self._batch_exhausted(result):
break
result.buckets_scanned += 1
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
cursor_key = self.cursor_store.get_last_key(bucket_name)
key_corrupted = self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
key_orphaned = self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
key_phantom = self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run, cursor_key)
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
self.cursor_store.update_bucket(bucket_name, time.time())
returned_keys = [k for k in (key_corrupted, key_orphaned, key_phantom) if k is not None]
bucket_exhausted = self._batch_exhausted(result)
if bucket_exhausted and returned_keys:
self.cursor_store.update_bucket(bucket_name, time.time(), last_key=min(returned_keys))
else:
self.cursor_store.update_bucket(bucket_name, time.time(), completed=True)
result.execution_time_seconds = time.time() - start
@@ -399,108 +440,172 @@ class IntegrityChecker:
if len(result.issues) < MAX_ISSUES:
result.issues.append(issue)
def _check_corrupted_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
def _collect_index_keys(
self, meta_root: Path, cursor_key: Optional[str] = None,
) -> Dict[str, Dict[str, Any]]:
all_keys: Dict[str, Dict[str, Any]] = {}
if not meta_root.exists():
return
return all_keys
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
if self._batch_exhausted(result):
return
if not index_file.is_file():
continue
rel_dir = index_file.parent.relative_to(meta_root)
dir_prefix = "" if rel_dir == Path(".") else rel_dir.as_posix()
if cursor_key is not None and dir_prefix:
full_prefix = dir_prefix + "/"
if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix:
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
for key_name, entry in list(index_data.items()):
if self._throttle():
return
if self._batch_exhausted(result):
return
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
object_path = bucket_path / full_key
if not object_path.exists():
for key_name, entry in index_data.items():
full_key = (dir_prefix + "/" + key_name) if dir_prefix else key_name
if cursor_key is not None and full_key <= cursor_key:
continue
all_keys[full_key] = {
"entry": entry,
"index_file": index_file,
"key_name": key_name,
}
except OSError:
pass
return all_keys
result.objects_scanned += 1
def _walk_bucket_files_sorted(
self, bucket_path: Path, cursor_key: Optional[str] = None,
):
def _walk(dir_path: Path, prefix: str):
try:
entries = list(os.scandir(dir_path))
except OSError:
return
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if not stored_etag:
def _sort_key(e):
if e.is_dir(follow_symlinks=False):
return e.name + "/"
return e.name
entries.sort(key=_sort_key)
for entry in entries:
if entry.is_dir(follow_symlinks=False):
if not prefix and entry.name in self.INTERNAL_FOLDERS:
continue
try:
actual_etag = _compute_etag(object_path)
except OSError:
new_prefix = (prefix + "/" + entry.name) if prefix else entry.name
if cursor_key is not None:
full_prefix = new_prefix + "/"
if not cursor_key.startswith(full_prefix) and cursor_key > full_prefix:
continue
yield from _walk(Path(entry.path), new_prefix)
elif entry.is_file(follow_symlinks=False):
full_key = (prefix + "/" + entry.name) if prefix else entry.name
if cursor_key is not None and full_key <= cursor_key:
continue
yield full_key
if actual_etag != stored_etag:
result.corrupted_objects += 1
issue = IntegrityIssue(
issue_type="corrupted_object",
bucket=bucket_name,
key=full_key,
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
)
yield from _walk(bucket_path, "")
if auto_heal and not dry_run:
try:
stat = object_path.stat()
meta["__etag__"] = actual_etag
meta["__size__"] = str(stat.st_size)
meta["__last_modified__"] = str(stat.st_mtime)
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_file, index_data)
issue.healed = True
issue.heal_action = "updated etag in index"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check corrupted {bucket_name}: {e}")
def _check_orphaned_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
def _check_corrupted_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return None
last_key = None
try:
for entry in bucket_path.rglob("*"):
all_keys = self._collect_index_keys(meta_root, cursor_key)
sorted_keys = sorted(all_keys.keys())
for full_key in sorted_keys:
if self._throttle():
return
return last_key
if self._batch_exhausted(result):
return
if not entry.is_file():
continue
try:
rel = entry.relative_to(bucket_path)
except ValueError:
continue
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
return last_key
info = all_keys[full_key]
entry = info["entry"]
index_file = info["index_file"]
key_name = info["key_name"]
object_path = bucket_path / full_key
if not object_path.exists():
continue
result.objects_scanned += 1
full_key = rel.as_posix()
key_name = rel.name
parent = rel.parent
last_key = full_key
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if not stored_etag:
continue
try:
actual_etag = _compute_etag(object_path)
except OSError:
continue
if actual_etag != stored_etag:
result.corrupted_objects += 1
issue = IntegrityIssue(
issue_type="corrupted_object",
bucket=bucket_name,
key=full_key,
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
)
if auto_heal and not dry_run:
try:
stat = object_path.stat()
meta["__etag__"] = actual_etag
meta["__size__"] = str(stat.st_size)
meta["__last_modified__"] = str(stat.st_mtime)
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
index_data = {}
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_file, index_data)
issue.healed = True
issue.heal_action = "updated etag in index"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check corrupted {bucket_name}: {e}")
return last_key
def _check_orphaned_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
last_key = None
try:
for full_key in self._walk_bucket_files_sorted(bucket_path, cursor_key):
if self._throttle():
return last_key
if self._batch_exhausted(result):
return last_key
result.objects_scanned += 1
last_key = full_key
key_path = Path(full_key)
key_name = key_path.name
parent = key_path.parent
if parent == Path("."):
index_path = meta_root / "_index.json"
@@ -526,8 +631,9 @@ class IntegrityChecker:
if auto_heal and not dry_run:
try:
etag = _compute_etag(entry)
stat = entry.stat()
object_path = bucket_path / full_key
etag = _compute_etag(object_path)
stat = object_path.stat()
meta = {
"__etag__": etag,
"__size__": str(stat.st_size),
@@ -550,58 +656,56 @@ class IntegrityChecker:
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check orphaned {bucket_name}: {e}")
return last_key
def _check_phantom_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool,
cursor_key: Optional[str] = None,
) -> Optional[str]:
if self._batch_exhausted(result):
return None
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return
return None
last_key = None
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
all_keys = self._collect_index_keys(meta_root, cursor_key)
sorted_keys = sorted(all_keys.keys())
heal_by_index: Dict[Path, List[str]] = {}
for full_key in sorted_keys:
if self._batch_exhausted(result):
return
if not index_file.is_file():
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
break
keys_to_remove = []
for key_name in list(index_data.keys()):
if self._batch_exhausted(result):
break
result.objects_scanned += 1
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
result.objects_scanned += 1
last_key = full_key
object_path = bucket_path / full_key
if not object_path.exists():
result.phantom_metadata += 1
issue = IntegrityIssue(
issue_type="phantom_metadata",
bucket=bucket_name,
key=full_key,
detail="metadata entry without file on disk",
)
if auto_heal and not dry_run:
keys_to_remove.append(key_name)
issue.healed = True
issue.heal_action = "removed stale index entry"
result.issues_healed += 1
self._add_issue(result, issue)
object_path = bucket_path / full_key
if not object_path.exists():
result.phantom_metadata += 1
info = all_keys[full_key]
issue = IntegrityIssue(
issue_type="phantom_metadata",
bucket=bucket_name,
key=full_key,
detail="metadata entry without file on disk",
)
if auto_heal and not dry_run:
index_file = info["index_file"]
heal_by_index.setdefault(index_file, []).append(info["key_name"])
issue.healed = True
issue.heal_action = "removed stale index entry"
result.issues_healed += 1
self._add_issue(result, issue)
if keys_to_remove and auto_heal and not dry_run:
if heal_by_index and auto_heal and not dry_run:
for index_file, keys_to_remove in heal_by_index.items():
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
for k in keys_to_remove:
index_data.pop(k, None)
if index_data:
@@ -612,10 +716,13 @@ class IntegrityChecker:
result.errors.append(f"heal phantom {bucket_name}: {e}")
except OSError as e:
result.errors.append(f"check phantom {bucket_name}: {e}")
return last_key
def _check_stale_versions(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR
if not versions_root.exists():
@@ -682,6 +789,8 @@ class IntegrityChecker:
def _check_etag_cache(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json"
if not etag_index_path.exists():
@@ -751,6 +860,8 @@ class IntegrityChecker:
def _check_legacy_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
if self._batch_exhausted(result):
return
legacy_meta_root = self.storage_root / bucket_name / ".meta"
if not legacy_meta_root.exists():
return

View File

@@ -644,5 +644,145 @@ class TestCursorRotation:
after = time.time()
cursor_info = checker.cursor_store.get_info()
ts = cursor_info["buckets"]["mybucket"]
assert before <= ts <= after
entry = cursor_info["buckets"]["mybucket"]
assert before <= entry["last_scanned"] <= after
assert entry["completed"] is True
class TestIntraBucketCursor:
def test_resumes_from_cursor_key(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
result1 = checker.run_now()
assert result1.objects_scanned == 3
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is not None
assert entry["completed"] is False
result2 = checker.run_now()
assert result2.objects_scanned == 3
cursor_after = checker.cursor_store.get_info()["buckets"]["mybucket"]
assert cursor_after["last_key"] > entry["last_key"]
def test_cursor_resets_after_full_pass(self, storage_root):
objects = {f"file_{i}.txt": f"data{i}".encode() for i in range(3)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is None
assert entry["completed"] is True
def test_full_coverage_across_cycles(self, storage_root):
objects = {f"obj_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
all_scanned = 0
for _ in range(10):
result = checker.run_now()
all_scanned += result.objects_scanned
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
assert all_scanned >= 10
def test_deleted_cursor_key_skips_gracefully(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(6)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
obj_path = storage_root / "mybucket" / cursor_key
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
key_path = Path(cursor_key)
index_path = meta_root / key_path.parent / "_index.json" if key_path.parent != Path(".") else meta_root / "_index.json"
if key_path.parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / key_path.parent / "_index.json"
if obj_path.exists():
obj_path.unlink()
if index_path.exists():
index_data = json.loads(index_path.read_text())
index_data.pop(key_path.name, None)
index_path.write_text(json.dumps(index_data))
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_incomplete_buckets_prioritized(self, storage_root):
_setup_bucket(storage_root, "bucket-a", {f"a{i}.txt": b"a" for i in range(5)})
_setup_bucket(storage_root, "bucket-b", {f"b{i}.txt": b"b" for i in range(5)})
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
incomplete = [
name for name, info in cursor_info["buckets"].items()
if info.get("last_key") is not None
]
assert len(incomplete) >= 1
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_cursor_skips_nested_directories(self, storage_root):
objects = {
"aaa/file1.txt": b"a1",
"aaa/file2.txt": b"a2",
"bbb/file1.txt": b"b1",
"bbb/file2.txt": b"b2",
"ccc/file1.txt": b"c1",
"ccc/file2.txt": b"c2",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
result1 = checker.run_now()
assert result1.objects_scanned == 4
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
assert cursor_key.startswith("aaa/") or cursor_key.startswith("bbb/")
result2 = checker.run_now()
assert result2.objects_scanned >= 2
all_scanned = result1.objects_scanned + result2.objects_scanned
for _ in range(10):
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
r = checker.run_now()
all_scanned += r.objects_scanned
assert all_scanned >= 6
def test_sorted_walk_order(self, storage_root):
objects = {
"bar.txt": b"bar",
"bar/inner.txt": b"inner",
"abc.txt": b"abc",
"zzz/deep.txt": b"deep",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
result = checker.run_now()
assert result.objects_scanned >= 4
assert result.total_issues == 0