Fix integrity scanner batch limit and add cursor-based rotation
This commit is contained in:
108
app/integrity.py
108
app/integrity.py
@@ -162,6 +162,76 @@ class IntegrityHistoryStore:
|
||||
return self.load()[offset : offset + limit]
|
||||
|
||||
|
||||
class IntegrityCursorStore:
|
||||
def __init__(self, storage_root: Path) -> None:
|
||||
self.storage_root = storage_root
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_path(self) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "config" / "integrity_cursor.json"
|
||||
|
||||
def load(self) -> Dict[str, Any]:
|
||||
path = self._get_path()
|
||||
if not path.exists():
|
||||
return {"buckets": {}}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data.get("buckets"), dict):
|
||||
return {"buckets": {}}
|
||||
return data
|
||||
except (OSError, ValueError, KeyError):
|
||||
return {"buckets": {}}
|
||||
|
||||
def save(self, data: Dict[str, Any]) -> None:
|
||||
path = self._get_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except OSError as e:
|
||||
logger.error("Failed to save integrity cursor: %s", e)
|
||||
|
||||
def update_bucket(self, bucket_name: str, timestamp: float) -> None:
|
||||
with self._lock:
|
||||
data = self.load()
|
||||
data["buckets"][bucket_name] = {"last_scanned": timestamp}
|
||||
self.save(data)
|
||||
|
||||
def clean_stale(self, existing_buckets: List[str]) -> None:
|
||||
with self._lock:
|
||||
data = self.load()
|
||||
existing_set = set(existing_buckets)
|
||||
stale_keys = [k for k in data["buckets"] if k not in existing_set]
|
||||
if stale_keys:
|
||||
for k in stale_keys:
|
||||
del data["buckets"][k]
|
||||
self.save(data)
|
||||
|
||||
def get_bucket_order(self, bucket_names: List[str]) -> List[str]:
|
||||
data = self.load()
|
||||
buckets_info = data.get("buckets", {})
|
||||
|
||||
def sort_key(name: str) -> float:
|
||||
entry = buckets_info.get(name)
|
||||
if entry is None:
|
||||
return 0.0
|
||||
return entry.get("last_scanned", 0.0)
|
||||
|
||||
return sorted(bucket_names, key=sort_key)
|
||||
|
||||
def get_info(self) -> Dict[str, Any]:
|
||||
data = self.load()
|
||||
buckets = data.get("buckets", {})
|
||||
return {
|
||||
"tracked_buckets": len(buckets),
|
||||
"buckets": {
|
||||
name: info.get("last_scanned")
|
||||
for name, info in buckets.items()
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
MAX_ISSUES = 500
|
||||
|
||||
|
||||
@@ -194,6 +264,7 @@ class IntegrityChecker:
|
||||
self._scan_start_time: Optional[float] = None
|
||||
self._io_throttle = max(0, io_throttle_ms) / 1000.0
|
||||
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
|
||||
self.cursor_store = IntegrityCursorStore(self.storage_root)
|
||||
|
||||
def start(self) -> None:
|
||||
if self._timer is not None:
|
||||
@@ -247,9 +318,11 @@ class IntegrityChecker:
|
||||
result = IntegrityResult()
|
||||
|
||||
bucket_names = self._list_bucket_names()
|
||||
self.cursor_store.clean_stale(bucket_names)
|
||||
ordered_buckets = self.cursor_store.get_bucket_order(bucket_names)
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
if self._shutdown or result.objects_scanned >= self.batch_size:
|
||||
for bucket_name in ordered_buckets:
|
||||
if self._batch_exhausted(result):
|
||||
break
|
||||
result.buckets_scanned += 1
|
||||
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
@@ -258,6 +331,7 @@ class IntegrityChecker:
|
||||
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self.cursor_store.update_bucket(bucket_name, time.time())
|
||||
|
||||
result.execution_time_seconds = time.time() - start
|
||||
|
||||
@@ -318,6 +392,9 @@ class IntegrityChecker:
|
||||
time.sleep(self._io_throttle)
|
||||
return self._shutdown
|
||||
|
||||
def _batch_exhausted(self, result: IntegrityResult) -> bool:
|
||||
return self._shutdown or result.objects_scanned >= self.batch_size
|
||||
|
||||
def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None:
|
||||
if len(result.issues) < MAX_ISSUES:
|
||||
result.issues.append(issue)
|
||||
@@ -335,7 +412,7 @@ class IntegrityChecker:
|
||||
for index_file in meta_root.rglob("_index.json"):
|
||||
if self._throttle():
|
||||
return
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
if not index_file.is_file():
|
||||
continue
|
||||
@@ -347,7 +424,7 @@ class IntegrityChecker:
|
||||
for key_name, entry in list(index_data.items()):
|
||||
if self._throttle():
|
||||
return
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
|
||||
rel_dir = index_file.parent.relative_to(meta_root)
|
||||
@@ -409,7 +486,7 @@ class IntegrityChecker:
|
||||
for entry in bucket_path.rglob("*"):
|
||||
if self._throttle():
|
||||
return
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
if not entry.is_file():
|
||||
continue
|
||||
@@ -420,6 +497,7 @@ class IntegrityChecker:
|
||||
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
|
||||
continue
|
||||
|
||||
result.objects_scanned += 1
|
||||
full_key = rel.as_posix()
|
||||
key_name = rel.name
|
||||
parent = rel.parent
|
||||
@@ -486,6 +564,8 @@ class IntegrityChecker:
|
||||
for index_file in meta_root.rglob("_index.json"):
|
||||
if self._throttle():
|
||||
return
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
if not index_file.is_file():
|
||||
continue
|
||||
try:
|
||||
@@ -495,6 +575,9 @@ class IntegrityChecker:
|
||||
|
||||
keys_to_remove = []
|
||||
for key_name in list(index_data.keys()):
|
||||
if self._batch_exhausted(result):
|
||||
break
|
||||
result.objects_scanned += 1
|
||||
rel_dir = index_file.parent.relative_to(meta_root)
|
||||
if rel_dir == Path("."):
|
||||
full_key = key_name
|
||||
@@ -542,6 +625,8 @@ class IntegrityChecker:
|
||||
for key_dir in versions_root.rglob("*"):
|
||||
if self._throttle():
|
||||
return
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
if not key_dir.is_dir():
|
||||
continue
|
||||
|
||||
@@ -549,6 +634,9 @@ class IntegrityChecker:
|
||||
json_files = {f.stem: f for f in key_dir.glob("*.json")}
|
||||
|
||||
for stem, bin_file in bin_files.items():
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
result.objects_scanned += 1
|
||||
if stem not in json_files:
|
||||
result.stale_versions += 1
|
||||
issue = IntegrityIssue(
|
||||
@@ -568,6 +656,9 @@ class IntegrityChecker:
|
||||
self._add_issue(result, issue)
|
||||
|
||||
for stem, json_file in json_files.items():
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
result.objects_scanned += 1
|
||||
if stem not in bin_files:
|
||||
result.stale_versions += 1
|
||||
issue = IntegrityIssue(
|
||||
@@ -608,6 +699,9 @@ class IntegrityChecker:
|
||||
found_mismatch = False
|
||||
|
||||
for full_key, cached_etag in etag_cache.items():
|
||||
if self._batch_exhausted(result):
|
||||
break
|
||||
result.objects_scanned += 1
|
||||
key_path = Path(full_key)
|
||||
key_name = key_path.name
|
||||
parent = key_path.parent
|
||||
@@ -667,9 +761,12 @@ class IntegrityChecker:
|
||||
for meta_file in legacy_meta_root.rglob("*.meta.json"):
|
||||
if self._throttle():
|
||||
return
|
||||
if self._batch_exhausted(result):
|
||||
return
|
||||
if not meta_file.is_file():
|
||||
continue
|
||||
|
||||
result.objects_scanned += 1
|
||||
try:
|
||||
rel = meta_file.relative_to(legacy_meta_root)
|
||||
except ValueError:
|
||||
@@ -781,4 +878,5 @@ class IntegrityChecker:
|
||||
}
|
||||
if self._scanning and self._scan_start_time is not None:
|
||||
status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1)
|
||||
status["cursor"] = self.cursor_store.get_info()
|
||||
return status
|
||||
|
||||
@@ -9,7 +9,7 @@ import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
from app.integrity import IntegrityChecker, IntegrityResult
|
||||
from app.integrity import IntegrityChecker, IntegrityCursorStore, IntegrityResult
|
||||
|
||||
|
||||
def _wait_scan_done(client, headers, timeout=10):
|
||||
@@ -118,7 +118,7 @@ class TestCorruptedObjects:
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.corrupted_objects == 0
|
||||
assert result.objects_scanned == 1
|
||||
assert result.objects_scanned >= 1
|
||||
|
||||
def test_corrupted_nested_key(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"})
|
||||
@@ -503,7 +503,7 @@ class TestMultipleBuckets:
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.buckets_scanned == 2
|
||||
assert result.objects_scanned == 2
|
||||
assert result.objects_scanned >= 2
|
||||
assert result.corrupted_objects == 0
|
||||
|
||||
|
||||
@@ -516,3 +516,133 @@ class TestGetStatus:
|
||||
assert "batch_size" in status
|
||||
assert "auto_heal" in status
|
||||
assert "dry_run" in status
|
||||
|
||||
def test_status_includes_cursor(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
checker.run_now()
|
||||
status = checker.get_status()
|
||||
assert "cursor" in status
|
||||
assert status["cursor"]["tracked_buckets"] == 1
|
||||
assert "mybucket" in status["cursor"]["buckets"]
|
||||
|
||||
|
||||
class TestUnifiedBatchCounter:
|
||||
def test_orphaned_objects_count_toward_batch(self, storage_root):
|
||||
_setup_bucket(storage_root, "mybucket", {})
|
||||
for i in range(10):
|
||||
(storage_root / "mybucket" / f"orphan{i}.txt").write_bytes(f"data{i}".encode())
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
|
||||
result = checker.run_now()
|
||||
assert result.objects_scanned <= 3
|
||||
|
||||
def test_phantom_metadata_counts_toward_batch(self, storage_root):
|
||||
objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)}
|
||||
_setup_bucket(storage_root, "mybucket", objects)
|
||||
for i in range(10):
|
||||
(storage_root / "mybucket" / f"file{i}.txt").unlink()
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=5)
|
||||
result = checker.run_now()
|
||||
assert result.objects_scanned <= 5
|
||||
|
||||
def test_all_check_types_contribute(self, storage_root):
|
||||
_setup_bucket(storage_root, "mybucket", {"valid.txt": b"hello"})
|
||||
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan")
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
result = checker.run_now()
|
||||
assert result.objects_scanned > 2
|
||||
|
||||
|
||||
class TestCursorRotation:
|
||||
def test_oldest_bucket_scanned_first(self, storage_root):
|
||||
_setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"})
|
||||
_setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"})
|
||||
_setup_bucket(storage_root, "bucket-c", {"c.txt": b"ccc"})
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=5)
|
||||
|
||||
checker.cursor_store.update_bucket("bucket-a", 1000.0)
|
||||
checker.cursor_store.update_bucket("bucket-b", 3000.0)
|
||||
checker.cursor_store.update_bucket("bucket-c", 2000.0)
|
||||
|
||||
ordered = checker.cursor_store.get_bucket_order(["bucket-a", "bucket-b", "bucket-c"])
|
||||
assert ordered[0] == "bucket-a"
|
||||
assert ordered[1] == "bucket-c"
|
||||
assert ordered[2] == "bucket-b"
|
||||
|
||||
def test_never_scanned_buckets_first(self, storage_root):
|
||||
_setup_bucket(storage_root, "bucket-old", {"a.txt": b"aaa"})
|
||||
_setup_bucket(storage_root, "bucket-new", {"b.txt": b"bbb"})
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
|
||||
checker.cursor_store.update_bucket("bucket-old", time.time())
|
||||
|
||||
ordered = checker.cursor_store.get_bucket_order(["bucket-old", "bucket-new"])
|
||||
assert ordered[0] == "bucket-new"
|
||||
|
||||
def test_rotation_covers_all_buckets(self, storage_root):
|
||||
for name in ["bucket-a", "bucket-b", "bucket-c"]:
|
||||
_setup_bucket(storage_root, name, {f"{name}.txt": name.encode()})
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
|
||||
|
||||
result1 = checker.run_now()
|
||||
scanned_buckets_1 = set()
|
||||
for issue_bucket in [storage_root]:
|
||||
pass
|
||||
assert result1.buckets_scanned >= 1
|
||||
|
||||
result2 = checker.run_now()
|
||||
result3 = checker.run_now()
|
||||
|
||||
cursor_info = checker.cursor_store.get_info()
|
||||
assert cursor_info["tracked_buckets"] == 3
|
||||
|
||||
def test_cursor_persistence(self, storage_root):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
checker1 = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
checker1.run_now()
|
||||
|
||||
cursor1 = checker1.cursor_store.get_info()
|
||||
assert cursor1["tracked_buckets"] == 1
|
||||
assert "mybucket" in cursor1["buckets"]
|
||||
|
||||
checker2 = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
cursor2 = checker2.cursor_store.get_info()
|
||||
assert cursor2["tracked_buckets"] == 1
|
||||
assert "mybucket" in cursor2["buckets"]
|
||||
|
||||
def test_stale_cursor_cleanup(self, storage_root):
|
||||
_setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"})
|
||||
_setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"})
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
checker.run_now()
|
||||
|
||||
import shutil
|
||||
shutil.rmtree(storage_root / "bucket-b")
|
||||
meta_b = storage_root / ".myfsio.sys" / "buckets" / "bucket-b"
|
||||
if meta_b.exists():
|
||||
shutil.rmtree(meta_b)
|
||||
|
||||
checker.run_now()
|
||||
|
||||
cursor_info = checker.cursor_store.get_info()
|
||||
assert "bucket-b" not in cursor_info["buckets"]
|
||||
assert "bucket-a" in cursor_info["buckets"]
|
||||
|
||||
def test_cursor_updates_after_scan(self, storage_root):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
|
||||
before = time.time()
|
||||
checker.run_now()
|
||||
after = time.time()
|
||||
|
||||
cursor_info = checker.cursor_store.get_info()
|
||||
ts = cursor_info["buckets"]["mybucket"]
|
||||
assert before <= ts <= after
|
||||
|
||||
Reference in New Issue
Block a user