Files
MyFSIO/tests/test_integrity.py

789 lines
30 KiB
Python

import hashlib
import json
import os
import sys
import time
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app.integrity import IntegrityChecker, IntegrityCursorStore, IntegrityResult
def _wait_scan_done(client, headers, timeout=10):
deadline = time.time() + timeout
while time.time() < deadline:
resp = client.get("/admin/integrity/status", headers=headers)
data = resp.get_json()
if not data.get("scanning"):
return
time.sleep(0.1)
raise TimeoutError("scan did not complete")
def _md5(data: bytes) -> str:
return hashlib.md5(data).hexdigest()
def _setup_bucket(storage_root: Path, bucket_name: str, objects: dict[str, bytes]) -> None:
bucket_path = storage_root / bucket_name
bucket_path.mkdir(parents=True, exist_ok=True)
meta_root = storage_root / ".myfsio.sys" / "buckets" / bucket_name / "meta"
meta_root.mkdir(parents=True, exist_ok=True)
bucket_json = storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".bucket.json"
bucket_json.write_text(json.dumps({"created": "2025-01-01"}))
for key, data in objects.items():
obj_path = bucket_path / key
obj_path.parent.mkdir(parents=True, exist_ok=True)
obj_path.write_bytes(data)
etag = _md5(data)
stat = obj_path.stat()
meta = {
"__etag__": etag,
"__size__": str(stat.st_size),
"__last_modified__": str(stat.st_mtime),
}
key_path = Path(key)
parent = key_path.parent
key_name = key_path.name
if parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / parent / "_index.json"
index_path.parent.mkdir(parents=True, exist_ok=True)
index_data = {}
if index_path.exists():
index_data = json.loads(index_path.read_text())
index_data[key_name] = {"metadata": meta}
index_path.write_text(json.dumps(index_data))
def _issues_of_type(result, issue_type):
return [i for i in result.issues if i.issue_type == issue_type]
@pytest.fixture
def storage_root(tmp_path):
root = tmp_path / "data"
root.mkdir()
(root / ".myfsio.sys" / "config").mkdir(parents=True, exist_ok=True)
return root
@pytest.fixture
def checker(storage_root):
return IntegrityChecker(
storage_root=storage_root,
interval_hours=24.0,
batch_size=1000,
auto_heal=False,
dry_run=False,
)
class TestCorruptedObjects:
def test_detect_corrupted(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
result = checker.run_now()
assert result.corrupted_objects == 1
issues = _issues_of_type(result, "corrupted_object")
assert len(issues) == 1
assert issues[0].bucket == "mybucket"
assert issues[0].key == "file.txt"
assert not issues[0].healed
def test_heal_corrupted(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
result = checker.run_now(auto_heal=True)
assert result.corrupted_objects == 1
assert result.issues_healed == 1
issues = _issues_of_type(result, "corrupted_object")
assert issues[0].healed
result2 = checker.run_now()
assert result2.corrupted_objects == 0
def test_valid_objects_pass(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
result = checker.run_now()
assert result.corrupted_objects == 0
assert result.objects_scanned >= 1
def test_corrupted_nested_key(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"})
(storage_root / "mybucket" / "sub" / "dir" / "file.txt").write_bytes(b"bad")
result = checker.run_now()
assert result.corrupted_objects == 1
issues = _issues_of_type(result, "corrupted_object")
assert issues[0].key == "sub/dir/file.txt"
class TestOrphanedObjects:
def test_detect_orphaned(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {})
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
result = checker.run_now()
assert result.orphaned_objects == 1
issues = _issues_of_type(result, "orphaned_object")
assert len(issues) == 1
def test_heal_orphaned(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {})
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
result = checker.run_now(auto_heal=True)
assert result.orphaned_objects == 1
assert result.issues_healed == 1
issues = _issues_of_type(result, "orphaned_object")
assert issues[0].healed
result2 = checker.run_now()
assert result2.orphaned_objects == 0
assert result2.objects_scanned >= 1
class TestPhantomMetadata:
def test_detect_phantom(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").unlink()
result = checker.run_now()
assert result.phantom_metadata == 1
issues = _issues_of_type(result, "phantom_metadata")
assert len(issues) == 1
def test_heal_phantom(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").unlink()
result = checker.run_now(auto_heal=True)
assert result.phantom_metadata == 1
assert result.issues_healed == 1
result2 = checker.run_now()
assert result2.phantom_metadata == 0
class TestStaleVersions:
def test_manifest_without_data(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
result = checker.run_now()
assert result.stale_versions == 1
issues = _issues_of_type(result, "stale_version")
assert "manifest without data" in issues[0].detail
def test_data_without_manifest(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.bin").write_bytes(b"old data")
result = checker.run_now()
assert result.stale_versions == 1
issues = _issues_of_type(result, "stale_version")
assert "data without manifest" in issues[0].detail
def test_heal_stale_versions(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
(versions_root / "v2.bin").write_bytes(b"old data")
result = checker.run_now(auto_heal=True)
assert result.stale_versions == 2
assert result.issues_healed == 2
assert not (versions_root / "v1.json").exists()
assert not (versions_root / "v2.bin").exists()
def test_valid_versions_pass(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
(versions_root / "v1.bin").write_bytes(b"old data")
result = checker.run_now()
assert result.stale_versions == 0
class TestEtagCache:
def test_detect_mismatch(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
result = checker.run_now()
assert result.etag_cache_inconsistencies == 1
issues = _issues_of_type(result, "etag_cache_inconsistency")
assert len(issues) == 1
def test_heal_mismatch(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
result = checker.run_now(auto_heal=True)
assert result.etag_cache_inconsistencies == 1
assert result.issues_healed == 1
assert not etag_path.exists()
class TestLegacyMetadata:
def test_detect_unmigrated(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_path = meta_root / "_index.json"
index_path.unlink()
result = checker.run_now()
assert result.legacy_metadata_drifts == 1
issues = _issues_of_type(result, "legacy_metadata_drift")
assert len(issues) == 1
assert issues[0].detail == "unmigrated legacy .meta.json"
def test_detect_drift(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
result = checker.run_now()
assert result.legacy_metadata_drifts == 1
issues = _issues_of_type(result, "legacy_metadata_drift")
assert "differs from index" in issues[0].detail
def test_heal_unmigrated(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_data = {"__etag__": _md5(b"hello"), "__size__": "5"}
legacy_meta.write_text(json.dumps(legacy_data))
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_path = meta_root / "_index.json"
index_path.unlink()
result = checker.run_now(auto_heal=True)
assert result.legacy_metadata_drifts == 1
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
assert len(legacy_issues) == 1
assert legacy_issues[0].healed
assert not legacy_meta.exists()
index_data = json.loads(index_path.read_text())
assert "file.txt" in index_data
assert index_data["file.txt"]["metadata"]["__etag__"] == _md5(b"hello")
def test_heal_drift(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
result = checker.run_now(auto_heal=True)
assert result.legacy_metadata_drifts == 1
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
assert legacy_issues[0].healed
assert not legacy_meta.exists()
class TestDryRun:
def test_dry_run_no_changes(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted")
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan")
result = checker.run_now(auto_heal=True, dry_run=True)
assert result.corrupted_objects == 1
assert result.orphaned_objects == 1
assert result.issues_healed == 0
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_data = json.loads((meta_root / "_index.json").read_text())
assert "orphan.txt" not in index_data
class TestBatchSize:
def test_batch_limits_scan(self, storage_root):
objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(
storage_root=storage_root,
batch_size=3,
)
result = checker.run_now()
assert result.objects_scanned <= 3
class TestHistory:
def test_history_recorded(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker.run_now()
history = checker.get_history()
assert len(history) == 1
assert "corrupted_objects" in history[0]["result"]
def test_history_multiple(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker.run_now()
checker.run_now()
checker.run_now()
history = checker.get_history()
assert len(history) == 3
def test_history_pagination(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
for _ in range(5):
checker.run_now()
history = checker.get_history(limit=2, offset=1)
assert len(history) == 2
AUTH_HEADERS = {"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}
class TestAdminAPI:
@pytest.fixture
def integrity_app(self, tmp_path):
from app import create_api_app
storage_root = tmp_path / "data"
iam_config = tmp_path / "iam.json"
bucket_policies = tmp_path / "bucket_policies.json"
iam_payload = {
"users": [
{
"access_key": "admin",
"secret_key": "adminsecret",
"display_name": "Admin",
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
}
]
}
iam_config.write_text(json.dumps(iam_payload))
flask_app = create_api_app({
"TESTING": True,
"SECRET_KEY": "testing",
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"INTEGRITY_ENABLED": True,
"INTEGRITY_AUTO_HEAL": False,
"INTEGRITY_DRY_RUN": False,
})
yield flask_app
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
ic = flask_app.extensions.get("integrity")
if ic:
ic.stop()
def test_status_endpoint(self, integrity_app):
client = integrity_app.test_client()
resp = client.get("/admin/integrity/status", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert data["enabled"] is True
assert "interval_hours" in data
def test_run_endpoint(self, integrity_app):
client = integrity_app.test_client()
resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "started"
_wait_scan_done(client, AUTH_HEADERS)
resp = client.get("/admin/integrity/history?limit=1", headers=AUTH_HEADERS)
hist = resp.get_json()
assert len(hist["executions"]) >= 1
assert "corrupted_objects" in hist["executions"][0]["result"]
assert "objects_scanned" in hist["executions"][0]["result"]
def test_run_with_overrides(self, integrity_app):
client = integrity_app.test_client()
resp = client.post(
"/admin/integrity/run",
headers=AUTH_HEADERS,
json={"dry_run": True, "auto_heal": True},
)
assert resp.status_code == 200
_wait_scan_done(client, AUTH_HEADERS)
def test_history_endpoint(self, integrity_app):
client = integrity_app.test_client()
client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
_wait_scan_done(client, AUTH_HEADERS)
resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert "executions" in data
assert len(data["executions"]) >= 1
def test_auth_required(self, integrity_app):
client = integrity_app.test_client()
resp = client.get("/admin/integrity/status")
assert resp.status_code in (401, 403)
def test_disabled_status(self, tmp_path):
from app import create_api_app
storage_root = tmp_path / "data2"
iam_config = tmp_path / "iam2.json"
bucket_policies = tmp_path / "bp2.json"
iam_payload = {
"users": [
{
"access_key": "admin",
"secret_key": "adminsecret",
"display_name": "Admin",
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
}
]
}
iam_config.write_text(json.dumps(iam_payload))
flask_app = create_api_app({
"TESTING": True,
"SECRET_KEY": "testing",
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"INTEGRITY_ENABLED": False,
})
c = flask_app.test_client()
resp = c.get("/admin/integrity/status", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert data["enabled"] is False
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
class TestMultipleBuckets:
def test_scans_multiple_buckets(self, storage_root, checker):
_setup_bucket(storage_root, "bucket1", {"a.txt": b"aaa"})
_setup_bucket(storage_root, "bucket2", {"b.txt": b"bbb"})
result = checker.run_now()
assert result.buckets_scanned == 2
assert result.objects_scanned >= 2
assert result.corrupted_objects == 0
class TestGetStatus:
def test_status_fields(self, checker):
status = checker.get_status()
assert "enabled" in status
assert "running" in status
assert "interval_hours" in status
assert "batch_size" in status
assert "auto_heal" in status
assert "dry_run" in status
def test_status_includes_cursor(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker.run_now()
status = checker.get_status()
assert "cursor" in status
assert status["cursor"]["tracked_buckets"] == 1
assert "mybucket" in status["cursor"]["buckets"]
class TestUnifiedBatchCounter:
def test_orphaned_objects_count_toward_batch(self, storage_root):
_setup_bucket(storage_root, "mybucket", {})
for i in range(10):
(storage_root / "mybucket" / f"orphan{i}.txt").write_bytes(f"data{i}".encode())
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
result = checker.run_now()
assert result.objects_scanned <= 3
def test_phantom_metadata_counts_toward_batch(self, storage_root):
objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
for i in range(10):
(storage_root / "mybucket" / f"file{i}.txt").unlink()
checker = IntegrityChecker(storage_root=storage_root, batch_size=5)
result = checker.run_now()
assert result.objects_scanned <= 5
def test_all_check_types_contribute(self, storage_root):
_setup_bucket(storage_root, "mybucket", {"valid.txt": b"hello"})
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan")
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
result = checker.run_now()
assert result.objects_scanned > 2
class TestCursorRotation:
def test_oldest_bucket_scanned_first(self, storage_root):
_setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"})
_setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"})
_setup_bucket(storage_root, "bucket-c", {"c.txt": b"ccc"})
checker = IntegrityChecker(storage_root=storage_root, batch_size=5)
checker.cursor_store.update_bucket("bucket-a", 1000.0)
checker.cursor_store.update_bucket("bucket-b", 3000.0)
checker.cursor_store.update_bucket("bucket-c", 2000.0)
ordered = checker.cursor_store.get_bucket_order(["bucket-a", "bucket-b", "bucket-c"])
assert ordered[0] == "bucket-a"
assert ordered[1] == "bucket-c"
assert ordered[2] == "bucket-b"
def test_never_scanned_buckets_first(self, storage_root):
_setup_bucket(storage_root, "bucket-old", {"a.txt": b"aaa"})
_setup_bucket(storage_root, "bucket-new", {"b.txt": b"bbb"})
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
checker.cursor_store.update_bucket("bucket-old", time.time())
ordered = checker.cursor_store.get_bucket_order(["bucket-old", "bucket-new"])
assert ordered[0] == "bucket-new"
def test_rotation_covers_all_buckets(self, storage_root):
for name in ["bucket-a", "bucket-b", "bucket-c"]:
_setup_bucket(storage_root, name, {f"{name}.txt": name.encode()})
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
result1 = checker.run_now()
scanned_buckets_1 = set()
for issue_bucket in [storage_root]:
pass
assert result1.buckets_scanned >= 1
result2 = checker.run_now()
result3 = checker.run_now()
cursor_info = checker.cursor_store.get_info()
assert cursor_info["tracked_buckets"] == 3
def test_cursor_persistence(self, storage_root):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker1 = IntegrityChecker(storage_root=storage_root, batch_size=1000)
checker1.run_now()
cursor1 = checker1.cursor_store.get_info()
assert cursor1["tracked_buckets"] == 1
assert "mybucket" in cursor1["buckets"]
checker2 = IntegrityChecker(storage_root=storage_root, batch_size=1000)
cursor2 = checker2.cursor_store.get_info()
assert cursor2["tracked_buckets"] == 1
assert "mybucket" in cursor2["buckets"]
def test_stale_cursor_cleanup(self, storage_root):
_setup_bucket(storage_root, "bucket-a", {"a.txt": b"aaa"})
_setup_bucket(storage_root, "bucket-b", {"b.txt": b"bbb"})
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
checker.run_now()
import shutil
shutil.rmtree(storage_root / "bucket-b")
meta_b = storage_root / ".myfsio.sys" / "buckets" / "bucket-b"
if meta_b.exists():
shutil.rmtree(meta_b)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
assert "bucket-b" not in cursor_info["buckets"]
assert "bucket-a" in cursor_info["buckets"]
def test_cursor_updates_after_scan(self, storage_root):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker = IntegrityChecker(storage_root=storage_root, batch_size=1000)
before = time.time()
checker.run_now()
after = time.time()
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert before <= entry["last_scanned"] <= after
assert entry["completed"] is True
class TestIntraBucketCursor:
def test_resumes_from_cursor_key(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
result1 = checker.run_now()
assert result1.objects_scanned == 3
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is not None
assert entry["completed"] is False
result2 = checker.run_now()
assert result2.objects_scanned == 3
cursor_after = checker.cursor_store.get_info()["buckets"]["mybucket"]
assert cursor_after["last_key"] > entry["last_key"]
def test_cursor_resets_after_full_pass(self, storage_root):
objects = {f"file_{i}.txt": f"data{i}".encode() for i in range(3)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
entry = cursor_info["buckets"]["mybucket"]
assert entry["last_key"] is None
assert entry["completed"] is True
def test_full_coverage_across_cycles(self, storage_root):
objects = {f"obj_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
all_scanned = 0
for _ in range(10):
result = checker.run_now()
all_scanned += result.objects_scanned
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
assert all_scanned >= 10
def test_deleted_cursor_key_skips_gracefully(self, storage_root):
objects = {f"file_{chr(ord('a') + i)}.txt": f"data{i}".encode() for i in range(6)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
obj_path = storage_root / "mybucket" / cursor_key
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
key_path = Path(cursor_key)
index_path = meta_root / key_path.parent / "_index.json" if key_path.parent != Path(".") else meta_root / "_index.json"
if key_path.parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / key_path.parent / "_index.json"
if obj_path.exists():
obj_path.unlink()
if index_path.exists():
index_data = json.loads(index_path.read_text())
index_data.pop(key_path.name, None)
index_path.write_text(json.dumps(index_data))
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_incomplete_buckets_prioritized(self, storage_root):
_setup_bucket(storage_root, "bucket-a", {f"a{i}.txt": b"a" for i in range(5)})
_setup_bucket(storage_root, "bucket-b", {f"b{i}.txt": b"b" for i in range(5)})
checker = IntegrityChecker(storage_root=storage_root, batch_size=3)
checker.run_now()
cursor_info = checker.cursor_store.get_info()
incomplete = [
name for name, info in cursor_info["buckets"].items()
if info.get("last_key") is not None
]
assert len(incomplete) >= 1
result2 = checker.run_now()
assert result2.objects_scanned > 0
def test_cursor_skips_nested_directories(self, storage_root):
objects = {
"aaa/file1.txt": b"a1",
"aaa/file2.txt": b"a2",
"bbb/file1.txt": b"b1",
"bbb/file2.txt": b"b2",
"ccc/file1.txt": b"c1",
"ccc/file2.txt": b"c2",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=4)
result1 = checker.run_now()
assert result1.objects_scanned == 4
cursor_info = checker.cursor_store.get_info()
cursor_key = cursor_info["buckets"]["mybucket"]["last_key"]
assert cursor_key is not None
assert cursor_key.startswith("aaa/") or cursor_key.startswith("bbb/")
result2 = checker.run_now()
assert result2.objects_scanned >= 2
all_scanned = result1.objects_scanned + result2.objects_scanned
for _ in range(10):
if checker.cursor_store.get_info()["buckets"]["mybucket"]["completed"]:
break
r = checker.run_now()
all_scanned += r.objects_scanned
assert all_scanned >= 6
def test_sorted_walk_order(self, storage_root):
objects = {
"bar.txt": b"bar",
"bar/inner.txt": b"inner",
"abc.txt": b"abc",
"zzz/deep.txt": b"deep",
}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(storage_root=storage_root, batch_size=100)
result = checker.run_now()
assert result.objects_scanned >= 4
assert result.total_issues == 0