Add integrity scanner: background detection and healing of corrupted objects, orphaned files, phantom metadata, stale versions, etag cache inconsistencies, and legacy metadata drift

This commit is contained in:
2026-03-10 22:14:39 +08:00
parent 9da7104887
commit a4ae81c77c
8 changed files with 1509 additions and 19 deletions

View File

@@ -30,6 +30,7 @@ from .extensions import limiter, csrf
from .iam import IamService
from .kms import KMSManager
from .gc import GarbageCollector
from .integrity import IntegrityChecker
from .lifecycle import LifecycleManager
from .notifications import NotificationService
from .object_lock import ObjectLockService
@@ -234,6 +235,17 @@ def create_app(
)
gc_collector.start()
integrity_checker = None
if app.config.get("INTEGRITY_ENABLED", False):
integrity_checker = IntegrityChecker(
storage_root=storage_root,
interval_hours=app.config.get("INTEGRITY_INTERVAL_HOURS", 24.0),
batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000),
auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False),
dry_run=app.config.get("INTEGRITY_DRY_RUN", False),
)
integrity_checker.start()
app.extensions["object_storage"] = storage
app.extensions["iam"] = iam
app.extensions["bucket_policies"] = bucket_policies
@@ -246,6 +258,7 @@ def create_app(
app.extensions["acl"] = acl_service
app.extensions["lifecycle"] = lifecycle_manager
app.extensions["gc"] = gc_collector
app.extensions["integrity"] = integrity_checker
app.extensions["object_lock"] = object_lock_service
app.extensions["notifications"] = notification_service
app.extensions["access_logging"] = access_logging_service

View File

@@ -15,6 +15,7 @@ from flask import Blueprint, Response, current_app, jsonify, request
from .connections import ConnectionStore
from .extensions import limiter
from .gc import GarbageCollector
from .integrity import IntegrityChecker
from .iam import IamError, Principal
from .replication import ReplicationManager
from .site_registry import PeerSite, SiteInfo, SiteRegistry
@@ -829,3 +830,54 @@ def gc_history():
offset = int(request.args.get("offset", 0))
records = gc.get_history(limit=limit, offset=offset)
return jsonify({"executions": records})
def _integrity() -> Optional[IntegrityChecker]:
return current_app.extensions.get("integrity")
@admin_api_bp.route("/integrity/status", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def integrity_status():
principal, error = _require_admin()
if error:
return error
checker = _integrity()
if not checker:
return jsonify({"enabled": False, "message": "Integrity checker is not enabled. Set INTEGRITY_ENABLED=true to enable."})
return jsonify(checker.get_status())
@admin_api_bp.route("/integrity/run", methods=["POST"])
@limiter.limit(lambda: _get_admin_rate_limit())
def integrity_run_now():
principal, error = _require_admin()
if error:
return error
checker = _integrity()
if not checker:
return _json_error("InvalidRequest", "Integrity checker is not enabled", 400)
payload = request.get_json(silent=True) or {}
override_dry_run = payload.get("dry_run")
override_auto_heal = payload.get("auto_heal")
result = checker.run_now(
auto_heal=override_auto_heal if override_auto_heal is not None else None,
dry_run=override_dry_run if override_dry_run is not None else None,
)
logger.info("Integrity manual run by %s", principal.access_key)
return jsonify(result.to_dict())
@admin_api_bp.route("/integrity/history", methods=["GET"])
@limiter.limit(lambda: _get_admin_rate_limit())
def integrity_history():
principal, error = _require_admin()
if error:
return error
checker = _integrity()
if not checker:
return jsonify({"executions": []})
limit = min(int(request.args.get("limit", 50)), 200)
offset = int(request.args.get("offset", 0))
records = checker.get_history(limit=limit, offset=offset)
return jsonify({"executions": records})

View File

@@ -156,6 +156,11 @@ class AppConfig:
gc_multipart_max_age_days: int
gc_lock_file_max_age_hours: float
gc_dry_run: bool
integrity_enabled: bool
integrity_interval_hours: float
integrity_batch_size: int
integrity_auto_heal: bool
integrity_dry_run: bool
@classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -331,6 +336,11 @@ class AppConfig:
gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7))
gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0))
gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0))
integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000))
integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"}
integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
return cls(storage_root=storage_root,
max_upload_size=max_upload_size,
@@ -424,7 +434,12 @@ class AppConfig:
gc_temp_file_max_age_hours=gc_temp_file_max_age_hours,
gc_multipart_max_age_days=gc_multipart_max_age_days,
gc_lock_file_max_age_hours=gc_lock_file_max_age_hours,
gc_dry_run=gc_dry_run)
gc_dry_run=gc_dry_run,
integrity_enabled=integrity_enabled,
integrity_interval_hours=integrity_interval_hours,
integrity_batch_size=integrity_batch_size,
integrity_auto_heal=integrity_auto_heal,
integrity_dry_run=integrity_dry_run)
def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues.
@@ -641,4 +656,9 @@ class AppConfig:
"GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days,
"GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours,
"GC_DRY_RUN": self.gc_dry_run,
"INTEGRITY_ENABLED": self.integrity_enabled,
"INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours,
"INTEGRITY_BATCH_SIZE": self.integrity_batch_size,
"INTEGRITY_AUTO_HEAL": self.integrity_auto_heal,
"INTEGRITY_DRY_RUN": self.integrity_dry_run,
}

738
app/integrity.py Normal file
View File

