From 5e32cef7927394d9a851ed06d9d034a3d1b6c02c Mon Sep 17 00:00:00 2001 From: kqjy Date: Mon, 23 Mar 2026 11:36:38 +0800 Subject: [PATCH] Add I/O throttling to GC and integrity scanner to prevent HDD starvation --- app/__init__.py | 2 ++ app/config.py | 10 +++++++++- app/gc.py | 34 ++++++++++++++++++++++++++++++++++ app/integrity.py | 24 +++++++++++++++++++++++- 4 files changed, 68 insertions(+), 2 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 015e16d..9961fff 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -293,6 +293,7 @@ def create_app( multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7), lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0), dry_run=app.config.get("GC_DRY_RUN", False), + io_throttle_ms=app.config.get("GC_IO_THROTTLE_MS", 10), ) gc_collector.start() @@ -304,6 +305,7 @@ def create_app( batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000), auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False), dry_run=app.config.get("INTEGRITY_DRY_RUN", False), + io_throttle_ms=app.config.get("INTEGRITY_IO_THROTTLE_MS", 10), ) integrity_checker.start() diff --git a/app/config.py b/app/config.py index d196397..04cae7a 100644 --- a/app/config.py +++ b/app/config.py @@ -157,11 +157,13 @@ class AppConfig: gc_multipart_max_age_days: int gc_lock_file_max_age_hours: float gc_dry_run: bool + gc_io_throttle_ms: int integrity_enabled: bool integrity_interval_hours: float integrity_batch_size: int integrity_auto_heal: bool integrity_dry_run: bool + integrity_io_throttle_ms: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -338,11 +340,13 @@ class AppConfig: gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7)) gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0)) gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + gc_io_throttle_ms = int(_get("GC_IO_THROTTLE_MS", 10)) integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0)) integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000)) integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"} integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + integrity_io_throttle_ms = int(_get("INTEGRITY_IO_THROTTLE_MS", 10)) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -438,11 +442,13 @@ class AppConfig: gc_multipart_max_age_days=gc_multipart_max_age_days, gc_lock_file_max_age_hours=gc_lock_file_max_age_hours, gc_dry_run=gc_dry_run, + gc_io_throttle_ms=gc_io_throttle_ms, integrity_enabled=integrity_enabled, integrity_interval_hours=integrity_interval_hours, integrity_batch_size=integrity_batch_size, integrity_auto_heal=integrity_auto_heal, - integrity_dry_run=integrity_dry_run) + integrity_dry_run=integrity_dry_run, + integrity_io_throttle_ms=integrity_io_throttle_ms) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -663,9 +669,11 @@ class AppConfig: "GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days, "GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours, "GC_DRY_RUN": self.gc_dry_run, + "GC_IO_THROTTLE_MS": self.gc_io_throttle_ms, "INTEGRITY_ENABLED": self.integrity_enabled, "INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours, "INTEGRITY_BATCH_SIZE": self.integrity_batch_size, "INTEGRITY_AUTO_HEAL": self.integrity_auto_heal, "INTEGRITY_DRY_RUN": self.integrity_dry_run, + "INTEGRITY_IO_THROTTLE_MS": self.integrity_io_throttle_ms, } diff --git a/app/gc.py b/app/gc.py index 1697607..85f2617 100644 --- a/app/gc.py +++ b/app/gc.py @@ -162,6 +162,7 @@ class GarbageCollector: lock_file_max_age_hours: float = 1.0, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -172,6 +173,7 @@ class GarbageCollector: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = GCHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -255,6 +257,13 @@ class GarbageCollector: def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + def _list_bucket_names(self) -> List[str]: names = [] try: @@ -271,6 +280,8 @@ class GarbageCollector: return try: for entry in tmp_dir.iterdir(): + if self._throttle(): + return if not entry.is_file(): continue age = _file_age_hours(entry) @@ -292,6 +303,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return for multipart_root in ( self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name, self.storage_root / bucket_name / ".multipart", @@ -300,6 +313,8 @@ class GarbageCollector: continue try: for upload_dir in multipart_root.iterdir(): + if self._throttle(): + return if not upload_dir.is_dir(): continue self._maybe_clean_upload(upload_dir, cutoff_hours, result) @@ -329,6 +344,8 @@ class GarbageCollector: try: for bucket_dir in buckets_root.iterdir(): + if self._shutdown: + return if not bucket_dir.is_dir(): continue locks_dir = bucket_dir / "locks" @@ -336,6 +353,8 @@ class GarbageCollector: continue try: for lock_file in locks_dir.iterdir(): + if self._throttle(): + return if not lock_file.is_file() or not lock_file.name.endswith(".lock"): continue age = _file_age_hours(lock_file) @@ -356,6 +375,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return legacy_meta = self.storage_root / bucket_name / ".meta" if legacy_meta.exists(): self._clean_legacy_metadata(bucket_name, legacy_meta, result) @@ -368,6 +389,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for meta_file in meta_root.rglob("*.meta.json"): + if self._throttle(): + return if not meta_file.is_file(): continue try: @@ -387,6 +410,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if not index_file.is_file(): continue try: @@ -430,6 +455,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return bucket_path = self.storage_root / bucket_name for versions_root in ( self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR, @@ -439,6 +466,8 @@ class GarbageCollector: continue try: for key_dir in versions_root.iterdir(): + if self._throttle(): + return if not key_dir.is_dir(): continue self._clean_versions_for_key(bucket_path, versions_root, key_dir, result) @@ -489,6 +518,8 @@ class GarbageCollector: self._remove_empty_dirs_recursive(root, root, result) def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool: + if self._shutdown: + return False if not path.is_dir(): return False @@ -499,6 +530,8 @@ class GarbageCollector: all_empty = True for child in children: + if self._throttle(): + return False if child.is_dir(): if not self._remove_empty_dirs_recursive(child, stop_at, result): all_empty = False @@ -528,4 +561,5 @@ class GarbageCollector: "multipart_max_age_days": self.multipart_max_age_days, "lock_file_max_age_hours": self.lock_file_max_age_hours, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } diff --git a/app/integrity.py b/app/integrity.py index 02b996b..957e150 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -180,6 +180,7 @@ class IntegrityChecker: auto_heal: bool = False, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -191,6 +192,7 @@ class IntegrityChecker: self._lock = threading.Lock() self._scanning = False self._scan_start_time: Optional[float] = None + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -247,7 +249,7 @@ class IntegrityChecker: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: - if result.objects_scanned >= self.batch_size: + if self._shutdown or result.objects_scanned >= self.batch_size: break result.buckets_scanned += 1 self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) @@ -309,6 +311,13 @@ class IntegrityChecker: pass return names + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) @@ -324,6 +333,8 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return if not index_file.is_file(): @@ -334,6 +345,8 @@ class IntegrityChecker: continue for key_name, entry in list(index_data.items()): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return @@ -394,6 +407,8 @@ class IntegrityChecker: try: for entry in bucket_path.rglob("*"): + if self._throttle(): + return if result.objects_scanned >= self.batch_size: return if not entry.is_file(): @@ -469,6 +484,8 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if not index_file.is_file(): continue try: @@ -523,6 +540,8 @@ class IntegrityChecker: try: for key_dir in versions_root.rglob("*"): + if self._throttle(): + return if not key_dir.is_dir(): continue @@ -646,6 +665,8 @@ class IntegrityChecker: try: for meta_file in legacy_meta_root.rglob("*.meta.json"): + if self._throttle(): + return if not meta_file.is_file(): continue @@ -756,6 +777,7 @@ class IntegrityChecker: "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } if self._scanning and self._scan_start_time is not None: status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1)