MyFSIO v0.3.9 Release
Reviewed-on: #32
This commit was merged in pull request #32.
This commit is contained in:
@@ -30,6 +30,7 @@ from .extensions import limiter, csrf
|
||||
from .iam import IamService
|
||||
from .kms import KMSManager
|
||||
from .gc import GarbageCollector
|
||||
from .integrity import IntegrityChecker
|
||||
from .lifecycle import LifecycleManager
|
||||
from .notifications import NotificationService
|
||||
from .object_lock import ObjectLockService
|
||||
@@ -234,6 +235,17 @@ def create_app(
|
||||
)
|
||||
gc_collector.start()
|
||||
|
||||
integrity_checker = None
|
||||
if app.config.get("INTEGRITY_ENABLED", False):
|
||||
integrity_checker = IntegrityChecker(
|
||||
storage_root=storage_root,
|
||||
interval_hours=app.config.get("INTEGRITY_INTERVAL_HOURS", 24.0),
|
||||
batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000),
|
||||
auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False),
|
||||
dry_run=app.config.get("INTEGRITY_DRY_RUN", False),
|
||||
)
|
||||
integrity_checker.start()
|
||||
|
||||
app.extensions["object_storage"] = storage
|
||||
app.extensions["iam"] = iam
|
||||
app.extensions["bucket_policies"] = bucket_policies
|
||||
@@ -246,6 +258,7 @@ def create_app(
|
||||
app.extensions["acl"] = acl_service
|
||||
app.extensions["lifecycle"] = lifecycle_manager
|
||||
app.extensions["gc"] = gc_collector
|
||||
app.extensions["integrity"] = integrity_checker
|
||||
app.extensions["object_lock"] = object_lock_service
|
||||
app.extensions["notifications"] = notification_service
|
||||
app.extensions["access_logging"] = access_logging_service
|
||||
@@ -549,30 +562,57 @@ def _configure_logging(app: Flask) -> None:
|
||||
is_encrypted = "x-amz-server-side-encryption" in metadata
|
||||
except (StorageError, OSError):
|
||||
pass
|
||||
if request.method == "HEAD":
|
||||
response = Response(status=200)
|
||||
if is_encrypted and hasattr(storage, "get_object_data"):
|
||||
try:
|
||||
data, _ = storage.get_object_data(bucket, object_key)
|
||||
response.headers["Content-Length"] = len(data)
|
||||
except (StorageError, OSError):
|
||||
return _website_error_response(500, "Internal Server Error")
|
||||
else:
|
||||
try:
|
||||
stat = obj_path.stat()
|
||||
response.headers["Content-Length"] = stat.st_size
|
||||
except OSError:
|
||||
return _website_error_response(500, "Internal Server Error")
|
||||
response.headers["Content-Type"] = content_type
|
||||
return response
|
||||
if is_encrypted and hasattr(storage, "get_object_data"):
|
||||
try:
|
||||
data, _ = storage.get_object_data(bucket, object_key)
|
||||
response = Response(data, mimetype=content_type)
|
||||
response.headers["Content-Length"] = len(data)
|
||||
return response
|
||||
file_size = len(data)
|
||||
except (StorageError, OSError):
|
||||
return _website_error_response(500, "Internal Server Error")
|
||||
else:
|
||||
data = None
|
||||
try:
|
||||
stat = obj_path.stat()
|
||||
file_size = stat.st_size
|
||||
except OSError:
|
||||
return _website_error_response(500, "Internal Server Error")
|
||||
if request.method == "HEAD":
|
||||
response = Response(status=200)
|
||||
response.headers["Content-Length"] = file_size
|
||||
response.headers["Content-Type"] = content_type
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
return response
|
||||
from .s3_api import _parse_range_header
|
||||
range_header = request.headers.get("Range")
|
||||
if range_header:
|
||||
ranges = _parse_range_header(range_header, file_size)
|
||||
if ranges is None:
|
||||
return Response(status=416, headers={"Content-Range": f"bytes */{file_size}"})
|
||||
start, end = ranges[0]
|
||||
length = end - start + 1
|
||||
if data is not None:
|
||||
partial_data = data[start:end + 1]
|
||||
response = Response(partial_data, status=206, mimetype=content_type)
|
||||
else:
|
||||
def _stream_range(file_path, start_pos, length_to_read):
|
||||
with file_path.open("rb") as f:
|
||||
f.seek(start_pos)
|
||||
remaining = length_to_read
|
||||
while remaining > 0:
|
||||
chunk = f.read(min(262144, remaining))
|
||||
if not chunk:
|
||||
break
|
||||
remaining -= len(chunk)
|
||||
yield chunk
|
||||
response = Response(_stream_range(obj_path, start, length), status=206, mimetype=content_type, direct_passthrough=True)
|
||||
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
||||
response.headers["Content-Length"] = length
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
return response
|
||||
if data is not None:
|
||||
response = Response(data, mimetype=content_type)
|
||||
response.headers["Content-Length"] = file_size
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
return response
|
||||
def _stream(file_path):
|
||||
with file_path.open("rb") as f:
|
||||
while True:
|
||||
@@ -580,13 +620,10 @@ def _configure_logging(app: Flask) -> None:
|
||||
if not chunk:
|
||||
break
|
||||
yield chunk
|
||||
try:
|
||||
stat = obj_path.stat()
|
||||
response = Response(_stream(obj_path), mimetype=content_type, direct_passthrough=True)
|
||||
response.headers["Content-Length"] = stat.st_size
|
||||
return response
|
||||
except OSError:
|
||||
return _website_error_response(500, "Internal Server Error")
|
||||
response = Response(_stream(obj_path), mimetype=content_type, direct_passthrough=True)
|
||||
response.headers["Content-Length"] = file_size
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
return response
|
||||
|
||||
def _serve_website_error(storage, bucket, error_doc_key, status_code):
|
||||
if not error_doc_key:
|
||||
|
||||
@@ -15,6 +15,7 @@ from flask import Blueprint, Response, current_app, jsonify, request
|
||||
from .connections import ConnectionStore
|
||||
from .extensions import limiter
|
||||
from .gc import GarbageCollector
|
||||
from .integrity import IntegrityChecker
|
||||
from .iam import IamError, Principal
|
||||
from .replication import ReplicationManager
|
||||
from .site_registry import PeerSite, SiteInfo, SiteRegistry
|
||||
@@ -829,3 +830,54 @@ def gc_history():
|
||||
offset = int(request.args.get("offset", 0))
|
||||
records = gc.get_history(limit=limit, offset=offset)
|
||||
return jsonify({"executions": records})
|
||||
|
||||
|
||||
def _integrity() -> Optional[IntegrityChecker]:
|
||||
return current_app.extensions.get("integrity")
|
||||
|
||||
|
||||
@admin_api_bp.route("/integrity/status", methods=["GET"])
|
||||
@limiter.limit(lambda: _get_admin_rate_limit())
|
||||
def integrity_status():
|
||||
principal, error = _require_admin()
|
||||
if error:
|
||||
return error
|
||||
checker = _integrity()
|
||||
if not checker:
|
||||
return jsonify({"enabled": False, "message": "Integrity checker is not enabled. Set INTEGRITY_ENABLED=true to enable."})
|
||||
return jsonify(checker.get_status())
|
||||
|
||||
|
||||
@admin_api_bp.route("/integrity/run", methods=["POST"])
|
||||
@limiter.limit(lambda: _get_admin_rate_limit())
|
||||
def integrity_run_now():
|
||||
principal, error = _require_admin()
|
||||
if error:
|
||||
return error
|
||||
checker = _integrity()
|
||||
if not checker:
|
||||
return _json_error("InvalidRequest", "Integrity checker is not enabled", 400)
|
||||
payload = request.get_json(silent=True) or {}
|
||||
override_dry_run = payload.get("dry_run")
|
||||
override_auto_heal = payload.get("auto_heal")
|
||||
result = checker.run_now(
|
||||
auto_heal=override_auto_heal if override_auto_heal is not None else None,
|
||||
dry_run=override_dry_run if override_dry_run is not None else None,
|
||||
)
|
||||
logger.info("Integrity manual run by %s", principal.access_key)
|
||||
return jsonify(result.to_dict())
|
||||
|
||||
|
||||
@admin_api_bp.route("/integrity/history", methods=["GET"])
|
||||
@limiter.limit(lambda: _get_admin_rate_limit())
|
||||
def integrity_history():
|
||||
principal, error = _require_admin()
|
||||
if error:
|
||||
return error
|
||||
checker = _integrity()
|
||||
if not checker:
|
||||
return jsonify({"executions": []})
|
||||
limit = min(int(request.args.get("limit", 50)), 200)
|
||||
offset = int(request.args.get("offset", 0))
|
||||
records = checker.get_history(limit=limit, offset=offset)
|
||||
return jsonify({"executions": records})
|
||||
|
||||
@@ -156,6 +156,11 @@ class AppConfig:
|
||||
gc_multipart_max_age_days: int
|
||||
gc_lock_file_max_age_hours: float
|
||||
gc_dry_run: bool
|
||||
integrity_enabled: bool
|
||||
integrity_interval_hours: float
|
||||
integrity_batch_size: int
|
||||
integrity_auto_heal: bool
|
||||
integrity_dry_run: bool
|
||||
|
||||
@classmethod
|
||||
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
|
||||
@@ -331,6 +336,11 @@ class AppConfig:
|
||||
gc_multipart_max_age_days = int(_get("GC_MULTIPART_MAX_AGE_DAYS", 7))
|
||||
gc_lock_file_max_age_hours = float(_get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0))
|
||||
gc_dry_run = str(_get("GC_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
|
||||
integrity_enabled = str(_get("INTEGRITY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
|
||||
integrity_interval_hours = float(_get("INTEGRITY_INTERVAL_HOURS", 24.0))
|
||||
integrity_batch_size = int(_get("INTEGRITY_BATCH_SIZE", 1000))
|
||||
integrity_auto_heal = str(_get("INTEGRITY_AUTO_HEAL", "0")).lower() in {"1", "true", "yes", "on"}
|
||||
integrity_dry_run = str(_get("INTEGRITY_DRY_RUN", "0")).lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
return cls(storage_root=storage_root,
|
||||
max_upload_size=max_upload_size,
|
||||
@@ -424,7 +434,12 @@ class AppConfig:
|
||||
gc_temp_file_max_age_hours=gc_temp_file_max_age_hours,
|
||||
gc_multipart_max_age_days=gc_multipart_max_age_days,
|
||||
gc_lock_file_max_age_hours=gc_lock_file_max_age_hours,
|
||||
gc_dry_run=gc_dry_run)
|
||||
gc_dry_run=gc_dry_run,
|
||||
integrity_enabled=integrity_enabled,
|
||||
integrity_interval_hours=integrity_interval_hours,
|
||||
integrity_batch_size=integrity_batch_size,
|
||||
integrity_auto_heal=integrity_auto_heal,
|
||||
integrity_dry_run=integrity_dry_run)
|
||||
|
||||
def validate_and_report(self) -> list[str]:
|
||||
"""Validate configuration and return a list of warnings/issues.
|
||||
@@ -641,4 +656,9 @@ class AppConfig:
|
||||
"GC_MULTIPART_MAX_AGE_DAYS": self.gc_multipart_max_age_days,
|
||||
"GC_LOCK_FILE_MAX_AGE_HOURS": self.gc_lock_file_max_age_hours,
|
||||
"GC_DRY_RUN": self.gc_dry_run,
|
||||
"INTEGRITY_ENABLED": self.integrity_enabled,
|
||||
"INTEGRITY_INTERVAL_HOURS": self.integrity_interval_hours,
|
||||
"INTEGRITY_BATCH_SIZE": self.integrity_batch_size,
|
||||
"INTEGRITY_AUTO_HEAL": self.integrity_auto_heal,
|
||||
"INTEGRITY_DRY_RUN": self.integrity_dry_run,
|
||||
}
|
||||
|
||||
738
app/integrity.py
Normal file
738
app/integrity.py
Normal file
@@ -0,0 +1,738 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
try:
|
||||
import myfsio_core as _rc
|
||||
_HAS_RUST = True
|
||||
except ImportError:
|
||||
_HAS_RUST = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _compute_etag(path: Path) -> str:
|
||||
if _HAS_RUST:
|
||||
return _rc.md5_file(str(path))
|
||||
checksum = hashlib.md5()
|
||||
with path.open("rb") as handle:
|
||||
for chunk in iter(lambda: handle.read(8192), b""):
|
||||
checksum.update(chunk)
|
||||
return checksum.hexdigest()
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrityIssue:
|
||||
issue_type: str
|
||||
bucket: str
|
||||
key: str
|
||||
detail: str
|
||||
healed: bool = False
|
||||
heal_action: str = ""
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"issue_type": self.issue_type,
|
||||
"bucket": self.bucket,
|
||||
"key": self.key,
|
||||
"detail": self.detail,
|
||||
"healed": self.healed,
|
||||
"heal_action": self.heal_action,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrityResult:
|
||||
corrupted_objects: int = 0
|
||||
orphaned_objects: int = 0
|
||||
phantom_metadata: int = 0
|
||||
stale_versions: int = 0
|
||||
etag_cache_inconsistencies: int = 0
|
||||
legacy_metadata_drifts: int = 0
|
||||
issues_healed: int = 0
|
||||
issues: List[IntegrityIssue] = field(default_factory=list)
|
||||
errors: List[str] = field(default_factory=list)
|
||||
objects_scanned: int = 0
|
||||
buckets_scanned: int = 0
|
||||
execution_time_seconds: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"corrupted_objects": self.corrupted_objects,
|
||||
"orphaned_objects": self.orphaned_objects,
|
||||
"phantom_metadata": self.phantom_metadata,
|
||||
"stale_versions": self.stale_versions,
|
||||
"etag_cache_inconsistencies": self.etag_cache_inconsistencies,
|
||||
"legacy_metadata_drifts": self.legacy_metadata_drifts,
|
||||
"issues_healed": self.issues_healed,
|
||||
"issues": [i.to_dict() for i in self.issues],
|
||||
"errors": self.errors,
|
||||
"objects_scanned": self.objects_scanned,
|
||||
"buckets_scanned": self.buckets_scanned,
|
||||
"execution_time_seconds": self.execution_time_seconds,
|
||||
}
|
||||
|
||||
@property
|
||||
def total_issues(self) -> int:
|
||||
return (
|
||||
self.corrupted_objects
|
||||
+ self.orphaned_objects
|
||||
+ self.phantom_metadata
|
||||
+ self.stale_versions
|
||||
+ self.etag_cache_inconsistencies
|
||||
+ self.legacy_metadata_drifts
|
||||
)
|
||||
|
||||
@property
|
||||
def has_issues(self) -> bool:
|
||||
return self.total_issues > 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrityExecutionRecord:
|
||||
timestamp: float
|
||||
result: dict
|
||||
dry_run: bool
|
||||
auto_heal: bool
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"result": self.result,
|
||||
"dry_run": self.dry_run,
|
||||
"auto_heal": self.auto_heal,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> IntegrityExecutionRecord:
|
||||
return cls(
|
||||
timestamp=data["timestamp"],
|
||||
result=data["result"],
|
||||
dry_run=data.get("dry_run", False),
|
||||
auto_heal=data.get("auto_heal", False),
|
||||
)
|
||||
|
||||
|
||||
class IntegrityHistoryStore:
|
||||
def __init__(self, storage_root: Path, max_records: int = 50) -> None:
|
||||
self.storage_root = storage_root
|
||||
self.max_records = max_records
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_path(self) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "config" / "integrity_history.json"
|
||||
|
||||
def load(self) -> List[IntegrityExecutionRecord]:
|
||||
path = self._get_path()
|
||||
if not path.exists():
|
||||
return []
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return [IntegrityExecutionRecord.from_dict(d) for d in data.get("executions", [])]
|
||||
except (OSError, ValueError, KeyError) as e:
|
||||
logger.error("Failed to load integrity history: %s", e)
|
||||
return []
|
||||
|
||||
def save(self, records: List[IntegrityExecutionRecord]) -> None:
|
||||
path = self._get_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {"executions": [r.to_dict() for r in records[: self.max_records]]}
|
||||
try:
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except OSError as e:
|
||||
logger.error("Failed to save integrity history: %s", e)
|
||||
|
||||
def add(self, record: IntegrityExecutionRecord) -> None:
|
||||
with self._lock:
|
||||
records = self.load()
|
||||
records.insert(0, record)
|
||||
self.save(records)
|
||||
|
||||
def get_history(self, limit: int = 50, offset: int = 0) -> List[IntegrityExecutionRecord]:
|
||||
return self.load()[offset : offset + limit]
|
||||
|
||||
|
||||
MAX_ISSUES = 500
|
||||
|
||||
|
||||
class IntegrityChecker:
|
||||
SYSTEM_ROOT = ".myfsio.sys"
|
||||
SYSTEM_BUCKETS_DIR = "buckets"
|
||||
BUCKET_META_DIR = "meta"
|
||||
BUCKET_VERSIONS_DIR = "versions"
|
||||
INTERNAL_FOLDERS = {".meta", ".versions", ".multipart"}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage_root: Path,
|
||||
interval_hours: float = 24.0,
|
||||
batch_size: int = 1000,
|
||||
auto_heal: bool = False,
|
||||
dry_run: bool = False,
|
||||
max_history: int = 50,
|
||||
) -> None:
|
||||
self.storage_root = Path(storage_root)
|
||||
self.interval_seconds = interval_hours * 3600.0
|
||||
self.batch_size = batch_size
|
||||
self.auto_heal = auto_heal
|
||||
self.dry_run = dry_run
|
||||
self._timer: Optional[threading.Timer] = None
|
||||
self._shutdown = False
|
||||
self._lock = threading.Lock()
|
||||
self.history_store = IntegrityHistoryStore(storage_root, max_records=max_history)
|
||||
|
||||
def start(self) -> None:
|
||||
if self._timer is not None:
|
||||
return
|
||||
self._shutdown = False
|
||||
self._schedule_next()
|
||||
logger.info(
|
||||
"Integrity checker started: interval=%.1fh, batch_size=%d, auto_heal=%s, dry_run=%s",
|
||||
self.interval_seconds / 3600.0,
|
||||
self.batch_size,
|
||||
self.auto_heal,
|
||||
self.dry_run,
|
||||
)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._shutdown = True
|
||||
if self._timer:
|
||||
self._timer.cancel()
|
||||
self._timer = None
|
||||
logger.info("Integrity checker stopped")
|
||||
|
||||
def _schedule_next(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
self._timer = threading.Timer(self.interval_seconds, self._run_cycle)
|
||||
self._timer.daemon = True
|
||||
self._timer.start()
|
||||
|
||||
def _run_cycle(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
try:
|
||||
self.run_now()
|
||||
except Exception as e:
|
||||
logger.error("Integrity check cycle failed: %s", e)
|
||||
finally:
|
||||
self._schedule_next()
|
||||
|
||||
def run_now(self, auto_heal: Optional[bool] = None, dry_run: Optional[bool] = None) -> IntegrityResult:
|
||||
effective_auto_heal = auto_heal if auto_heal is not None else self.auto_heal
|
||||
effective_dry_run = dry_run if dry_run is not None else self.dry_run
|
||||
|
||||
start = time.time()
|
||||
result = IntegrityResult()
|
||||
|
||||
bucket_names = self._list_bucket_names()
|
||||
|
||||
for bucket_name in bucket_names:
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
break
|
||||
result.buckets_scanned += 1
|
||||
self._check_corrupted_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_orphaned_objects(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_phantom_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_stale_versions(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_etag_cache(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
self._check_legacy_metadata(bucket_name, result, effective_auto_heal, effective_dry_run)
|
||||
|
||||
result.execution_time_seconds = time.time() - start
|
||||
|
||||
if result.has_issues or result.errors:
|
||||
logger.info(
|
||||
"Integrity check completed in %.2fs: corrupted=%d, orphaned=%d, phantom=%d, "
|
||||
"stale_versions=%d, etag_cache=%d, legacy_drift=%d, healed=%d, errors=%d%s",
|
||||
result.execution_time_seconds,
|
||||
result.corrupted_objects,
|
||||
result.orphaned_objects,
|
||||
result.phantom_metadata,
|
||||
result.stale_versions,
|
||||
result.etag_cache_inconsistencies,
|
||||
result.legacy_metadata_drifts,
|
||||
result.issues_healed,
|
||||
len(result.errors),
|
||||
" (dry run)" if effective_dry_run else "",
|
||||
)
|
||||
|
||||
record = IntegrityExecutionRecord(
|
||||
timestamp=time.time(),
|
||||
result=result.to_dict(),
|
||||
dry_run=effective_dry_run,
|
||||
auto_heal=effective_auto_heal,
|
||||
)
|
||||
self.history_store.add(record)
|
||||
|
||||
return result
|
||||
|
||||
def _system_path(self) -> Path:
|
||||
return self.storage_root / self.SYSTEM_ROOT
|
||||
|
||||
def _list_bucket_names(self) -> List[str]:
|
||||
names = []
|
||||
try:
|
||||
for entry in self.storage_root.iterdir():
|
||||
if entry.is_dir() and entry.name != self.SYSTEM_ROOT:
|
||||
names.append(entry.name)
|
||||
except OSError:
|
||||
pass
|
||||
return names
|
||||
|
||||
def _add_issue(self, result: IntegrityResult, issue: IntegrityIssue) -> None:
|
||||
if len(result.issues) < MAX_ISSUES:
|
||||
result.issues.append(issue)
|
||||
|
||||
def _check_corrupted_objects(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
|
||||
if not meta_root.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
for index_file in meta_root.rglob("_index.json"):
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
return
|
||||
if not index_file.is_file():
|
||||
continue
|
||||
try:
|
||||
index_data = json.loads(index_file.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
|
||||
for key_name, entry in list(index_data.items()):
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
return
|
||||
|
||||
rel_dir = index_file.parent.relative_to(meta_root)
|
||||
if rel_dir == Path("."):
|
||||
full_key = key_name
|
||||
else:
|
||||
full_key = rel_dir.as_posix() + "/" + key_name
|
||||
|
||||
object_path = bucket_path / full_key
|
||||
if not object_path.exists():
|
||||
continue
|
||||
|
||||
result.objects_scanned += 1
|
||||
|
||||
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
|
||||
stored_etag = meta.get("__etag__")
|
||||
if not stored_etag:
|
||||
continue
|
||||
|
||||
try:
|
||||
actual_etag = _compute_etag(object_path)
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
if actual_etag != stored_etag:
|
||||
result.corrupted_objects += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="corrupted_object",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail=f"stored_etag={stored_etag} actual_etag={actual_etag}",
|
||||
)
|
||||
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
stat = object_path.stat()
|
||||
meta["__etag__"] = actual_etag
|
||||
meta["__size__"] = str(stat.st_size)
|
||||
meta["__last_modified__"] = str(stat.st_mtime)
|
||||
index_data[key_name] = {"metadata": meta}
|
||||
self._atomic_write_index(index_file, index_data)
|
||||
issue.healed = True
|
||||
issue.heal_action = "updated etag in index"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal corrupted {bucket_name}/{full_key}: {e}")
|
||||
|
||||
self._add_issue(result, issue)
|
||||
except OSError as e:
|
||||
result.errors.append(f"check corrupted {bucket_name}: {e}")
|
||||
|
||||
def _check_orphaned_objects(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
|
||||
try:
|
||||
for entry in bucket_path.rglob("*"):
|
||||
if result.objects_scanned >= self.batch_size:
|
||||
return
|
||||
if not entry.is_file():
|
||||
continue
|
||||
try:
|
||||
rel = entry.relative_to(bucket_path)
|
||||
except ValueError:
|
||||
continue
|
||||
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
|
||||
continue
|
||||
|
||||
full_key = rel.as_posix()
|
||||
key_name = rel.name
|
||||
parent = rel.parent
|
||||
|
||||
if parent == Path("."):
|
||||
index_path = meta_root / "_index.json"
|
||||
else:
|
||||
index_path = meta_root / parent / "_index.json"
|
||||
|
||||
has_entry = False
|
||||
if index_path.exists():
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
has_entry = key_name in index_data
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
if not has_entry:
|
||||
result.orphaned_objects += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="orphaned_object",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail="file exists without metadata entry",
|
||||
)
|
||||
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
etag = _compute_etag(entry)
|
||||
stat = entry.stat()
|
||||
meta = {
|
||||
"__etag__": etag,
|
||||
"__size__": str(stat.st_size),
|
||||
"__last_modified__": str(stat.st_mtime),
|
||||
}
|
||||
index_data = {}
|
||||
if index_path.exists():
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
index_data[key_name] = {"metadata": meta}
|
||||
self._atomic_write_index(index_path, index_data)
|
||||
issue.healed = True
|
||||
issue.heal_action = "created metadata entry"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal orphaned {bucket_name}/{full_key}: {e}")
|
||||
|
||||
self._add_issue(result, issue)
|
||||
except OSError as e:
|
||||
result.errors.append(f"check orphaned {bucket_name}: {e}")
|
||||
|
||||
def _check_phantom_metadata(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
bucket_path = self.storage_root / bucket_name
|
||||
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
|
||||
if not meta_root.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
for index_file in meta_root.rglob("_index.json"):
|
||||
if not index_file.is_file():
|
||||
continue
|
||||
try:
|
||||
index_data = json.loads(index_file.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
|
||||
keys_to_remove = []
|
||||
for key_name in list(index_data.keys()):
|
||||
rel_dir = index_file.parent.relative_to(meta_root)
|
||||
if rel_dir == Path("."):
|
||||
full_key = key_name
|
||||
else:
|
||||
full_key = rel_dir.as_posix() + "/" + key_name
|
||||
|
||||
object_path = bucket_path / full_key
|
||||
if not object_path.exists():
|
||||
result.phantom_metadata += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="phantom_metadata",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail="metadata entry without file on disk",
|
||||
)
|
||||
if auto_heal and not dry_run:
|
||||
keys_to_remove.append(key_name)
|
||||
issue.healed = True
|
||||
issue.heal_action = "removed stale index entry"
|
||||
result.issues_healed += 1
|
||||
self._add_issue(result, issue)
|
||||
|
||||
if keys_to_remove and auto_heal and not dry_run:
|
||||
try:
|
||||
for k in keys_to_remove:
|
||||
index_data.pop(k, None)
|
||||
if index_data:
|
||||
self._atomic_write_index(index_file, index_data)
|
||||
else:
|
||||
index_file.unlink(missing_ok=True)
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal phantom {bucket_name}: {e}")
|
||||
except OSError as e:
|
||||
result.errors.append(f"check phantom {bucket_name}: {e}")
|
||||
|
||||
def _check_stale_versions(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
versions_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_VERSIONS_DIR
|
||||
|
||||
if not versions_root.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
for key_dir in versions_root.rglob("*"):
|
||||
if not key_dir.is_dir():
|
||||
continue
|
||||
|
||||
bin_files = {f.stem: f for f in key_dir.glob("*.bin")}
|
||||
json_files = {f.stem: f for f in key_dir.glob("*.json")}
|
||||
|
||||
for stem, bin_file in bin_files.items():
|
||||
if stem not in json_files:
|
||||
result.stale_versions += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="stale_version",
|
||||
bucket=bucket_name,
|
||||
key=f"{key_dir.relative_to(versions_root).as_posix()}/{bin_file.name}",
|
||||
detail="version data without manifest",
|
||||
)
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
bin_file.unlink(missing_ok=True)
|
||||
issue.healed = True
|
||||
issue.heal_action = "removed orphaned version data"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal stale version {bin_file}: {e}")
|
||||
self._add_issue(result, issue)
|
||||
|
||||
for stem, json_file in json_files.items():
|
||||
if stem not in bin_files:
|
||||
result.stale_versions += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="stale_version",
|
||||
bucket=bucket_name,
|
||||
key=f"{key_dir.relative_to(versions_root).as_posix()}/{json_file.name}",
|
||||
detail="version manifest without data",
|
||||
)
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
json_file.unlink(missing_ok=True)
|
||||
issue.healed = True
|
||||
issue.heal_action = "removed orphaned version manifest"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal stale version {json_file}: {e}")
|
||||
self._add_issue(result, issue)
|
||||
except OSError as e:
|
||||
result.errors.append(f"check stale versions {bucket_name}: {e}")
|
||||
|
||||
def _check_etag_cache(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
etag_index_path = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / "etag_index.json"
|
||||
|
||||
if not etag_index_path.exists():
|
||||
return
|
||||
|
||||
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
if not meta_root.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
etag_cache = json.loads(etag_index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return
|
||||
|
||||
found_mismatch = False
|
||||
|
||||
for full_key, cached_etag in etag_cache.items():
|
||||
key_path = Path(full_key)
|
||||
key_name = key_path.name
|
||||
parent = key_path.parent
|
||||
|
||||
if parent == Path("."):
|
||||
index_path = meta_root / "_index.json"
|
||||
else:
|
||||
index_path = meta_root / parent / "_index.json"
|
||||
|
||||
if not index_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
|
||||
entry = index_data.get(key_name)
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
meta = entry.get("metadata", {}) if isinstance(entry, dict) else {}
|
||||
stored_etag = meta.get("__etag__")
|
||||
|
||||
if stored_etag and cached_etag != stored_etag:
|
||||
result.etag_cache_inconsistencies += 1
|
||||
found_mismatch = True
|
||||
issue = IntegrityIssue(
|
||||
issue_type="etag_cache_inconsistency",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail=f"cached_etag={cached_etag} index_etag={stored_etag}",
|
||||
)
|
||||
self._add_issue(result, issue)
|
||||
|
||||
if found_mismatch and auto_heal and not dry_run:
|
||||
try:
|
||||
etag_index_path.unlink(missing_ok=True)
|
||||
for issue in result.issues:
|
||||
if issue.issue_type == "etag_cache_inconsistency" and issue.bucket == bucket_name and not issue.healed:
|
||||
issue.healed = True
|
||||
issue.heal_action = "deleted etag_index.json"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal etag cache {bucket_name}: {e}")
|
||||
|
||||
def _check_legacy_metadata(
|
||||
self, bucket_name: str, result: IntegrityResult, auto_heal: bool, dry_run: bool
|
||||
) -> None:
|
||||
legacy_meta_root = self.storage_root / bucket_name / ".meta"
|
||||
if not legacy_meta_root.exists():
|
||||
return
|
||||
|
||||
meta_root = self._system_path() / self.SYSTEM_BUCKETS_DIR / bucket_name / self.BUCKET_META_DIR
|
||||
|
||||
try:
|
||||
for meta_file in legacy_meta_root.rglob("*.meta.json"):
|
||||
if not meta_file.is_file():
|
||||
continue
|
||||
|
||||
try:
|
||||
rel = meta_file.relative_to(legacy_meta_root)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
full_key = rel.as_posix().removesuffix(".meta.json")
|
||||
key_path = Path(full_key)
|
||||
key_name = key_path.name
|
||||
parent = key_path.parent
|
||||
|
||||
if parent == Path("."):
|
||||
index_path = meta_root / "_index.json"
|
||||
else:
|
||||
index_path = meta_root / parent / "_index.json"
|
||||
|
||||
try:
|
||||
legacy_data = json.loads(meta_file.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
|
||||
index_entry = None
|
||||
if index_path.exists():
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
index_entry = index_data.get(key_name)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
if index_entry is None:
|
||||
result.legacy_metadata_drifts += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="legacy_metadata_drift",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail="unmigrated legacy .meta.json",
|
||||
)
|
||||
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
index_data = {}
|
||||
if index_path.exists():
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
index_data[key_name] = {"metadata": legacy_data}
|
||||
self._atomic_write_index(index_path, index_data)
|
||||
meta_file.unlink(missing_ok=True)
|
||||
issue.healed = True
|
||||
issue.heal_action = "migrated to index and deleted legacy file"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal legacy {bucket_name}/{full_key}: {e}")
|
||||
|
||||
self._add_issue(result, issue)
|
||||
else:
|
||||
index_meta = index_entry.get("metadata", {}) if isinstance(index_entry, dict) else {}
|
||||
if legacy_data != index_meta:
|
||||
result.legacy_metadata_drifts += 1
|
||||
issue = IntegrityIssue(
|
||||
issue_type="legacy_metadata_drift",
|
||||
bucket=bucket_name,
|
||||
key=full_key,
|
||||
detail="legacy .meta.json differs from index entry",
|
||||
)
|
||||
|
||||
if auto_heal and not dry_run:
|
||||
try:
|
||||
meta_file.unlink(missing_ok=True)
|
||||
issue.healed = True
|
||||
issue.heal_action = "deleted legacy file (index is authoritative)"
|
||||
result.issues_healed += 1
|
||||
except OSError as e:
|
||||
result.errors.append(f"heal legacy drift {bucket_name}/{full_key}: {e}")
|
||||
|
||||
self._add_issue(result, issue)
|
||||
except OSError as e:
|
||||
result.errors.append(f"check legacy meta {bucket_name}: {e}")
|
||||
|
||||
@staticmethod
|
||||
def _atomic_write_index(index_path: Path, data: Dict[str, Any]) -> None:
|
||||
index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = index_path.with_suffix(".tmp")
|
||||
try:
|
||||
with open(tmp_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f)
|
||||
os.replace(str(tmp_path), str(index_path))
|
||||
except BaseException:
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
def get_history(self, limit: int = 50, offset: int = 0) -> List[dict]:
|
||||
records = self.history_store.get_history(limit, offset)
|
||||
return [r.to_dict() for r in records]
|
||||
|
||||
def get_status(self) -> dict:
|
||||
return {
|
||||
"enabled": not self._shutdown or self._timer is not None,
|
||||
"running": self._timer is not None and not self._shutdown,
|
||||
"interval_hours": self.interval_seconds / 3600.0,
|
||||
"batch_size": self.batch_size,
|
||||
"auto_heal": self.auto_heal,
|
||||
"dry_run": self.dry_run,
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.3.8"
|
||||
APP_VERSION = "0.3.9"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
63
docs.md
63
docs.md
@@ -356,6 +356,69 @@ The application automatically trusts these headers to generate correct presigned
|
||||
| `ALLOWED_REDIRECT_HOSTS` | `""` | Comma-separated whitelist of safe redirect targets. Empty allows only same-host redirects. |
|
||||
| `ALLOW_INTERNAL_ENDPOINTS` | `false` | Allow connections to internal/private IPs for webhooks and replication targets. **Keep disabled in production unless needed.** |
|
||||
|
||||
## Integrity Scanner
|
||||
|
||||
The integrity scanner detects and optionally auto-repairs data inconsistencies: corrupted objects (ETag mismatch), orphaned files without metadata, phantom metadata without files, stale version archives, ETag cache drift, and unmigrated legacy `.meta.json` files.
|
||||
|
||||
### Enabling Integrity Scanner
|
||||
|
||||
By default, the integrity scanner is disabled. Enable it by setting:
|
||||
|
||||
```bash
|
||||
INTEGRITY_ENABLED=true python run.py
|
||||
```
|
||||
|
||||
Or in your `myfsio.env` file:
|
||||
```
|
||||
INTEGRITY_ENABLED=true
|
||||
INTEGRITY_INTERVAL_HOURS=24 # Run every 24 hours (default)
|
||||
INTEGRITY_BATCH_SIZE=1000 # Max objects to scan per cycle
|
||||
INTEGRITY_AUTO_HEAL=false # Automatically repair detected issues
|
||||
INTEGRITY_DRY_RUN=false # Set to true to log without healing
|
||||
```
|
||||
|
||||
### What Gets Checked
|
||||
|
||||
| Check | Detection | Heal Action |
|
||||
|-------|-----------|-------------|
|
||||
| **Corrupted objects** | File MD5 does not match stored `__etag__` | Update `__etag__` in index (disk data is authoritative) |
|
||||
| **Orphaned objects** | File exists on disk without metadata entry | Create index entry with computed MD5/size/mtime |
|
||||
| **Phantom metadata** | Index entry exists but file is missing from disk | Remove stale entry from `_index.json` |
|
||||
| **Stale versions** | `.json` manifest without `.bin` data or vice versa | Remove orphaned version file |
|
||||
| **ETag cache inconsistency** | `etag_index.json` entry differs from metadata `__etag__` | Delete `etag_index.json` (auto-rebuilt on next list) |
|
||||
| **Legacy metadata drift** | Legacy `.meta.json` differs from index or is unmigrated | Migrate to index and delete legacy file |
|
||||
|
||||
### Admin API
|
||||
|
||||
All integrity endpoints require admin (`iam:*`) permissions.
|
||||
|
||||
| Method | Route | Description |
|
||||
|--------|-------|-------------|
|
||||
| `GET` | `/admin/integrity/status` | Get scanner status and configuration |
|
||||
| `POST` | `/admin/integrity/run` | Trigger a manual scan (body: `{"dry_run": true, "auto_heal": true}`) |
|
||||
| `GET` | `/admin/integrity/history` | Get scan history (query: `?limit=50&offset=0`) |
|
||||
|
||||
### Dry Run Mode
|
||||
|
||||
Set `INTEGRITY_DRY_RUN=true` to log detected issues without making any changes. You can also trigger a one-time dry run via the admin API:
|
||||
|
||||
```bash
|
||||
curl -X POST "http://localhost:5000/admin/integrity/run" \
|
||||
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"dry_run": true, "auto_heal": true}'
|
||||
```
|
||||
|
||||
### Configuration Reference
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `INTEGRITY_ENABLED` | `false` | Enable background integrity scanning |
|
||||
| `INTEGRITY_INTERVAL_HOURS` | `24` | Hours between scan cycles |
|
||||
| `INTEGRITY_BATCH_SIZE` | `1000` | Max objects to scan per cycle |
|
||||
| `INTEGRITY_AUTO_HEAL` | `false` | Automatically repair detected issues |
|
||||
| `INTEGRITY_DRY_RUN` | `false` | Log issues without healing |
|
||||
|
||||
## 4. Upgrading and Updates
|
||||
|
||||
### Version Checking
|
||||
|
||||
@@ -15,6 +15,12 @@
|
||||
--myfsio-hover-bg: rgba(59, 130, 246, 0.12);
|
||||
--myfsio-accent: #3b82f6;
|
||||
--myfsio-accent-hover: #2563eb;
|
||||
--myfsio-tag-key-bg: #e0e7ff;
|
||||
--myfsio-tag-key-text: #3730a3;
|
||||
--myfsio-tag-value-bg: #f0f1fa;
|
||||
--myfsio-tag-value-text: #4338ca;
|
||||
--myfsio-tag-border: #c7d2fe;
|
||||
--myfsio-tag-delete-hover: #ef4444;
|
||||
}
|
||||
|
||||
[data-theme='dark'] {
|
||||
@@ -34,6 +40,12 @@
|
||||
--myfsio-hover-bg: rgba(59, 130, 246, 0.2);
|
||||
--myfsio-accent: #60a5fa;
|
||||
--myfsio-accent-hover: #3b82f6;
|
||||
--myfsio-tag-key-bg: #312e81;
|
||||
--myfsio-tag-key-text: #c7d2fe;
|
||||
--myfsio-tag-value-bg: #1e1b4b;
|
||||
--myfsio-tag-value-text: #a5b4fc;
|
||||
--myfsio-tag-border: #4338ca;
|
||||
--myfsio-tag-delete-hover: #f87171;
|
||||
}
|
||||
|
||||
[data-theme='dark'] body,
|
||||
@@ -3002,6 +3014,89 @@ body:has(.login-card) .main-wrapper {
|
||||
padding: 0.375rem 1rem;
|
||||
}
|
||||
|
||||
.tag-pill {
|
||||
display: inline-flex;
|
||||
border-radius: 9999px;
|
||||
border: 1px solid var(--myfsio-tag-border);
|
||||
overflow: hidden;
|
||||
font-size: 0.75rem;
|
||||
line-height: 1;
|
||||
}
|
||||
|
||||
.tag-pill-key {
|
||||
padding: 0.3rem 0.5rem;
|
||||
background: var(--myfsio-tag-key-bg);
|
||||
color: var(--myfsio-tag-key-text);
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.tag-pill-value {
|
||||
padding: 0.3rem 0.5rem;
|
||||
background: var(--myfsio-tag-value-bg);
|
||||
color: var(--myfsio-tag-value-text);
|
||||
font-weight: 400;
|
||||
}
|
||||
|
||||
.tag-editor-card {
|
||||
background: var(--myfsio-preview-bg);
|
||||
border-radius: 0.5rem;
|
||||
padding: 0.75rem;
|
||||
}
|
||||
|
||||
.tag-editor-header,
|
||||
.tag-editor-row {
|
||||
display: grid;
|
||||
grid-template-columns: 1fr 1fr 28px;
|
||||
gap: 0.5rem;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.tag-editor-header {
|
||||
padding-bottom: 0.375rem;
|
||||
border-bottom: 1px solid var(--myfsio-card-border);
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
|
||||
.tag-editor-header span {
|
||||
font-size: 0.7rem;
|
||||
font-weight: 600;
|
||||
text-transform: uppercase;
|
||||
color: var(--myfsio-muted);
|
||||
letter-spacing: 0.05em;
|
||||
}
|
||||
|
||||
.tag-editor-row {
|
||||
margin-bottom: 0.375rem;
|
||||
}
|
||||
|
||||
.tag-editor-delete {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
width: 28px;
|
||||
height: 28px;
|
||||
border: none;
|
||||
background: transparent;
|
||||
color: var(--myfsio-muted);
|
||||
border-radius: 0.375rem;
|
||||
cursor: pointer;
|
||||
transition: color 0.15s, background 0.15s;
|
||||
}
|
||||
|
||||
.tag-editor-delete:hover {
|
||||
color: var(--myfsio-tag-delete-hover);
|
||||
background: rgba(239, 68, 68, 0.1);
|
||||
}
|
||||
|
||||
.tag-editor-actions {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
margin-top: 0.75rem;
|
||||
padding-top: 0.5rem;
|
||||
border-top: 1px solid var(--myfsio-card-border);
|
||||
}
|
||||
|
||||
@media (prefers-reduced-motion: reduce) {
|
||||
*,
|
||||
*::before,
|
||||
|
||||
@@ -3948,6 +3948,7 @@
|
||||
const cancelTagsButton = document.getElementById('cancelTagsButton');
|
||||
let currentObjectTags = [];
|
||||
let isEditingTags = false;
|
||||
let savedObjectTags = [];
|
||||
|
||||
const loadObjectTags = async (row) => {
|
||||
if (!row || !previewTagsPanel) return;
|
||||
@@ -3976,17 +3977,26 @@
|
||||
previewTagsEmpty.classList.remove('d-none');
|
||||
} else {
|
||||
previewTagsEmpty.classList.add('d-none');
|
||||
previewTagsList.innerHTML = currentObjectTags.map(t => `<span class="badge bg-info-subtle text-info">${escapeHtml(t.Key)}=${escapeHtml(t.Value)}</span>`).join('');
|
||||
previewTagsList.innerHTML = currentObjectTags.map(t => `<span class="tag-pill"><span class="tag-pill-key">${escapeHtml(t.Key)}</span><span class="tag-pill-value">${escapeHtml(t.Value)}</span></span>`).join('');
|
||||
}
|
||||
};
|
||||
|
||||
const syncTagInputs = () => {
|
||||
previewTagsInputs?.querySelectorAll('.tag-editor-row').forEach((row, idx) => {
|
||||
if (idx < currentObjectTags.length) {
|
||||
currentObjectTags[idx].Key = row.querySelector(`[data-tag-key="${idx}"]`)?.value || '';
|
||||
currentObjectTags[idx].Value = row.querySelector(`[data-tag-value="${idx}"]`)?.value || '';
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const renderTagEditor = () => {
|
||||
if (!previewTagsInputs) return;
|
||||
previewTagsInputs.innerHTML = currentObjectTags.map((t, idx) => `
|
||||
<div class="input-group input-group-sm mb-1">
|
||||
<input type="text" class="form-control" placeholder="Key" value="${escapeHtml(t.Key)}" data-tag-key="${idx}">
|
||||
<input type="text" class="form-control" placeholder="Value" value="${escapeHtml(t.Value)}" data-tag-value="${idx}">
|
||||
<button class="btn btn-outline-danger" type="button" onclick="removeTagRow(${idx})">
|
||||
<div class="tag-editor-row">
|
||||
<input type="text" class="form-control form-control-sm" placeholder="e.g. Environment" value="${escapeHtml(t.Key)}" data-tag-key="${idx}">
|
||||
<input type="text" class="form-control form-control-sm" placeholder="e.g. Production" value="${escapeHtml(t.Value)}" data-tag-value="${idx}">
|
||||
<button class="tag-editor-delete" type="button" onclick="removeTagRow(${idx})">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" viewBox="0 0 16 16"><path d="M4.646 4.646a.5.5 0 0 1 .708 0L8 7.293l2.646-2.647a.5.5 0 0 1 .708.708L8.707 8l2.647 2.646a.5.5 0 0 1-.708.708L8 8.707l-2.646 2.647a.5.5 0 0 1-.708-.708L7.293 8 4.646 5.354a.5.5 0 0 1 0-.708z"/></svg>
|
||||
</button>
|
||||
</div>
|
||||
@@ -3994,20 +4004,29 @@
|
||||
};
|
||||
|
||||
window.removeTagRow = (idx) => {
|
||||
syncTagInputs();
|
||||
currentObjectTags.splice(idx, 1);
|
||||
renderTagEditor();
|
||||
};
|
||||
|
||||
editTagsButton?.addEventListener('click', () => {
|
||||
savedObjectTags = currentObjectTags.map(t => ({ Key: t.Key, Value: t.Value }));
|
||||
isEditingTags = true;
|
||||
previewTagsList.classList.add('d-none');
|
||||
previewTagsEmpty.classList.add('d-none');
|
||||
previewTagsEditor?.classList.remove('d-none');
|
||||
const card = previewTagsEditor?.querySelector('.tag-editor-card');
|
||||
if (card) {
|
||||
card.style.opacity = '0';
|
||||
card.style.transition = 'opacity 0.2s ease';
|
||||
requestAnimationFrame(() => { card.style.opacity = '1'; });
|
||||
}
|
||||
renderTagEditor();
|
||||
});
|
||||
|
||||
cancelTagsButton?.addEventListener('click', () => {
|
||||
isEditingTags = false;
|
||||
currentObjectTags = savedObjectTags.map(t => ({ Key: t.Key, Value: t.Value }));
|
||||
previewTagsEditor?.classList.add('d-none');
|
||||
previewTagsList.classList.remove('d-none');
|
||||
renderObjectTags();
|
||||
@@ -4018,6 +4037,7 @@
|
||||
showMessage({ title: 'Limit reached', body: 'Maximum 10 tags allowed per object.', variant: 'warning' });
|
||||
return;
|
||||
}
|
||||
syncTagInputs();
|
||||
currentObjectTags.push({ Key: '', Value: '' });
|
||||
renderTagEditor();
|
||||
});
|
||||
@@ -4026,7 +4046,7 @@
|
||||
if (!activeRow) return;
|
||||
const tagsUrl = activeRow.dataset.tagsUrl;
|
||||
if (!tagsUrl) return;
|
||||
const inputs = previewTagsInputs?.querySelectorAll('.input-group');
|
||||
const inputs = previewTagsInputs?.querySelectorAll('.tag-editor-row');
|
||||
const newTags = [];
|
||||
inputs?.forEach((group, idx) => {
|
||||
const key = group.querySelector(`[data-tag-key="${idx}"]`)?.value?.trim() || '';
|
||||
|
||||
@@ -292,19 +292,28 @@
|
||||
Edit
|
||||
</button>
|
||||
</div>
|
||||
<div id="preview-tags-list" class="d-flex flex-wrap gap-1"></div>
|
||||
<div id="preview-tags-list" class="d-flex flex-wrap gap-2"></div>
|
||||
<div id="preview-tags-empty" class="text-muted small p-2 bg-body-tertiary rounded">No tags</div>
|
||||
<div id="preview-tags-editor" class="d-none mt-2">
|
||||
<div id="preview-tags-inputs" class="mb-2"></div>
|
||||
<div class="d-flex gap-2">
|
||||
<button class="btn btn-sm btn-outline-secondary flex-grow-1" type="button" id="addTagRow">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" class="me-1" viewBox="0 0 16 16">
|
||||
<path d="M8 4a.5.5 0 0 1 .5.5v3h3a.5.5 0 0 1 0 1h-3v3a.5.5 0 0 1-1 0v-3h-3a.5.5 0 0 1 0-1h3v-3A.5.5 0 0 1 8 4z"/>
|
||||
</svg>
|
||||
Add Tag
|
||||
</button>
|
||||
<button class="btn btn-sm btn-primary" type="button" id="saveTagsButton">Save</button>
|
||||
<button class="btn btn-sm btn-outline-secondary" type="button" id="cancelTagsButton">Cancel</button>
|
||||
<div class="tag-editor-card">
|
||||
<div class="tag-editor-header">
|
||||
<span>Key</span>
|
||||
<span>Value</span>
|
||||
<span></span>
|
||||
</div>
|
||||
<div id="preview-tags-inputs"></div>
|
||||
<div class="tag-editor-actions">
|
||||
<button class="btn btn-sm btn-outline-secondary" type="button" id="addTagRow">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" class="me-1" viewBox="0 0 16 16">
|
||||
<path d="M8 4a.5.5 0 0 1 .5.5v3h3a.5.5 0 0 1 0 1h-3v3a.5.5 0 0 1-1 0v-3h-3a.5.5 0 0 1 0-1h3v-3A.5.5 0 0 1 8 4z"/>
|
||||
</svg>
|
||||
Add Tag
|
||||
</button>
|
||||
<div class="ms-auto d-flex gap-2">
|
||||
<button class="btn btn-sm btn-outline-secondary" type="button" id="cancelTagsButton">Cancel</button>
|
||||
<button class="btn btn-sm btn-primary" type="button" id="saveTagsButton">Save</button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="form-text mt-1">Maximum 10 tags. Keys and values up to 256 characters.</div>
|
||||
</div>
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
<li><a href="#encryption">Encryption</a></li>
|
||||
<li><a href="#lifecycle">Lifecycle Rules</a></li>
|
||||
<li><a href="#garbage-collection">Garbage Collection</a></li>
|
||||
<li><a href="#integrity">Integrity Scanner</a></li>
|
||||
<li><a href="#metrics">Metrics History</a></li>
|
||||
<li><a href="#operation-metrics">Operation Metrics</a></li>
|
||||
<li><a href="#troubleshooting">Troubleshooting</a></li>
|
||||
@@ -1731,10 +1732,114 @@ curl "{{ api_base }}/admin/gc/history?limit=10" \
|
||||
</div>
|
||||
</div>
|
||||
</article>
|
||||
<article id="metrics" class="card shadow-sm docs-section">
|
||||
<article id="integrity" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">15</span>
|
||||
<h2 class="h4 mb-0">Integrity Scanner</h2>
|
||||
</div>
|
||||
<p class="text-muted">Detect and optionally auto-repair data inconsistencies: corrupted objects, orphaned files, phantom metadata, stale versions, ETag cache drift, and unmigrated legacy metadata.</p>
|
||||
|
||||
<h3 class="h6 text-uppercase text-muted mt-4">Enabling Integrity Scanner</h3>
|
||||
<p class="small text-muted">Disabled by default. Enable via environment variable:</p>
|
||||
<pre class="mb-3"><code class="language-bash">INTEGRITY_ENABLED=true python run.py</code></pre>
|
||||
|
||||
<h3 class="h6 text-uppercase text-muted mt-4">Configuration</h3>
|
||||
<div class="table-responsive mb-3">
|
||||
<table class="table table-sm table-bordered small">
|
||||
<thead class="table-light">
|
||||
<tr>
|
||||
<th>Variable</th>
|
||||
<th>Default</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr><td><code>INTEGRITY_ENABLED</code></td><td><code>false</code></td><td>Enable background integrity scanning</td></tr>
|
||||
<tr><td><code>INTEGRITY_INTERVAL_HOURS</code></td><td><code>24</code></td><td>Hours between scan cycles</td></tr>
|
||||
<tr><td><code>INTEGRITY_BATCH_SIZE</code></td><td><code>1000</code></td><td>Max objects to scan per cycle</td></tr>
|
||||
<tr><td><code>INTEGRITY_AUTO_HEAL</code></td><td><code>false</code></td><td>Automatically repair detected issues</td></tr>
|
||||
<tr><td><code>INTEGRITY_DRY_RUN</code></td><td><code>false</code></td><td>Log issues without healing</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<h3 class="h6 text-uppercase text-muted mt-4">What Gets Checked</h3>
|
||||
<div class="table-responsive mb-3">
|
||||
<table class="table table-sm table-bordered small">
|
||||
<thead class="table-light">
|
||||
<tr>
|
||||
<th>Check</th>
|
||||
<th>Detection</th>
|
||||
<th>Heal Action</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr><td><strong>Corrupted objects</strong></td><td>File MD5 does not match stored ETag</td><td>Update ETag in index (disk is authoritative)</td></tr>
|
||||
<tr><td><strong>Orphaned objects</strong></td><td>File exists without metadata entry</td><td>Create index entry with computed MD5/size/mtime</td></tr>
|
||||
<tr><td><strong>Phantom metadata</strong></td><td>Index entry exists but file is missing</td><td>Remove stale entry from index</td></tr>
|
||||
<tr><td><strong>Stale versions</strong></td><td>Manifest without data or vice versa</td><td>Remove orphaned version file</td></tr>
|
||||
<tr><td><strong>ETag cache</strong></td><td><code>etag_index.json</code> differs from metadata</td><td>Delete cache file (auto-rebuilt)</td></tr>
|
||||
<tr><td><strong>Legacy metadata</strong></td><td>Legacy <code>.meta.json</code> differs or unmigrated</td><td>Migrate to index, delete legacy file</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<h3 class="h6 text-uppercase text-muted mt-4">Admin API</h3>
|
||||
<div class="table-responsive mb-3">
|
||||
<table class="table table-sm table-bordered small">
|
||||
<thead class="table-light">
|
||||
<tr>
|
||||
<th>Method</th>
|
||||
<th>Route</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr><td><code>GET</code></td><td><code>/admin/integrity/status</code></td><td>Get scanner status and configuration</td></tr>
|
||||
<tr><td><code>POST</code></td><td><code>/admin/integrity/run</code></td><td>Trigger manual scan</td></tr>
|
||||
<tr><td><code>GET</code></td><td><code>/admin/integrity/history</code></td><td>Get scan history</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<pre class="mb-3"><code class="language-bash"># Trigger a dry run with auto-heal preview
|
||||
curl -X POST "{{ api_base }}/admin/integrity/run" \
|
||||
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"dry_run": true, "auto_heal": true}'
|
||||
|
||||
# Trigger actual scan with healing
|
||||
curl -X POST "{{ api_base }}/admin/integrity/run" \
|
||||
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"auto_heal": true}'
|
||||
|
||||
# Check status
|
||||
curl "{{ api_base }}/admin/integrity/status" \
|
||||
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
|
||||
|
||||
# View history
|
||||
curl "{{ api_base }}/admin/integrity/history?limit=10" \
|
||||
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"</code></pre>
|
||||
|
||||
<div class="alert alert-light border mb-0">
|
||||
<div class="d-flex gap-2">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-info-circle text-muted mt-1 flex-shrink-0" viewBox="0 0 16 16">
|
||||
<path d="M8 15A7 7 0 1 1 8 1a7 7 0 0 1 0 14zm0 1A8 8 0 1 0 8 0a8 8 0 0 0 0 16z"/>
|
||||
<path d="m8.93 6.588-2.29.287-.082.38.45.083c.294.07.352.176.288.469l-.738 3.468c-.194.897.105 1.319.808 1.319.545 0 1.178-.252 1.465-.598l.088-.416c-.2.176-.492.246-.686.246-.275 0-.375-.193-.304-.533L8.93 6.588zM9 4.5a1 1 0 1 1-2 0 1 1 0 0 1 2 0z"/>
|
||||
</svg>
|
||||
<div>
|
||||
<strong>Dry Run:</strong> Use <code>INTEGRITY_DRY_RUN=true</code> or pass <code>{"dry_run": true}</code> to the API to preview detected issues without making any changes. Combine with <code>{"auto_heal": true}</code> to see what would be repaired.
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</article>
|
||||
<article id="metrics" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">16</span>
|
||||
<h2 class="h4 mb-0">Metrics History</h2>
|
||||
</div>
|
||||
<p class="text-muted">Track CPU, memory, and disk usage over time with optional metrics history. Disabled by default to minimize overhead.</p>
|
||||
@@ -1818,7 +1923,7 @@ curl -X PUT "{{ api_base | replace('/api', '/ui') }}/metrics/settings" \
|
||||
<article id="operation-metrics" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">16</span>
|
||||
<span class="docs-section-kicker">17</span>
|
||||
<h2 class="h4 mb-0">Operation Metrics</h2>
|
||||
</div>
|
||||
<p class="text-muted">Track API request statistics including request counts, latency, error rates, and bandwidth usage. Provides real-time visibility into API operations.</p>
|
||||
@@ -1925,7 +2030,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
|
||||
<article id="troubleshooting" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">17</span>
|
||||
<span class="docs-section-kicker">18</span>
|
||||
<h2 class="h4 mb-0">Troubleshooting & tips</h2>
|
||||
</div>
|
||||
<div class="table-responsive">
|
||||
@@ -1976,7 +2081,7 @@ curl "{{ api_base | replace('/api', '/ui') }}/metrics/operations/history?hours=6
|
||||
<article id="health-check" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">18</span>
|
||||
<span class="docs-section-kicker">19</span>
|
||||
<h2 class="h4 mb-0">Health Check Endpoint</h2>
|
||||
</div>
|
||||
<p class="text-muted">The API exposes a health check endpoint for monitoring and load balancer integration.</p>
|
||||
@@ -1998,7 +2103,7 @@ curl {{ api_base }}/myfsio/health
|
||||
<article id="object-lock" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">19</span>
|
||||
<span class="docs-section-kicker">20</span>
|
||||
<h2 class="h4 mb-0">Object Lock & Retention</h2>
|
||||
</div>
|
||||
<p class="text-muted">Object Lock prevents objects from being deleted or overwritten for a specified retention period.</p>
|
||||
@@ -2058,7 +2163,7 @@ curl "{{ api_base }}/<bucket>/<key>?legal-hold" \
|
||||
<article id="access-logging" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">20</span>
|
||||
<span class="docs-section-kicker">21</span>
|
||||
<h2 class="h4 mb-0">Access Logging</h2>
|
||||
</div>
|
||||
<p class="text-muted">Enable S3-style access logging to track all requests to your buckets for audit and analysis.</p>
|
||||
@@ -2085,7 +2190,7 @@ curl "{{ api_base }}/<bucket>?logging" \
|
||||
<article id="notifications" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">21</span>
|
||||
<span class="docs-section-kicker">22</span>
|
||||
<h2 class="h4 mb-0">Notifications & Webhooks</h2>
|
||||
</div>
|
||||
<p class="text-muted">Configure event notifications to trigger webhooks when objects are created or deleted.</p>
|
||||
@@ -2148,7 +2253,7 @@ curl -X PUT "{{ api_base }}/<bucket>?notification" \
|
||||
<article id="select-content" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">22</span>
|
||||
<span class="docs-section-kicker">23</span>
|
||||
<h2 class="h4 mb-0">SelectObjectContent (SQL)</h2>
|
||||
</div>
|
||||
<p class="text-muted">Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.</p>
|
||||
@@ -2193,7 +2298,7 @@ curl -X POST "{{ api_base }}/<bucket>/data.csv?select" \
|
||||
<article id="advanced-ops" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">23</span>
|
||||
<span class="docs-section-kicker">24</span>
|
||||
<h2 class="h4 mb-0">Advanced S3 Operations</h2>
|
||||
</div>
|
||||
<p class="text-muted">Copy, move, and partially download objects using advanced S3 operations.</p>
|
||||
@@ -2267,7 +2372,7 @@ curl "{{ api_base }}/<bucket>/<key>" \
|
||||
<article id="acls" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">24</span>
|
||||
<span class="docs-section-kicker">25</span>
|
||||
<h2 class="h4 mb-0">Access Control Lists (ACLs)</h2>
|
||||
</div>
|
||||
<p class="text-muted">ACLs provide legacy-style permission management for buckets and objects.</p>
|
||||
@@ -2321,7 +2426,7 @@ curl -X PUT "{{ api_base }}/<bucket>/<key>" \
|
||||
<article id="tagging" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">25</span>
|
||||
<span class="docs-section-kicker">26</span>
|
||||
<h2 class="h4 mb-0">Object & Bucket Tagging</h2>
|
||||
</div>
|
||||
<p class="text-muted">Add metadata tags to buckets and objects for organization, cost allocation, or lifecycle rule filtering.</p>
|
||||
@@ -2380,7 +2485,7 @@ curl -X PUT "{{ api_base }}/<bucket>?tagging" \
|
||||
<article id="website-hosting" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">26</span>
|
||||
<span class="docs-section-kicker">27</span>
|
||||
<h2 class="h4 mb-0">Static Website Hosting</h2>
|
||||
</div>
|
||||
<p class="text-muted">Host static websites directly from S3 buckets with custom index and error pages, served via custom domain mapping.</p>
|
||||
@@ -2473,7 +2578,7 @@ server {
|
||||
<article id="cors-config" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">27</span>
|
||||
<span class="docs-section-kicker">28</span>
|
||||
<h2 class="h4 mb-0">CORS Configuration</h2>
|
||||
</div>
|
||||
<p class="text-muted">Configure per-bucket Cross-Origin Resource Sharing rules to control which origins can access your bucket from a browser.</p>
|
||||
@@ -2540,7 +2645,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
|
||||
<article id="post-object" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">28</span>
|
||||
<span class="docs-section-kicker">29</span>
|
||||
<h2 class="h4 mb-0">PostObject (HTML Form Upload)</h2>
|
||||
</div>
|
||||
<p class="text-muted">Upload objects directly from an HTML form using browser-based POST uploads with policy-based authorization.</p>
|
||||
@@ -2582,7 +2687,7 @@ curl -X DELETE "{{ api_base }}/<bucket>?cors" \
|
||||
<article id="list-objects-v2" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">29</span>
|
||||
<span class="docs-section-kicker">30</span>
|
||||
<h2 class="h4 mb-0">List Objects API v2</h2>
|
||||
</div>
|
||||
<p class="text-muted">Use the v2 list API for improved pagination with continuation tokens instead of markers.</p>
|
||||
@@ -2626,7 +2731,7 @@ curl "{{ api_base }}/<bucket>?list-type=2&start-after=photos/2025/" \
|
||||
<article id="upgrading" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">30</span>
|
||||
<span class="docs-section-kicker">31</span>
|
||||
<h2 class="h4 mb-0">Upgrading & Updates</h2>
|
||||
</div>
|
||||
<p class="text-muted">How to safely update MyFSIO to a new version.</p>
|
||||
@@ -2659,7 +2764,7 @@ cp -r logs/ logs-backup/</code></pre>
|
||||
<article id="api-matrix" class="card shadow-sm docs-section">
|
||||
<div class="card-body">
|
||||
<div class="d-flex align-items-center gap-2 mb-3">
|
||||
<span class="docs-section-kicker">31</span>
|
||||
<span class="docs-section-kicker">32</span>
|
||||
<h2 class="h4 mb-0">Full API Reference</h2>
|
||||
</div>
|
||||
<p class="text-muted">Complete list of all S3-compatible, admin, and KMS endpoints.</p>
|
||||
|
||||
499
tests/test_integrity.py
Normal file
499
tests/test_integrity.py
Normal file
@@ -0,0 +1,499 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
from app.integrity import IntegrityChecker, IntegrityResult
|
||||
|
||||
|
||||
def _md5(data: bytes) -> str:
|
||||
return hashlib.md5(data).hexdigest()
|
||||
|
||||
|
||||
def _setup_bucket(storage_root: Path, bucket_name: str, objects: dict[str, bytes]) -> None:
|
||||
bucket_path = storage_root / bucket_name
|
||||
bucket_path.mkdir(parents=True, exist_ok=True)
|
||||
meta_root = storage_root / ".myfsio.sys" / "buckets" / bucket_name / "meta"
|
||||
meta_root.mkdir(parents=True, exist_ok=True)
|
||||
bucket_json = storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".bucket.json"
|
||||
bucket_json.write_text(json.dumps({"created": "2025-01-01"}))
|
||||
|
||||
for key, data in objects.items():
|
||||
obj_path = bucket_path / key
|
||||
obj_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
obj_path.write_bytes(data)
|
||||
|
||||
etag = _md5(data)
|
||||
stat = obj_path.stat()
|
||||
meta = {
|
||||
"__etag__": etag,
|
||||
"__size__": str(stat.st_size),
|
||||
"__last_modified__": str(stat.st_mtime),
|
||||
}
|
||||
|
||||
key_path = Path(key)
|
||||
parent = key_path.parent
|
||||
key_name = key_path.name
|
||||
if parent == Path("."):
|
||||
index_path = meta_root / "_index.json"
|
||||
else:
|
||||
index_path = meta_root / parent / "_index.json"
|
||||
index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
index_data = {}
|
||||
if index_path.exists():
|
||||
index_data = json.loads(index_path.read_text())
|
||||
index_data[key_name] = {"metadata": meta}
|
||||
index_path.write_text(json.dumps(index_data))
|
||||
|
||||
|
||||
def _issues_of_type(result, issue_type):
|
||||
return [i for i in result.issues if i.issue_type == issue_type]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def storage_root(tmp_path):
|
||||
root = tmp_path / "data"
|
||||
root.mkdir()
|
||||
(root / ".myfsio.sys" / "config").mkdir(parents=True, exist_ok=True)
|
||||
return root
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def checker(storage_root):
|
||||
return IntegrityChecker(
|
||||
storage_root=storage_root,
|
||||
interval_hours=24.0,
|
||||
batch_size=1000,
|
||||
auto_heal=False,
|
||||
dry_run=False,
|
||||
)
|
||||
|
||||
|
||||
class TestCorruptedObjects:
|
||||
def test_detect_corrupted(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
|
||||
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.corrupted_objects == 1
|
||||
issues = _issues_of_type(result, "corrupted_object")
|
||||
assert len(issues) == 1
|
||||
assert issues[0].bucket == "mybucket"
|
||||
assert issues[0].key == "file.txt"
|
||||
assert not issues[0].healed
|
||||
|
||||
def test_heal_corrupted(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
|
||||
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted data")
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.corrupted_objects == 1
|
||||
assert result.issues_healed == 1
|
||||
issues = _issues_of_type(result, "corrupted_object")
|
||||
assert issues[0].healed
|
||||
|
||||
result2 = checker.run_now()
|
||||
assert result2.corrupted_objects == 0
|
||||
|
||||
def test_valid_objects_pass(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello world"})
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.corrupted_objects == 0
|
||||
assert result.objects_scanned == 1
|
||||
|
||||
def test_corrupted_nested_key(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"sub/dir/file.txt": b"nested content"})
|
||||
(storage_root / "mybucket" / "sub" / "dir" / "file.txt").write_bytes(b"bad")
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.corrupted_objects == 1
|
||||
issues = _issues_of_type(result, "corrupted_object")
|
||||
assert issues[0].key == "sub/dir/file.txt"
|
||||
|
||||
|
||||
class TestOrphanedObjects:
|
||||
def test_detect_orphaned(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {})
|
||||
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.orphaned_objects == 1
|
||||
issues = _issues_of_type(result, "orphaned_object")
|
||||
assert len(issues) == 1
|
||||
|
||||
def test_heal_orphaned(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {})
|
||||
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan data")
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.orphaned_objects == 1
|
||||
assert result.issues_healed == 1
|
||||
issues = _issues_of_type(result, "orphaned_object")
|
||||
assert issues[0].healed
|
||||
|
||||
result2 = checker.run_now()
|
||||
assert result2.orphaned_objects == 0
|
||||
assert result2.objects_scanned >= 1
|
||||
|
||||
|
||||
class TestPhantomMetadata:
|
||||
def test_detect_phantom(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
(storage_root / "mybucket" / "file.txt").unlink()
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.phantom_metadata == 1
|
||||
issues = _issues_of_type(result, "phantom_metadata")
|
||||
assert len(issues) == 1
|
||||
|
||||
def test_heal_phantom(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
(storage_root / "mybucket" / "file.txt").unlink()
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.phantom_metadata == 1
|
||||
assert result.issues_healed == 1
|
||||
|
||||
result2 = checker.run_now()
|
||||
assert result2.phantom_metadata == 0
|
||||
|
||||
|
||||
class TestStaleVersions:
|
||||
def test_manifest_without_data(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
|
||||
versions_root.mkdir(parents=True)
|
||||
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.stale_versions == 1
|
||||
issues = _issues_of_type(result, "stale_version")
|
||||
assert "manifest without data" in issues[0].detail
|
||||
|
||||
def test_data_without_manifest(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
|
||||
versions_root.mkdir(parents=True)
|
||||
(versions_root / "v1.bin").write_bytes(b"old data")
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.stale_versions == 1
|
||||
issues = _issues_of_type(result, "stale_version")
|
||||
assert "data without manifest" in issues[0].detail
|
||||
|
||||
def test_heal_stale_versions(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
|
||||
versions_root.mkdir(parents=True)
|
||||
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
|
||||
(versions_root / "v2.bin").write_bytes(b"old data")
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.stale_versions == 2
|
||||
assert result.issues_healed == 2
|
||||
assert not (versions_root / "v1.json").exists()
|
||||
assert not (versions_root / "v2.bin").exists()
|
||||
|
||||
def test_valid_versions_pass(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
versions_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "versions" / "file.txt"
|
||||
versions_root.mkdir(parents=True)
|
||||
(versions_root / "v1.json").write_text(json.dumps({"etag": "abc"}))
|
||||
(versions_root / "v1.bin").write_bytes(b"old data")
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.stale_versions == 0
|
||||
|
||||
|
||||
class TestEtagCache:
|
||||
def test_detect_mismatch(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
|
||||
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.etag_cache_inconsistencies == 1
|
||||
issues = _issues_of_type(result, "etag_cache_inconsistency")
|
||||
assert len(issues) == 1
|
||||
|
||||
def test_heal_mismatch(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
etag_path = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "etag_index.json"
|
||||
etag_path.write_text(json.dumps({"file.txt": "wrong_etag"}))
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.etag_cache_inconsistencies == 1
|
||||
assert result.issues_healed == 1
|
||||
assert not etag_path.exists()
|
||||
|
||||
|
||||
class TestLegacyMetadata:
|
||||
def test_detect_unmigrated(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
|
||||
legacy_meta.parent.mkdir(parents=True)
|
||||
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
|
||||
|
||||
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
|
||||
index_path = meta_root / "_index.json"
|
||||
index_path.unlink()
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.legacy_metadata_drifts == 1
|
||||
issues = _issues_of_type(result, "legacy_metadata_drift")
|
||||
assert len(issues) == 1
|
||||
assert issues[0].detail == "unmigrated legacy .meta.json"
|
||||
|
||||
def test_detect_drift(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
|
||||
legacy_meta.parent.mkdir(parents=True)
|
||||
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.legacy_metadata_drifts == 1
|
||||
issues = _issues_of_type(result, "legacy_metadata_drift")
|
||||
assert "differs from index" in issues[0].detail
|
||||
|
||||
def test_heal_unmigrated(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
|
||||
legacy_meta.parent.mkdir(parents=True)
|
||||
legacy_data = {"__etag__": _md5(b"hello"), "__size__": "5"}
|
||||
legacy_meta.write_text(json.dumps(legacy_data))
|
||||
|
||||
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
|
||||
index_path = meta_root / "_index.json"
|
||||
index_path.unlink()
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.legacy_metadata_drifts == 1
|
||||
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
|
||||
assert len(legacy_issues) == 1
|
||||
assert legacy_issues[0].healed
|
||||
assert not legacy_meta.exists()
|
||||
|
||||
index_data = json.loads(index_path.read_text())
|
||||
assert "file.txt" in index_data
|
||||
assert index_data["file.txt"]["metadata"]["__etag__"] == _md5(b"hello")
|
||||
|
||||
def test_heal_drift(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
legacy_meta = storage_root / "mybucket" / ".meta" / "file.txt.meta.json"
|
||||
legacy_meta.parent.mkdir(parents=True)
|
||||
legacy_meta.write_text(json.dumps({"__etag__": "different_value"}))
|
||||
|
||||
result = checker.run_now(auto_heal=True)
|
||||
assert result.legacy_metadata_drifts == 1
|
||||
legacy_issues = _issues_of_type(result, "legacy_metadata_drift")
|
||||
assert legacy_issues[0].healed
|
||||
assert not legacy_meta.exists()
|
||||
|
||||
|
||||
class TestDryRun:
|
||||
def test_dry_run_no_changes(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
(storage_root / "mybucket" / "file.txt").write_bytes(b"corrupted")
|
||||
(storage_root / "mybucket" / "orphan.txt").write_bytes(b"orphan")
|
||||
|
||||
result = checker.run_now(auto_heal=True, dry_run=True)
|
||||
assert result.corrupted_objects == 1
|
||||
assert result.orphaned_objects == 1
|
||||
assert result.issues_healed == 0
|
||||
|
||||
meta_root = storage_root / ".myfsio.sys" / "buckets" / "mybucket" / "meta"
|
||||
index_data = json.loads((meta_root / "_index.json").read_text())
|
||||
assert "orphan.txt" not in index_data
|
||||
|
||||
|
||||
class TestBatchSize:
|
||||
def test_batch_limits_scan(self, storage_root):
|
||||
objects = {f"file{i}.txt": f"data{i}".encode() for i in range(10)}
|
||||
_setup_bucket(storage_root, "mybucket", objects)
|
||||
|
||||
checker = IntegrityChecker(
|
||||
storage_root=storage_root,
|
||||
batch_size=3,
|
||||
)
|
||||
result = checker.run_now()
|
||||
assert result.objects_scanned <= 3
|
||||
|
||||
|
||||
class TestHistory:
|
||||
def test_history_recorded(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
checker.run_now()
|
||||
history = checker.get_history()
|
||||
assert len(history) == 1
|
||||
assert "corrupted_objects" in history[0]["result"]
|
||||
|
||||
def test_history_multiple(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
checker.run_now()
|
||||
checker.run_now()
|
||||
checker.run_now()
|
||||
history = checker.get_history()
|
||||
assert len(history) == 3
|
||||
|
||||
def test_history_pagination(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "mybucket", {"file.txt": b"hello"})
|
||||
|
||||
for _ in range(5):
|
||||
checker.run_now()
|
||||
|
||||
history = checker.get_history(limit=2, offset=1)
|
||||
assert len(history) == 2
|
||||
|
||||
|
||||
AUTH_HEADERS = {"X-Access-Key": "admin", "X-Secret-Key": "adminsecret"}
|
||||
|
||||
|
||||
class TestAdminAPI:
|
||||
@pytest.fixture
|
||||
def integrity_app(self, tmp_path):
|
||||
from app import create_api_app
|
||||
storage_root = tmp_path / "data"
|
||||
iam_config = tmp_path / "iam.json"
|
||||
bucket_policies = tmp_path / "bucket_policies.json"
|
||||
iam_payload = {
|
||||
"users": [
|
||||
{
|
||||
"access_key": "admin",
|
||||
"secret_key": "adminsecret",
|
||||
"display_name": "Admin",
|
||||
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
|
||||
}
|
||||
]
|
||||
}
|
||||
iam_config.write_text(json.dumps(iam_payload))
|
||||
flask_app = create_api_app({
|
||||
"TESTING": True,
|
||||
"SECRET_KEY": "testing",
|
||||
"STORAGE_ROOT": storage_root,
|
||||
"IAM_CONFIG": iam_config,
|
||||
"BUCKET_POLICY_PATH": bucket_policies,
|
||||
"API_BASE_URL": "http://testserver",
|
||||
"INTEGRITY_ENABLED": True,
|
||||
"INTEGRITY_AUTO_HEAL": False,
|
||||
"INTEGRITY_DRY_RUN": False,
|
||||
})
|
||||
yield flask_app
|
||||
storage = flask_app.extensions.get("object_storage")
|
||||
if storage:
|
||||
base = getattr(storage, "storage", storage)
|
||||
if hasattr(base, "shutdown_stats"):
|
||||
base.shutdown_stats()
|
||||
ic = flask_app.extensions.get("integrity")
|
||||
if ic:
|
||||
ic.stop()
|
||||
|
||||
def test_status_endpoint(self, integrity_app):
|
||||
client = integrity_app.test_client()
|
||||
resp = client.get("/admin/integrity/status", headers=AUTH_HEADERS)
|
||||
assert resp.status_code == 200
|
||||
data = resp.get_json()
|
||||
assert data["enabled"] is True
|
||||
assert "interval_hours" in data
|
||||
|
||||
def test_run_endpoint(self, integrity_app):
|
||||
client = integrity_app.test_client()
|
||||
resp = client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
|
||||
assert resp.status_code == 200
|
||||
data = resp.get_json()
|
||||
assert "corrupted_objects" in data
|
||||
assert "objects_scanned" in data
|
||||
|
||||
def test_run_with_overrides(self, integrity_app):
|
||||
client = integrity_app.test_client()
|
||||
resp = client.post(
|
||||
"/admin/integrity/run",
|
||||
headers=AUTH_HEADERS,
|
||||
json={"dry_run": True, "auto_heal": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_history_endpoint(self, integrity_app):
|
||||
client = integrity_app.test_client()
|
||||
client.post("/admin/integrity/run", headers=AUTH_HEADERS, json={})
|
||||
resp = client.get("/admin/integrity/history", headers=AUTH_HEADERS)
|
||||
assert resp.status_code == 200
|
||||
data = resp.get_json()
|
||||
assert "executions" in data
|
||||
assert len(data["executions"]) >= 1
|
||||
|
||||
def test_auth_required(self, integrity_app):
|
||||
client = integrity_app.test_client()
|
||||
resp = client.get("/admin/integrity/status")
|
||||
assert resp.status_code in (401, 403)
|
||||
|
||||
def test_disabled_status(self, tmp_path):
|
||||
from app import create_api_app
|
||||
storage_root = tmp_path / "data2"
|
||||
iam_config = tmp_path / "iam2.json"
|
||||
bucket_policies = tmp_path / "bp2.json"
|
||||
iam_payload = {
|
||||
"users": [
|
||||
{
|
||||
"access_key": "admin",
|
||||
"secret_key": "adminsecret",
|
||||
"display_name": "Admin",
|
||||
"policies": [{"bucket": "*", "actions": ["list", "read", "write", "delete", "policy", "iam:*"]}],
|
||||
}
|
||||
]
|
||||
}
|
||||
iam_config.write_text(json.dumps(iam_payload))
|
||||
flask_app = create_api_app({
|
||||
"TESTING": True,
|
||||
"SECRET_KEY": "testing",
|
||||
"STORAGE_ROOT": storage_root,
|
||||
"IAM_CONFIG": iam_config,
|
||||
"BUCKET_POLICY_PATH": bucket_policies,
|
||||
"API_BASE_URL": "http://testserver",
|
||||
"INTEGRITY_ENABLED": False,
|
||||
})
|
||||
c = flask_app.test_client()
|
||||
resp = c.get("/admin/integrity/status", headers=AUTH_HEADERS)
|
||||
assert resp.status_code == 200
|
||||
data = resp.get_json()
|
||||
assert data["enabled"] is False
|
||||
|
||||
storage = flask_app.extensions.get("object_storage")
|
||||
if storage:
|
||||
base = getattr(storage, "storage", storage)
|
||||
if hasattr(base, "shutdown_stats"):
|
||||
base.shutdown_stats()
|
||||
|
||||
|
||||
class TestMultipleBuckets:
|
||||
def test_scans_multiple_buckets(self, storage_root, checker):
|
||||
_setup_bucket(storage_root, "bucket1", {"a.txt": b"aaa"})
|
||||
_setup_bucket(storage_root, "bucket2", {"b.txt": b"bbb"})
|
||||
|
||||
result = checker.run_now()
|
||||
assert result.buckets_scanned == 2
|
||||
assert result.objects_scanned == 2
|
||||
assert result.corrupted_objects == 0
|
||||
|
||||
|
||||
class TestGetStatus:
|
||||
def test_status_fields(self, checker):
|
||||
status = checker.get_status()
|
||||
assert "enabled" in status
|
||||
assert "running" in status
|
||||
assert "interval_hours" in status
|
||||
assert "batch_size" in status
|
||||
assert "auto_heal" in status
|
||||
assert "dry_run" in status
|
||||
Reference in New Issue
Block a user