@@ -0,0 +1,738 @@
from __future__ import annotations
import hashlib
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import myfsio_core as _rc
_HAS_RUST = True
except ImportError:
_HAS_RUST = False
logger = logging.getLogger(__name__)
def _compute_etag(path: Path) -> str:
if _HAS_RUST:
return _rc.md5_file(str(path))
checksum = hashlib.md5()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(8192), b""):
checksum.update(chunk)
return checksum.hexdigest()
@dataclass
class IntegrityIssue:
issue_type: str
bucket: str
key: str
detail: str
healed: bool = False
heal_action: str = ""
def to_dict(self) -> dict:
return {
"issue_type": self.issue_type,
"bucket": self.bucket,
"key": self.key,
"detail": self.detail,
"healed": self.healed,
"heal_action": self.heal_action,
}
@dataclass
class IntegrityResult:
corrupted_objects: int = 0
orphaned_objects: int = 0
phantom_metadata: int = 0
stale_versions: int = 0
etag_cache_inconsistencies: int = 0
legacy_metadata_drifts: int = 0
issues_healed: int = 0
issues: List[IntegrityIssue] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
objects_scanned: int = 0
buckets_scanned: int = 0
execution_time_seconds: float = 0.0
def to_dict(self) -> dict:
return {
"corrupted_objects": self.corrupted_objects,
"orphaned_objects": self.orphaned_objects,
"phantom_metadata": self.phantom_metadata,
"stale_versions": self.stale_versions,
"etag_cache_inconsistencies": self.etag_cache_inconsistencies,
"legacy_metadata_drifts": self.legacy_metadata_drifts,
"issues_healed": self.issues_healed,
"issues": [i.to_dict() for i in self.issues],
"errors": self.errors,
"objects_scanned": self.objects_scanned,
"buckets_scanned": self.buckets_scanned,
"execution_time_seconds": self.execution_time_seconds,
}
@property
def total_issues(self) -> int:
return (
self.corrupted_objects
+ self.orphaned_objects
+ self.phantom_metadata
+ self.stale_versions
+ self.etag_cache_inconsistencies
+ self.legacy_metadata_drifts
)
@property
def has_issues(self) -> bool:
return self.total_issues > 0
@dataclass
class IntegrityExecutionRecord:
timestamp: float
result: dict
dry_run: bool
auto_heal: bool
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"result": self.result,
"dry_run": self.dry_run,
"auto_heal": self.auto_heal,
}
@classmethod
def from_dict(cls, data: dict) -> IntegrityExecutionRecord:
return cls(
timestamp=data["timestamp"],
result=data["result"],
dry_run=data.get("dry_run", False),
auto_heal=data.get("auto_heal", False),
)
class IntegrityHistoryStore:
def __init__(self, storage_root: Path, max_records: int = 50) -> None:
self.storage_root = storage_root
self.max_records = max_records
self._lock = threading.Lock()
def _get_path(self) -> Path:
return self.storage_root / ".myfsio.sys" / "config" / "integrity_history.json"
def load(self) -> List[IntegrityExecutionRecord]:
path = self._get_path()
if not path.exists():
return []
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return [IntegrityExecutionRecord.from_dict(d) for d in data.get("executions", [])]
except (OSError, ValueError, KeyError) as e:
logger.error("Failed to load integrity history: %s", e)
return []
def save(self, records: List[IntegrityExecutionRecord]) -> None:
path = self._get_path()
path.parent.mkdir(parents=True, exist_ok=True)
data = {"executions": [r.to_dict() for r in records[: self.max_records]]}
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except OSError as e:
logger.error("Failed to save integrity history: %s", e)
def add(self, record: IntegrityExecutionRecord) -> None:
with self._lock:
records = self.load()
records.insert(0, record)
self.save(records)
def get_history(self, limit: int = 50, offset: int = 0) -> List[IntegrityExecutionRecord]:
return self.load()[offset : offset + limit]
MAX_ISSUES = 500
class IntegrityChecker:
SYSTEM_ROOT = ".myfsio.sys"
SYSTEM_BUCKETS_DIR = "buckets"
BUCKET_META_DIR = "meta"
BUCKET_VERSIONS_DIR = "versions"
INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"}
def __init__(
self,
storage_root: Path,
interval_hours: float = 24.0,
batch_size: int = 1000,
auto_heal: bool = False,
dry_run: bool = False,
max_history: int = 50,
) -> None:
self.storage_root = Path(storage_root)
self.interval_seconds = interval_hours * 3600.0
self.batch_size = batch_size
self.auto_heal = auto_heal
self.dry_run = dry_run
self._timer: Optional[threading.Timer] = None
self._shutdown = False
self._lock = threading.Lock()
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
def start(self) -> None:
if self._timer is not None:
return
self._shutdown = False
self._schedule_next()
logger.info(
"Integrity checker started: interval=%.1fh, batch_size=%d, auto_heal=%s, dry_run=%s",
self.interval_seconds / 3600.0,
self.batch_size,
self.auto_heal,
self.dry_run,
)
def stop(self) -> None:
self._shutdown = True
if self._timer:
self._timer.cancel()
self._timer = None
logger.info("Integrity checker stopped")
def _schedule_next(self) -> None:
if self._shutdown:
return
self._timer = threading.Timer(self.interval_seconds, self._run_cycle)
self._timer.daemon = True
self._timer.start()
def _run_cycle(self) -> None:
if self._shutdown:
return
try:
self.run_now()
except Exception as e:
logger.error("Integrity check cycle failed: %s", e)
finally:
self._schedule_next()
def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult:
effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
effective_dry_run = dry_run if dry_run is not None else self.dry_run
start = time.time()
result = IntegrityResult()
bucket_names = self._list_bucket_names()
for bucket_name in bucket_names:
if result.objects_scanned >= self.batch_size:
break
result.buckets_scanned += 1
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
result.execution_time_seconds = time.time() - start
if result.has_issues or result.errors:
logger.info(
"Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
"stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
result.execution_time_seconds,
result.corrupted_objects,
result.orphaned_objects,
result.phantom_metadata,
result.stale_versions,
result.etag_cache_inconsistencies,
result.legacy_metadata_drifts,
result.issues_healed,
len(result.errors),
" (dry run)" if effective_dry_run else "",
)
record = IntegrityExecutionRecord(
timestamp=time.time(),
result=result.to_dict(),
dry_run=effective_dry_run,
auto_heal=effective_auto_heal,
)
self.history_store.add(record)
return result
def _system_path(self) -> Path:
return self.storage_root / self.SYSTEM_ROOT
def _list_bucket_names(self) -> List[str]:
names = []
try:
for entry in self.storage_root.iterdir():
if entry.is_dir() and entry.name != self.SYSTEM_ROOT:
names.append(entry.name)
except OSError:
pass
return names
def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None:
if len(result.issues) < MAX_ISSUES:
result.issues.append(issue)
def _check_corrupted_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return
try:
for index_file in meta_root.rglob("_index.json"):
if result.objects_scanned >= self.batch_size:
return
if not index_file.is_file():
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
for key_name, entry in list(index_data.items()):
if result.objects_scanned >= self.batch_size:
return
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
object_path = bucket_path / full_key
if not object_path.exists():
continue
result.objects_scanned += 1
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if not stored_etag:
continue
try:
actual_etag = _compute_etag(object_path)
except OSError:
continue
if actual_etag != stored_etag:
result.corrupted_objects += 1
issue = IntegrityIssue(
issue_type="corrupted_object",
bucket=bucket_name,
key=full_key,
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
)
if auto_heal and not dry_run:
try:
stat = object_path.stat()
meta["__etag__"] = actual_etag
meta["__size__"] = str(stat.st_size)
meta["__last_modified__"] = str(stat.st_mtime)
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_file, index_data)
issue.healed = True
issue.heal_action = "updated etag in index"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check corrupted {bucket_name}: {e}")
def _check_orphaned_objects(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
try:
for entry in bucket_path.rglob("*"):
if result.objects_scanned >= self.batch_size:
return
if not entry.is_file():
continue
try:
rel = entry.relative_to(bucket_path)
except ValueError:
continue
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
continue
full_key = rel.as_posix()
key_name = rel.name
parent = rel.parent
if parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / parent / "_index.json"
has_entry = False
if index_path.exists():
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
has_entry = key_name in index_data
except (OSError, json.JSONDecodeError):
pass
if not has_entry:
result.orphaned_objects += 1
issue = IntegrityIssue(
issue_type="orphaned_object",
bucket=bucket_name,
key=full_key,
detail="file exists without metadata entry",
)
if auto_heal and not dry_run:
try:
etag = _compute_etag(entry)
stat = entry.stat()
meta = {
"__etag__": etag,
"__size__": str(stat.st_size),
"__last_modified__": str(stat.st_mtime),
}
index_data = {}
if index_path.exists():
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
index_data[key_name] = {"metadata": meta}
self._atomic_write_index(index_path, index_data)
issue.healed = True
issue.heal_action = "created metadata entry"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal orphaned {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check orphaned {bucket_name}: {e}")
def _check_phantom_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
bucket_path = self.storage_root / bucket_name
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return
try:
for index_file in meta_root.rglob("_index.json"):
if not index_file.is_file():
continue
try:
index_data = json.loads(index_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
keys_to_remove = []
for key_name in list(index_data.keys()):
rel_dir = index_file.parent.relative_to(meta_root)
if rel_dir == Path("."):
full_key = key_name
else:
full_key = rel_dir.as_posix() + "/" + key_name
object_path = bucket_path / full_key
if not object_path.exists():
result.phantom_metadata += 1
issue = IntegrityIssue(
issue_type="phantom_metadata",
bucket=bucket_name,
key=full_key,
detail="metadata entry without file on disk",
)
if auto_heal and not dry_run:
keys_to_remove.append(key_name)
issue.healed = True
issue.heal_action = "removed stale index entry"
result.issues_healed += 1
self._add_issue(result, issue)
if keys_to_remove and auto_heal and not dry_run:
try:
for k in keys_to_remove:
index_data.pop(k, None)
if index_data:
self._atomic_write_index(index_file, index_data)
else:
index_file.unlink(missing_ok=True)
except OSError as e:
result.errors.append(f"heal phantom {bucket_name}: {e}")
except OSError as e:
result.errors.append(f"check phantom {bucket_name}: {e}")
def _check_stale_versions(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR
if not versions_root.exists():
return
try:
for key_dir in versions_root.rglob("*"):
if not key_dir.is_dir():
continue
bin_files = {f.stem: f for f in key_dir.glob("*.bin")}
json_files = {f.stem: f for f in key_dir.glob("*.json")}
for stem, bin_file in bin_files.items():
if stem not in json_files:
result.stale_versions += 1
issue = IntegrityIssue(
issue_type="stale_version",
bucket=bucket_name,
key=f"{key_dir.relative_to(versions_root).as_posix()}/{bin_file.name}",
detail="version data without manifest",
)
if auto_heal and not dry_run:
try:
bin_file.unlink(missing_ok=True)
issue.healed = True
issue.heal_action = "removed orphaned version data"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal stale version {bin_file}: {e}")
self._add_issue(result, issue)
for stem, json_file in json_files.items():
if stem not in bin_files:
result.stale_versions += 1
issue = IntegrityIssue(
issue_type="stale_version",
bucket=bucket_name,
key=f"{key_dir.relative_to(versions_root).as_posix()}/{json_file.name}",
detail="version manifest without data",
)
if auto_heal and not dry_run:
try:
json_file.unlink(missing_ok=True)
issue.healed = True
issue.heal_action = "removed orphaned version manifest"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal stale version {json_file}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check stale versions {bucket_name}: {e}")
def _check_etag_cache(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json"
if not etag_index_path.exists():
return
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
if not meta_root.exists():
return
try:
etag_cache = json.loads(etag_index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return
found_mismatch = False
for full_key, cached_etag in etag_cache.items():
key_path = Path(full_key)
key_name = key_path.name
parent = key_path.parent
if parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / parent / "_index.json"
if not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
entry = index_data.get(key_name)
if not entry:
continue
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
stored_etag = meta.get("__etag__")
if stored_etag and cached_etag != stored_etag:
result.etag_cache_inconsistencies += 1
found_mismatch = True
issue = IntegrityIssue(
issue_type="etag_cache_inconsistency",
bucket=bucket_name,
key=full_key,
detail=f"cached_etag={cached_etag} index_etag={stored_etag}",
)
self._add_issue(result, issue)
if found_mismatch and auto_heal and not dry_run:
try:
etag_index_path.unlink(missing_ok=True)
for issue in result.issues:
if issue.issue_type == "etag_cache_inconsistency" and issue.bucket == bucket_name and not issue.healed:
issue.healed = True
issue.heal_action = "deleted etag_index.json"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal etag cache {bucket_name}: {e}")
def _check_legacy_metadata(
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
) -> None:
legacy_meta_root = self.storage_root / bucket_name / ".meta"
if not legacy_meta_root.exists():
return
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
try:
for meta_file in legacy_meta_root.rglob("*.meta.json"):
if not meta_file.is_file():
continue
try:
rel = meta_file.relative_to(legacy_meta_root)
except ValueError:
continue
full_key = rel.as_posix().removesuffix(".meta.json")
key_path = Path(full_key)
key_name = key_path.name
parent = key_path.parent
if parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / parent / "_index.json"
try:
legacy_data = json.loads(meta_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
index_entry = None
if index_path.exists():
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
index_entry = index_data.get(key_name)
except (OSError, json.JSONDecodeError):
pass
if index_entry is None:
result.legacy_metadata_drifts += 1
issue = IntegrityIssue(
issue_type="legacy_metadata_drift",
bucket=bucket_name,
key=full_key,
detail="unmigrated legacy .meta.json",
)
if auto_heal and not dry_run:
try:
index_data = {}
if index_path.exists():
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
index_data[key_name] = {"metadata": legacy_data}
self._atomic_write_index(index_path, index_data)
meta_file.unlink(missing_ok=True)
issue.healed = True
issue.heal_action = "migrated to index and deleted legacy file"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal legacy {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
else:
index_meta = index_entry.get("metadata", {}) if isinstance(index_entry, dict) else {}
if legacy_data != index_meta:
result.legacy_metadata_drifts += 1
issue = IntegrityIssue(
issue_type="legacy_metadata_drift",
bucket=bucket_name,
key=full_key,
detail="legacy .meta.json differs from index entry",
)
if auto_heal and not dry_run:
try:
meta_file.unlink(missing_ok=True)
issue.healed = True
issue.heal_action = "deleted legacy file (index is authoritative)"
result.issues_healed += 1
except OSError as e:
result.errors.append(f"heal legacy drift {bucket_name}/{full_key}: {e}")
self._add_issue(result, issue)
except OSError as e:
result.errors.append(f"check legacy meta {bucket_name}: {e}")
@staticmethod
def _atomic_write_index(index_path: Path, data: Dict[str, Any]) -> None:
index_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = index_path.with_suffix(".tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f)
os.replace(str(tmp_path), str(index_path))
except BaseException:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
raise
def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]:
records = self.history_store.get_history(limit, offset)
return [r.to_dict() for r in records]
def get_status(self) -> dict:
return {
"enabled": not self._shutdown or self._timer is not None,
"running": self._timer is not None and not self._shutdown,
"interval_hours": self.interval_seconds / 3600.0,
"batch_size": self.batch_size,
"auto_heal": self.auto_heal,
"dry_run": self.dry_run,
}

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.3.8"
APP_VERSION = "0.3.9"
def get_version() -> str:

63
docs.md
View File

@@ -356,6 +356,69 @@ The application automatically trusts these headers to generate correct presigned
| `ALLOWED_REDIRECT_HOSTS` | `""` | Comma-separated whitelist of safe redirect targets. Empty allows only same-host redirects. |
| `ALLOW_INTERNAL_ENDPOINTS` | `false` | Allow connections to internal/private IPs for webhooks and replication targets. **Keep disabled in production unless needed.** |
## Integrity Scanner
The integrity scanner detects and optionally auto-repairs data inconsistencies: corrupted objects (ETag mismatch), orphaned files without metadata, phantom metadata without files, stale version archives, ETag cache drift, and unmigrated legacy `.meta.json` files.
### Enabling Integrity Scanner
By default, the integrity scanner is disabled. Enable it by setting:
```bash
INTEGRITY_ENABLED=true python run.py
```
Or in your `myfsio.env` file:
```
INTEGRITY_ENABLED=true
INTEGRITY_INTERVAL_HOURS=24 # Run every 24 hours (default)
INTEGRITY_BATCH_SIZE=1000 # Max objects to scan per cycle
INTEGRITY_AUTO_HEAL=false # Automatically repair detected issues
INTEGRITY_DRY_RUN=false # Set to true to log without healing
```
### What Gets Checked
| Check | Detection | Heal Action |
|-------|-----------|-------------|
| **Corrupted objects** | File MD5 does not match stored `__etag__` | Update `__etag__` in index (disk data is authoritative) |
| **Orphaned objects** | File exists on disk without metadata entry | Create index entry with computed MD5/size/mtime |
| **Phantom metadata** | Index entry exists but file is missing from disk | Remove stale entry from `_index.json` |
| **Stale versions** | `.json` manifest without `.bin` data or vice versa | Remove orphaned version file |
| **ETag cache inconsistency** | `etag_index.json` entry differs from metadata `__etag__` | Delete `etag_index.json` (auto-rebuilt on next list) |
| **Legacy metadata drift** | Legacy `.meta.json` differs from index or is unmigrated | Migrate to index and delete legacy file |
### Admin API
All integrity endpoints require admin (`iam:*`) permissions.
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/admin/integrity/status` | Get scanner status and configuration |
| `POST` | `/admin/integrity/run` | Trigger a manual scan (body: `{"dry_run": true, "auto_heal": true}`) |
| `GET` | `/admin/integrity/history` | Get scan history (query: `?limit=50&offset=0`) |
### Dry Run Mode
Set `INTEGRITY_DRY_RUN=true` to log detected issues without making any changes. You can also trigger a one-time dry run via the admin API:
```bash
curl -X POST "http://localhost:5000/admin/integrity/run" \
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
-H "Content-Type: application/json" \
-d '{"dry_run": true, "auto_heal": true}'
```
### Configuration Reference
| Variable | Default | Description |
|----------|---------|-------------|
| `INTEGRITY_ENABLED` | `false` | Enable background integrity scanning |
| `INTEGRITY_INTERVAL_HOURS` | `24` | Hours between scan cycles |
| `INTEGRITY_BATCH_SIZE` | `1000` | Max objects to scan per cycle |
| `INTEGRITY_AUTO_HEAL` | `false` | Automatically repair detected issues |
| `INTEGRITY_DRY_RUN` | `false` | Log issues without healing |
## 4. Upgrading and Updates
### Version Checking

View File

@@ -41,6 +41,7 @@
<li><a href="#encryption">Encryption</a></li>
<li><a href="#lifecycle">Lifecycle Rules</a></li>
<li><a href="#garbage-collection">Garbage Collection</a></li>
<li><a href="#integrity">Integrity Scanner</a></li>
<li><a href="#metrics">Metrics History</a></li>
<li><a href="#operation-metrics">Operation Metrics</a></li>
<li><a href="#troubleshooting">Troubleshooting</a></li>
@@ -1731,10 +1732,114 @@ curl "{{ api_base }}/admin/gc/history?limit=10" \
</div>
</div>
</article>
<article id="metrics" class="card shadow-sm docs-section">
<article id="integrity" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">15</span>
<h2 class="h4 mb-0">Integrity Scanner</h2>
</div>
<p class="text-muted">Detect and optionally auto-repair data inconsistencies: corrupted objects, orphaned files, phantom metadata, stale versions, ETag cache drift, and unmigrated legacy metadata.</p>
<h3 class="h6 text-uppercase text-muted mt-4">Enabling Integrity Scanner</h3>
<p class="small text-muted">Disabled by default. Enable via environment variable:</p>
<pre class="mb-3"><code class="language-bash">INTEGRITY_ENABLED=true python run.py</code></pre>
<h3 class="h6 text-uppercase text-muted mt-4">Configuration</h3>
<div class="table-responsive mb-3">
<table class="table table-sm table-bordered small">
<thead class="table-light">
<tr>
<th>Variable</th>
<th>Default</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr><td><code>INTEGRITY_ENABLED</code></td><td><code>false</code></td><td>Enable background integrity scanning</td></tr>
<tr><td><code>INTEGRITY_INTERVAL_HOURS</code></td><td><code>24</code></td><td>Hours between scan cycles</td></tr>
<tr><td><code>INTEGRITY_BATCH_SIZE</code></td><td><code>1000</code></td><td>Max objects to scan per cycle</td></tr>
<tr><td><code>INTEGRITY_AUTO_HEAL</code></td><td><code>false</code></td><td>Automatically repair detected issues</td></tr>
<tr><td><code>INTEGRITY_DRY_RUN</code></td><td><code>false</code></td><td>Log issues without healing</td></tr>
</tbody>
</table>
</div>
<h3 class="h6 text-uppercase text-muted mt-4">What Gets Checked</h3>
<div class="table-responsive mb-3">
<table class="table table-sm table-bordered small">
<thead class="table-light">
<tr>
<th>Check</th>
<th>Detection</th>
<th>Heal Action</th>
</tr>
</thead>
<tbody>
<tr><td><strong>Corrupted objects</strong></td><td>File MD5 does not match stored ETag</td><td>Update ETag in index (disk is authoritative)</td></tr>
<tr><td><strong>Orphaned objects</strong></td><td>File exists without metadata entry</td><td>Create index entry with computed MD5/size/mtime</td></tr>
<tr><td><strong>Phantom metadata</strong></td><td>Index entry exists but file is missing</td><td>Remove stale entry from index</td></tr>
<tr><td><strong>Stale versions</strong></td><td>Manifest without data or vice versa</td><td>Remove orphaned version file</td></tr>
<tr><td><strong>ETag cache</strong></td><td><code>etag_index.json</code> differs from metadata</td><td>Delete cache file (auto-rebuilt)</td></tr>
<tr><td><strong>Legacy metadata</strong></td><td>Legacy <code>.meta.json</code> differs or unmigrated</td><td>Migrate to index, delete legacy file</td></tr>
</tbody>
</table>
</div>
<h3 class="h6 text-uppercase text-muted mt-4">Admin API</h3>
<div class="table-responsive mb-3">
<table class="table table-sm table-bordered small">
<thead class="table-light">
<tr>
<th>Method</th>
<th>Route</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr><td><code>GET</code></td><td><code>/admin/integrity/status</code></td><td>Get scanner status and configuration</td></tr>
<tr><td><code>POST</code></td><td><code>/admin/integrity/run</code></td><td>Trigger manual scan</td></tr>
<tr><td><code>GET</code></td><td><code>/admin/integrity/history</code></td><td>Get scan history</td></tr>
</tbody>
</table>
</div>
<pre class="mb-3"><code class="language-bash"># Trigger a dry run with auto-heal preview
curl -X POST "{{ api_base }}/admin/integrity/run" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;" \
-H "Content-Type: application/json" \
-d '{"dry_run": true, "auto_heal": true}'
# Trigger actual scan with healing
curl -X POST "{{ api_base }}/admin/integrity/run" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;" \
-H "Content-Type: application/json" \
-d '{"auto_heal": true}'
# Check status
curl "{{ api_base }}/admin/integrity/status" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;"
# View history
curl "{{ api_base }}/admin/integrity/history?limit=10" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;"</code></pre>
<div class="alert alert-light border mb-0">
<div class="d-flex gap-2">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-info-circle text-muted mt-1 flex-shrink-0" viewBox="0 0 16 16">
<path d="M8 15A7 7 0 1 1 8 1a7 7 0 0 1 0 14zm0 1A8 8 0 1 0 8 0a8 8 0 0 0 0 16z"/>
<path d="m8.93 6.588-2.29.287-.082.38.45.083c.294.07.352.176.288.469l-.738 3.468c-.194.897.105 1.319.808 1.319.545 0 1.178-.252 1.465-.598l.088-.416c-.2.176-.492.246-.686.246-.275 0-.375-.193-.304-.533L8.93 6.588zM9 4.5a1 1 0 1 1-2 0 1 1 0 0 1 2 0z"/>
</svg>
<div>
<strong>Dry Run:</strong> Use <code>INTEGRITY_DRY_RUN=true</code> or pass <code>{"dry_run": true}</code> to the API to preview detected issues without making any changes. Combine with <code>{"auto_heal": true}</code> to see what would be repaired.
</div>
</div>
</div>
</div>
</article>
<article id="metrics" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">16</span>
<h2 class="h4 mb-0">Metrics History</h2>
</div>
<p class="text-muted">Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.</p>
@@ -1818,7 +1923,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
<article id="operation-metrics" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">16</span>
<span class="docs-section-kicker">17</span>
<h2 class="h4 mb-0">Operation Metrics</h2>
</div>
<p class="text-muted">Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.</p>
@@ -1925,7 +2030,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
<article id="troubleshooting" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">17</span>
<span class="docs-section-kicker">18</span>
<h2 class="h4 mb-0">Troubleshooting &amp; tips</h2>
</div>
<div class="table-responsive">
@@ -1976,7 +2081,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
<article id="health-check" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">18</span>
<span class="docs-section-kicker">19</span>
<h2 class="h4 mb-0">Health Check Endpoint</h2>
</div>
<p class="text-muted">The API exposes a health check endpoint for monitoring and load balancer integration.</p>
@@ -1998,7 +2103,7 @@ curl {{ api_base }}/myfsio/health
<article id="object-lock" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">19</span>
<span class="docs-section-kicker">20</span>
<h2 class="h4 mb-0">Object Lock &amp; Retention</h2>
</div>
<p class="text-muted">Object Lock prevents objects from being deleted or overwritten for a specified retention period.</p>
@@ -2058,7 +2163,7 @@ curl "{{ api_base }}/&lt;bucket&gt;/&lt;key&gt;?legal-hold" \
<article id="access-logging" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">20</span>
<span class="docs-section-kicker">21</span>
<h2 class="h4 mb-0">Access Logging</h2>
</div>
<p class="text-muted">Enable S3-style access logging to track all requests to your buckets for audit and analysis.</p>
@@ -2085,7 +2190,7 @@ curl "{{ api_base }}/&lt;bucket&gt;?logging" \
<article id="notifications" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">21</span>
<span class="docs-section-kicker">22</span>
<h2 class="h4 mb-0">Notifications &amp; Webhooks</h2>
</div>
<p class="text-muted">Configure event notifications to trigger webhooks when objects are created or deleted.</p>
@@ -2148,7 +2253,7 @@ curl -X PUT "{{ api_base }}/&lt;bucket&gt;?notification" \
<article id="select-content" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">22</span>
<span class="docs-section-kicker">23</span>
<h2 class="h4 mb-0">SelectObjectContent (SQL)</h2>
</div>
<p class="text-muted">Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.</p>
@@ -2193,7 +2298,7 @@ curl -X POST "{{ api_base }}/&lt;bucket&gt;/data.csv?select" \
<article id="advanced-ops" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">23</span>
<span class="docs-section-kicker">24</span>
<h2 class="h4 mb-0">Advanced S3 Operations</h2>
</div>
<p class="text-muted">Copy, move, and partially download objects using advanced S3 operations.</p>
@@ -2267,7 +2372,7 @@ curl "{{ api_base }}/&lt;bucket&gt;/&lt;key&gt;" \
<article id="acls" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">24</span>
<span class="docs-section-kicker">25</span>
<h2 class="h4 mb-0">Access Control Lists (ACLs)</h2>
</div>
<p class="text-muted">ACLs provide legacy-style permission management for buckets and objects.</p>
@@ -2321,7 +2426,7 @@ curl -X PUT "{{ api_base }}/&lt;bucket&gt;/&lt;key&gt;" \
<article id="tagging" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">25</span>
<span class="docs-section-kicker">26</span>
<h2 class="h4 mb-0">Object &amp; Bucket Tagging</h2>
</div>
<p class="text-muted">Add metadata tags to buckets and objects for organization, cost allocation, or lifecycle rule filtering.</p>
@@ -2380,7 +2485,7 @@ curl -X PUT "{{ api_base }}/&lt;bucket&gt;?tagging" \
<article id="website-hosting" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">26</span>
<span class="docs-section-kicker">27</span>
<h2 class="h4 mb-0">Static Website Hosting</h2>
</div>
<p class="text-muted">Host static websites directly from S3 buckets with custom index and error pages, served via custom domain mapping.</p>
@@ -2473,7 +2578,7 @@ server {
<article id="cors-config" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">27</span>
<span class="docs-section-kicker">28</span>
<h2 class="h4 mb-0">CORS Configuration</h2>
</div>
<p class="text-muted">Configure per-bucket Cross-Origin Resource Sharing rules to control which origins can access your bucket from a browser.</p>
@@ -2540,7 +2645,7 @@ curl -X DELETE "{{ api_base }}/&lt;bucket&gt;?cors" \
<article id="post-object" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">28</span>
<span class="docs-section-kicker">29</span>
<h2 class="h4 mb-0">PostObject (HTML Form Upload)</h2>
</div>
<p class="text-muted">Upload objects directly from an HTML form using browser-based POST uploads with policy-based authorization.</p>
@@ -2582,7 +2687,7 @@ curl -X DELETE "{{ api_base }}/&lt;bucket&gt;?cors" \
<article id="list-objects-v2" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">29</span>
<span class="docs-section-kicker">30</span>
<h2 class="h4 mb-0">List Objects API v2</h2>
</div>
<p class="text-muted">Use the v2 list API for improved pagination with continuation tokens instead of markers.</p>
@@ -2626,7 +2731,7 @@ curl "{{ api_base }}/&lt;bucket&gt;?list-type=2&amp;start-after=photos/2025/" \
<article id="upgrading" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">30</span>
<span class="docs-section-kicker">31</span>
<h2 class="h4 mb-0">Upgrading &amp; Updates</h2>
</div>
<p class="text-muted">How to safely update MyFSIO to a new version.</p>
@@ -2659,7 +2764,7 @@ cp -r logs/ logs-backup/</code></pre>
<article id="api-matrix" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">31</span>
<span class="docs-section-kicker">32</span>
<h2 class="h4 mb-0">Full API Reference</h2>
</div>
<p class="text-muted">Complete list of all S3-compatible, admin, and KMS endpoints.</p>

499
tests/test_integrity.py Normal file
View File

@@ -0,0 +1,499 @@
import hashlib
import json
import os
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app.integrity import IntegrityChecker, IntegrityResult
def _md5(data: bytes) -> str:
return hashlib.md5(data).hexdigest()
def _setup_bucket(storage_root: Path, bucket_name: str, objects: dict[str, bytes]) -> None:
bucket_path = storage_root / bucket_name
bucket_path.mkdir(parents=True, exist_ok=True)
meta_root = storage_root / ".myfsio.sys" / "buckets" / bucket_name / "meta"
meta_root.mkdir(parents=True, exist_ok=True)
bucket_json = storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".bucket.json"
bucket_json.write_text(json.dumps({"created": "2025-01-01"}))
for key, data in objects.items():
obj_path = bucket_path / key
obj_path.parent.mkdir(parents=True, exist_ok=True)
obj_path.write_bytes(data)
etag = _md5(data)
stat = obj_path.stat()
meta = {
"__etag__": etag,
"__size__": str(stat.st_size),
"__last_modified__": str(stat.st_mtime),
}
key_path = Path(key)
parent = key_path.parent
key_name = key_path.name
if parent == Path("."):
index_path = meta_root / "_index.json"
else:
index_path = meta_root / parent / "_index.json"
index_path.parent.mkdir(parents=True, exist_ok=True)
index_data = {}
if index_path.exists():
index_data = json.loads(index_path.read_text())
index_data[key_name] = {"metadata": meta}
index_path.write_text(json.dumps(index_data))
def _issues_of_type(result, issue_type):
return [i for i in result.issues if i.issue_type == issue_type]
@pytest.fixture
def storage_root(tmp_path):
root = tmp_path / "data"
root.mkdir()
(root / ".myfsio.sys" / "config").mkdir(parents=True, exist_ok=True)
return root
@pytest.fixture
def checker(storage_root):
return IntegrityChecker(
storage_root=storage_root,
interval_hours=24.0,
batch_size=1000,
auto_heal=False,
dry_run=False,
)
class TestCorruptedObjects:
def test_detect_corrupted(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
result = checker.run_now()
assert result.corrupted_objects == 1
issues = _issues_of_type(result, "corrupted_object")
assert len(issues) == 1
assert issues[0].bucket == "mybucket"
assert issues[0].key == "file.txt"
assert not issues[0].healed
def test_heal_corrupted(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
result = checker.run_now(auto_heal=True)
assert result.corrupted_objects == 1
assert result.issues_healed == 1
issues = _issues_of_type(result, "corrupted_object")
assert issues[0].healed
result2 = checker.run_now()
assert result2.corrupted_objects == 0
def test_valid_objects_pass(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
result = checker.run_now()
assert result.corrupted_objects == 0
assert result.objects_scanned == 1
def test_corrupted_nested_key(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"})
(storage_root / "mybucket" / "sub" / "dir" / "file.txt").write_bytes(b"bad")
result = checker.run_now()
assert result.corrupted_objects == 1
issues = _issues_of_type(result, "corrupted_object")
assert issues[0].key == "sub/dir/file.txt"
class TestOrphanedObjects:
def test_detect_orphaned(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {})
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
result = checker.run_now()
assert result.orphaned_objects == 1
issues = _issues_of_type(result, "orphaned_object")
assert len(issues) == 1
def test_heal_orphaned(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {})
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
result = checker.run_now(auto_heal=True)
assert result.orphaned_objects == 1
assert result.issues_healed == 1
issues = _issues_of_type(result, "orphaned_object")
assert issues[0].healed
result2 = checker.run_now()
assert result2.orphaned_objects == 0
assert result2.objects_scanned >= 1
class TestPhantomMetadata:
def test_detect_phantom(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").unlink()
result = checker.run_now()
assert result.phantom_metadata == 1
issues = _issues_of_type(result, "phantom_metadata")
assert len(issues) == 1
def test_heal_phantom(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").unlink()
result = checker.run_now(auto_heal=True)
assert result.phantom_metadata == 1
assert result.issues_healed == 1
result2 = checker.run_now()
assert result2.phantom_metadata == 0
class TestStaleVersions:
def test_manifest_without_data(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
result = checker.run_now()
assert result.stale_versions == 1
issues = _issues_of_type(result, "stale_version")
assert "manifest without data" in issues[0].detail
def test_data_without_manifest(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.bin").write_bytes(b"old data")
result = checker.run_now()
assert result.stale_versions == 1
issues = _issues_of_type(result, "stale_version")
assert "data without manifest" in issues[0].detail
def test_heal_stale_versions(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
(versions_root / "v2.bin").write_bytes(b"old data")
result = checker.run_now(auto_heal=True)
assert result.stale_versions == 2
assert result.issues_healed == 2
assert not (versions_root / "v1.json").exists()
assert not (versions_root / "v2.bin").exists()
def test_valid_versions_pass(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
versions_root.mkdir(parents=True)
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
(versions_root / "v1.bin").write_bytes(b"old data")
result = checker.run_now()
assert result.stale_versions == 0
class TestEtagCache:
def test_detect_mismatch(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
result = checker.run_now()
assert result.etag_cache_inconsistencies == 1
issues = _issues_of_type(result, "etag_cache_inconsistency")
assert len(issues) == 1
def test_heal_mismatch(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
result = checker.run_now(auto_heal=True)
assert result.etag_cache_inconsistencies == 1
assert result.issues_healed == 1
assert not etag_path.exists()
class TestLegacyMetadata:
def test_detect_unmigrated(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_path = meta_root / "_index.json"
index_path.unlink()
result = checker.run_now()
assert result.legacy_metadata_drifts == 1
issues = _issues_of_type(result, "legacy_metadata_drift")
assert len(issues) == 1
assert issues[0].detail == "unmigrated legacy .meta.json"
def test_detect_drift(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
result = checker.run_now()
assert result.legacy_metadata_drifts == 1
issues = _issues_of_type(result, "legacy_metadata_drift")
assert "differs from index" in issues[0].detail
def test_heal_unmigrated(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_data = {"__etag__": _md5(b"hello"), "__size__": "5"}
legacy_meta.write_text(json.dumps(legacy_data))
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_path = meta_root / "_index.json"
index_path.unlink()
result = checker.run_now(auto_heal=True)
assert result.legacy_metadata_drifts == 1
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
assert len(legacy_issues) == 1
assert legacy_issues[0].healed
assert not legacy_meta.exists()
index_data = json.loads(index_path.read_text())
assert "file.txt" in index_data
assert index_data["file.txt"]["metadata"]["__etag__"] == _md5(b"hello")
def test_heal_drift(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
legacy_meta.parent.mkdir(parents=True)
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
result = checker.run_now(auto_heal=True)
assert result.legacy_metadata_drifts == 1
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
assert legacy_issues[0].healed
assert not legacy_meta.exists()
class TestDryRun:
def test_dry_run_no_changes(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted")
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan")
result = checker.run_now(auto_heal=True, dry_run=True)
assert result.corrupted_objects == 1
assert result.orphaned_objects == 1
assert result.issues_healed == 0
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
index_data = json.loads((meta_root / "_index.json").read_text())
assert "orphan.txt" not in index_data
class TestBatchSize:
def test_batch_limits_scan(self, storage_root):
objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)}
_setup_bucket(storage_root, "mybucket", objects)
checker = IntegrityChecker(
storage_root=storage_root,
batch_size=3,
)
result = checker.run_now()
assert result.objects_scanned <= 3
class TestHistory:
def test_history_recorded(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker.run_now()
history = checker.get_history()
assert len(history) == 1
assert "corrupted_objects" in history[0]["result"]
def test_history_multiple(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
checker.run_now()
checker.run_now()
checker.run_now()
history = checker.get_history()
assert len(history) == 3
def test_history_pagination(self, storage_root, checker):
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
for _ in range(5):
checker.run_now()
history = checker.get_history(limit=2, offset=1)
assert len(history) == 2
AUTH_HEADERS = {"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}
class TestAdminAPI:
@pytest.fixture
def integrity_app(self, tmp_path):
from app import create_api_app
storage_root = tmp_path / "data"
iam_config = tmp_path / "iam.json"
bucket_policies = tmp_path / "bucket_policies.json"
iam_payload = {
"users": [
{
"access_key": "admin",
"secret_key": "adminsecret",
"display_name": "Admin",
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
}
]
}
iam_config.write_text(json.dumps(iam_payload))
flask_app = create_api_app({
"TESTING": True,
"SECRET_KEY": "testing",
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"INTEGRITY_ENABLED": True,
"INTEGRITY_AUTO_HEAL": False,
"INTEGRITY_DRY_RUN": False,
})
yield flask_app
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
ic = flask_app.extensions.get("integrity")
if ic:
ic.stop()
def test_status_endpoint(self, integrity_app):
client = integrity_app.test_client()
resp = client.get("/admin/integrity/status", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert data["enabled"] is True
assert "interval_hours" in data
def test_run_endpoint(self, integrity_app):
client = integrity_app.test_client()
resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
assert resp.status_code == 200
data = resp.get_json()
assert "corrupted_objects" in data
assert "objects_scanned" in data
def test_run_with_overrides(self, integrity_app):
client = integrity_app.test_client()
resp = client.post(
"/admin/integrity/run",
headers=AUTH_HEADERS,
json={"dry_run": True, "auto_heal": True},
)
assert resp.status_code == 200
def test_history_endpoint(self, integrity_app):
client = integrity_app.test_client()
client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert "executions" in data
assert len(data["executions"]) >= 1
def test_auth_required(self, integrity_app):
client = integrity_app.test_client()
resp = client.get("/admin/integrity/status")
assert resp.status_code in (401, 403)
def test_disabled_status(self, tmp_path):
from app import create_api_app
storage_root = tmp_path / "data2"
iam_config = tmp_path / "iam2.json"
bucket_policies = tmp_path / "bp2.json"
iam_payload = {
"users": [
{
"access_key": "admin",
"secret_key": "adminsecret",
"display_name": "Admin",
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
}
]
}
iam_config.write_text(json.dumps(iam_payload))
flask_app = create_api_app({
"TESTING": True,
"SECRET_KEY": "testing",
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"INTEGRITY_ENABLED": False,
})
c = flask_app.test_client()
resp = c.get("/admin/integrity/status", headers=AUTH_HEADERS)
assert resp.status_code == 200
data = resp.get_json()
assert data["enabled"] is False
storage = flask_app.extensions.get("object_storage")
if storage:
base = getattr(storage, "storage", storage)
if hasattr(base, "shutdown_stats"):
base.shutdown_stats()
class TestMultipleBuckets:
def test_scans_multiple_buckets(self, storage_root, checker):
_setup_bucket(storage_root, "bucket1", {"a.txt": b"aaa"})
_setup_bucket(storage_root, "bucket2", {"b.txt": b"bbb"})
result = checker.run_now()
assert result.buckets_scanned == 2
assert result.objects_scanned == 2
assert result.corrupted_objects == 0
class TestGetStatus:
def test_status_fields(self, checker):
status = checker.get_status()
assert "enabled" in status
assert "running" in status
assert "interval_hours" in status
assert "batch_size" in status
assert "auto_heal" in status
assert "dry_run" in status