Add I/O throttling to GC and integrity scanner to prevent HDD starvation

This commit is contained in:
2026-03-23 11:36:38 +08:00
parent 9898167f8d
commit 5e32cef792
4 changed files with 68 additions and 2 deletions

View File

@@ -293,6 +293,7 @@ def create_app(
multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7),
lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0),
dry_run=app.config.get("GC_DRY_RUN", False),
io_throttle_ms=app.config.get("GC_IO_THROTTLE_MS", 10),
)
gc_collector.start()
@@ -304,6 +305,7 @@ def create_app(
batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000),
auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False),
dry_run=app.config.get("INTEGRITY_DRY_RUN", False),
io_throttle_ms=app.config.get("INTEGRITY_IO_THROTTLE_MS", 10),
)
integrity_checker.start()

View File

@@ -157,11 +157,13 @@ class AppConfig:
gc_multipart_max_age_days: int
gc_lock_file_max_age_hours: float
gc_dry_run: bool
gc_io_throttle_ms: int
integrity_enabled: bool
integrity_interval_hours: float
integrity_batch_size: int
integrity_auto_heal: bool
integrity_dry_run: bool
integrity_io_throttle_ms: int
@classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -338,11 +340,13 @@ class AppConfig:
gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7))
gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0))
gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
gc_io_throttle_ms = int(_get("GC_IO_THROTTLE_MS", 10))
integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0))
integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000))
integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"}
integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
integrity_io_throttle_ms = int(_get("INTEGRITY_IO_THROTTLE_MS", 10))
return cls(storage_root=storage_root,
max_upload_size=max_upload_size,
@@ -438,11 +442,13 @@ class AppConfig:
gc_multipart_max_age_days=gc_multipart_max_age_days,
gc_lock_file_max_age_hours=gc_lock_file_max_age_hours,
gc_dry_run=gc_dry_run,
gc_io_throttle_ms=gc_io_throttle_ms,
integrity_enabled=integrity_enabled,
integrity_interval_hours=integrity_interval_hours,
integrity_batch_size=integrity_batch_size,
integrity_auto_heal=integrity_auto_heal,
integrity_dry_run=integrity_dry_run)
integrity_dry_run=integrity_dry_run,
integrity_io_throttle_ms=integrity_io_throttle_ms)
def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues.
@@ -663,9 +669,11 @@ class AppConfig:
"GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days,
"GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours,
"GC_DRY_RUN": self.gc_dry_run,
"GC_IO_THROTTLE_MS": self.gc_io_throttle_ms,
"INTEGRITY_ENABLED": self.integrity_enabled,
"INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours,
"INTEGRITY_BATCH_SIZE": self.integrity_batch_size,
"INTEGRITY_AUTO_HEAL": self.integrity_auto_heal,
"INTEGRITY_DRY_RUN": self.integrity_dry_run,
"INTEGRITY_IO_THROTTLE_MS": self.integrity_io_throttle_ms,
}

View File

