diff --git a/app/__init__.py b/app/__init__.py index 89dbdbd..161fa90 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -29,6 +29,7 @@ from .encryption import EncryptionManager from .extensions import limiter, csrf from .iam import IamService from .kms import KMSManager +from .gc import GarbageCollector from .lifecycle import LifecycleManager from .notifications import NotificationService from .object_lock import ObjectLockService @@ -221,6 +222,18 @@ def create_app( ) lifecycle_manager.start() + gc_collector = None + if app.config.get("GC_ENABLED", False): + gc_collector = GarbageCollector( + storage_root=storage_root, + interval_hours=app.config.get("GC_INTERVAL_HOURS", 6.0), + temp_file_max_age_hours=app.config.get("GC_TEMP_FILE_MAX_AGE_HOURS", 24.0), + multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7), + lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0), + dry_run=app.config.get("GC_DRY_RUN", False), + ) + gc_collector.start() + app.extensions["object_storage"] = storage app.extensions["iam"] = iam app.extensions["bucket_policies"] = bucket_policies @@ -232,6 +245,7 @@ def create_app( app.extensions["kms"] = kms_manager app.extensions["acl"] = acl_service app.extensions["lifecycle"] = lifecycle_manager + app.extensions["gc"] = gc_collector app.extensions["object_lock"] = object_lock_service app.extensions["notifications"] = notification_service app.extensions["access_logging"] = access_logging_service diff --git a/app/admin_api.py b/app/admin_api.py index f650a3f..04d91c2 100644 --- a/app/admin_api.py +++ b/app/admin_api.py @@ -14,6 +14,7 @@ from flask import Blueprint, Response, current_app, jsonify, request from .connections import ConnectionStore from .extensions import limiter +from .gc import GarbageCollector from .iam import IamError, Principal from .replication import ReplicationManager from .site_registry import PeerSite, SiteInfo, SiteRegistry @@ -776,3 +777,55 @@ def delete_website_domain(domain: str): return _json_error("NotFound", f"No mapping found for domain '{domain}'", 404) logger.info("Website domain mapping deleted: %s", domain) return Response(status=204) + + +def _gc() -> Optional[GarbageCollector]: + return current_app.extensions.get("gc") + + +@admin_api_bp.route("/gc/status", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def gc_status(): + principal, error = _require_admin() + if error: + return error + gc = _gc() + if not gc: + return jsonify({"enabled": False, "message": "GC is not enabled. Set GC_ENABLED=true to enable."}) + return jsonify(gc.get_status()) + + +@admin_api_bp.route("/gc/run", methods=["POST"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def gc_run_now(): + principal, error = _require_admin() + if error: + return error + gc = _gc() + if not gc: + return _json_error("InvalidRequest", "GC is not enabled", 400) + payload = request.get_json(silent=True) or {} + original_dry_run = gc.dry_run + if "dry_run" in payload: + gc.dry_run = bool(payload["dry_run"]) + try: + result = gc.run_now() + finally: + gc.dry_run = original_dry_run + logger.info("GC manual run by %s", principal.access_key) + return jsonify(result.to_dict()) + + +@admin_api_bp.route("/gc/history", methods=["GET"]) +@limiter.limit(lambda: _get_admin_rate_limit()) +def gc_history(): + principal, error = _require_admin() + if error: + return error + gc = _gc() + if not gc: + return jsonify({"executions": []}) + limit = min(int(request.args.get("limit", 50)), 200) + offset = int(request.args.get("offset", 0)) + records = gc.get_history(limit=limit, offset=offset) + return jsonify({"executions": records}) diff --git a/app/config.py b/app/config.py index 71da39a..7002d48 100644 --- a/app/config.py +++ b/app/config.py @@ -150,6 +150,12 @@ class AppConfig: allowed_redirect_hosts: list[str] allow_internal_endpoints: bool website_hosting_enabled: bool + gc_enabled: bool + gc_interval_hours: float + gc_temp_file_max_age_hours: float + gc_multipart_max_age_days: int + gc_lock_file_max_age_hours: float + gc_dry_run: bool @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -319,6 +325,12 @@ class AppConfig: allowed_redirect_hosts = [h.strip() for h in str(allowed_redirect_hosts_raw).split(",") if h.strip()] allow_internal_endpoints = str(_get("ALLOW_INTERNAL_ENDPOINTS", "0")).lower() in {"1", "true", "yes", "on"} website_hosting_enabled = str(_get("WEBSITE_HOSTING_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} + gc_enabled = str(_get("GC_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} + gc_interval_hours = float(_get("GC_INTERVAL_HOURS", 6.0)) + gc_temp_file_max_age_hours = float(_get("GC_TEMP_FILE_MAX_AGE_HOURS", 24.0)) + gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7)) + gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0)) + gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"} return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -406,7 +418,13 @@ class AppConfig: num_trusted_proxies=num_trusted_proxies, allowed_redirect_hosts=allowed_redirect_hosts, allow_internal_endpoints=allow_internal_endpoints, - website_hosting_enabled=website_hosting_enabled) + website_hosting_enabled=website_hosting_enabled, + gc_enabled=gc_enabled, + gc_interval_hours=gc_interval_hours, + gc_temp_file_max_age_hours=gc_temp_file_max_age_hours, + gc_multipart_max_age_days=gc_multipart_max_age_days, + gc_lock_file_max_age_hours=gc_lock_file_max_age_hours, + gc_dry_run=gc_dry_run) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -617,4 +635,10 @@ class AppConfig: "ALLOWED_REDIRECT_HOSTS": self.allowed_redirect_hosts, "ALLOW_INTERNAL_ENDPOINTS": self.allow_internal_endpoints, "WEBSITE_HOSTING_ENABLED": self.website_hosting_enabled, + "GC_ENABLED": self.gc_enabled, + "GC_INTERVAL_HOURS": self.gc_interval_hours, + "GC_TEMP_FILE_MAX_AGE_HOURS": self.gc_temp_file_max_age_hours, + "GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days, + "GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours, + "GC_DRY_RUN": self.gc_dry_run, } diff --git a/app/gc.py b/app/gc.py new file mode 100644 index 0000000..1697607 --- /dev/null +++ b/app/gc.py @@ -0,0 +1,531 @@ +from __future__ import annotations + +import json +import logging +import os +import shutil +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class GCResult: + temp_files_deleted: int = 0 + temp_bytes_freed: int = 0 + multipart_uploads_deleted: int = 0 + multipart_bytes_freed: int = 0 + lock_files_deleted: int = 0 + orphaned_metadata_deleted: int = 0 + orphaned_versions_deleted: int = 0 + orphaned_version_bytes_freed: int = 0 + empty_dirs_removed: int = 0 + errors: List[str] = field(default_factory=list) + execution_time_seconds: float = 0.0 + + def to_dict(self) -> dict: + return { + "temp_files_deleted": self.temp_files_deleted, + "temp_bytes_freed": self.temp_bytes_freed, + "multipart_uploads_deleted": self.multipart_uploads_deleted, + "multipart_bytes_freed": self.multipart_bytes_freed, + "lock_files_deleted": self.lock_files_deleted, + "orphaned_metadata_deleted": self.orphaned_metadata_deleted, + "orphaned_versions_deleted": self.orphaned_versions_deleted, + "orphaned_version_bytes_freed": self.orphaned_version_bytes_freed, + "empty_dirs_removed": self.empty_dirs_removed, + "errors": self.errors, + "execution_time_seconds": self.execution_time_seconds, + } + + @property + def total_bytes_freed(self) -> int: + return self.temp_bytes_freed + self.multipart_bytes_freed + self.orphaned_version_bytes_freed + + @property + def has_work(self) -> bool: + return ( + self.temp_files_deleted > 0 + or self.multipart_uploads_deleted > 0 + or self.lock_files_deleted > 0 + or self.orphaned_metadata_deleted > 0 + or self.orphaned_versions_deleted > 0 + or self.empty_dirs_removed > 0 + ) + + +@dataclass +class GCExecutionRecord: + timestamp: float + result: dict + dry_run: bool + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp, + "result": self.result, + "dry_run": self.dry_run, + } + + @classmethod + def from_dict(cls, data: dict) -> GCExecutionRecord: + return cls( + timestamp=data["timestamp"], + result=data["result"], + dry_run=data.get("dry_run", False), + ) + + +class GCHistoryStore: + def __init__(self, storage_root: Path, max_records: int = 50) -> None: + self.storage_root = storage_root + self.max_records = max_records + self._lock = threading.Lock() + + def _get_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "gc_history.json" + + def load(self) -> List[GCExecutionRecord]: + path = self._get_path() + if not path.exists(): + return [] + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return [GCExecutionRecord.from_dict(d) for d in data.get("executions", [])] + except (OSError, ValueError, KeyError) as e: + logger.error("Failed to load GC history: %s", e) + return [] + + def save(self, records: List[GCExecutionRecord]) -> None: + path = self._get_path() + path.parent.mkdir(parents=True, exist_ok=True) + data = {"executions": [r.to_dict() for r in records[: self.max_records]]} + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + except OSError as e: + logger.error("Failed to save GC history: %s", e) + + def add(self, record: GCExecutionRecord) -> None: + with self._lock: + records = self.load() + records.insert(0, record) + self.save(records) + + def get_history(self, limit: int = 50, offset: int = 0) -> List[GCExecutionRecord]: + return self.load()[offset : offset + limit] + + +def _dir_size(path: Path) -> int: + total = 0 + try: + for f in path.rglob("*"): + if f.is_file(): + try: + total += f.stat().st_size + except OSError: + pass + except OSError: + pass + return total + + +def _file_age_hours(path: Path) -> float: + try: + mtime = path.stat().st_mtime + return (time.time() - mtime) / 3600.0 + except OSError: + return 0.0 + + +class GarbageCollector: + SYSTEM_ROOT = ".myfsio.sys" + SYSTEM_TMP_DIR = "tmp" + SYSTEM_MULTIPART_DIR = "multipart" + SYSTEM_BUCKETS_DIR = "buckets" + BUCKET_META_DIR = "meta" + BUCKET_VERSIONS_DIR = "versions" + INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"} + + def __init__( + self, + storage_root: Path, + interval_hours: float = 6.0, + temp_file_max_age_hours: float = 24.0, + multipart_max_age_days: int = 7, + lock_file_max_age_hours: float = 1.0, + dry_run: bool = False, + max_history: int = 50, + ) -> None: + self.storage_root = Path(storage_root) + self.interval_seconds = interval_hours * 3600.0 + self.temp_file_max_age_hours = temp_file_max_age_hours + self.multipart_max_age_days = multipart_max_age_days + self.lock_file_max_age_hours = lock_file_max_age_hours + self.dry_run = dry_run + self._timer: Optional[threading.Timer] = None + self._shutdown = False + self._lock = threading.Lock() + self.history_store = GCHistoryStore(storage_root, max_records=max_history) + + def start(self) -> None: + if self._timer is not None: + return + self._shutdown = False + self._schedule_next() + logger.info( + "GC started: interval=%.1fh, temp_max_age=%.1fh, multipart_max_age=%dd, lock_max_age=%.1fh, dry_run=%s", + self.interval_seconds / 3600.0, + self.temp_file_max_age_hours, + self.multipart_max_age_days, + self.lock_file_max_age_hours, + self.dry_run, + ) + + def stop(self) -> None: + self._shutdown = True + if self._timer: + self._timer.cancel() + self._timer = None + logger.info("GC stopped") + + def _schedule_next(self) -> None: + if self._shutdown: + return + self._timer = threading.Timer(self.interval_seconds, self._run_cycle) + self._timer.daemon = True + self._timer.start() + + def _run_cycle(self) -> None: + if self._shutdown: + return + try: + self.run_now() + except Exception as e: + logger.error("GC cycle failed: %s", e) + finally: + self._schedule_next() + + def run_now(self) -> GCResult: + start = time.time() + result = GCResult() + + self._clean_temp_files(result) + self._clean_orphaned_multipart(result) + self._clean_stale_locks(result) + self._clean_orphaned_metadata(result) + self._clean_orphaned_versions(result) + self._clean_empty_dirs(result) + + result.execution_time_seconds = time.time() - start + + if result.has_work or result.errors: + logger.info( + "GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), " + "locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s", + result.execution_time_seconds, + result.temp_files_deleted, + result.temp_bytes_freed / (1024 * 1024), + result.multipart_uploads_deleted, + result.multipart_bytes_freed / (1024 * 1024), + result.lock_files_deleted, + result.orphaned_metadata_deleted, + result.orphaned_versions_deleted, + result.orphaned_version_bytes_freed / (1024 * 1024), + result.empty_dirs_removed, + len(result.errors), + " (dry run)" if self.dry_run else "", + ) + + record = GCExecutionRecord( + timestamp=time.time(), + result=result.to_dict(), + dry_run=self.dry_run, + ) + self.history_store.add(record) + + return result + + def _system_path(self) -> Path: + return self.storage_root / self.SYSTEM_ROOT + + def _list_bucket_names(self) -> List[str]: + names = [] + try: + for entry in self.storage_root.iterdir(): + if entry.is_dir() and entry.name != self.SYSTEM_ROOT: + names.append(entry.name) + except OSError: + pass + return names + + def _clean_temp_files(self, result: GCResult) -> None: + tmp_dir = self._system_path() / self.SYSTEM_TMP_DIR + if not tmp_dir.exists(): + return + try: + for entry in tmp_dir.iterdir(): + if not entry.is_file(): + continue + age = _file_age_hours(entry) + if age < self.temp_file_max_age_hours: + continue + try: + size = entry.stat().st_size + if not self.dry_run: + entry.unlink() + result.temp_files_deleted += 1 + result.temp_bytes_freed += size + except OSError as e: + result.errors.append(f"temp file {entry.name}: {e}") + except OSError as e: + result.errors.append(f"scan tmp dir: {e}") + + def _clean_orphaned_multipart(self, result: GCResult) -> None: + cutoff_hours = self.multipart_max_age_days * 24.0 + bucket_names = self._list_bucket_names() + + for bucket_name in bucket_names: + for multipart_root in ( + self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name, + self.storage_root / bucket_name / ".multipart", + ): + if not multipart_root.exists(): + continue + try: + for upload_dir in multipart_root.iterdir(): + if not upload_dir.is_dir(): + continue + self._maybe_clean_upload(upload_dir, cutoff_hours, result) + except OSError as e: + result.errors.append(f"scan multipart {bucket_name}: {e}") + + def _maybe_clean_upload(self, upload_dir: Path, cutoff_hours: float, result: GCResult) -> None: + manifest_path = upload_dir / "manifest.json" + age = _file_age_hours(manifest_path) if manifest_path.exists() else _file_age_hours(upload_dir) + + if age < cutoff_hours: + return + + dir_bytes = _dir_size(upload_dir) + try: + if not self.dry_run: + shutil.rmtree(upload_dir, ignore_errors=True) + result.multipart_uploads_deleted += 1 + result.multipart_bytes_freed += dir_bytes + except OSError as e: + result.errors.append(f"multipart {upload_dir.name}: {e}") + + def _clean_stale_locks(self, result: GCResult) -> None: + buckets_root = self._system_path() / self.SYSTEM_BUCKETS_DIR + if not buckets_root.exists(): + return + + try: + for bucket_dir in buckets_root.iterdir(): + if not bucket_dir.is_dir(): + continue + locks_dir = bucket_dir / "locks" + if not locks_dir.exists(): + continue + try: + for lock_file in locks_dir.iterdir(): + if not lock_file.is_file() or not lock_file.name.endswith(".lock"): + continue + age = _file_age_hours(lock_file) + if age < self.lock_file_max_age_hours: + continue + try: + if not self.dry_run: + lock_file.unlink(missing_ok=True) + result.lock_files_deleted += 1 + except OSError as e: + result.errors.append(f"lock {lock_file.name}: {e}") + except OSError as e: + result.errors.append(f"scan locks {bucket_dir.name}: {e}") + except OSError as e: + result.errors.append(f"scan buckets for locks: {e}") + + def _clean_orphaned_metadata(self, result: GCResult) -> None: + bucket_names = self._list_bucket_names() + + for bucket_name in bucket_names: + legacy_meta = self.storage_root / bucket_name / ".meta" + if legacy_meta.exists(): + self._clean_legacy_metadata(bucket_name, legacy_meta, result) + + new_meta = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR + if new_meta.exists(): + self._clean_index_metadata(bucket_name, new_meta, result) + + def _clean_legacy_metadata(self, bucket_name: str, meta_root: Path, result: GCResult) -> None: + bucket_path = self.storage_root / bucket_name + try: + for meta_file in meta_root.rglob("*.meta.json"): + if not meta_file.is_file(): + continue + try: + rel = meta_file.relative_to(meta_root) + object_key = rel.as_posix().removesuffix(".meta.json") + object_path = bucket_path / object_key + if not object_path.exists(): + if not self.dry_run: + meta_file.unlink(missing_ok=True) + result.orphaned_metadata_deleted += 1 + except (OSError, ValueError) as e: + result.errors.append(f"legacy meta {bucket_name}/{meta_file.name}: {e}") + except OSError as e: + result.errors.append(f"scan legacy meta {bucket_name}: {e}") + + def _clean_index_metadata(self, bucket_name: str, meta_root: Path, result: GCResult) -> None: + bucket_path = self.storage_root / bucket_name + try: + for index_file in meta_root.rglob("_index.json"): + if not index_file.is_file(): + continue + try: + with open(index_file, "r", encoding="utf-8") as f: + index_data = json.load(f) + except (OSError, json.JSONDecodeError): + continue + + keys_to_remove = [] + for key in index_data: + rel_dir = index_file.parent.relative_to(meta_root) + if rel_dir == Path("."): + full_key = key + else: + full_key = rel_dir.as_posix() + "/" + key + object_path = bucket_path / full_key + if not object_path.exists(): + keys_to_remove.append(key) + + if keys_to_remove: + if not self.dry_run: + for k in keys_to_remove: + index_data.pop(k, None) + if index_data: + try: + with open(index_file, "w", encoding="utf-8") as f: + json.dump(index_data, f) + except OSError as e: + result.errors.append(f"write index {bucket_name}: {e}") + continue + else: + try: + index_file.unlink(missing_ok=True) + except OSError: + pass + result.orphaned_metadata_deleted += len(keys_to_remove) + except OSError as e: + result.errors.append(f"scan index meta {bucket_name}: {e}") + + def _clean_orphaned_versions(self, result: GCResult) -> None: + bucket_names = self._list_bucket_names() + + for bucket_name in bucket_names: + bucket_path = self.storage_root / bucket_name + for versions_root in ( + self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR, + self.storage_root / bucket_name / ".versions", + ): + if not versions_root.exists(): + continue + try: + for key_dir in versions_root.iterdir(): + if not key_dir.is_dir(): + continue + self._clean_versions_for_key(bucket_path, versions_root, key_dir, result) + except OSError as e: + result.errors.append(f"scan versions {bucket_name}: {e}") + + def _clean_versions_for_key( + self, bucket_path: Path, versions_root: Path, key_dir: Path, result: GCResult + ) -> None: + try: + rel = key_dir.relative_to(versions_root) + except ValueError: + return + + object_path = bucket_path / rel + if object_path.exists(): + return + + version_files = list(key_dir.glob("*.bin")) + list(key_dir.glob("*.json")) + if not version_files: + return + + for vf in version_files: + try: + size = vf.stat().st_size if vf.suffix == ".bin" else 0 + if not self.dry_run: + vf.unlink(missing_ok=True) + if vf.suffix == ".bin": + result.orphaned_version_bytes_freed += size + result.orphaned_versions_deleted += 1 + except OSError as e: + result.errors.append(f"version file {vf.name}: {e}") + + def _clean_empty_dirs(self, result: GCResult) -> None: + targets = [ + self._system_path() / self.SYSTEM_TMP_DIR, + self._system_path() / self.SYSTEM_MULTIPART_DIR, + self._system_path() / self.SYSTEM_BUCKETS_DIR, + ] + for bucket_name in self._list_bucket_names(): + targets.append(self.storage_root / bucket_name / ".meta") + targets.append(self.storage_root / bucket_name / ".versions") + targets.append(self.storage_root / bucket_name / ".multipart") + + for root in targets: + if not root.exists(): + continue + self._remove_empty_dirs_recursive(root, root, result) + + def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool: + if not path.is_dir(): + return False + + try: + children = list(path.iterdir()) + except OSError: + return False + + all_empty = True + for child in children: + if child.is_dir(): + if not self._remove_empty_dirs_recursive(child, stop_at, result): + all_empty = False + else: + all_empty = False + + if all_empty and path != stop_at: + try: + if not self.dry_run: + path.rmdir() + result.empty_dirs_removed += 1 + return True + except OSError: + return False + return all_empty + + def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]: + records = self.history_store.get_history(limit, offset) + return [r.to_dict() for r in records] + + def get_status(self) -> dict: + return { + "enabled": not self._shutdown or self._timer is not None, + "running": self._timer is not None and not self._shutdown, + "interval_hours": self.interval_seconds / 3600.0, + "temp_file_max_age_hours": self.temp_file_max_age_hours, + "multipart_max_age_days": self.multipart_max_age_days, + "lock_file_max_age_hours": self.lock_file_max_age_hours, + "dry_run": self.dry_run, + } diff --git a/app/s3_api.py b/app/s3_api.py index 749e1e1..af54bed 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1019,6 +1019,58 @@ def _method_not_allowed(allowed: list[str]) -> Response: return response +def _check_conditional_headers(etag: str, last_modified: float | None) -> Response | None: + from email.utils import parsedate_to_datetime + + if_match = request.headers.get("If-Match") + if if_match: + if if_match.strip() != "*": + match_etags = [e.strip().strip('"') for e in if_match.split(",")] + if etag not in match_etags: + return Response(status=412) + + if_unmodified = request.headers.get("If-Unmodified-Since") + if not if_match and if_unmodified and last_modified is not None: + try: + dt = parsedate_to_datetime(if_unmodified) + obj_dt = datetime.fromtimestamp(last_modified, timezone.utc) + if obj_dt > dt: + return Response(status=412) + except (TypeError, ValueError): + pass + + if_none_match = request.headers.get("If-None-Match") + if if_none_match: + if if_none_match.strip() == "*": + resp = Response(status=304) + resp.headers["ETag"] = f'"{etag}"' + if last_modified is not None: + resp.headers["Last-Modified"] = http_date(last_modified) + return resp + none_match_etags = [e.strip().strip('"') for e in if_none_match.split(",")] + if etag in none_match_etags: + resp = Response(status=304) + resp.headers["ETag"] = f'"{etag}"' + if last_modified is not None: + resp.headers["Last-Modified"] = http_date(last_modified) + return resp + + if_modified = request.headers.get("If-Modified-Since") + if not if_none_match and if_modified and last_modified is not None: + try: + dt = parsedate_to_datetime(if_modified) + obj_dt = datetime.fromtimestamp(last_modified, timezone.utc) + if obj_dt <= dt: + resp = Response(status=304) + resp.headers["ETag"] = f'"{etag}"' + resp.headers["Last-Modified"] = http_date(last_modified) + return resp + except (TypeError, ValueError): + pass + + return None + + def _apply_object_headers( response: Response, *, @@ -2897,7 +2949,24 @@ def object_handler(bucket_name: str, object_key: str): mimetype = metadata.get("__content_type__") or mimetypes.guess_type(object_key)[0] or "application/octet-stream" is_encrypted = "x-amz-server-side-encryption" in metadata - + + cond_etag = metadata.get("__etag__") + if not cond_etag and not is_encrypted: + try: + cond_etag = storage._compute_etag(path) + except OSError: + cond_etag = None + if cond_etag: + cond_mtime = float(metadata["__last_modified__"]) if "__last_modified__" in metadata else None + if cond_mtime is None: + try: + cond_mtime = path.stat().st_mtime + except OSError: + pass + cond_resp = _check_conditional_headers(cond_etag, cond_mtime) + if cond_resp: + return cond_resp + if request.method == "GET": range_header = request.headers.get("Range") @@ -3367,6 +3436,16 @@ def head_object(bucket_name: str, object_key: str) -> Response: metadata = _storage().get_object_metadata(bucket_name, object_key) etag = metadata.get("__etag__") or _storage()._compute_etag(path) + head_mtime = float(metadata["__last_modified__"]) if "__last_modified__" in metadata else None + if head_mtime is None: + try: + head_mtime = path.stat().st_mtime + except OSError: + pass + cond_resp = _check_conditional_headers(etag, head_mtime) + if cond_resp: + return cond_resp + cached_size = metadata.get("__size__") cached_mtime = metadata.get("__last_modified__") if cached_size is not None and cached_mtime is not None: diff --git a/app/version.py b/app/version.py index e049a33..b4679f1 100644 --- a/app/version.py +++ b/app/version.py @@ -1,6 +1,6 @@ from __future__ import annotations -APP_VERSION = "0.3.7" +APP_VERSION = "0.3.8" def get_version() -> str: diff --git a/docs.md b/docs.md index 3e6908e..a63b72e 100644 --- a/docs.md +++ b/docs.md @@ -252,6 +252,60 @@ Once enabled, configure lifecycle rules via: ``` +## Garbage Collection + +The garbage collector (GC) automatically cleans up orphaned data that accumulates over time: stale temporary files from failed uploads, abandoned multipart uploads, stale lock files, orphaned metadata entries, orphaned version files, and empty directories. + +### Enabling GC + +By default, GC is disabled. Enable it by setting: + +```bash +GC_ENABLED=true python run.py +``` + +Or in your `myfsio.env` file: +``` +GC_ENABLED=true +GC_INTERVAL_HOURS=6 # Run every 6 hours (default) +GC_TEMP_FILE_MAX_AGE_HOURS=24 # Delete temp files older than 24h +GC_MULTIPART_MAX_AGE_DAYS=7 # Delete orphaned multipart uploads older than 7 days +GC_LOCK_FILE_MAX_AGE_HOURS=1 # Delete stale lock files older than 1h +GC_DRY_RUN=false # Set to true to log without deleting +``` + +### What Gets Cleaned + +| Type | Location | Condition | +|------|----------|-----------| +| **Temp files** | `.myfsio.sys/tmp/` | Older than `GC_TEMP_FILE_MAX_AGE_HOURS` | +| **Orphaned multipart uploads** | `.myfsio.sys/multipart/` and `/.multipart/` | Older than `GC_MULTIPART_MAX_AGE_DAYS` | +| **Stale lock files** | `.myfsio.sys/buckets//locks/` | Older than `GC_LOCK_FILE_MAX_AGE_HOURS` | +| **Orphaned metadata** | `.myfsio.sys/buckets//meta/` and `/.meta/` | Object file no longer exists | +| **Orphaned versions** | `.myfsio.sys/buckets//versions/` and `/.versions/` | Main object no longer exists | +| **Empty directories** | Various internal directories | Directory is empty after cleanup | + +### Admin API + +All GC endpoints require admin (`iam:*`) permissions. + +| Method | Route | Description | +|--------|-------|-------------| +| `GET` | `/admin/gc/status` | Get GC status and configuration | +| `POST` | `/admin/gc/run` | Trigger a manual GC run (body: `{"dry_run": true}` for preview) | +| `GET` | `/admin/gc/history` | Get GC execution history (query: `?limit=50&offset=0`) | + +### Dry Run Mode + +Set `GC_DRY_RUN=true` to log what would be deleted without actually removing anything. You can also trigger a one-time dry run via the admin API: + +```bash +curl -X POST "http://localhost:5000/admin/gc/run" \ + -H "X-Access-Key: " -H "X-Secret-Key: " \ + -H "Content-Type: application/json" \ + -d '{"dry_run": true}' +``` + ### Performance Tuning | Variable | Default | Notes | diff --git a/static/js/bucket-detail-main.js b/static/js/bucket-detail-main.js index 05a7b34..0a97e08 100644 --- a/static/js/bucket-detail-main.js +++ b/static/js/bucket-detail-main.js @@ -702,7 +702,7 @@ flushPendingStreamObjects(); hasMoreObjects = false; totalObjectCount = loadedObjectCount; - if (!currentPrefix) bucketTotalObjects = totalObjectCount; + if (!currentPrefix && !useDelimiterMode) bucketTotalObjects = totalObjectCount; updateObjectCountBadge(); if (objectsLoadingRow && objectsLoadingRow.parentNode) { @@ -767,7 +767,7 @@ } totalObjectCount = data.total_count || 0; - if (!append && !currentPrefix) bucketTotalObjects = totalObjectCount; + if (!append && !currentPrefix && !useDelimiterMode) bucketTotalObjects = totalObjectCount; nextContinuationToken = data.next_continuation_token; if (!append && objectsLoadingRow) { diff --git a/templates/buckets.html b/templates/buckets.html index ad24574..5817e24 100644 --- a/templates/buckets.html +++ b/templates/buckets.html @@ -51,7 +51,7 @@
{{ bucket.meta.name }}
- Created {{ bucket.meta.created_at | format_datetime }} + Created {{ bucket.meta.creation_date | format_datetime }}
{{ bucket.access_label }} diff --git a/templates/docs.html b/templates/docs.html index adf4bb3..e0a193e 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -40,6 +40,7 @@
  • Bucket Quotas
  • Encryption
  • Lifecycle Rules
  • +
  • Garbage Collection
  • Metrics History
  • Operation Metrics
  • Troubleshooting
  • @@ -1627,10 +1628,113 @@ curl "{{ api_base }}/<bucket>?lifecycle" \ -
    +
    14 +

    Garbage Collection

    +
    +

    Automatically clean up orphaned data that accumulates over time: stale temp files, abandoned multipart uploads, stale lock files, orphaned metadata, orphaned versions, and empty directories.

    + +

    Enabling GC

    +

    Disabled by default. Enable via environment variable:

    +
    GC_ENABLED=true python run.py
    + +

    Configuration

    +
    + + + + + + + + + + + + + + + + +
    VariableDefaultDescription
    GC_ENABLEDfalseEnable garbage collection
    GC_INTERVAL_HOURS6Hours between GC cycles
    GC_TEMP_FILE_MAX_AGE_HOURS24Delete temp files older than this
    GC_MULTIPART_MAX_AGE_DAYS7Delete orphaned multipart uploads older than this
    GC_LOCK_FILE_MAX_AGE_HOURS1Delete stale lock files older than this
    GC_DRY_RUNfalseLog what would be deleted without removing
    +
    + +

    What Gets Cleaned

    +
    + + + + + + + + + + + + + + + + +
    TypeLocationCondition
    Temp files.myfsio.sys/tmp/Older than configured max age
    Orphaned multipart.myfsio.sys/multipart/Older than configured max age
    Stale lock files.myfsio.sys/buckets/<bucket>/locks/Older than configured max age
    Orphaned metadata.myfsio.sys/buckets/<bucket>/meta/Object file no longer exists
    Orphaned versions.myfsio.sys/buckets/<bucket>/versions/Main object no longer exists
    Empty directoriesVarious internal dirsDirectory is empty after cleanup
    +
    + +

    Admin API

    +
    + + + + + + + + + + + + + +
    MethodRouteDescription
    GET/admin/gc/statusGet GC status and configuration
    POST/admin/gc/runTrigger manual GC run
    GET/admin/gc/historyGet execution history
    +
    + +
    # Trigger a dry run (preview what would be cleaned)
    +curl -X POST "{{ api_base }}/admin/gc/run" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
    +  -H "Content-Type: application/json" \
    +  -d '{"dry_run": true}'
    +
    +# Trigger actual GC
    +curl -X POST "{{ api_base }}/admin/gc/run" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    +
    +# Check status
    +curl "{{ api_base }}/admin/gc/status" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    +
    +# View history
    +curl "{{ api_base }}/admin/gc/history?limit=10" \
    +  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
    + +
    +
    + + + + +
    + Dry Run: Use GC_DRY_RUN=true or pass {"dry_run": true} to the API to preview what would be deleted without actually removing anything. Check the logs or API response for details. +
    +
    +
    +
    +
    +
    +
    +
    + 15

    Metrics History

    Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.

    @@ -1714,7 +1818,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
    - 15 + 16

    Operation Metrics

    Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.

    @@ -1821,7 +1925,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
    - 16 + 17

    Troubleshooting & tips

    @@ -1872,7 +1976,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
    - 17 + 18

    Health Check Endpoint

    The API exposes a health check endpoint for monitoring and load balancer integration.

    @@ -1894,7 +1998,7 @@ curl {{ api_base }}/myfsio/health
    - 18 + 19

    Object Lock & Retention

    Object Lock prevents objects from being deleted or overwritten for a specified retention period.

    @@ -1954,7 +2058,7 @@ curl "{{ api_base }}/<bucket>/<key>?legal-hold" \
    - 19 + 20

    Access Logging

    Enable S3-style access logging to track all requests to your buckets for audit and analysis.

    @@ -1981,7 +2085,7 @@ curl "{{ api_base }}/<bucket>?logging" \
    - 20 + 21

    Notifications & Webhooks

    Configure event notifications to trigger webhooks when objects are created or deleted.

    @@ -2044,7 +2148,7 @@ curl -X PUT "{{ api_base }}/<bucket>?notification" \
    - 21 + 22

    SelectObjectContent (SQL)

    Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.

    @@ -2089,7 +2193,7 @@ curl -X POST "{{ api_base }}/<bucket>/data.csv?select" \
    - 22 + 23

    Advanced S3 Operations

    Copy, move, and partially download objects using advanced S3 operations.

    @@ -2163,7 +2267,7 @@ curl "{{ api_base }}/<bucket>/<key>" \
    - 23 + 24

    Access Control Lists (ACLs)

    ACLs provide legacy-style permission management for buckets and objects.

    @@ -2217,7 +2321,7 @@ curl -X PUT "{{ api_base }}/<bucket>/<key>" \
    - 24 + 25

    Object & Bucket Tagging

    Add metadata tags to buckets and objects for organization, cost allocation, or lifecycle rule filtering.

    @@ -2276,7 +2380,7 @@ curl -X PUT "{{ api_base }}/<bucket>?tagging" \
    - 25 + 26

    Static Website Hosting

    Host static websites directly from S3 buckets with custom index and error pages, served via custom domain mapping.

    @@ -2369,7 +2473,7 @@ server {
    - 26 + 27

    CORS Configuration

    Configure per-bucket Cross-Origin Resource Sharing rules to control which origins can access your bucket from a browser.

    @@ -2436,7 +2540,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
    - 27 + 28

    PostObject (HTML Form Upload)

    Upload objects directly from an HTML form using browser-based POST uploads with policy-based authorization.

    @@ -2478,7 +2582,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
    - 28 + 29

    List Objects API v2

    Use the v2 list API for improved pagination with continuation tokens instead of markers.

    @@ -2522,7 +2626,7 @@ curl "{{ api_base }}/<bucket>?list-type=2&start-after=photos/2025/" \
    - 29 + 30

    Upgrading & Updates

    How to safely update MyFSIO to a new version.

    @@ -2555,7 +2659,7 @@ cp -r logs/ logs-backup/
    - 30 + 31

    Full API Reference

    Complete list of all S3-compatible, admin, and KMS endpoints.

    @@ -2653,6 +2757,7 @@ POST /kms/generate-random # Generate random bytes
  • Bucket Quotas
  • Encryption
  • Lifecycle Rules
  • +
  • Garbage Collection
  • Metrics History
  • Operation Metrics
  • Troubleshooting
  • diff --git a/tests/test_conditional_headers.py b/tests/test_conditional_headers.py new file mode 100644 index 0000000..dfbedcf --- /dev/null +++ b/tests/test_conditional_headers.py @@ -0,0 +1,156 @@ +import hashlib +import time + +import pytest + + +@pytest.fixture() +def bucket(client, signer): + headers = signer("PUT", "/cond-test") + client.put("/cond-test", headers=headers) + return "cond-test" + + +@pytest.fixture() +def uploaded(client, signer, bucket): + body = b"hello conditional" + etag = hashlib.md5(body).hexdigest() + headers = signer("PUT", f"/{bucket}/obj.txt", body=body) + resp = client.put(f"/{bucket}/obj.txt", headers=headers, data=body) + last_modified = resp.headers.get("Last-Modified") + return {"etag": etag, "last_modified": last_modified} + + +class TestIfMatch: + def test_get_matching_etag(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Match": f'"{uploaded["etag"]}"'}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_get_non_matching_etag(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Match": '"wrongetag"'}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 + + def test_head_matching_etag(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-Match": f'"{uploaded["etag"]}"'}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_head_non_matching_etag(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-Match": '"wrongetag"'}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 + + def test_wildcard_match(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Match": "*"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_multiple_etags_one_matches(self, client, signer, bucket, uploaded): + etag_list = f'"bad1", "{uploaded["etag"]}", "bad2"' + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Match": etag_list}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_multiple_etags_none_match(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Match": '"bad1", "bad2"'}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 + + +class TestIfNoneMatch: + def test_get_matching_etag_returns_304(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-None-Match": f'"{uploaded["etag"]}"'}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 304 + assert uploaded["etag"] in resp.headers.get("ETag", "") + + def test_get_non_matching_etag_returns_200(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-None-Match": '"wrongetag"'}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_head_matching_etag_returns_304(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-None-Match": f'"{uploaded["etag"]}"'}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 304 + + def test_head_non_matching_etag_returns_200(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-None-Match": '"wrongetag"'}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_wildcard_returns_304(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-None-Match": "*"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 304 + + +class TestIfModifiedSince: + def test_not_modified_returns_304(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Modified-Since": "Sun, 01 Jan 2034 00:00:00 GMT"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 304 + assert "ETag" in resp.headers + + def test_modified_returns_200(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Modified-Since": "Sun, 01 Jan 2000 00:00:00 GMT"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_head_not_modified(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-Modified-Since": "Sun, 01 Jan 2034 00:00:00 GMT"}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 304 + + def test_if_none_match_takes_precedence(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={ + "If-None-Match": '"wrongetag"', + "If-Modified-Since": "Sun, 01 Jan 2034 00:00:00 GMT", + }) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + +class TestIfUnmodifiedSince: + def test_unmodified_returns_200(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Unmodified-Since": "Sun, 01 Jan 2034 00:00:00 GMT"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + def test_modified_returns_412(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={"If-Unmodified-Since": "Sun, 01 Jan 2000 00:00:00 GMT"}) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 + + def test_head_modified_returns_412(self, client, signer, bucket, uploaded): + headers = signer("HEAD", f"/{bucket}/obj.txt", headers={"If-Unmodified-Since": "Sun, 01 Jan 2000 00:00:00 GMT"}) + resp = client.head(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 + + def test_if_match_takes_precedence(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={ + "If-Match": f'"{uploaded["etag"]}"', + "If-Unmodified-Since": "Sun, 01 Jan 2000 00:00:00 GMT", + }) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 200 + + +class TestConditionalWithRange: + def test_if_match_with_range(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={ + "If-Match": f'"{uploaded["etag"]}"', + "Range": "bytes=0-4", + }) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 206 + + def test_if_match_fails_with_range(self, client, signer, bucket, uploaded): + headers = signer("GET", f"/{bucket}/obj.txt", headers={ + "If-Match": '"wrongetag"', + "Range": "bytes=0-4", + }) + resp = client.get(f"/{bucket}/obj.txt", headers=headers) + assert resp.status_code == 412 diff --git a/tests/test_gc.py b/tests/test_gc.py new file mode 100644 index 0000000..174b1ce --- /dev/null +++ b/tests/test_gc.py @@ -0,0 +1,350 @@ +import json +import os +import time +from pathlib import Path + +import pytest + +from app.gc import GarbageCollector, GCResult + + +@pytest.fixture +def storage_root(tmp_path): + root = tmp_path / "data" + root.mkdir() + sys_root = root / ".myfsio.sys" + sys_root.mkdir() + (sys_root / "config").mkdir(parents=True) + (sys_root / "tmp").mkdir() + (sys_root / "multipart").mkdir() + (sys_root / "buckets").mkdir() + return root + + +@pytest.fixture +def gc(storage_root): + return GarbageCollector( + storage_root=storage_root, + interval_hours=1.0, + temp_file_max_age_hours=1.0, + multipart_max_age_days=1, + lock_file_max_age_hours=0.5, + dry_run=False, + ) + + +def _make_old(path, hours=48): + old_time = time.time() - hours * 3600 + os.utime(path, (old_time, old_time)) + + +class TestTempFileCleanup: + def test_old_temp_files_deleted(self, storage_root, gc): + tmp_dir = storage_root / ".myfsio.sys" / "tmp" + old_file = tmp_dir / "abc123.tmp" + old_file.write_bytes(b"x" * 1000) + _make_old(old_file, hours=48) + + result = gc.run_now() + assert result.temp_files_deleted == 1 + assert result.temp_bytes_freed == 1000 + assert not old_file.exists() + + def test_recent_temp_files_kept(self, storage_root, gc): + tmp_dir = storage_root / ".myfsio.sys" / "tmp" + new_file = tmp_dir / "recent.tmp" + new_file.write_bytes(b"data") + + result = gc.run_now() + assert result.temp_files_deleted == 0 + assert new_file.exists() + + def test_dry_run_keeps_files(self, storage_root, gc): + gc.dry_run = True + tmp_dir = storage_root / ".myfsio.sys" / "tmp" + old_file = tmp_dir / "stale.tmp" + old_file.write_bytes(b"x" * 500) + _make_old(old_file, hours=48) + + result = gc.run_now() + assert result.temp_files_deleted == 1 + assert result.temp_bytes_freed == 500 + assert old_file.exists() + + +class TestMultipartCleanup: + def test_old_orphaned_multipart_deleted(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + mp_root = storage_root / ".myfsio.sys" / "multipart" / "test-bucket" + mp_root.mkdir(parents=True) + upload_dir = mp_root / "upload-123" + upload_dir.mkdir() + manifest = upload_dir / "manifest.json" + manifest.write_text(json.dumps({"upload_id": "upload-123", "object_key": "foo.txt"})) + part = upload_dir / "part-00001.part" + part.write_bytes(b"x" * 2000) + _make_old(manifest, hours=200) + _make_old(part, hours=200) + _make_old(upload_dir, hours=200) + + result = gc.run_now() + assert result.multipart_uploads_deleted == 1 + assert result.multipart_bytes_freed > 0 + assert not upload_dir.exists() + + def test_recent_multipart_kept(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + mp_root = storage_root / ".myfsio.sys" / "multipart" / "test-bucket" + mp_root.mkdir(parents=True) + upload_dir = mp_root / "upload-new" + upload_dir.mkdir() + manifest = upload_dir / "manifest.json" + manifest.write_text(json.dumps({"upload_id": "upload-new", "object_key": "bar.txt"})) + + result = gc.run_now() + assert result.multipart_uploads_deleted == 0 + assert upload_dir.exists() + + def test_legacy_multipart_cleaned(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + legacy_mp = bucket / ".multipart" / "upload-old" + legacy_mp.mkdir(parents=True) + part = legacy_mp / "part-00001.part" + part.write_bytes(b"y" * 500) + _make_old(part, hours=200) + _make_old(legacy_mp, hours=200) + + result = gc.run_now() + assert result.multipart_uploads_deleted == 1 + + +class TestLockFileCleanup: + def test_stale_lock_files_deleted(self, storage_root, gc): + locks_dir = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "locks" + locks_dir.mkdir(parents=True) + lock = locks_dir / "some_key.lock" + lock.write_text("") + _make_old(lock, hours=2) + + result = gc.run_now() + assert result.lock_files_deleted == 1 + assert not lock.exists() + + def test_recent_lock_kept(self, storage_root, gc): + locks_dir = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "locks" + locks_dir.mkdir(parents=True) + lock = locks_dir / "active.lock" + lock.write_text("") + + result = gc.run_now() + assert result.lock_files_deleted == 0 + assert lock.exists() + + +class TestOrphanedMetadataCleanup: + def test_legacy_orphaned_metadata_deleted(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + meta_dir = bucket / ".meta" + meta_dir.mkdir() + orphan = meta_dir / "deleted_file.txt.meta.json" + orphan.write_text(json.dumps({"etag": "abc"})) + + result = gc.run_now() + assert result.orphaned_metadata_deleted == 1 + assert not orphan.exists() + + def test_valid_metadata_kept(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + obj = bucket / "exists.txt" + obj.write_text("hello") + meta_dir = bucket / ".meta" + meta_dir.mkdir() + meta = meta_dir / "exists.txt.meta.json" + meta.write_text(json.dumps({"etag": "abc"})) + + result = gc.run_now() + assert result.orphaned_metadata_deleted == 0 + assert meta.exists() + + def test_index_orphaned_entries_cleaned(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + obj = bucket / "keep.txt" + obj.write_text("hello") + + meta_dir = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "meta" + meta_dir.mkdir(parents=True) + index = meta_dir / "_index.json" + index.write_text(json.dumps({"keep.txt": {"etag": "a"}, "gone.txt": {"etag": "b"}})) + + result = gc.run_now() + assert result.orphaned_metadata_deleted == 1 + + updated = json.loads(index.read_text()) + assert "keep.txt" in updated + assert "gone.txt" not in updated + + +class TestOrphanedVersionsCleanup: + def test_orphaned_versions_deleted(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + versions_dir = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "versions" / "deleted_obj.txt" + versions_dir.mkdir(parents=True) + v_bin = versions_dir / "v1.bin" + v_json = versions_dir / "v1.json" + v_bin.write_bytes(b"old data" * 100) + v_json.write_text(json.dumps({"version_id": "v1", "size": 800})) + + result = gc.run_now() + assert result.orphaned_versions_deleted == 2 + assert result.orphaned_version_bytes_freed == 800 + + def test_active_versions_kept(self, storage_root, gc): + bucket = storage_root / "test-bucket" + bucket.mkdir() + obj = bucket / "active.txt" + obj.write_text("current") + versions_dir = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "versions" / "active.txt" + versions_dir.mkdir(parents=True) + v_bin = versions_dir / "v1.bin" + v_bin.write_bytes(b"old version") + + result = gc.run_now() + assert result.orphaned_versions_deleted == 0 + assert v_bin.exists() + + +class TestEmptyDirCleanup: + def test_empty_dirs_removed(self, storage_root, gc): + empty = storage_root / ".myfsio.sys" / "buckets" / "test-bucket" / "locks" / "sub" + empty.mkdir(parents=True) + + result = gc.run_now() + assert result.empty_dirs_removed > 0 + assert not empty.exists() + + +class TestHistory: + def test_history_recorded(self, storage_root, gc): + gc.run_now() + history = gc.get_history() + assert len(history) == 1 + assert "result" in history[0] + assert "timestamp" in history[0] + + def test_multiple_runs(self, storage_root, gc): + gc.run_now() + gc.run_now() + gc.run_now() + history = gc.get_history() + assert len(history) == 3 + assert history[0]["timestamp"] >= history[1]["timestamp"] + + +class TestStatus: + def test_get_status(self, storage_root, gc): + status = gc.get_status() + assert status["interval_hours"] == 1.0 + assert status["dry_run"] is False + assert status["temp_file_max_age_hours"] == 1.0 + assert status["multipart_max_age_days"] == 1 + assert status["lock_file_max_age_hours"] == 0.5 + + +class TestGCResult: + def test_total_bytes_freed(self): + r = GCResult(temp_bytes_freed=100, multipart_bytes_freed=200, orphaned_version_bytes_freed=300) + assert r.total_bytes_freed == 600 + + def test_has_work(self): + assert not GCResult().has_work + assert GCResult(temp_files_deleted=1).has_work + assert GCResult(lock_files_deleted=1).has_work + assert GCResult(empty_dirs_removed=1).has_work + + +class TestAdminAPI: + @pytest.fixture + def gc_app(self, tmp_path): + from app import create_api_app + storage_root = tmp_path / "data" + iam_config = tmp_path / "iam.json" + bucket_policies = tmp_path / "bucket_policies.json" + iam_payload = { + "users": [ + { + "access_key": "admin", + "secret_key": "adminsecret", + "display_name": "Admin", + "policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}], + } + ] + } + iam_config.write_text(json.dumps(iam_payload)) + flask_app = create_api_app({ + "TESTING": True, + "SECRET_KEY": "testing", + "STORAGE_ROOT": storage_root, + "IAM_CONFIG": iam_config, + "BUCKET_POLICY_PATH": bucket_policies, + "GC_ENABLED": True, + "GC_INTERVAL_HOURS": 1.0, + }) + yield flask_app + gc = flask_app.extensions.get("gc") + if gc: + gc.stop() + + def test_gc_status(self, gc_app): + client = gc_app.test_client() + resp = client.get("/admin/gc/status", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}) + assert resp.status_code == 200 + data = resp.get_json() + assert data["enabled"] is True + + def test_gc_run(self, gc_app): + client = gc_app.test_client() + resp = client.post( + "/admin/gc/run", + headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}, + content_type="application/json", + ) + assert resp.status_code == 200 + data = resp.get_json() + assert "temp_files_deleted" in data + + def test_gc_dry_run(self, gc_app): + client = gc_app.test_client() + resp = client.post( + "/admin/gc/run", + headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}, + data=json.dumps({"dry_run": True}), + content_type="application/json", + ) + assert resp.status_code == 200 + data = resp.get_json() + assert "temp_files_deleted" in data + + def test_gc_history(self, gc_app): + client = gc_app.test_client() + client.post("/admin/gc/run", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}) + resp = client.get("/admin/gc/history", headers={"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}) + assert resp.status_code == 200 + data = resp.get_json() + assert len(data["executions"]) >= 1 + + def test_gc_requires_admin(self, gc_app): + iam = gc_app.extensions["iam"] + user = iam.create_user(display_name="Regular") + client = gc_app.test_client() + resp = client.get( + "/admin/gc/status", + headers={"X-Access-Key": user["access_key"], "X-Secret-Key": user["secret_key"]}, + ) + assert resp.status_code == 403