Add garbage collection: background cleanup of orphaned temp files, multipart uploads, lock files, metadata, versions, and empty directories
This commit is contained in:
531
app/gc.py
Normal file
531
app/gc.py
Normal file
@@ -0,0 +1,531 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GCResult:
|
||||
temp_files_deleted: int = 0
|
||||
temp_bytes_freed: int = 0
|
||||
multipart_uploads_deleted: int = 0
|
||||
multipart_bytes_freed: int = 0
|
||||
lock_files_deleted: int = 0
|
||||
orphaned_metadata_deleted: int = 0
|
||||
orphaned_versions_deleted: int = 0
|
||||
orphaned_version_bytes_freed: int = 0
|
||||
empty_dirs_removed: int = 0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
execution_time_seconds: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"temp_files_deleted": self.temp_files_deleted,
|
||||
"temp_bytes_freed": self.temp_bytes_freed,
|
||||
"multipart_uploads_deleted": self.multipart_uploads_deleted,
|
||||
"multipart_bytes_freed": self.multipart_bytes_freed,
|
||||
"lock_files_deleted": self.lock_files_deleted,
|
||||
"orphaned_metadata_deleted": self.orphaned_metadata_deleted,
|
||||
"orphaned_versions_deleted": self.orphaned_versions_deleted,
|
||||
"orphaned_version_bytes_freed": self.orphaned_version_bytes_freed,
|
||||
"empty_dirs_removed": self.empty_dirs_removed,
|
||||
"errors": self.errors,
|
||||
"execution_time_seconds": self.execution_time_seconds,
|
||||
}
|
||||
|
||||
@property
|
||||
def total_bytes_freed(self) -> int:
|
||||
return self.temp_bytes_freed + self.multipart_bytes_freed + self.orphaned_version_bytes_freed
|
||||
|
||||
@property
|
||||
def has_work(self) -> bool:
|
||||
return (
|
||||
self.temp_files_deleted > 0
|
||||
or self.multipart_uploads_deleted > 0
|
||||
or self.lock_files_deleted > 0
|
||||
or self.orphaned_metadata_deleted > 0
|
||||
or self.orphaned_versions_deleted > 0
|
||||
or self.empty_dirs_removed > 0
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GCExecutionRecord:
|
||||
timestamp: float
|
||||
result: dict
|
||||
dry_run: bool
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"result": self.result,
|
||||
"dry_run": self.dry_run,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> GCExecutionRecord:
|
||||
return cls(
|
||||
timestamp=data["timestamp"],
|
||||
result=data["result"],
|
||||
dry_run=data.get("dry_run", False),
|
||||
)
|
||||
|
||||
|
||||
class GCHistoryStore:
|
||||
def __init__(self, storage_root: Path, max_records: int = 50) -> None:
|
||||
self.storage_root = storage_root
|
||||
self.max_records = max_records
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_path(self) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "config" / "gc_history.json"
|
||||
|
||||
def load(self) -> List[GCExecutionRecord]:
|
||||
path = self._get_path()
|
||||
if not path.exists():
|
||||
return []
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return [GCExecutionRecord.from_dict(d) for d in data.get("executions", [])]
|
||||
except (OSError, ValueError, KeyError) as e:
|
||||
logger.error("Failed to load GC history: %s", e)
|
||||
return []
|
||||
|
||||
def save(self, records: List[GCExecutionRecord]) -> None:
|
||||
path = self._get_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {"executions": [r.to_dict() for r in records[: self.max_records]]}
|
||||
try:
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except OSError as e:
|
||||
logger.error("Failed to save GC history: %s", e)
|
||||
|
||||
def add(self, record: GCExecutionRecord) -> None:
|
||||
with self._lock:
|
||||
records = self.load()
|
||||
records.insert(0, record)
|
||||
self.save(records)
|
||||
|
||||
def get_history(self, limit: int = 50, offset: int = 0) -> List[GCExecutionRecord]:
|
||||
return self.load()[offset : offset + limit]
|
||||
|
||||
|
||||
def _dir_size(path: Path) -> int:
|
||||
total = 0
|
||||
try:
|
||||
for f in path.rglob("*"):
|
||||
if f.is_file():
|
||||
try:
|
||||
total += f.stat().st_size
|
||||
except OSError:
|
||||
pass
|
||||
except OSError:
|
||||
pass
|
||||
return total
|
||||
|
||||
|
||||
def _file_age_hours(path: Path) -> float:
|
||||
try:
|
||||
mtime = path.stat().st_mtime
|
||||
return (time.time() - mtime) / 3600.0
|
||||
except OSError:
|
||||
return 0.0
|
||||
|
||||
|
||||
class GarbageCollector:
|
||||
SYSTEM_ROOT = ".myfsio.sys"
|
||||
SYSTEM_TMP_DIR = "tmp"
|
||||
SYSTEM_MULTIPART_DIR = "multipart"
|
||||
SYSTEM_BUCKETS_DIR = "buckets"
|
||||
BUCKET_META_DIR = "meta"
|
||||
BUCKET_VERSIONS_DIR = "versions"
|
||||
INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage_root: Path,
|
||||
interval_hours: float = 6.0,
|
||||
temp_file_max_age_hours: float = 24.0,
|
||||
multipart_max_age_days: int = 7,
|
||||
lock_file_max_age_hours: float = 1.0,
|
||||
dry_run: bool = False,
|
||||
max_history: int = 50,
|
||||
) -> None:
|
||||
self.storage_root = Path(storage_root)
|
||||
self.interval_seconds = interval_hours * 3600.0
|
||||
self.temp_file_max_age_hours = temp_file_max_age_hours
|
||||
self.multipart_max_age_days = multipart_max_age_days
|
||||
self.lock_file_max_age_hours = lock_file_max_age_hours
|
||||
self.dry_run = dry_run
|
||||
self._timer: Optional[threading.Timer] = None
|
||||
self._shutdown = False
|
||||
self._lock = threading.Lock()
|
||||
self.history_store = GCHistoryStore(storage_root, max_records=max_history)
|
||||
|
||||
def start(self) -> None:
|
||||
if self._timer is not None:
|
||||
return
|
||||
self._shutdown = False
|
||||
self._schedule_next()
|
||||
logger.info(
|
||||
"GC started: interval=%.1fh, temp_max_age=%.1fh, multipart_max_age=%dd, lock_max_age=%.1fh, dry_run=%s",
|
||||
self.interval_seconds / 3600.0,
|
||||
self.temp_file_max_age_hours,
|
||||
self.multipart_max_age_days,
|
||||
self.lock_file_max_age_hours,
|
||||
self.dry_run,
|
||||
)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._shutdown = True
|
||||
if self._timer:
|
||||
self._timer.cancel()
|
||||
self._timer = None
|
||||
logger.info("GC stopped")
|
||||
|
||||
def _schedule_next(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
self._timer = threading.Timer(self.interval_seconds, self._run_cycle)
|
||||
self._timer.daemon = True
|
||||
self._timer.start()
|
||||
|
||||
def _run_cycle(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
try:
|
||||
self.run_now()
|
||||
except Exception as e:
|
||||
logger.error("GC cycle failed: %s", e)
|
||||
finally:
|
||||
self._schedule_next()
|
||||
|
||||
def run_now(self) -> GCResult:
|
||||
start = time.time()
|
||||
result = GCResult()
|
||||
|
||||
self._clean_temp_files(result)
|
||||
self._clean_orphaned_multipart(result)
|
||||
self._clean_stale_locks(result)
|
||||
self._clean_orphaned_metadata(result)
|
||||
self._clean_orphaned_versions(result)
|
||||
self._clean_empty_dirs(result)
|
||||
|
||||
result.execution_time_seconds = time.time() - start
|
||||
|
||||
if result.has_work or result.errors:
|
||||
logger.info(
|
||||
"GC completed in %.2fs: temp=%d (%.1f MB), multipart=%d (%.1f MB), "
|
||||
"locks=%d, meta=%d, versions=%d (%.1f MB), dirs=%d, errors=%d%s",
|
||||
result.execution_time_seconds,
|
||||
result.temp_files_deleted,
|
||||
result.temp_bytes_freed / (1024 * 1024),
|
||||
result.multipart_uploads_deleted,
|
||||
result.multipart_bytes_freed / (1024 * 1024),
|
||||
result.lock_files_deleted,
|
||||
result.orphaned_metadata_deleted,
|
||||
result.orphaned_versions_deleted,
|
||||
result.orphaned_version_bytes_freed / (1024 * 1024),
|
||||
result.empty_dirs_removed,
|
||||
len(result.errors),
|
||||
" (dry run)" if self.dry_run else "",
|
||||
)
|
||||
|
||||
record = GCExecutionRecord(
|
||||
timestamp=time.time(),
|
||||
result=result.to_dict(),
|
||||
dry_run=self.dry_run,
|
||||
)
|
||||
self.history_store.add(record)
|
||||
|
||||
return result
|
||||
|
||||
def _system_path(self) -> Path:
|
||||
return self.storage_root / self.SYSTEM_ROOT
|
||||
|
||||
def _list_bucket_names(self) -> List[str]:
|
||||
names = []
|
||||
try:
|
||||
for entry in self.storage_root.iterdir():
|
||||
if entry.is_dir() and entry.name != self.SYSTEM_ROOT:
|
||||
names.append(entry.name)
|
||||
except OSError:
|
||||
pass
|
||||
return names
|
||||
|
||||
def _clean_temp_files(self, result: GCResult) -> None:
|
||||
tmp_dir = self._system_path() / self.SYSTEM_TMP_DIR
|
||||
if not tmp_dir.exists():
|
||||
return
|
||||
try:
|
||||
for entry in tmp_dir.iterdir():
|
||||
if not entry.is_file():
|
||||
continue
|
||||
age = _file_age_hours(entry)
|
||||
if age < self.temp_file_max_age_hours:
|
||||
continue
|
||||
try:
|
||||
size = entry.stat().st_size
|
||||
if not self.dry_run:
|
||||
entry.unlink()
|
||||
result.temp_files_deleted += 1
|
||||
result.temp_bytes_freed += size
|
||||
except OSError as e:
|
||||
result.errors.append(f"temp file {entry.name}: {e}")
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan tmp dir: {e}")
|
||||
|
||||
def _clean_orphaned_multipart(self, result: GCResult) -> None:
|
||||
cutoff_hours = self.multipart_max_age_days * 24.0
|
||||
bucket_names = self._list_bucket_names()
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
for multipart_root in (
|
||||
self._system_path() / self.SYSTEM_MULTIPART_DIR / bucket_name,
|
||||
self.storage_root / bucket_name / ".multipart",
|
||||
):
|
||||
if not multipart_root.exists():
|
||||
continue
|
||||
try:
|
||||
for upload_dir in multipart_root.iterdir():
|
||||
if not upload_dir.is_dir():
|
||||
continue
|
||||
self._maybe_clean_upload(upload_dir, cutoff_hours, result)
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan multipart {bucket_name}: {e}")
|
||||
|
||||
def _maybe_clean_upload(self, upload_dir: Path, cutoff_hours: float, result: GCResult) -> None:
|
||||
manifest_path = upload_dir / "manifest.json"
|
||||
age = _file_age_hours(manifest_path) if manifest_path.exists() else _file_age_hours(upload_dir)
|
||||
|
||||
if age < cutoff_hours:
|
||||
return
|
||||
|
||||
dir_bytes = _dir_size(upload_dir)
|
||||
try:
|
||||
if not self.dry_run:
|
||||
shutil.rmtree(upload_dir, ignore_errors=True)
|
||||
result.multipart_uploads_deleted += 1
|
||||
result.multipart_bytes_freed += dir_bytes
|
||||
except OSError as e:
|
||||
result.errors.append(f"multipart {upload_dir.name}: {e}")
|
||||
|
||||
def _clean_stale_locks(self, result: GCResult) -> None:
|
||||
buckets_root = self._system_path() / self.SYSTEM_BUCKETS_DIR
|
||||
if not buckets_root.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
for bucket_dir in buckets_root.iterdir():
|
||||
if not bucket_dir.is_dir():
|
||||
continue
|
||||
locks_dir = bucket_dir / "locks"
|
||||
if not locks_dir.exists():
|
||||
continue
|
||||
try:
|
||||
for lock_file in locks_dir.iterdir():
|
||||
if not lock_file.is_file() or not lock_file.name.endswith(".lock"):
|
||||
continue
|
||||
age = _file_age_hours(lock_file)
|
||||
if age < self.lock_file_max_age_hours:
|
||||
continue
|
||||
try:
|
||||
if not self.dry_run:
|
||||
lock_file.unlink(missing_ok=True)
|
||||
result.lock_files_deleted += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"lock {lock_file.name}: {e}")
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan locks {bucket_dir.name}: {e}")
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan buckets for locks: {e}")
|
||||
|
||||
def _clean_orphaned_metadata(self, result: GCResult) -> None:
|
||||
bucket_names = self._list_bucket_names()
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
legacy_meta = self.storage_root / bucket_name / ".meta"
|
||||
if legacy_meta.exists():
|
||||
self._clean_legacy_metadata(bucket_name, legacy_meta, result)
|
||||
|
||||
new_meta = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
if new_meta.exists():
|
||||
self._clean_index_metadata(bucket_name, new_meta, result)
|
||||
|
||||
def _clean_legacy_metadata(self, bucket_name: str, meta_root: Path, result: GCResult) -> None:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
try:
|
||||
for meta_file in meta_root.rglob("*.meta.json"):
|
||||
if not meta_file.is_file():
|
||||
continue
|
||||
try:
|
||||
rel = meta_file.relative_to(meta_root)
|
||||
object_key = rel.as_posix().removesuffix(".meta.json")
|
||||
object_path = bucket_path / object_key
|
||||
if not object_path.exists():
|
||||
if not self.dry_run:
|
||||
meta_file.unlink(missing_ok=True)
|
||||
result.orphaned_metadata_deleted += 1
|
||||
except (OSError, ValueError) as e:
|
||||
result.errors.append(f"legacy meta {bucket_name}/{meta_file.name}: {e}")
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan legacy meta {bucket_name}: {e}")
|
||||
|
||||
def _clean_index_metadata(self, bucket_name: str, meta_root: Path, result: GCResult) -> None:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
try:
|
||||
for index_file in meta_root.rglob("_index.json"):
|
||||
if not index_file.is_file():
|
||||
continue
|
||||
try:
|
||||
with open(index_file, "r", encoding="utf-8") as f:
|
||||
index_data = json.load(f)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
|
||||
keys_to_remove = []
|
||||
for key in index_data:
|
||||
rel_dir = index_file.parent.relative_to(meta_root)
|
||||
if rel_dir == Path("."):
|
||||
full_key = key
|
||||
else:
|
||||
full_key = rel_dir.as_posix() + "/" + key
|
||||
object_path = bucket_path / full_key
|
||||
if not object_path.exists():
|
||||
keys_to_remove.append(key)
|
||||
|
||||
if keys_to_remove:
|
||||
if not self.dry_run:
|
||||
for k in keys_to_remove:
|
||||
index_data.pop(k, None)
|
||||
if index_data:
|
||||
try:
|
||||
with open(index_file, "w", encoding="utf-8") as f:
|
||||
json.dump(index_data, f)
|
||||
except OSError as e:
|
||||
result.errors.append(f"write index {bucket_name}: {e}")
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
index_file.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
result.orphaned_metadata_deleted += len(keys_to_remove)
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan index meta {bucket_name}: {e}")
|
||||
|
||||
def _clean_orphaned_versions(self, result: GCResult) -> None:
|
||||
bucket_names = self._list_bucket_names()
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
for versions_root in (
|
||||
self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR,
|
||||
self.storage_root / bucket_name / ".versions",
|
||||
):
|
||||
if not versions_root.exists():
|
||||
continue
|
||||
try:
|
||||
for key_dir in versions_root.iterdir():
|
||||
if not key_dir.is_dir():
|
||||
continue
|
||||
self._clean_versions_for_key(bucket_path, versions_root, key_dir, result)
|
||||
except OSError as e:
|
||||
result.errors.append(f"scan versions {bucket_name}: {e}")
|
||||
|
||||
def _clean_versions_for_key(
|
||||
self, bucket_path: Path, versions_root: Path, key_dir: Path, result: GCResult
|
||||
) -> None:
|
||||
try:
|
||||
rel = key_dir.relative_to(versions_root)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
object_path = bucket_path / rel
|
||||
if object_path.exists():
|
||||
return
|
||||
|
||||
version_files = list(key_dir.glob("*.bin")) + list(key_dir.glob("*.json"))
|
||||
if not version_files:
|
||||
return
|
||||
|
||||
for vf in version_files:
|
||||
try:
|
||||
size = vf.stat().st_size if vf.suffix == ".bin" else 0
|
||||
if not self.dry_run:
|
||||
vf.unlink(missing_ok=True)
|
||||
if vf.suffix == ".bin":
|
||||
result.orphaned_version_bytes_freed += size
|
||||
result.orphaned_versions_deleted += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"version file {vf.name}: {e}")
|
||||
|
||||
def _clean_empty_dirs(self, result: GCResult) -> None:
|
||||
targets = [
|
||||
self._system_path() / self.SYSTEM_TMP_DIR,
|
||||
self._system_path() / self.SYSTEM_MULTIPART_DIR,
|
||||
self._system_path() / self.SYSTEM_BUCKETS_DIR,
|
||||
]
|
||||
for bucket_name in self._list_bucket_names():
|
||||
targets.append(self.storage_root / bucket_name / ".meta")
|
||||
targets.append(self.storage_root / bucket_name / ".versions")
|
||||
targets.append(self.storage_root / bucket_name / ".multipart")
|
||||
|
||||
for root in targets:
|
||||
if not root.exists():
|
||||
continue
|
||||
self._remove_empty_dirs_recursive(root, root, result)
|
||||
|
||||
def _remove_empty_dirs_recursive(self, path: Path, stop_at: Path, result: GCResult) -> bool:
|
||||
if not path.is_dir():
|
||||
return False
|
||||
|
||||
try:
|
||||
children = list(path.iterdir())
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
all_empty = True
|
||||
for child in children:
|
||||
if child.is_dir():
|
||||
if not self._remove_empty_dirs_recursive(child, stop_at, result):
|
||||
all_empty = False
|
||||
else:
|
||||
all_empty = False
|
||||
|
||||
if all_empty and path != stop_at:
|
||||
try:
|
||||
if not self.dry_run:
|
||||
path.rmdir()
|
||||
result.empty_dirs_removed += 1
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
return all_empty
|
||||
|
||||
def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]:
|
||||
records = self.history_store.get_history(limit, offset)
|
||||
return [r.to_dict() for r in records]
|
||||
|
||||
def get_status(self) -> dict:
|
||||
return {
|
||||
"enabled": not self._shutdown or self._timer is not None,
|
||||
"running": self._timer is not None and not self._shutdown,
|
||||
"interval_hours": self.interval_seconds / 3600.0,
|
||||
"temp_file_max_age_hours": self.temp_file_max_age_hours,
|
||||
"multipart_max_age_days": self.multipart_max_age_days,
|
||||
"lock_file_max_age_hours": self.lock_file_max_age_hours,
|
||||
"dry_run": self.dry_run,
|
||||
}
|
||||
Reference in New Issue
Block a user