@@ -162,6 +162,7 @@ class GarbageCollector:
lock_file_max_age_hours: float = 1.0,
dry_run: bool = False,
max_history: int = 50,
io_throttle_ms: int = 10,
) -> None:
self.storage_root = Path(storage_root)
self.interval_seconds = interval_hours * 3600.0
@@ -172,6 +173,7 @@ class GarbageCollector:
self._timer: Optional[threading.Timer] = None
self._shutdown = False
self._lock = threading.Lock()
self._io_throttle = max(0, io_throttle_ms) / 1000.0
self.history_store = GCHistoryStore(storage_root, max_records=max_history)
def start(self) -> None:
@@ -255,6 +257,13 @@ class GarbageCollector:
def _system_path(self) -> Path:
return self.storage_root / self.SYSTEM_ROOT
def _throttle(self) -> bool:
if self._shutdown:
return True
if self._io_throttle > 0:
time.sleep(self._io_throttle)
return self._shutdown
def _list_bucket_names(self) -> List[str]:
names = []
try:
@@ -271,6 +280,8 @@ class GarbageCollector:
return
try:
for entry in tmp_dir.iterdir():
if self._throttle():
return
if not entry.is_file():
continue
age = _file_age_hours(entry)
@@ -292,6 +303,8 @@ class GarbageCollector:
bucket_names = self._list_bucket_names()
for bucket_name in bucket_names:
if self._shutdown:
return
for multipart_root in (
self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name,
self.storage_root / bucket_name / ".multipart",
@@ -300,6 +313,8 @@ class GarbageCollector:
continue
try:
for upload_dir in multipart_root.iterdir():
if self._throttle():
return
if not upload_dir.is_dir():
continue
self._maybe_clean_upload(upload_dir, cutoff_hours, result)
@@ -329,6 +344,8 @@ class GarbageCollector:
try:
for bucket_dir in buckets_root.iterdir():
if self._shutdown:
return
if not bucket_dir.is_dir():
continue
locks_dir = bucket_dir / "locks"
@@ -336,6 +353,8 @@ class GarbageCollector:
continue
try:
for lock_file in locks_dir.iterdir():
if self._throttle():
return
if not lock_file.is_file() or not lock_file.name.endswith(".lock"):
continue
age = _file_age_hours(lock_file)
@@ -356,6 +375,8 @@ class GarbageCollector:
bucket_names = self._list_bucket_names()
for bucket_name in bucket_names:
if self._shutdown:
return
legacy_meta = self.storage_root / bucket_name / ".meta"
if legacy_meta.exists():
self._clean_legacy_metadata(bucket_name, legacy_meta, result)
@@ -368,6 +389,8 @@ class GarbageCollector:
bucket_path = self.storage_root / bucket_name
try:
for meta_file in meta_root.rglob("*.meta.json"):
if self._throttle():
return
if not meta_file.is_file():
continue
try:
@@ -387,6 +410,8 @@ class GarbageCollector:
bucket_path = self.storage_root / bucket_name
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
if not index_file.is_file():
continue
try:
@@ -430,6 +455,8 @@ class GarbageCollector:
bucket_names = self._list_bucket_names()
for bucket_name in bucket_names:
if self._shutdown:
return
bucket_path = self.storage_root / bucket_name
for versions_root in (
self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR,
@@ -439,6 +466,8 @@ class GarbageCollector:
continue
try:
for key_dir in versions_root.iterdir():
if self._throttle():
return
if not key_dir.is_dir():
continue
self._clean_versions_for_key(bucket_path, versions_root, key_dir, result)
@@ -489,6 +518,8 @@ class GarbageCollector:
self._remove_empty_dirs_recursive(root, root, result)
def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool:
if self._shutdown:
return False
if not path.is_dir():
return False
@@ -499,6 +530,8 @@ class GarbageCollector:
all_empty = True
for child in children:
if self._throttle():
return False
if child.is_dir():
if not self._remove_empty_dirs_recursive(child, stop_at, result):
all_empty = False
@@ -528,4 +561,5 @@ class GarbageCollector:
"multipart_max_age_days": self.multipart_max_age_days,
"lock_file_max_age_hours": self.lock_file_max_age_hours,
"dry_run": self.dry_run,
"io_throttle_ms": round(self._io_throttle * 1000),
}

View File

@@ -180,6 +180,7 @@ class IntegrityChecker:
auto_heal: bool = False,
dry_run: bool = False,
max_history: int = 50,
io_throttle_ms: int = 10,
) -> None:
self.storage_root = Path(storage_root)
self.interval_seconds = interval_hours * 3600.0
@@ -191,6 +192,7 @@ class IntegrityChecker:
self._lock = threading.Lock()
self._scanning = False
self._scan_start_time: Optional[float] = None
self._io_throttle = max(0, io_throttle_ms) / 1000.0
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
def start(self) -> None:
@@ -247,7 +249,7 @@ class IntegrityChecker:
bucket_names = self._list_bucket_names()
for bucket_name in bucket_names:
if result.objects_scanned >= self.batch_size:
if self._shutdown or result.objects_scanned >= self.batch_size:
break
result.buckets_scanned += 1
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
@@ -309,6 +311,13 @@ class IntegrityChecker:
pass
return names
def _throttle(self) -> bool:
if self._shutdown:
return True
if self._io_throttle > 0:
time.sleep(self._io_throttle)
return self._shutdown
def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None:
if len(result.issues) < MAX_ISSUES:
result.issues.append(issue)
@@ -324,6 +333,8 @@ class IntegrityChecker:
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
if result.objects_scanned >= self.batch_size:
return
if not index_file.is_file():
@@ -334,6 +345,8 @@ class IntegrityChecker:
continue
for key_name, entry in list(index_data.items()):
if self._throttle():
return
if result.objects_scanned >= self.batch_size:
return
@@ -394,6 +407,8 @@ class IntegrityChecker:
try:
for entry in bucket_path.rglob("*"):
if self._throttle():
return
if result.objects_scanned >= self.batch_size:
return
if not entry.is_file():
@@ -469,6 +484,8 @@ class IntegrityChecker:
try:
for index_file in meta_root.rglob("_index.json"):
if self._throttle():
return
if not index_file.is_file():
continue
try:
@@ -523,6 +540,8 @@ class IntegrityChecker:
try:
for key_dir in versions_root.rglob("*"):
if self._throttle():
return
if not key_dir.is_dir():
continue
@@ -646,6 +665,8 @@ class IntegrityChecker:
try:
for meta_file in legacy_meta_root.rglob("*.meta.json"):
if self._throttle():
return
if not meta_file.is_file():
continue
@@ -756,6 +777,7 @@ class IntegrityChecker:
"batch_size": self.batch_size,
"auto_heal": self.auto_heal,
"dry_run": self.dry_run,
"io_throttle_ms": round(self._io_throttle * 1000),
}
if self._scanning and self._scan_start_time is not None:
status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1)