diff --git a/app/__init__.py b/app/__init__.py index 015e16d..9961fff 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -293,6 +293,7 @@ def create_app( multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7), lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0), dry_run=app.config.get("GC_DRY_RUN", False), + io_throttle_ms=app.config.get("GC_IO_THROTTLE_MS", 10), ) gc_collector.start() @@ -304,6 +305,7 @@ def create_app( batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000), auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False), dry_run=app.config.get("INTEGRITY_DRY_RUN", False), + io_throttle_ms=app.config.get("INTEGRITY_IO_THROTTLE_MS", 10), ) integrity_checker.start() diff --git a/app/admin_api.py b/app/admin_api.py index ccc0408..89c87f0 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -907,15 +907,11 @@ def gc_run_now(): if not gc: return _json_error("InvalidRequest", "GC is not enabled", 400) payload = request.get_json(silent=True) or {} - original_dry_run = gc.dry_run - if "dry_run" in payload: - gc.dry_run = bool(payload["dry_run"]) - try: - result = gc.run_now() - finally: - gc.dry_run = original_dry_run + started = gc.run_async(dry_run=payload.get("dry_run")) logger.info("GC manual run by %s", principal.access_key) - return jsonify(result.to_dict()) + if not started: + return _json_error("Conflict", "GC is already in progress", 409) + return jsonify({"status": "started"}) @admin_api_bp.route("/gc/history", methods=["GET"]) @@ -961,12 +957,14 @@ def integrity_run_now(): payload = request.get_json(silent=True) or {} override_dry_run = payload.get("dry_run") override_auto_heal = payload.get("auto_heal") - result = checker.run_now( + started = checker.run_async( auto_heal=override_auto_heal if override_auto_heal is not None else None, dry_run=override_dry_run if override_dry_run is not None else None, ) logger.info("Integrity manual run by %s", principal.access_key) - return jsonify(result.to_dict()) + if not started: + return _json_error("Conflict", "A scan is already in progress", 409) + return jsonify({"status": "started"}) @admin_api_bp.route("/integrity/history", methods=["GET"]) diff --git a/app/config.py b/app/config.py index d196397..04cae7a 100644 --- a/app/config.py +++ b/app/config.py @@ -157,11 +157,13 @@ class AppConfig: gc_multipart_max_age_days: int gc_lock_file_max_age_hours: float gc_dry_run: bool + gc_io_throttle_ms: int integrity_enabled: bool integrity_interval_hours: float integrity_batch_size: int integrity_auto_heal: bool integrity_dry_run: bool + integrity_io_throttle_ms: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -338,11 +340,13 @@ class AppConfig: gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7)) gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0)) gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + gc_io_throttle_ms = int(_get("GC_IO_THROTTLE_MS", 10)) integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0)) integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000)) integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"} integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} + integrity_io_throttle_ms = int(_get("INTEGRITY_IO_THROTTLE_MS", 10)) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -438,11 +442,13 @@ class AppConfig: gc_multipart_max_age_days=gc_multipart_max_age_days, gc_lock_file_max_age_hours=gc_lock_file_max_age_hours, gc_dry_run=gc_dry_run, + gc_io_throttle_ms=gc_io_throttle_ms, integrity_enabled=integrity_enabled, integrity_interval_hours=integrity_interval_hours, integrity_batch_size=integrity_batch_size, integrity_auto_heal=integrity_auto_heal, - integrity_dry_run=integrity_dry_run) + integrity_dry_run=integrity_dry_run, + integrity_io_throttle_ms=integrity_io_throttle_ms) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -663,9 +669,11 @@ class AppConfig: "GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days, "GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours, "GC_DRY_RUN": self.gc_dry_run, + "GC_IO_THROTTLE_MS": self.gc_io_throttle_ms, "INTEGRITY_ENABLED": self.integrity_enabled, "INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours, "INTEGRITY_BATCH_SIZE": self.integrity_batch_size, "INTEGRITY_AUTO_HEAL": self.integrity_auto_heal, "INTEGRITY_DRY_RUN": self.integrity_dry_run, + "INTEGRITY_IO_THROTTLE_MS": self.integrity_io_throttle_ms, } diff --git a/app/errors.py b/app/errors.py index b2a4079..049187d 100644 --- a/app/errors.py +++ b/app/errors.py @@ -175,13 +175,21 @@ def handle_app_error(error: AppError) -> Response: def handle_rate_limit_exceeded(e: RateLimitExceeded) -> Response: g.s3_error_code = "SlowDown" + if request.path.startswith("/ui") or request.path.startswith("/buckets"): + wants_json = ( + request.is_json or + request.headers.get("X-Requested-With") == "XMLHttpRequest" or + "application/json" in request.accept_mimetypes.values() + ) + if wants_json: + return jsonify({"success": False, "error": {"code": "SlowDown", "message": "Please reduce your request rate."}}), 429 error = Element("Error") SubElement(error, "Code").text = "SlowDown" SubElement(error, "Message").text = "Please reduce your request rate." SubElement(error, "Resource").text = request.path SubElement(error, "RequestId").text = getattr(g, "request_id", "") xml_bytes = tostring(error, encoding="utf-8") - return Response(xml_bytes, status=429, mimetype="application/xml") + return Response(xml_bytes, status="429 Too Many Requests", mimetype="application/xml") def register_error_handlers(app): diff --git a/app/gc.py b/app/gc.py index 1697607..16fa3b7 100644 --- a/app/gc.py +++ b/app/gc.py @@ -162,6 +162,7 @@ class GarbageCollector: lock_file_max_age_hours: float = 1.0, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -172,6 +173,9 @@ class GarbageCollector: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._scanning = False + self._scan_start_time: Optional[float] = None + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = GCHistoryStore(storage_root, max_records=max_history) def start(self) -> None: @@ -212,49 +216,81 @@ class GarbageCollector: finally: self._schedule_next() - def run_now(self) -> GCResult: - start = time.time() - result = GCResult() + def run_now(self, dry_run: Optional[bool] = None) -> GCResult: + if not self._lock.acquire(blocking=False): + raise RuntimeError("GC is already in progress") - self._clean_temp_files(result) - self._clean_orphaned_multipart(result) - self._clean_stale_locks(result) - self._clean_orphaned_metadata(result) - self._clean_orphaned_versions(result) - self._clean_empty_dirs(result) + effective_dry_run = dry_run if dry_run is not None else self.dry_run - result.execution_time_seconds = time.time() - start + try: + self._scanning = True + self._scan_start_time = time.time() - if result.has_work or result.errors: - logger.info( - "GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), " - "locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s", - result.execution_time_seconds, - result.temp_files_deleted, - result.temp_bytes_freed / (1024 * 1024), - result.multipart_uploads_deleted, - result.multipart_bytes_freed / (1024 * 1024), - result.lock_files_deleted, - result.orphaned_metadata_deleted, - result.orphaned_versions_deleted, - result.orphaned_version_bytes_freed / (1024 * 1024), - result.empty_dirs_removed, - len(result.errors), - " (dry run)" if self.dry_run else "", + start = self._scan_start_time + result = GCResult() + + original_dry_run = self.dry_run + self.dry_run = effective_dry_run + try: + self._clean_temp_files(result) + self._clean_orphaned_multipart(result) + self._clean_stale_locks(result) + self._clean_orphaned_metadata(result) + self._clean_orphaned_versions(result) + self._clean_empty_dirs(result) + finally: + self.dry_run = original_dry_run + + result.execution_time_seconds = time.time() - start + + if result.has_work or result.errors: + logger.info( + "GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), " + "locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s", + result.execution_time_seconds, + result.temp_files_deleted, + result.temp_bytes_freed / (1024 * 1024), + result.multipart_uploads_deleted, + result.multipart_bytes_freed / (1024 * 1024), + result.lock_files_deleted, + result.orphaned_metadata_deleted, + result.orphaned_versions_deleted, + result.orphaned_version_bytes_freed / (1024 * 1024), + result.empty_dirs_removed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = GCExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, ) + self.history_store.add(record) - record = GCExecutionRecord( - timestamp=time.time(), - result=result.to_dict(), - dry_run=self.dry_run, - ) - self.history_store.add(record) + return result + finally: + self._scanning = False + self._scan_start_time = None + self._lock.release() - return result + def run_async(self, dry_run: Optional[bool] = None) -> bool: + if self._scanning: + return False + t = threading.Thread(target=self.run_now, args=(dry_run,), daemon=True) + t.start() + return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + def _list_bucket_names(self) -> List[str]: names = [] try: @@ -271,6 +307,8 @@ class GarbageCollector: return try: for entry in tmp_dir.iterdir(): + if self._throttle(): + return if not entry.is_file(): continue age = _file_age_hours(entry) @@ -292,6 +330,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return for multipart_root in ( self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name, self.storage_root / bucket_name / ".multipart", @@ -300,6 +340,8 @@ class GarbageCollector: continue try: for upload_dir in multipart_root.iterdir(): + if self._throttle(): + return if not upload_dir.is_dir(): continue self._maybe_clean_upload(upload_dir, cutoff_hours, result) @@ -329,6 +371,8 @@ class GarbageCollector: try: for bucket_dir in buckets_root.iterdir(): + if self._shutdown: + return if not bucket_dir.is_dir(): continue locks_dir = bucket_dir / "locks" @@ -336,6 +380,8 @@ class GarbageCollector: continue try: for lock_file in locks_dir.iterdir(): + if self._throttle(): + return if not lock_file.is_file() or not lock_file.name.endswith(".lock"): continue age = _file_age_hours(lock_file) @@ -356,6 +402,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return legacy_meta = self.storage_root / bucket_name / ".meta" if legacy_meta.exists(): self._clean_legacy_metadata(bucket_name, legacy_meta, result) @@ -368,6 +416,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for meta_file in meta_root.rglob("*.meta.json"): + if self._throttle(): + return if not meta_file.is_file(): continue try: @@ -387,6 +437,8 @@ class GarbageCollector: bucket_path = self.storage_root / bucket_name try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return if not index_file.is_file(): continue try: @@ -430,6 +482,8 @@ class GarbageCollector: bucket_names = self._list_bucket_names() for bucket_name in bucket_names: + if self._shutdown: + return bucket_path = self.storage_root / bucket_name for versions_root in ( self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR, @@ -439,6 +493,8 @@ class GarbageCollector: continue try: for key_dir in versions_root.iterdir(): + if self._throttle(): + return if not key_dir.is_dir(): continue self._clean_versions_for_key(bucket_path, versions_root, key_dir, result) @@ -489,6 +545,8 @@ class GarbageCollector: self._remove_empty_dirs_recursive(root, root, result) def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool: + if self._shutdown: + return False if not path.is_dir(): return False @@ -499,6 +557,8 @@ class GarbageCollector: all_empty = True for child in children: + if self._throttle(): + return False if child.is_dir(): if not self._remove_empty_dirs_recursive(child, stop_at, result): all_empty = False @@ -520,12 +580,17 @@ class GarbageCollector: return [r.to_dict() for r in records] def get_status(self) -> dict: - return { + status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, + "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "temp_file_max_age_hours": self.temp_file_max_age_hours, "multipart_max_age_days": self.multipart_max_age_days, "lock_file_max_age_hours": self.lock_file_max_age_hours, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } + if self._scanning and self._scan_start_time: + status["scan_elapsed_seconds"] = time.time() - self._scan_start_time + return status diff --git a/app/integrity.py b/app/integrity.py index 4dc15d2..b79cbc8 100644 --- a/app/integrity.py +++ b/app/integrity.py @@ -162,6 +162,76 @@ class IntegrityHistoryStore: return self.load()[offset : offset + limit] +class IntegrityCursorStore: + def __init__(self, storage_root: Path) -> None: + self.storage_root = storage_root + self._lock = threading.Lock() + + def _get_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "integrity_cursor.json" + + def load(self) -> Dict[str, Any]: + path = self._get_path() + if not path.exists(): + return {"buckets": {}} + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data.get("buckets"), dict): + return {"buckets": {}} + return data + except (OSError, ValueError, KeyError): + return {"buckets": {}} + + def save(self, data: Dict[str, Any]) -> None: + path = self._get_path() + path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error("Failed to save integrity cursor: %s", e) + + def update_bucket(self, bucket_name: str, timestamp: float) -> None: + with self._lock: + data = self.load() + data["buckets"][bucket_name] = {"last_scanned": timestamp} + self.save(data) + + def clean_stale(self, existing_buckets: List[str]) -> None: + with self._lock: + data = self.load() + existing_set = set(existing_buckets) + stale_keys = [k for k in data["buckets"] if k not in existing_set] + if stale_keys: + for k in stale_keys: + del data["buckets"][k] + self.save(data) + + def get_bucket_order(self, bucket_names: List[str]) -> List[str]: + data = self.load() + buckets_info = data.get("buckets", {}) + + def sort_key(name: str) -> float: + entry = buckets_info.get(name) + if entry is None: + return 0.0 + return entry.get("last_scanned", 0.0) + + return sorted(bucket_names, key=sort_key) + + def get_info(self) -> Dict[str, Any]: + data = self.load() + buckets = data.get("buckets", {}) + return { + "tracked_buckets": len(buckets), + "buckets": { + name: info.get("last_scanned") + for name, info in buckets.items() + }, + } + + MAX_ISSUES = 500 @@ -180,6 +250,7 @@ class IntegrityChecker: auto_heal: bool = False, dry_run: bool = False, max_history: int = 50, + io_throttle_ms: int = 10, ) -> None: self.storage_root = Path(storage_root) self.interval_seconds = interval_hours * 3600.0 @@ -189,7 +260,11 @@ class IntegrityChecker: self._timer: Optional[threading.Timer] = None self._shutdown = False self._lock = threading.Lock() + self._scanning = False + self._scan_start_time: Optional[float] = None + self._io_throttle = max(0, io_throttle_ms) / 1000.0 self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history) + self.cursor_store = IntegrityCursorStore(self.storage_root) def start(self) -> None: if self._timer is not None: @@ -229,52 +304,73 @@ class IntegrityChecker: self._schedule_next() def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult: - effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal - effective_dry_run = dry_run if dry_run is not None else self.dry_run + if not self._lock.acquire(blocking=False): + raise RuntimeError("Integrity scan is already in progress") - start = time.time() - result = IntegrityResult() + try: + self._scanning = True + self._scan_start_time = time.time() - bucket_names = self._list_bucket_names() + effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal + effective_dry_run = dry_run if dry_run is not None else self.dry_run - for bucket_name in bucket_names: - if result.objects_scanned >= self.batch_size: - break - result.buckets_scanned += 1 - self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) - self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + start = self._scan_start_time + result = IntegrityResult() - result.execution_time_seconds = time.time() - start + bucket_names = self._list_bucket_names() + self.cursor_store.clean_stale(bucket_names) + ordered_buckets = self.cursor_store.get_bucket_order(bucket_names) - if result.has_issues or result.errors: - logger.info( - "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " - "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", - result.execution_time_seconds, - result.corrupted_objects, - result.orphaned_objects, - result.phantom_metadata, - result.stale_versions, - result.etag_cache_inconsistencies, - result.legacy_metadata_drifts, - result.issues_healed, - len(result.errors), - " (dry run)" if effective_dry_run else "", + for bucket_name in ordered_buckets: + if self._batch_exhausted(result): + break + result.buckets_scanned += 1 + self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run) + self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run) + self.cursor_store.update_bucket(bucket_name, time.time()) + + result.execution_time_seconds = time.time() - start + + if result.has_issues or result.errors: + logger.info( + "Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, " + "stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s", + result.execution_time_seconds, + result.corrupted_objects, + result.orphaned_objects, + result.phantom_metadata, + result.stale_versions, + result.etag_cache_inconsistencies, + result.legacy_metadata_drifts, + result.issues_healed, + len(result.errors), + " (dry run)" if effective_dry_run else "", + ) + + record = IntegrityExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=effective_dry_run, + auto_heal=effective_auto_heal, ) + self.history_store.add(record) - record = IntegrityExecutionRecord( - timestamp=time.time(), - result=result.to_dict(), - dry_run=effective_dry_run, - auto_heal=effective_auto_heal, - ) - self.history_store.add(record) + return result + finally: + self._scanning = False + self._scan_start_time = None + self._lock.release() - return result + def run_async(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> bool: + if self._scanning: + return False + t = threading.Thread(target=self.run_now, args=(auto_heal, dry_run), daemon=True) + t.start() + return True def _system_path(self) -> Path: return self.storage_root / self.SYSTEM_ROOT @@ -289,6 +385,16 @@ class IntegrityChecker: pass return names + def _throttle(self) -> bool: + if self._shutdown: + return True + if self._io_throttle > 0: + time.sleep(self._io_throttle) + return self._shutdown + + def _batch_exhausted(self, result: IntegrityResult) -> bool: + return self._shutdown or result.objects_scanned >= self.batch_size + def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None: if len(result.issues) < MAX_ISSUES: result.issues.append(issue) @@ -304,7 +410,9 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): - if result.objects_scanned >= self.batch_size: + if self._throttle(): + return + if self._batch_exhausted(result): return if not index_file.is_file(): continue @@ -314,7 +422,9 @@ class IntegrityChecker: continue for key_name, entry in list(index_data.items()): - if result.objects_scanned >= self.batch_size: + if self._throttle(): + return + if self._batch_exhausted(result): return rel_dir = index_file.parent.relative_to(meta_root) @@ -374,7 +484,9 @@ class IntegrityChecker: try: for entry in bucket_path.rglob("*"): - if result.objects_scanned >= self.batch_size: + if self._throttle(): + return + if self._batch_exhausted(result): return if not entry.is_file(): continue @@ -385,6 +497,7 @@ class IntegrityChecker: if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: continue + result.objects_scanned += 1 full_key = rel.as_posix() key_name = rel.name parent = rel.parent @@ -449,6 +562,10 @@ class IntegrityChecker: try: for index_file in meta_root.rglob("_index.json"): + if self._throttle(): + return + if self._batch_exhausted(result): + return if not index_file.is_file(): continue try: @@ -458,6 +575,9 @@ class IntegrityChecker: keys_to_remove = [] for key_name in list(index_data.keys()): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 rel_dir = index_file.parent.relative_to(meta_root) if rel_dir == Path("."): full_key = key_name @@ -503,6 +623,10 @@ class IntegrityChecker: try: for key_dir in versions_root.rglob("*"): + if self._throttle(): + return + if self._batch_exhausted(result): + return if not key_dir.is_dir(): continue @@ -510,6 +634,9 @@ class IntegrityChecker: json_files = {f.stem: f for f in key_dir.glob("*.json")} for stem, bin_file in bin_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in json_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -529,6 +656,9 @@ class IntegrityChecker: self._add_issue(result, issue) for stem, json_file in json_files.items(): + if self._batch_exhausted(result): + return + result.objects_scanned += 1 if stem not in bin_files: result.stale_versions += 1 issue = IntegrityIssue( @@ -569,6 +699,9 @@ class IntegrityChecker: found_mismatch = False for full_key, cached_etag in etag_cache.items(): + if self._batch_exhausted(result): + break + result.objects_scanned += 1 key_path = Path(full_key) key_name = key_path.name parent = key_path.parent @@ -626,9 +759,14 @@ class IntegrityChecker: try: for meta_file in legacy_meta_root.rglob("*.meta.json"): + if self._throttle(): + return + if self._batch_exhausted(result): + return if not meta_file.is_file(): continue + result.objects_scanned += 1 try: rel = meta_file.relative_to(legacy_meta_root) except ValueError: @@ -728,11 +866,17 @@ class IntegrityChecker: return [r.to_dict() for r in records] def get_status(self) -> dict: - return { + status: Dict[str, Any] = { "enabled": not self._shutdown or self._timer is not None, "running": self._timer is not None and not self._shutdown, + "scanning": self._scanning, "interval_hours": self.interval_seconds / 3600.0, "batch_size": self.batch_size, "auto_heal": self.auto_heal, "dry_run": self.dry_run, + "io_throttle_ms": round(self._io_throttle * 1000), } + if self._scanning and self._scan_start_time is not None: + status["scan_elapsed_seconds"] = round(time.time() - self._scan_start_time, 1) + status["cursor"] = self.cursor_store.get_info() + return status diff --git a/app/ui.py b/app/ui.py index b1a0947..29ffd05 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1063,6 +1063,27 @@ def bulk_delete_objects(bucket_name: str): return _respond(False, f"A maximum of {MAX_KEYS} objects can be deleted per request", status_code=400) unique_keys = list(dict.fromkeys(cleaned)) + + folder_prefixes = [k for k in unique_keys if k.endswith("/")] + if folder_prefixes: + try: + client = get_session_s3_client() + for prefix in folder_prefixes: + unique_keys.remove(prefix) + paginator = client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"] not in unique_keys: + unique_keys.append(obj["Key"]) + except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: + if isinstance(exc, ClientError): + err, status = handle_client_error(exc) + return _respond(False, err["error"], status_code=status) + return _respond(False, "S3 API server is unreachable", status_code=502) + + if not unique_keys: + return _respond(False, "No objects found under the selected folders", status_code=400) + try: _authorize_ui(principal, bucket_name, "delete") except IamError as exc: @@ -1093,13 +1114,17 @@ def bulk_delete_objects(bucket_name: str): else: try: client = get_session_s3_client() - objects_to_delete = [{"Key": k} for k in unique_keys] - resp = client.delete_objects( - Bucket=bucket_name, - Delete={"Objects": objects_to_delete, "Quiet": False}, - ) - deleted = [d["Key"] for d in resp.get("Deleted", [])] - errors = [{"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])] + deleted = [] + errors = [] + for i in range(0, len(unique_keys), 1000): + batch = unique_keys[i:i + 1000] + objects_to_delete = [{"Key": k} for k in batch] + resp = client.delete_objects( + Bucket=bucket_name, + Delete={"Objects": objects_to_delete, "Quiet": False}, + ) + deleted.extend(d["Key"] for d in resp.get("Deleted", [])) + errors.extend({"key": e["Key"], "error": e.get("Message", e.get("Code", "Unknown error"))} for e in resp.get("Errors", [])) for key in deleted: _replication_manager().trigger_replication(bucket_name, key, action="delete") except (ClientError, EndpointConnectionError, ConnectionClosedError) as exc: @@ -4126,7 +4151,7 @@ def system_dashboard(): r = rec.get("result", {}) total_freed = r.get("temp_bytes_freed", 0) + r.get("multipart_bytes_freed", 0) + r.get("orphaned_version_bytes_freed", 0) rec["bytes_freed_display"] = _format_bytes(total_freed) - rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + rec["timestamp_display"] = _format_datetime_display(datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc)) gc_history_records.append(rec) checker = current_app.extensions.get("integrity") @@ -4135,7 +4160,7 @@ def system_dashboard(): if checker: raw = checker.get_history(limit=10, offset=0) for rec in raw: - rec["timestamp_display"] = datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + rec["timestamp_display"] = _format_datetime_display(datetime.fromtimestamp(rec["timestamp"], tz=dt_timezone.utc)) integrity_history_records.append(rec) features = [ @@ -4163,6 +4188,7 @@ def system_dashboard(): gc_history=gc_history_records, integrity_status=integrity_status, integrity_history=integrity_history_records, + display_timezone=current_app.config.get("DISPLAY_TIMEZONE", "UTC"), ) @@ -4179,14 +4205,43 @@ def system_gc_run(): return jsonify({"error": "GC is not enabled"}), 400 payload = request.get_json(silent=True) or {} - original_dry_run = gc.dry_run - if "dry_run" in payload: - gc.dry_run = bool(payload["dry_run"]) + started = gc.run_async(dry_run=payload.get("dry_run")) + if not started: + return jsonify({"error": "GC is already in progress"}), 409 + return jsonify({"status": "started"}) + + +@ui_bp.get("/system/gc/status") +def system_gc_status(): + principal = _current_principal() try: - result = gc.run_now() - finally: - gc.dry_run = original_dry_run - return jsonify(result.to_dict()) + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"error": "GC is not enabled"}), 400 + + return jsonify(gc.get_status()) + + +@ui_bp.get("/system/gc/history") +def system_gc_history(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + gc = current_app.extensions.get("gc") + if not gc: + return jsonify({"executions": []}) + + limit = min(int(request.args.get("limit", 10)), 200) + offset = int(request.args.get("offset", 0)) + records = gc.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) @ui_bp.post("/system/integrity/run") @@ -4202,11 +4257,46 @@ def system_integrity_run(): return jsonify({"error": "Integrity checker is not enabled"}), 400 payload = request.get_json(silent=True) or {} - result = checker.run_now( + started = checker.run_async( auto_heal=payload.get("auto_heal"), dry_run=payload.get("dry_run"), ) - return jsonify(result.to_dict()) + if not started: + return jsonify({"error": "A scan is already in progress"}), 409 + return jsonify({"status": "started"}) + + +@ui_bp.get("/system/integrity/status") +def system_integrity_status(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"error": "Integrity checker is not enabled"}), 400 + + return jsonify(checker.get_status()) + + +@ui_bp.get("/system/integrity/history") +def system_integrity_history(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:*") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + checker = current_app.extensions.get("integrity") + if not checker: + return jsonify({"executions": []}) + + limit = min(int(request.args.get("limit", 10)), 200) + offset = int(request.args.get("offset", 0)) + records = checker.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) @ui_bp.app_errorhandler(404) diff --git a/app/version.py b/app/version.py index 77fc8ae..5c24c4a 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.4.0" +APP_VERSION = "0.4.1" def get_version() -> str: diff --git a/static/css/main.css b/static/css/main.css index 5202aab..2a54ee0 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -2655,7 +2655,7 @@ pre code { } .objects-table-container { - max-height: none; + max-height: 60vh; } .preview-card { diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 90517fe..f35f46f 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -98,6 +98,9 @@ const previewMetadata = document.getElementById('preview-metadata'); const previewMetadataList = document.getElementById('preview-metadata-list'); const previewPlaceholder = document.getElementById('preview-placeholder'); + const previewPlaceholderDefault = previewPlaceholder ? previewPlaceholder.innerHTML : ''; + const previewErrorAlert = document.getElementById('preview-error-alert'); + const previewDetailsMeta = document.getElementById('preview-details-meta'); const previewImage = document.getElementById('preview-image'); const previewVideo = document.getElementById('preview-video'); const previewAudio = document.getElementById('preview-audio'); @@ -866,6 +869,11 @@ const checkbox = row.querySelector('[data-folder-select]'); checkbox?.addEventListener('change', (e) => { e.stopPropagation(); + if (checkbox.checked) { + selectedRows.set(folderPath, { key: folderPath, isFolder: true }); + } else { + selectedRows.delete(folderPath); + } const folderObjects = allObjects.filter(obj => obj.key.startsWith(folderPath)); folderObjects.forEach(obj => { if (checkbox.checked) { @@ -1350,8 +1358,11 @@ } if (selectAllCheckbox) { const filesInView = visibleItems.filter(item => item.type === 'file'); - const total = filesInView.length; - const visibleSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length; + const foldersInView = visibleItems.filter(item => item.type === 'folder'); + const total = filesInView.length + foldersInView.length; + const fileSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length; + const folderSelectedCount = foldersInView.filter(item => selectedRows.has(item.path)).length; + const visibleSelectedCount = fileSelectedCount + folderSelectedCount; selectAllCheckbox.disabled = total === 0; selectAllCheckbox.checked = visibleSelectedCount > 0 && visibleSelectedCount === total && total > 0; selectAllCheckbox.indeterminate = visibleSelectedCount > 0 && visibleSelectedCount < total; @@ -1373,8 +1384,12 @@ const keys = Array.from(selectedRows.keys()); bulkDeleteList.innerHTML = ''; if (bulkDeleteCount) { - const label = keys.length === 1 ? 'object' : 'objects'; - bulkDeleteCount.textContent = `${keys.length} ${label} selected`; + const folderCount = keys.filter(k => k.endsWith('/')).length; + const objectCount = keys.length - folderCount; + const parts = []; + if (folderCount) parts.push(`${folderCount} folder${folderCount !== 1 ? 's' : ''}`); + if (objectCount) parts.push(`${objectCount} object${objectCount !== 1 ? 's' : ''}`); + bulkDeleteCount.textContent = `${parts.join(' and ')} selected`; } if (!keys.length) { const empty = document.createElement('li'); @@ -1513,7 +1528,7 @@ }; const response = await fetch(endpoint, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { 'Content-Type': 'application/json', 'X-CSRFToken': window.getCsrfToken ? window.getCsrfToken() : '' }, body: JSON.stringify(payload), }); const data = await response.json(); @@ -1957,6 +1972,10 @@ [previewImage, previewVideo, previewAudio, previewIframe].forEach((el) => { if (!el) return; el.classList.add('d-none'); + if (el.tagName === 'IMG') { + el.removeAttribute('src'); + el.onload = null; + } if (el.tagName === 'VIDEO' || el.tagName === 'AUDIO') { el.pause(); el.removeAttribute('src'); @@ -1969,9 +1988,38 @@ previewText.classList.add('d-none'); previewText.textContent = ''; } + previewPlaceholder.innerHTML = previewPlaceholderDefault; previewPlaceholder.classList.remove('d-none'); }; + let previewFailed = false; + + const handlePreviewError = () => { + previewFailed = true; + if (downloadButton) { + downloadButton.classList.add('disabled'); + downloadButton.removeAttribute('href'); + } + if (presignButton) presignButton.disabled = true; + if (generatePresignButton) generatePresignButton.disabled = true; + if (previewDetailsMeta) previewDetailsMeta.classList.add('d-none'); + if (previewMetadata) previewMetadata.classList.add('d-none'); + const tagsPanel = document.getElementById('preview-tags'); + if (tagsPanel) tagsPanel.classList.add('d-none'); + const versionPanel = document.getElementById('version-panel'); + if (versionPanel) versionPanel.classList.add('d-none'); + if (previewErrorAlert) { + previewErrorAlert.textContent = 'Unable to load object \u2014 it may have been deleted, or the server returned an error.'; + previewErrorAlert.classList.remove('d-none'); + } + }; + + const clearPreviewError = () => { + previewFailed = false; + if (previewErrorAlert) previewErrorAlert.classList.add('d-none'); + if (previewDetailsMeta) previewDetailsMeta.classList.remove('d-none'); + }; + async function fetchMetadata(metadataUrl) { if (!metadataUrl) return null; try { @@ -1993,6 +2041,7 @@ previewPanel.classList.remove('d-none'); activeRow = row; renderMetadata(null); + clearPreviewError(); previewKey.textContent = row.dataset.key; previewSize.textContent = formatBytes(Number(row.dataset.size)); @@ -2016,18 +2065,71 @@ const previewUrl = row.dataset.previewUrl; const lower = row.dataset.key.toLowerCase(); if (previewUrl && lower.match(/\.(png|jpg|jpeg|gif|webp|svg|ico|bmp)$/)) { - previewImage.src = previewUrl; - previewImage.classList.remove('d-none'); - previewPlaceholder.classList.add('d-none'); + previewPlaceholder.innerHTML = '
No executions recorded yet.
| Time | Cleaned | ' + + 'Freed | Mode |
|---|---|---|---|
| ' + formatTimestamp(exec.timestamp) + ' | ' + + '' + cleaned + ' | ' + + '' + formatBytes(freed) + ' | ' + + '' + mode + ' |
No scans recorded yet.
| Time | Scanned | ' + + 'Issues | Healed | ' + + 'Mode |
|---|---|---|---|---|
| ' + formatTimestamp(exec.timestamp) + ' | ' + + '' + (r.objects_scanned || 0) + ' | ' + + '' + issueHtml + ' | ' + + '' + (r.issues_healed || 0) + ' | ' + + '' + mode + ' |