From 3a590e6639f0a0db19c276a7d8ccd72c013f1bc1 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 25 Apr 2026 17:14:38 +0800 Subject: [PATCH] Make auto_heal real: peer-fetch corrupted_object with verified swap, poison-fallback on no peer --- crates/myfsio-common/src/error.rs | 4 + crates/myfsio-server/src/config.rs | 25 + crates/myfsio-server/src/handlers/ui_api.rs | 7 +- crates/myfsio-server/src/services/gc.rs | 80 +++ .../myfsio-server/src/services/integrity.rs | 662 +++++++++++++++++- crates/myfsio-server/src/services/mod.rs | 1 + .../myfsio-server/src/services/peer_fetch.rs | 256 +++++++ .../myfsio-server/src/services/replication.rs | 13 +- .../myfsio-server/src/services/site_sync.rs | 93 +-- crates/myfsio-server/src/state.rs | 46 +- crates/myfsio-server/templates/docs.html | 301 ++++---- crates/myfsio-storage/src/error.rs | 15 + crates/myfsio-storage/src/fs_backend.rs | 290 ++++++++ docs.md | 151 +++- 14 files changed, 1670 insertions(+), 274 deletions(-) create mode 100644 crates/myfsio-server/src/services/peer_fetch.rs diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs index 5a39eec..ceabe0d 100644 --- a/crates/myfsio-common/src/error.rs +++ b/crates/myfsio-common/src/error.rs @@ -29,6 +29,7 @@ pub enum S3ErrorCode { NoSuchUpload, NoSuchVersion, NoSuchTagSet, + ObjectCorrupted, PreconditionFailed, NotModified, QuotaExceeded, @@ -68,6 +69,7 @@ impl S3ErrorCode { Self::NoSuchUpload => 404, Self::NoSuchVersion => 404, Self::NoSuchTagSet => 404, + Self::ObjectCorrupted => 500, Self::PreconditionFailed => 412, Self::NotModified => 304, Self::QuotaExceeded => 403, @@ -107,6 +109,7 @@ impl S3ErrorCode { Self::NoSuchUpload => "NoSuchUpload", Self::NoSuchVersion => "NoSuchVersion", Self::NoSuchTagSet => "NoSuchTagSet", + Self::ObjectCorrupted => "ObjectCorrupted", Self::PreconditionFailed => "PreconditionFailed", Self::NotModified => "NotModified", Self::QuotaExceeded => "QuotaExceeded", @@ -148,6 +151,7 @@ impl S3ErrorCode { Self::NoSuchUpload => "The specified multipart upload does not exist", Self::NoSuchVersion => "The specified version does not exist", Self::NoSuchTagSet => "The TagSet does not exist", + Self::ObjectCorrupted => "The stored object is corrupted and cannot be served", Self::PreconditionFailed => "At least one of the preconditions you specified did not hold", Self::NotModified => "Not Modified", Self::QuotaExceeded => "The bucket quota has been exceeded", diff --git a/crates/myfsio-server/src/config.rs b/crates/myfsio-server/src/config.rs index b1ba47e..ea83d9a 100644 --- a/crates/myfsio-server/src/config.rs +++ b/crates/myfsio-server/src/config.rs @@ -39,6 +39,12 @@ pub struct ServerConfig { pub gc_lock_file_max_age_hours: f64, pub gc_dry_run: bool, pub integrity_enabled: bool, + pub integrity_interval_hours: f64, + pub integrity_batch_size: usize, + pub integrity_auto_heal: bool, + pub integrity_dry_run: bool, + pub integrity_heal_concurrency: usize, + pub integrity_quarantine_retention_days: u64, pub metrics_enabled: bool, pub metrics_history_enabled: bool, pub metrics_interval_minutes: u64, @@ -168,6 +174,13 @@ impl ServerConfig { let gc_dry_run = parse_bool_env("GC_DRY_RUN", false); let integrity_enabled = parse_bool_env("INTEGRITY_ENABLED", false); + let integrity_interval_hours = parse_f64_env("INTEGRITY_INTERVAL_HOURS", 24.0); + let integrity_batch_size = parse_usize_env("INTEGRITY_BATCH_SIZE", 10_000); + let integrity_auto_heal = parse_bool_env("INTEGRITY_AUTO_HEAL", false); + let integrity_dry_run = parse_bool_env("INTEGRITY_DRY_RUN", false); + let integrity_heal_concurrency = parse_usize_env("INTEGRITY_HEAL_CONCURRENCY", 4); + let integrity_quarantine_retention_days = + parse_u64_env("INTEGRITY_QUARANTINE_RETENTION_DAYS", 7); let metrics_enabled = parse_bool_env("OPERATION_METRICS_ENABLED", false); @@ -276,6 +289,12 @@ impl ServerConfig { gc_lock_file_max_age_hours, gc_dry_run, integrity_enabled, + integrity_interval_hours, + integrity_batch_size, + integrity_auto_heal, + integrity_dry_run, + integrity_heal_concurrency, + integrity_quarantine_retention_days, metrics_enabled, metrics_history_enabled, metrics_interval_minutes, @@ -357,6 +376,12 @@ impl Default for ServerConfig { gc_lock_file_max_age_hours: 1.0, gc_dry_run: false, integrity_enabled: false, + integrity_interval_hours: 24.0, + integrity_batch_size: 10_000, + integrity_auto_heal: false, + integrity_dry_run: false, + integrity_heal_concurrency: 4, + integrity_quarantine_retention_days: 7, metrics_enabled: false, metrics_history_enabled: false, metrics_interval_minutes: 5, diff --git a/crates/myfsio-server/src/handlers/ui_api.rs b/crates/myfsio-server/src/handlers/ui_api.rs index 8c681b3..3e3b494 100644 --- a/crates/myfsio-server/src/handlers/ui_api.rs +++ b/crates/myfsio-server/src/handlers/ui_api.rs @@ -129,9 +129,10 @@ fn storage_status(err: &StorageError) -> StatusCode { | StorageError::QuotaExceeded(_) => StatusCode::BAD_REQUEST, StorageError::BucketAlreadyExists(_) => StatusCode::CONFLICT, StorageError::BucketNotEmpty(_) => StatusCode::CONFLICT, - StorageError::Io(_) | StorageError::Json(_) | StorageError::Internal(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + StorageError::Io(_) + | StorageError::Json(_) + | StorageError::Internal(_) + | StorageError::ObjectCorrupted { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/crates/myfsio-server/src/services/gc.rs b/crates/myfsio-server/src/services/gc.rs index fb345e9..a72fabc 100644 --- a/crates/myfsio-server/src/services/gc.rs +++ b/crates/myfsio-server/src/services/gc.rs @@ -9,6 +9,7 @@ pub struct GcConfig { pub temp_file_max_age_hours: f64, pub multipart_max_age_days: u64, pub lock_file_max_age_hours: f64, + pub quarantine_max_age_days: u64, pub dry_run: bool, } @@ -19,6 +20,7 @@ impl Default for GcConfig { temp_file_max_age_hours: 24.0, multipart_max_age_days: 7, lock_file_max_age_hours: 1.0, + quarantine_max_age_days: 7, dry_run: false, } } @@ -106,6 +108,7 @@ impl GcService { "temp_file_max_age_hours": self.config.temp_file_max_age_hours, "multipart_max_age_days": self.config.multipart_max_age_days, "lock_file_max_age_hours": self.config.lock_file_max_age_hours, + "quarantine_max_age_days": self.config.quarantine_max_age_days, "dry_run": self.config.dry_run, }) } @@ -164,6 +167,8 @@ impl GcService { let mut multipart_uploads_deleted = 0u64; let mut lock_files_deleted = 0u64; let mut empty_dirs_removed = 0u64; + let mut quarantine_entries_deleted = 0u64; + let mut quarantine_bytes_freed = 0u64; let mut errors: Vec = Vec::new(); let now = std::time::SystemTime::now(); @@ -173,6 +178,8 @@ impl GcService { std::time::Duration::from_secs(self.config.multipart_max_age_days * 86400); let lock_max_age = std::time::Duration::from_secs_f64(self.config.lock_file_max_age_hours * 3600.0); + let quarantine_max_age = + std::time::Duration::from_secs(self.config.quarantine_max_age_days * 86400); let tmp_dir = self.storage_root.join(".myfsio.sys").join("tmp"); if tmp_dir.exists() { @@ -256,6 +263,58 @@ impl GcService { } } + let quarantine_dir = self.storage_root.join(".myfsio.sys").join("quarantine"); + if quarantine_dir.exists() { + if let Ok(bucket_dirs) = std::fs::read_dir(&quarantine_dir) { + for bucket_entry in bucket_dirs.flatten() { + if !bucket_entry.path().is_dir() { + continue; + } + if let Ok(ts_dirs) = std::fs::read_dir(bucket_entry.path()) { + for ts_entry in ts_dirs.flatten() { + let ts_path = ts_entry.path(); + if !ts_path.is_dir() { + continue; + } + let modified = ts_entry + .metadata() + .ok() + .and_then(|m| m.modified().ok()); + let Some(modified) = modified else { + continue; + }; + let Ok(age) = now.duration_since(modified) else { + continue; + }; + if age <= quarantine_max_age { + continue; + } + let bytes = dir_total_bytes(&ts_path); + if !dry_run { + if let Err(e) = std::fs::remove_dir_all(&ts_path) { + errors.push(format!( + "Failed to remove quarantine {}: {}", + ts_path.display(), + e + )); + continue; + } + } + quarantine_entries_deleted += 1; + quarantine_bytes_freed += bytes; + } + } + if !dry_run { + if let Ok(mut remaining) = std::fs::read_dir(bucket_entry.path()) { + if remaining.next().is_none() { + let _ = std::fs::remove_dir(bucket_entry.path()); + } + } + } + } + } + } + if !dry_run { for dir in [&tmp_dir, &multipart_dir] { if dir.exists() { @@ -281,6 +340,8 @@ impl GcService { "multipart_uploads_deleted": multipart_uploads_deleted, "lock_files_deleted": lock_files_deleted, "empty_dirs_removed": empty_dirs_removed, + "quarantine_entries_deleted": quarantine_entries_deleted, + "quarantine_bytes_freed": quarantine_bytes_freed, "errors": errors, }) } @@ -313,3 +374,22 @@ impl GcService { }) } } + +fn dir_total_bytes(path: &std::path::Path) -> u64 { + let mut total: u64 = 0; + let mut stack: Vec = vec![path.to_path_buf()]; + while let Some(dir) = stack.pop() { + let Ok(entries) = std::fs::read_dir(&dir) else { + continue; + }; + for entry in entries.flatten() { + let Ok(ft) = entry.file_type() else { continue }; + if ft.is_dir() { + stack.push(entry.path()); + } else if ft.is_file() { + total = total.saturating_add(entry.metadata().map(|m| m.len()).unwrap_or(0)); + } + } + } + total +} diff --git a/crates/myfsio-server/src/services/integrity.rs b/crates/myfsio-server/src/services/integrity.rs index 0d7ef03..d81985e 100644 --- a/crates/myfsio-server/src/services/integrity.rs +++ b/crates/myfsio-server/src/services/integrity.rs @@ -1,22 +1,31 @@ use myfsio_common::constants::{ BUCKET_META_DIR, BUCKET_VERSIONS_DIR, INDEX_FILE, SYSTEM_BUCKETS_DIR, SYSTEM_ROOT, }; -use myfsio_storage::fs_backend::FsStorageBackend; +use myfsio_storage::fs_backend::{ + is_multipart_etag, metadata_is_corrupted, FsStorageBackend, META_KEY_CORRUPTED, + META_KEY_CORRUPTED_AT, META_KEY_CORRUPTION_DETAIL, META_KEY_QUARANTINE_PATH, +}; +use myfsio_storage::traits::StorageEngine; use serde_json::{json, Map, Value}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, Semaphore}; + +use crate::services::peer_fetch::{HealOutcome, PeerFetcher}; const MAX_ISSUES: usize = 500; const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"]; +const QUARANTINE_DIR: &str = "quarantine"; pub struct IntegrityConfig { pub interval_hours: f64, pub batch_size: usize, pub auto_heal: bool, pub dry_run: bool, + pub heal_concurrency: usize, + pub quarantine_retention_days: u64, } impl Default for IntegrityConfig { @@ -26,21 +35,50 @@ impl Default for IntegrityConfig { batch_size: 10_000, auto_heal: false, dry_run: false, + heal_concurrency: 4, + quarantine_retention_days: 7, } } } pub struct IntegrityService { - #[allow(dead_code)] storage: Arc, storage_root: PathBuf, config: IntegrityConfig, + peer_fetcher: Option>, running: Arc>, started_at: Arc>>, history: Arc>>, history_path: PathBuf, } +#[derive(Default, Clone)] +struct HealStats { + found: u64, + healed: u64, + poisoned: u64, + peer_mismatch: u64, + peer_unavailable: u64, + verify_failed: u64, + failed: u64, + skipped: u64, +} + +impl HealStats { + fn to_value(&self) -> Value { + json!({ + "found": self.found, + "healed": self.healed, + "poisoned": self.poisoned, + "peer_mismatch": self.peer_mismatch, + "peer_unavailable": self.peer_unavailable, + "verify_failed": self.verify_failed, + "failed": self.failed, + "skipped": self.skipped, + }) + } +} + #[derive(Default)] struct ScanState { objects_scanned: u64, @@ -69,22 +107,6 @@ impl ScanState { })); } } - - fn into_json(self, elapsed: f64) -> Value { - json!({ - "objects_scanned": self.objects_scanned, - "buckets_scanned": self.buckets_scanned, - "corrupted_objects": self.corrupted_objects, - "orphaned_objects": self.orphaned_objects, - "phantom_metadata": self.phantom_metadata, - "stale_versions": self.stale_versions, - "etag_cache_inconsistencies": self.etag_cache_inconsistencies, - "issues_healed": 0, - "issues": self.issues, - "errors": self.errors, - "execution_time_seconds": elapsed, - }) - } } impl IntegrityService { @@ -92,6 +114,7 @@ impl IntegrityService { storage: Arc, storage_root: &Path, config: IntegrityConfig, + peer_fetcher: Option>, ) -> Self { let history_path = storage_root .join(SYSTEM_ROOT) @@ -112,6 +135,7 @@ impl IntegrityService { storage, storage_root: storage_root.to_path_buf(), config, + peer_fetcher, running: Arc::new(RwLock::new(false)), started_at: Arc::new(RwLock::new(None)), history: Arc::new(RwLock::new(history)), @@ -136,6 +160,8 @@ impl IntegrityService { "batch_size": self.config.batch_size, "auto_heal": self.config.auto_heal, "dry_run": self.config.dry_run, + "heal_concurrency": self.config.heal_concurrency, + "peer_heal_available": self.peer_fetcher.is_some(), }) } @@ -159,7 +185,7 @@ impl IntegrityService { let start = Instant::now(); let storage_root = self.storage_root.clone(); let batch_size = self.config.batch_size; - let result = + let scan_state = tokio::task::spawn_blocking(move || scan_all_buckets(&storage_root, batch_size)) .await .unwrap_or_else(|e| { @@ -167,12 +193,19 @@ impl IntegrityService { st.errors.push(format!("scan task failed: {}", e)); st }); + + let heal_stats = if auto_heal && !dry_run { + self.run_heal_phase(&scan_state).await + } else { + BTreeMap::new() + }; + let elapsed = start.elapsed().as_secs_f64(); *self.running.write().await = false; *self.started_at.write().await = None; - let result_json = result.into_json(elapsed); + let result_json = build_result_json(scan_state, heal_stats, elapsed); let record = json!({ "timestamp": chrono::Utc::now().timestamp_millis() as f64 / 1000.0, @@ -194,6 +227,77 @@ impl IntegrityService { Ok(result_json) } + async fn run_heal_phase(&self, scan: &ScanState) -> BTreeMap { + let mut stats: BTreeMap = BTreeMap::new(); + let issues: Vec = scan.issues.clone(); + let semaphore = Arc::new(Semaphore::new(self.config.heal_concurrency.max(1))); + let mut tasks: Vec> = Vec::new(); + + for issue in issues { + let issue_type = issue + .get("issue_type") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let bucket = issue + .get("bucket") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let key = issue + .get("key") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let detail = issue + .get("detail") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + stats.entry(issue_type.clone()).or_default().found += 1; + + let permit = match semaphore.clone().acquire_owned().await { + Ok(p) => p, + Err(_) => continue, + }; + let storage = self.storage.clone(); + let storage_root = self.storage_root.clone(); + let peer_fetcher = self.peer_fetcher.clone(); + + tasks.push(tokio::spawn(async move { + let _permit = permit; + heal_issue( + &storage, + &storage_root, + peer_fetcher.as_deref(), + &issue_type, + &bucket, + &key, + &detail, + ) + .await + })); + } + + for task in tasks { + if let Ok(report) = task.await { + let entry = stats.entry(report.issue_type).or_default(); + match report.status { + HealStatus::Healed => entry.healed += 1, + HealStatus::Poisoned => entry.poisoned += 1, + HealStatus::PeerMismatch => entry.peer_mismatch += 1, + HealStatus::PeerUnavailable => entry.peer_unavailable += 1, + HealStatus::VerifyFailed => entry.verify_failed += 1, + HealStatus::Failed => entry.failed += 1, + HealStatus::Skipped => entry.skipped += 1, + } + } + } + + stats + } + async fn save_history(&self) { let history = self.history.read().await; let data = json!({ "executions": *history }); @@ -208,13 +312,15 @@ impl IntegrityService { pub fn start_background(self: Arc) -> tokio::task::JoinHandle<()> { let interval = std::time::Duration::from_secs_f64(self.config.interval_hours * 3600.0); + let auto_heal = self.config.auto_heal; + let dry_run = self.config.dry_run; tokio::spawn(async move { let mut timer = tokio::time::interval(interval); timer.tick().await; loop { timer.tick().await; tracing::info!("Integrity check starting"); - match self.run_now(false, false).await { + match self.run_now(dry_run, auto_heal).await { Ok(result) => tracing::info!("Integrity check complete: {:?}", result), Err(e) => tracing::warn!("Integrity check failed: {}", e), } @@ -223,6 +329,376 @@ impl IntegrityService { } } +#[derive(Debug)] +enum HealStatus { + Healed, + Poisoned, + PeerMismatch, + PeerUnavailable, + VerifyFailed, + Failed, + Skipped, +} + +struct HealReport { + issue_type: String, + status: HealStatus, +} + +async fn heal_issue( + storage: &FsStorageBackend, + storage_root: &Path, + peer_fetcher: Option<&PeerFetcher>, + issue_type: &str, + bucket: &str, + key: &str, + detail: &str, +) -> HealReport { + let status = match issue_type { + "corrupted_object" => { + heal_corrupted(storage, storage_root, peer_fetcher, bucket, key, detail).await + } + "stale_version" => heal_stale_version(storage_root, bucket, key).await, + "etag_cache_inconsistency" => heal_etag_cache(storage_root, bucket, key, detail).await, + "phantom_metadata" => heal_phantom_metadata(storage, bucket, key).await, + _ => HealStatus::Skipped, + }; + HealReport { + issue_type: issue_type.to_string(), + status, + } +} + +async fn heal_corrupted( + storage: &FsStorageBackend, + storage_root: &Path, + peer_fetcher: Option<&PeerFetcher>, + bucket: &str, + key: &str, + detail: &str, +) -> HealStatus { + let stored_etag = parse_stored_etag(detail); + let actual_etag = parse_actual_etag(detail); + + let live_path = storage_root.join(bucket).join(key); + let quarantine_rel = quarantine_relative_path(bucket, key); + let quarantine_full = storage_root.join(&quarantine_rel); + + if let Some(parent) = quarantine_full.parent() { + if let Err(e) = std::fs::create_dir_all(parent) { + tracing::error!("Heal {}/{}: mkdir quarantine failed: {}", bucket, key, e); + return HealStatus::Failed; + } + } + + if live_path.exists() { + if let Err(e) = std::fs::rename(&live_path, &quarantine_full) { + tracing::error!( + "Heal {}/{}: quarantine rename failed: {}", + bucket, + key, + e + ); + return HealStatus::Failed; + } + } + + let quarantine_rel_str = quarantine_rel.to_string_lossy().replace('\\', "/"); + + if !stored_etag.is_empty() { + if let Some(fetcher) = peer_fetcher { + let nonce = uuid::Uuid::new_v4().simple().to_string(); + let temp_path = live_path.with_file_name(format!( + "{}.healing.{}", + live_path + .file_name() + .map(|n| n.to_string_lossy().into_owned()) + .unwrap_or_else(|| "healing".to_string()), + nonce + )); + match fetcher + .fetch_for_heal(bucket, key, &stored_etag, &temp_path) + .await + { + HealOutcome::Healed { peer_etag, bytes } => { + if let Err(e) = atomic_swap(&temp_path, &live_path) { + tracing::error!( + "Heal {}/{}: atomic swap failed: {} (restoring from quarantine)", + bucket, + key, + e + ); + let _ = std::fs::rename(&quarantine_full, &live_path); + let _ = std::fs::remove_file(&temp_path); + return HealStatus::Failed; + } + let _ = clear_poison_metadata(storage, bucket, key).await; + tracing::info!( + "Healed {}/{} from peer (etag={}, bytes={})", + bucket, + key, + peer_etag, + bytes + ); + return HealStatus::Healed; + } + HealOutcome::PeerMismatch { stored, peer } => { + let msg = format!("peer etag {} != stored {}", peer, stored); + let _ = + poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + tracing::warn!("Heal {}/{}: peer mismatch ({}), poisoned", bucket, key, msg); + return HealStatus::PeerMismatch; + } + HealOutcome::PeerUnavailable { error } => { + tracing::warn!( + "Heal {}/{}: peer unavailable ({}), poisoning", + bucket, + key, + error + ); + let msg = format!( + "etag mismatch (stored={}, actual={}) — peer unavailable: {}", + stored_etag, actual_etag, error + ); + let _ = + poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + return HealStatus::PeerUnavailable; + } + HealOutcome::VerifyFailed { expected, actual } => { + let msg = format!("peer download verify failed: expected={} actual={}", expected, actual); + let _ = + poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + tracing::warn!("Heal {}/{}: {}", bucket, key, msg); + return HealStatus::VerifyFailed; + } + HealOutcome::NotConfigured => { + let msg = format!( + "etag mismatch (stored={}, actual={}); no peer configured", + stored_etag, actual_etag + ); + let _ = + poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + return HealStatus::Poisoned; + } + } + } + } + + let msg = format!( + "etag mismatch (stored={}, actual={}); no peer fetcher", + stored_etag, actual_etag + ); + let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + HealStatus::Poisoned +} + +async fn heal_stale_version(storage_root: &Path, bucket: &str, key: &str) -> HealStatus { + let versions_root = storage_root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_VERSIONS_DIR); + let src = versions_root.join(key); + if !src.exists() { + return HealStatus::Skipped; + } + let ts = chrono::Utc::now().format("%Y%m%dT%H%M%S").to_string(); + let dst = storage_root + .join(SYSTEM_ROOT) + .join(QUARANTINE_DIR) + .join(bucket) + .join(&ts) + .join("versions") + .join(key); + if let Some(parent) = dst.parent() { + if let Err(e) = std::fs::create_dir_all(parent) { + tracing::error!("Stale-version quarantine mkdir failed {}/{}: {}", bucket, key, e); + return HealStatus::Failed; + } + } + if let Err(e) = std::fs::rename(&src, &dst) { + tracing::error!("Stale-version quarantine rename failed {}/{}: {}", bucket, key, e); + return HealStatus::Failed; + } + tracing::info!("Quarantined stale version {}/{}", bucket, key); + HealStatus::Healed +} + +async fn heal_etag_cache( + storage_root: &Path, + bucket: &str, + key: &str, + _detail: &str, +) -> HealStatus { + let etag_index_path = storage_root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join("etag_index.json"); + if !etag_index_path.exists() { + return HealStatus::Skipped; + } + + let meta_root = storage_root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_META_DIR); + let entries = collect_index_entries(&meta_root); + let canonical = entries.get(key).and_then(|info| stored_etag(&info.entry)); + + let mut cache: HashMap = match std::fs::read_to_string(&etag_index_path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + { + Some(Value::Object(m)) => m.into_iter().collect(), + _ => return HealStatus::Failed, + }; + + match canonical { + Some(etag) => { + cache.insert(key.to_string(), Value::String(etag)); + } + None => { + cache.remove(key); + } + } + + let json_obj: serde_json::Map = cache.into_iter().collect(); + match std::fs::write( + &etag_index_path, + serde_json::to_string_pretty(&Value::Object(json_obj)).unwrap_or_default(), + ) { + Ok(_) => HealStatus::Healed, + Err(e) => { + tracing::error!("etag-cache rewrite failed {}/{}: {}", bucket, key, e); + HealStatus::Failed + } + } +} + +async fn heal_phantom_metadata( + storage: &FsStorageBackend, + bucket: &str, + key: &str, +) -> HealStatus { + match storage.delete_object_metadata_entry(bucket, key).await { + Ok(_) => { + tracing::info!("Dropped phantom metadata for {}/{}", bucket, key); + HealStatus::Healed + } + Err(e) => { + tracing::error!("Failed to drop phantom metadata {}/{}: {}", bucket, key, e); + HealStatus::Failed + } + } +} + +async fn poison_metadata( + storage: &FsStorageBackend, + bucket: &str, + key: &str, + detail: &str, + quarantine_rel: &str, +) -> Result<(), String> { + let mut meta = storage + .get_object_metadata(bucket, key) + .await + .unwrap_or_default(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + meta.insert( + META_KEY_CORRUPTED_AT.to_string(), + chrono::Utc::now().to_rfc3339(), + ); + meta.insert(META_KEY_CORRUPTION_DETAIL.to_string(), detail.to_string()); + meta.insert( + META_KEY_QUARANTINE_PATH.to_string(), + quarantine_rel.to_string(), + ); + storage + .put_object_metadata(bucket, key, &meta) + .await + .map_err(|e| e.to_string()) +} + +async fn clear_poison_metadata( + storage: &FsStorageBackend, + bucket: &str, + key: &str, +) -> Result<(), String> { + let mut meta = storage + .get_object_metadata(bucket, key) + .await + .unwrap_or_default(); + meta.remove(META_KEY_CORRUPTED); + meta.remove(META_KEY_CORRUPTED_AT); + meta.remove(META_KEY_CORRUPTION_DETAIL); + meta.remove(META_KEY_QUARANTINE_PATH); + storage + .put_object_metadata(bucket, key, &meta) + .await + .map_err(|e| e.to_string()) +} + +fn quarantine_relative_path(bucket: &str, key: &str) -> PathBuf { + let ts = chrono::Utc::now().format("%Y%m%dT%H%M%S").to_string(); + PathBuf::from(SYSTEM_ROOT) + .join(QUARANTINE_DIR) + .join(bucket) + .join(ts) + .join(key) +} + +fn atomic_swap(src: &Path, dst: &Path) -> std::io::Result<()> { + if let Some(parent) = dst.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::rename(src, dst) +} + +fn parse_stored_etag(detail: &str) -> String { + detail + .split_whitespace() + .find_map(|s| s.strip_prefix("stored_etag=")) + .unwrap_or("") + .to_string() +} + +fn parse_actual_etag(detail: &str) -> String { + detail + .split_whitespace() + .find_map(|s| s.strip_prefix("actual_etag=")) + .unwrap_or("") + .to_string() +} + +fn build_result_json( + state: ScanState, + heal_stats: BTreeMap, + elapsed: f64, +) -> Value { + let issues_healed: u64 = heal_stats.values().map(|s| s.healed).sum(); + let heal_stats_json: serde_json::Map = heal_stats + .iter() + .map(|(k, v)| (k.clone(), v.to_value())) + .collect(); + + json!({ + "objects_scanned": state.objects_scanned, + "buckets_scanned": state.buckets_scanned, + "corrupted_objects": state.corrupted_objects, + "orphaned_objects": state.orphaned_objects, + "phantom_metadata": state.phantom_metadata, + "stale_versions": state.stale_versions, + "etag_cache_inconsistencies": state.etag_cache_inconsistencies, + "issues_healed": issues_healed, + "heal_stats": Value::Object(heal_stats_json), + "issues": state.issues, + "errors": state.errors, + "execution_time_seconds": elapsed, + }) +} + fn scan_all_buckets(storage_root: &Path, batch_size: usize) -> ScanState { let mut state = ScanState::default(); let buckets = match list_bucket_names(storage_root) { @@ -359,6 +835,18 @@ fn stored_etag(entry: &Value) -> Option { .map(|s| s.to_string()) } +fn entry_metadata_map(entry: &Value) -> HashMap { + entry + .get("metadata") + .and_then(|m| m.as_object()) + .map(|m| { + m.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect() + }) + .unwrap_or_default() +} + fn check_corrupted( state: &mut ScanState, bucket: &str, @@ -378,12 +866,20 @@ fn check_corrupted( if !object_path.exists() { continue; } + let meta_map = entry_metadata_map(&info.entry); + if metadata_is_corrupted(&meta_map) { + continue; + } state.objects_scanned += 1; let Some(stored) = stored_etag(&info.entry) else { continue; }; + if is_multipart_etag(&stored) { + continue; + } + match myfsio_crypto::hashing::md5_file(&object_path) { Ok(actual) => { if actual != stored { @@ -417,6 +913,10 @@ fn check_phantom( if state.batch_exhausted(batch_size) { return; } + let info = &entries[full_key]; + if metadata_is_corrupted(&entry_metadata_map(&info.entry)) { + continue; + } state.objects_scanned += 1; let object_path = bucket_path.join(full_key); if !object_path.exists() { @@ -729,4 +1229,120 @@ mod tests { let state = scan_all_buckets(tmp.path(), 100); assert_eq!(state.buckets_scanned, 0); } + + #[test] + fn poisoned_entries_are_skipped_during_corruption_scan() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let bucket = "testbucket"; + let bucket_path = root.join(bucket); + let meta_root = root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_META_DIR); + fs::create_dir_all(&bucket_path).unwrap(); + fs::create_dir_all(&meta_root).unwrap(); + + let bytes = b"some bytes that wont match"; + fs::write(bucket_path.join("rotted.txt"), bytes).unwrap(); + + let mut map = Map::new(); + map.insert( + "rotted.txt".to_string(), + json!({ + "metadata": { + "__etag__": "00000000000000000000000000000000", + "__corrupted__": "true", + "__corruption_detail__": "etag mismatch (already poisoned)", + } + }), + ); + fs::write( + meta_root.join(INDEX_FILE), + serde_json::to_string(&Value::Object(map)).unwrap(), + ) + .unwrap(); + + let state = scan_all_buckets(root, 10_000); + assert_eq!(state.corrupted_objects, 0, "poisoned entries must not re-flag"); + } + + #[test] + fn parse_etag_helpers() { + let detail = "stored_etag=abc123 actual_etag=def456"; + assert_eq!(parse_stored_etag(detail), "abc123"); + assert_eq!(parse_actual_etag(detail), "def456"); + } + + #[test] + fn poisoned_entry_with_missing_file_is_not_phantom() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let bucket = "testbucket"; + let bucket_path = root.join(bucket); + let meta_root = root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_META_DIR); + fs::create_dir_all(&bucket_path).unwrap(); + fs::create_dir_all(&meta_root).unwrap(); + + let mut map = Map::new(); + map.insert( + "quarantined.txt".to_string(), + json!({ + "metadata": { + "__etag__": "deadbeefdeadbeefdeadbeefdeadbeef", + "__corrupted__": "true", + "__corruption_detail__": "etag mismatch (no peer)", + "__quarantine_path__": ".myfsio.sys/quarantine/testbucket/2026/quarantined.txt", + } + }), + ); + fs::write( + meta_root.join(INDEX_FILE), + serde_json::to_string(&Value::Object(map)).unwrap(), + ) + .unwrap(); + + let state = scan_all_buckets(root, 10_000); + assert_eq!( + state.phantom_metadata, 0, + "poisoned entries with quarantined files must not be reported as phantom metadata" + ); + assert_eq!(state.corrupted_objects, 0); + } + + #[test] + fn healthy_multipart_object_is_not_flagged_corrupted() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let bucket = "testbucket"; + let bucket_path = root.join(bucket); + let meta_root = root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_META_DIR); + fs::create_dir_all(&bucket_path).unwrap(); + + fs::write(bucket_path.join("multi.bin"), b"healthy multipart body").unwrap(); + + write_index( + &meta_root, + &[( + "multi.bin", + "deadbeefdeadbeefdeadbeefdeadbeef-3", + )], + ); + + let state = scan_all_buckets(root, 10_000); + assert_eq!( + state.corrupted_objects, 0, + "multipart-style ETags must not be checked against whole-body MD5" + ); + assert!(state.errors.is_empty(), "unexpected errors: {:?}", state.errors); + } } diff --git a/crates/myfsio-server/src/services/mod.rs b/crates/myfsio-server/src/services/mod.rs index b8ddb47..f21b6fc 100644 --- a/crates/myfsio-server/src/services/mod.rs +++ b/crates/myfsio-server/src/services/mod.rs @@ -6,6 +6,7 @@ pub mod lifecycle; pub mod metrics; pub mod notifications; pub mod object_lock; +pub mod peer_fetch; pub mod replication; pub mod s3_client; pub mod site_registry; diff --git a/crates/myfsio-server/src/services/peer_fetch.rs b/crates/myfsio-server/src/services/peer_fetch.rs new file mode 100644 index 0000000..687360e --- /dev/null +++ b/crates/myfsio-server/src/services/peer_fetch.rs @@ -0,0 +1,256 @@ +use std::collections::HashMap; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; + +use aws_sdk_s3::Client; +use md5::{Digest, Md5}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; + +use myfsio_storage::fs_backend::{is_multipart_etag, FsStorageBackend}; +use myfsio_storage::traits::StorageEngine; + +use crate::services::replication::ReplicationManager; +use crate::services::s3_client::{build_client, ClientOptions}; +use crate::stores::connections::ConnectionStore; + +pub struct PeerFetcher { + storage: Arc, + connections: Arc, + replication: Arc, + client_options: ClientOptions, +} + +#[derive(Debug)] +pub enum HealOutcome { + Healed { peer_etag: String, bytes: u64 }, + PeerMismatch { stored: String, peer: String }, + PeerUnavailable { error: String }, + NotConfigured, + VerifyFailed { expected: String, actual: String }, +} + +impl PeerFetcher { + pub fn new( + storage: Arc, + connections: Arc, + replication: Arc, + client_options: ClientOptions, + ) -> Self { + Self { + storage, + connections, + replication, + client_options, + } + } + + fn build_client_for_bucket(&self, bucket: &str) -> Option<(Client, String)> { + let rule = self.replication.get_rule(bucket)?; + if !rule.enabled { + return None; + } + let conn = self.connections.get(&rule.target_connection_id)?; + let client = build_client(&conn, &self.client_options); + Some((client, rule.target_bucket)) + } + + pub async fn fetch_into_storage( + &self, + client: &Client, + remote_bucket: &str, + local_bucket: &str, + key: &str, + ) -> bool { + let resp = match client + .get_object() + .bucket(remote_bucket) + .key(key) + .send() + .await + { + Ok(r) => r, + Err(err) => { + tracing::error!("Pull GetObject failed {}/{}: {:?}", local_bucket, key, err); + return false; + } + }; + + let head = match client + .head_object() + .bucket(remote_bucket) + .key(key) + .send() + .await + { + Ok(r) => r, + Err(err) => { + tracing::error!("Pull HeadObject failed {}/{}: {:?}", local_bucket, key, err); + return false; + } + }; + + let metadata: Option> = head + .metadata() + .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()); + + let stream = resp.body.into_async_read(); + let boxed: Pin> = Box::pin(stream); + + match self + .storage + .put_object(local_bucket, key, boxed, metadata) + .await + { + Ok(_) => { + tracing::debug!("Pulled object {}/{} from remote", local_bucket, key); + true + } + Err(err) => { + tracing::error!( + "Store pulled object failed {}/{}: {}", + local_bucket, + key, + err + ); + false + } + } + } + + pub async fn fetch_for_heal( + &self, + local_bucket: &str, + key: &str, + expected_etag: &str, + dest_path: &Path, + ) -> HealOutcome { + let (client, target_bucket) = match self.build_client_for_bucket(local_bucket) { + Some(v) => v, + None => return HealOutcome::NotConfigured, + }; + + let head = match client + .head_object() + .bucket(&target_bucket) + .key(key) + .send() + .await + { + Ok(r) => r, + Err(err) => { + return HealOutcome::PeerUnavailable { + error: format!("HeadObject: {:?}", err), + }; + } + }; + + let peer_etag = head.e_tag().unwrap_or("").trim_matches('"').to_string(); + if peer_etag.is_empty() { + return HealOutcome::PeerUnavailable { + error: "remote returned empty ETag".into(), + }; + } + if peer_etag != expected_etag { + return HealOutcome::PeerMismatch { + stored: expected_etag.to_string(), + peer: peer_etag, + }; + } + + let resp = match client + .get_object() + .bucket(&target_bucket) + .key(key) + .send() + .await + { + Ok(r) => r, + Err(err) => { + return HealOutcome::PeerUnavailable { + error: format!("GetObject: {:?}", err), + }; + } + }; + + if let Some(parent) = dest_path.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + return HealOutcome::PeerUnavailable { + error: format!("mkdir parent: {}", e), + }; + } + } + + let mut file = match tokio::fs::File::create(dest_path).await { + Ok(f) => f, + Err(e) => { + return HealOutcome::PeerUnavailable { + error: format!("create temp: {}", e), + }; + } + }; + let mut reader = resp.body.into_async_read(); + let mut hasher = Md5::new(); + let mut buf = vec![0u8; 64 * 1024]; + let mut total: u64 = 0; + loop { + let n = match reader.read(&mut buf).await { + Ok(n) => n, + Err(e) => { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::PeerUnavailable { + error: format!("read body: {}", e), + }; + } + }; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + if let Err(e) = file.write_all(&buf[..n]).await { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::PeerUnavailable { + error: format!("write temp: {}", e), + }; + } + total += n as u64; + } + if let Err(e) = file.flush().await { + return HealOutcome::PeerUnavailable { + error: format!("flush temp: {}", e), + }; + } + drop(file); + + let actual = format!("{:x}", hasher.finalize()); + if !is_multipart_etag(expected_etag) && actual != expected_etag { + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::VerifyFailed { + expected: expected_etag.to_string(), + actual, + }; + } + + HealOutcome::Healed { + peer_etag, + bytes: total, + } + } +} + +#[cfg(test)] +mod tests { + use myfsio_storage::fs_backend::is_multipart_etag; + + #[test] + fn detects_multipart_etags() { + assert!(is_multipart_etag("d41d8cd98f00b204e9800998ecf8427e-3")); + assert!(is_multipart_etag("00000000000000000000000000000000-1")); + assert!(!is_multipart_etag("d41d8cd98f00b204e9800998ecf8427e")); + assert!(!is_multipart_etag("d41d8cd98f00b204e9800998ecf8427e-")); + assert!(!is_multipart_etag("not-hex-at-all-1")); + assert!(!is_multipart_etag("d41d8cd98f00b204e9800998ecf8427e-abc")); + } +} diff --git a/crates/myfsio-server/src/services/replication.rs b/crates/myfsio-server/src/services/replication.rs index 922174f..7efa047 100644 --- a/crates/myfsio-server/src/services/replication.rs +++ b/crates/myfsio-server/src/services/replication.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use myfsio_common::types::ListParams; -use myfsio_storage::fs_backend::FsStorageBackend; +use myfsio_storage::fs_backend::{metadata_is_corrupted, FsStorageBackend}; use myfsio_storage::traits::StorageEngine; use crate::services::s3_client::{build_client, check_endpoint_health, ClientOptions}; @@ -483,6 +483,17 @@ impl ReplicationManager { return; } + if let Ok(src_meta) = self.storage.get_object_metadata(bucket, object_key).await { + if metadata_is_corrupted(&src_meta) { + tracing::warn!( + "Replication skipped for {}/{}: source object is poisoned (corrupted)", + bucket, + object_key + ); + return; + } + } + let src_path = match self.storage.get_object_path(bucket, object_key).await { Ok(p) => p, Err(_) => { diff --git a/crates/myfsio-server/src/services/site_sync.rs b/crates/myfsio-server/src/services/site_sync.rs index a1e6a67..c70595a 100644 --- a/crates/myfsio-server/src/services/site_sync.rs +++ b/crates/myfsio-server/src/services/site_sync.rs @@ -1,19 +1,18 @@ use std::collections::HashMap; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use aws_sdk_s3::Client; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncRead; use tokio::sync::Notify; use myfsio_common::types::{ListParams, ObjectMeta}; use myfsio_storage::fs_backend::FsStorageBackend; use myfsio_storage::traits::StorageEngine; +use crate::services::peer_fetch::PeerFetcher; use crate::services::replication::{ReplicationManager, ReplicationRule, MODE_BIDIRECTIONAL}; use crate::services::s3_client::{build_client, ClientOptions}; use crate::stores::connections::ConnectionStore; @@ -53,6 +52,7 @@ pub struct SiteSyncWorker { storage: Arc, connections: Arc, replication: Arc, + peer_fetcher: Arc, storage_root: PathBuf, interval: Duration, batch_size: usize, @@ -75,24 +75,40 @@ impl SiteSyncWorker { max_retries: u32, clock_skew_tolerance: f64, ) -> Self { - Self { - storage, - connections, - replication, - storage_root, - interval: Duration::from_secs(interval_seconds), - batch_size, - clock_skew_tolerance, - client_options: ClientOptions { + let client_options = ClientOptions { + connect_timeout, + read_timeout, + max_attempts: max_retries, + }; + let peer_fetcher = Arc::new(PeerFetcher::new( + storage.clone(), + connections.clone(), + replication.clone(), + ClientOptions { connect_timeout, read_timeout, max_attempts: max_retries, }, + )); + Self { + storage, + connections, + replication, + peer_fetcher, + storage_root, + interval: Duration::from_secs(interval_seconds), + batch_size, + clock_skew_tolerance, + client_options, bucket_stats: Mutex::new(HashMap::new()), shutdown: Arc::new(Notify::new()), } } + pub fn peer_fetcher(&self) -> Arc { + self.peer_fetcher.clone() + } + pub fn shutdown(&self) { self.shutdown.notify_waiters(); } @@ -383,60 +399,9 @@ impl SiteSyncWorker { local_bucket: &str, key: &str, ) -> bool { - let resp = match client - .get_object() - .bucket(remote_bucket) - .key(key) - .send() + self.peer_fetcher + .fetch_into_storage(client, remote_bucket, local_bucket, key) .await - { - Ok(r) => r, - Err(err) => { - tracing::error!("Pull GetObject failed {}/{}: {:?}", local_bucket, key, err); - return false; - } - }; - - let head = match client - .head_object() - .bucket(remote_bucket) - .key(key) - .send() - .await - { - Ok(r) => r, - Err(err) => { - tracing::error!("Pull HeadObject failed {}/{}: {:?}", local_bucket, key, err); - return false; - } - }; - - let metadata: Option> = head - .metadata() - .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()); - - let stream = resp.body.into_async_read(); - let boxed: Pin> = Box::pin(stream); - - match self - .storage - .put_object(local_bucket, key, boxed, metadata) - .await - { - Ok(_) => { - tracing::debug!("Pulled object {}/{} from remote", local_bucket, key); - true - } - Err(err) => { - tracing::error!( - "Store pulled object failed {}/{}: {}", - local_bucket, - key, - err - ); - false - } - } } async fn apply_remote_deletion(&self, bucket: &str, key: &str) -> bool { diff --git a/crates/myfsio-server/src/state.rs b/crates/myfsio-server/src/state.rs index 4706be8..37aa618 100644 --- a/crates/myfsio-server/src/state.rs +++ b/crates/myfsio-server/src/state.rs @@ -6,7 +6,9 @@ use crate::services::access_logging::AccessLoggingService; use crate::services::gc::GcService; use crate::services::integrity::IntegrityService; use crate::services::metrics::MetricsService; +use crate::services::peer_fetch::PeerFetcher; use crate::services::replication::ReplicationManager; +use crate::services::s3_client::ClientOptions; use crate::services::site_registry::SiteRegistry; use crate::services::site_sync::SiteSyncWorker; use crate::services::system_metrics::SystemMetricsService; @@ -66,6 +68,7 @@ impl AppState { temp_file_max_age_hours: config.gc_temp_file_max_age_hours, multipart_max_age_days: config.gc_multipart_max_age_days, lock_file_max_age_hours: config.gc_lock_file_max_age_hours, + quarantine_max_age_days: config.integrity_quarantine_retention_days, dry_run: config.gc_dry_run, }, ))) @@ -73,16 +76,6 @@ impl AppState { None }; - let integrity = if config.integrity_enabled { - Some(Arc::new(IntegrityService::new( - storage.clone(), - &config.storage_root, - crate::services::integrity::IntegrityConfig::default(), - ))) - } else { - None - }; - let metrics = if config.metrics_enabled { Some(Arc::new(MetricsService::new( &config.storage_root, @@ -161,6 +154,39 @@ impl AppState { None }; + let integrity_peer_fetcher: Option> = if let Some(ref ss) = site_sync { + Some(ss.peer_fetcher()) + } else { + Some(Arc::new(PeerFetcher::new( + storage.clone(), + connections.clone(), + replication.clone(), + ClientOptions { + connect_timeout: Duration::from_secs(config.site_sync_connect_timeout_secs), + read_timeout: Duration::from_secs(config.site_sync_read_timeout_secs), + max_attempts: config.site_sync_max_retries, + }, + ))) + }; + + let integrity = if config.integrity_enabled { + Some(Arc::new(IntegrityService::new( + storage.clone(), + &config.storage_root, + crate::services::integrity::IntegrityConfig { + interval_hours: config.integrity_interval_hours, + batch_size: config.integrity_batch_size, + auto_heal: config.integrity_auto_heal, + dry_run: config.integrity_dry_run, + heal_concurrency: config.integrity_heal_concurrency, + quarantine_retention_days: config.integrity_quarantine_retention_days, + }, + integrity_peer_fetcher, + ))) + } else { + None + }; + let templates = init_templates(&config.templates_dir); let access_logging = Arc::new(AccessLoggingService::new(&config.storage_root)); let session_ttl = Duration::from_secs(config.session_lifetime_days.saturating_mul(86_400)); diff --git a/crates/myfsio-server/templates/docs.html b/crates/myfsio-server/templates/docs.html index 300d1cb..d63fe18 100644 --- a/crates/myfsio-server/templates/docs.html +++ b/crates/myfsio-server/templates/docs.html @@ -118,29 +118,69 @@ cargo build --release -p myfsio-server Directory for buckets and objects. - MAX_UPLOAD_SIZE - 1 GB - Max request body size in bytes. + IAM_CONFIG + <STORAGE_ROOT>/.myfsio.sys/config/iam.json + IAM users / access keys file path. SECRET_KEY - (Auto-generated) + (loaded from .myfsio.sys/config/.secret when present) Session signing and IAM-at-rest encryption key. Set explicitly in production. HOST 127.0.0.1 - Bind interface. + Bind interface for both API and UI listeners. PORT 5000 - Listen port (UI uses 5100). + S3 API listen port. - DISPLAY_TIMEZONE - UTC - Timezone for UI timestamps (e.g., US/Eastern, Asia/Tokyo). + UI_PORT + 5100 + Web UI listen port. + + + UI_ENABLED + true + Set to false to run API-only (no UI listener). + + + AWS_REGION + us-east-1 + Region used in SigV4 scope. + + + LOG_LEVEL + INFO + Log verbosity (also honored via RUST_LOG). + + + SESSION_LIFETIME_DAYS + 1 + UI session lifetime in days. + + + REQUEST_BODY_TIMEOUT_SECONDS + 60 + Per-request body read timeout for the S3 API. + + + MULTIPART_MIN_PART_SIZE + 5242880 + Minimum part size enforced for multipart uploads (5 MiB). + + + BULK_DELETE_MAX_KEYS + 1000 + Maximum keys accepted by the UI bulk-delete endpoint. + + + STREAM_CHUNK_SIZE + 1048576 + Default streaming chunk size for routes that opt into configured chunking (1 MiB). CORS Settings @@ -166,22 +206,32 @@ cargo build --release -p myfsio-server Response headers visible to browsers (e.g., ETag). - Security Settings - - - AUTH_MAX_ATTEMPTS - 5 - Failed login attempts before lockout. - - - AUTH_LOCKOUT_MINUTES - 15 - Lockout duration after max failed attempts. + Rate Limiting RATE_LIMIT_DEFAULT - 200 per minute - Default rate limit for S3 and KMS API endpoints. + 5000 per minute + Default rate limit for S3 and KMS API endpoints. Accepts N per <second/minute/hour/day> or N/<seconds>. + + + RATE_LIMIT_LIST_BUCKETS + inherits RATE_LIMIT_DEFAULT + Rate limit for GET / (ListBuckets). + + + RATE_LIMIT_BUCKET_OPS + inherits RATE_LIMIT_DEFAULT + Rate limit for bucket-scoped operations (/bucket). + + + RATE_LIMIT_OBJECT_OPS + inherits RATE_LIMIT_DEFAULT + Rate limit for object-scoped operations (/bucket/key). + + + RATE_LIMIT_HEAD_OPS + inherits RATE_LIMIT_DEFAULT + Rate limit applied when the request method is HEAD. RATE_LIMIT_ADMIN @@ -204,30 +254,7 @@ cargo build --release -p myfsio-server Custom secret key for the admin user on first run or credential reset. Random if unset. - Server Settings - - - SERVER_THREADS - 0 (auto) - Granian blocking threads (1-64). 0 = auto (CPU cores × 2). - - - SERVER_CONNECTION_LIMIT - 0 (auto) - Max concurrent connections (10-1000). 0 = auto (RAM-based). - - - SERVER_BACKLOG - 0 (auto) - TCP listen backlog (64-4096). 0 = auto (conn_limit × 2). - - - SERVER_CHANNEL_TIMEOUT - 120 - Idle connection timeout in seconds (10-300). - - - Encryption Settings + Feature Toggles ENCRYPTION_ENABLED @@ -237,20 +264,32 @@ cargo build --release -p myfsio-server KMS_ENABLED false - Enable KMS key management for encryption. - - - Logging Settings + Enable built-in KMS key management. - LOG_LEVEL - INFO - Log verbosity: DEBUG, INFO, WARNING, ERROR. + GC_ENABLED + false + Start the garbage collector worker. - LOG_TO_FILE - true - Enable file logging. + INTEGRITY_ENABLED + false + Start the integrity scanner worker. + + + LIFECYCLE_ENABLED + false + Start the lifecycle worker. + + + WEBSITE_HOSTING_ENABLED + false + Enable static website hosting and domain mappings. + + + SITE_SYNC_ENABLED + false + Start the bi-directional site sync worker. Metrics History Settings @@ -400,16 +439,6 @@ cargo build --release -p myfsio-server 50 Max lifecycle history records per bucket. - - OBJECT_CACHE_TTL - 60 - Seconds to cache object metadata. - - - BULK_DOWNLOAD_MAX_BYTES - 1 GB - Max total size for bulk ZIP downloads. - ENCRYPTION_CHUNK_SIZE_BYTES 65536 @@ -429,7 +458,7 @@ cargo build --release -p myfsio-server
- Production Checklist: Set SECRET_KEY (also enables IAM config encryption at rest), restrict CORS_ORIGINS, configure API_BASE_URL, enable HTTPS via reverse proxy, use --prod flag, and set credential expiry on non-admin users. + Production Checklist: Set SECRET_KEY (also enables IAM config encryption at rest), restrict CORS_ORIGINS, configure API_BASE_URL, enable HTTPS via a reverse proxy, run myfsio-server --check-config before starting, and set credential expiry on non-admin users.
@@ -527,7 +556,7 @@ sudo journalctl -u myfsio -f # View logs

Uploads

    -
  • Drag and drop folders or files into the upload modal. Objects above 16 MB switch to multipart automatically.
  • +
  • Drag and drop folders or files into the upload modal. Objects above 8 MB switch to multipart automatically.
  • Progress rows highlight retries, throughput, and completion even if you close the modal.
@@ -536,7 +565,7 @@ sudo journalctl -u myfsio -f # View logs
  • Navigate folder hierarchies using breadcrumbs. Objects with / in keys display as folders.
  • Infinite scroll loads more objects automatically. Choose batch size (50–250) from the footer dropdown.
  • -
  • Bulk select objects for multi-delete or multi-download (ZIP archive, up to 1 GiB). Filter by name using the search box.
  • +
  • Bulk select objects for multi-delete or multi-download (ZIP archive, up to 256 MB total). Filter by name using the search box.
  • If loading fails, click Retry to attempt again—no page refresh needed.
@@ -853,46 +882,15 @@ s3.complete_multipart_upload( -
-
- +
+
+ -
+
Headless Target Setup -

If your target server has no UI, create a setup_target.py script to bootstrap credentials:

-
# setup_target.py
-from pathlib import Path
-from app.iam import IamService
-from app.storage import ObjectStorage
-
-# Initialize services (paths match default config)
-data_dir = Path("data")
-iam = IamService(data_dir / ".myfsio.sys" / "config" / "iam.json")
-storage = ObjectStorage(data_dir)
-
-# 1. Create the bucket
-bucket_name = "backup-bucket"
-try:
-    storage.create_bucket(bucket_name)
-    print(f"Bucket '{bucket_name}' created.")
-except Exception as e:
-    print(f"Bucket creation skipped: {e}")
-
-# 2. Create the user
-try:
-    creds = iam.create_user(
-        display_name="Replication User",
-        policies=[{"bucket": bucket_name, "actions": ["write", "read", "list"]}]
-    )
-    print("\n--- CREDENTIALS GENERATED ---")
-    print(f"Access Key: {creds['access_key']}")
-    print(f"Secret Key: {creds['secret_key']}")
-    print("-----------------------------")
-except Exception as e:
-    print(f"User creation failed: {e}")
-

Save and run: python setup_target.py

+

If your target server has no UI, start it with ADMIN_ACCESS_KEY and ADMIN_SECRET_KEY set so the first-run bootstrap installs deterministic credentials, then create the destination bucket via the AWS CLI: aws --endpoint-url <target> s3api create-bucket --bucket backup-bucket. Use the admin API at /admin/iam/users (or --reset-cred) to manage credentials remotely.

@@ -1052,15 +1050,15 @@ SITE_SYNC_BATCH_SIZE=100 # Max objects per sync cycle Retry Logic - boto3 automatically handles 429 (rate limit) errors using exponential backoff with max_attempts=2 + The replication worker retries failed transfers up to REPLICATION_MAX_RETRIES times (default 2). - Concurrency - Uses a ThreadPoolExecutor with 4 parallel workers for replication tasks + Failure Budget + After REPLICATION_MAX_FAILURES_PER_BUCKET recorded failures (default 50), further records for that bucket are dropped until a retry succeeds. Timeouts - Connect: 5s, Read: 30s. Large files use streaming transfers + Connect: REPLICATION_CONNECT_TIMEOUT_SECONDS (default 5s), Read: REPLICATION_READ_TIMEOUT_SECONDS (default 30s). Objects larger than REPLICATION_STREAMING_THRESHOLD_BYTES (default 10 MB) use streaming transfers. @@ -1470,17 +1468,23 @@ curl -X POST {{ api_base }}/kms/keys \ curl {{ api_base }}/kms/keys \ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" -# Rotate a key (creates new key material) -curl -X POST {{ api_base }}/kms/keys/{key-id}/rotate \ - -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" - -# Disable/Enable a key +# Disable a key curl -X POST {{ api_base }}/kms/keys/{key-id}/disable \ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" -# Schedule key deletion (30-day waiting period) -curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \ - -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" +# Re-enable a key +curl -X POST {{ api_base }}/kms/keys/{key-id}/enable \ + -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" + +# Delete a key +curl -X DELETE "{{ api_base }}/kms/keys/{key-id}" \ + -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" + +# Generate a data key (sized between KMS_GENERATE_DATA_KEY_MIN_BYTES and _MAX_BYTES) +curl -X POST {{ api_base }}/kms/generate-data-key \ + -H "Content-Type: application/json" \ + -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \ + -d '{"KeyId": "<key-id>", "NumberOfBytes": 32}'

How It Works

@@ -1744,14 +1748,11 @@ curl "{{ api_base }}/admin/gc/history?limit=10" \ - INTEGRITY_ENABLEDfalseEnable background integrity scanning - INTEGRITY_INTERVAL_HOURS24Hours between scan cycles - INTEGRITY_BATCH_SIZE1000Max objects to scan per cycle - INTEGRITY_AUTO_HEALfalseAutomatically repair detected issues - INTEGRITY_DRY_RUNfalseLog issues without healing + INTEGRITY_ENABLEDfalseEnable the background integrity scanner

+

Other scanner settings (interval, batch size, auto-heal, dry run) currently use hardcoded defaults: 24-hour interval, batch size 10 000, auto-heal off, dry run off. Use the admin API below to trigger a one-off scan with auto_heal or dry_run overrides.

What Gets Checked

@@ -1819,7 +1820,7 @@ curl "{{ api_base }}/admin/integrity/history?limit=10" \
- Dry Run: Use INTEGRITY_DRY_RUN=true or pass {"dry_run": true} to the API to preview detected issues without making any changes. Combine with {"auto_heal": true} to see what would be repaired. + Dry Run: Pass {"dry_run": true} to /admin/integrity/run to preview detected issues without making any changes. Combine with {"auto_heal": true} to see what would be repaired.
@@ -2048,9 +2049,9 @@ curl "{{ api_base | replace(from="/api", to="/ui") }}/metrics/operations/history Update IAM inline policies or remove conflicting deny statements. - Large uploads fail instantly - MAX_UPLOAD_SIZE exceeded - Raise the env var or split the object. + Large uploads time out mid-stream + REQUEST_BODY_TIMEOUT_SECONDS exceeded + Raise the timeout, use multipart uploads, or upload from a faster network. Requests hit the wrong host @@ -2059,8 +2060,8 @@ curl "{{ api_base | replace(from="/api", to="/ui") }}/metrics/operations/history Large folder uploads hitting rate limits (429) - RATE_LIMIT_DEFAULT exceeded (200/min) - Increase RATE_LIMIT_DEFAULT in env config or upload in smaller batches. Distributed rate-limit storage is not supported yet. + RATE_LIMIT_DEFAULT exceeded (5000/min by default) + Increase RATE_LIMIT_DEFAULT (or the per-route override), or upload in smaller batches. Distributed rate-limit storage is not supported yet. @@ -2079,7 +2080,7 @@ curl "{{ api_base | replace(from="/api", to="/ui") }}/metrics/operations/history curl {{ api_base }}/myfsio/health # Response -{"status": "ok", "version": "0.1.7"} +{"status": "ok", "version": "0.5.0"}

Use this endpoint for:

    @@ -2248,7 +2249,7 @@ curl -X PUT "{{ api_base }}/<bucket>?notification" \

    Query CSV, JSON, or Parquet files directly using SQL without downloading the entire object.

    - Prerequisite: Requires DuckDB to be installed (pip install duckdb) + Note: DuckDB is bundled into the Rust server binary — no separate install is required.
    # Query a CSV file
    @@ -2816,20 +2817,36 @@ GET|PUT /admin/site                     # Local site config
     GET     /admin/sites                    # List peers
     POST    /admin/sites                    # Register peer
     GET|PUT|DELETE /admin/sites/<id>        # Manage peer
    -GET     /admin/sites/<id>/health        # Peer health
    +GET|POST /admin/sites/<id>/health       # Peer health
    +GET     /admin/sites/<id>/bidirectional-status  # Bidi sync state
     GET     /admin/topology                 # Cluster topology
    -GET|POST|PUT|DELETE /admin/website-domains  # Domain mappings
    +GET|POST /admin/website-domains              # List / Create domain mapping
    +GET|PUT|DELETE /admin/website-domains/<domain>  # Manage domain mapping
    +GET     /admin/iam/users                # List IAM users
    +GET     /admin/iam/users/<id>           # Get user
    +GET     /admin/iam/users/<id>/policies  # Get user policies
    +POST    /admin/iam/users/<id>/access-keys     # Create access key
    +DELETE  /admin/iam/users/<id>/access-keys/<ak> # Delete access key
    +POST    /admin/iam/users/<id>/disable    # Disable user
    +POST    /admin/iam/users/<id>/enable     # Enable user
    +GET|POST /admin/gc/status, /admin/gc/run, /admin/gc/history
    +GET|POST /admin/integrity/status, /admin/integrity/run, /admin/integrity/history
     
    -# KMS API
    -GET|POST /kms/keys                      # List / Create keys
    -GET|DELETE /kms/keys/<id>               # Get / Delete key
    -POST   /kms/keys/<id>/enable            # Enable key
    -POST   /kms/keys/<id>/disable           # Disable key
    -POST   /kms/keys/<id>/rotate            # Rotate key
    -POST   /kms/encrypt                     # Encrypt data
    -POST   /kms/decrypt                     # Decrypt data
    -POST   /kms/generate-data-key           # Generate data key
    -POST   /kms/generate-random             # Generate random bytes
    +# KMS API (only mounted when KMS_ENABLED=true) +GET|POST /kms/keys # List / Create keys +GET|DELETE /kms/keys/<id> # Get / Delete key +POST /kms/keys/<id>/enable # Enable key +POST /kms/keys/<id>/disable # Disable key +POST /kms/encrypt # Encrypt data +POST /kms/decrypt # Decrypt data +POST /kms/re-encrypt # Re-encrypt under a different key +POST /kms/generate-data-key # Generate data key +POST /kms/generate-data-key-without-plaintext # Generate wrapped DEK only +POST /kms/generate-random # Generate random bytes +POST /kms/client/generate-key # Client-side key helper +POST /kms/client/encrypt # Client-side encrypt helper +POST /kms/client/decrypt # Client-side decrypt helper +POST /kms/materials/<id> # Fetch wrapped key materials
diff --git a/crates/myfsio-storage/src/error.rs b/crates/myfsio-storage/src/error.rs index d1789fc..bc2c211 100644 --- a/crates/myfsio-storage/src/error.rs +++ b/crates/myfsio-storage/src/error.rs @@ -23,6 +23,12 @@ pub enum StorageError { key: String, version_id: String, }, + #[error("Object corrupted: {bucket}/{key} ({detail})")] + ObjectCorrupted { + bucket: String, + key: String, + detail: String, + }, #[error("Invalid bucket name: {0}")] InvalidBucketName(String), #[error("Invalid object key: {0}")] @@ -72,6 +78,15 @@ impl From for S3Error { version_id, } => S3Error::from_code(S3ErrorCode::MethodNotAllowed) .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)), + StorageError::ObjectCorrupted { + bucket, + key, + detail, + } => S3Error::new( + S3ErrorCode::ObjectCorrupted, + format!("Object corrupted: {}", detail), + ) + .with_resource(format!("/{}/{}", bucket, key)), StorageError::InvalidBucketName(msg) => { S3Error::new(S3ErrorCode::InvalidBucketName, msg) } diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index ae9bb47..6186b82 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -17,6 +17,37 @@ use uuid::Uuid; const EMPTY_SEGMENT_SENTINEL: &str = ".__myfsio_empty__"; +pub const META_KEY_CORRUPTED: &str = "__corrupted__"; +pub const META_KEY_CORRUPTED_AT: &str = "__corrupted_at__"; +pub const META_KEY_CORRUPTION_DETAIL: &str = "__corruption_detail__"; +pub const META_KEY_QUARANTINE_PATH: &str = "__quarantine_path__"; + +pub fn metadata_is_corrupted(meta: &HashMap) -> bool { + meta.get(META_KEY_CORRUPTED) + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +pub fn metadata_corruption_detail(meta: &HashMap) -> String { + meta.get(META_KEY_CORRUPTION_DETAIL) + .cloned() + .unwrap_or_else(|| "data integrity check failed".to_string()) +} + +pub fn is_multipart_etag(etag: &str) -> bool { + let Some(dash_idx) = etag.rfind('-') else { + return false; + }; + if dash_idx != 32 { + return false; + } + let (head, tail) = etag.split_at(dash_idx); + let tail = &tail[1..]; + !tail.is_empty() + && tail.chars().all(|c| c.is_ascii_digit()) + && head.chars().all(|c| c.is_ascii_hexdigit()) +} + fn fs_encode_key(key: &str) -> String { if key.is_empty() { return String::new(); @@ -741,6 +772,18 @@ impl FsStorageBackend { Ok(()) } + pub async fn delete_object_metadata_entry( + &self, + bucket: &str, + key: &str, + ) -> StorageResult<()> { + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io) + }) + } + fn compute_etag_sync(path: &Path) -> std::io::Result { myfsio_crypto::hashing::md5_file(path) } @@ -2139,6 +2182,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_status().is_active() { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2171,6 +2222,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_else(Utc::now); let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); @@ -2204,6 +2262,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_status().is_active() { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2241,6 +2307,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_else(Utc::now); let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); @@ -2279,6 +2352,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_status().is_active() { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2308,6 +2389,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_else(Utc::now); let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } let mut obj = ObjectMeta::new(key.to_string(), meta_fs.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); @@ -2364,6 +2452,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_status().is_active() { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2402,6 +2498,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_else(Utc::now); let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } let mut obj = ObjectMeta::new(key.to_string(), meta_fs.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); @@ -2453,6 +2556,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_enabled { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2476,6 +2587,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } if self.read_bucket_config_sync(bucket).versioning_status().is_active() { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { @@ -2504,6 +2623,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_else(Utc::now); let stored_meta = self.read_metadata_sync(bucket, key); + if metadata_is_corrupted(&stored_meta) { + return Err(StorageError::ObjectCorrupted { + bucket: bucket.to_string(), + key: key.to_string(), + detail: metadata_corruption_detail(&stored_meta), + }); + } let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); @@ -2834,6 +2960,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { writer.flush().map_err(StorageError::Io)?; let src_metadata = self.read_metadata_sync(src_bucket, src_key); + if metadata_is_corrupted(&src_metadata) { + return Err(StorageError::ObjectCorrupted { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + detail: metadata_corruption_detail(&src_metadata), + }); + } Ok((format!("{:x}", hasher.finalize()), total, src_metadata)) }); @@ -3750,6 +3883,163 @@ mod tests { assert!(stored.contains_key("__etag__")); } + #[tokio::test] + async fn test_poisoned_object_returns_object_corrupted_on_read() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("test-bucket").await.unwrap(); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"poisoned bytes".to_vec())); + backend + .put_object("test-bucket", "rotted.txt", data, None) + .await + .unwrap(); + + let mut meta = backend + .get_object_metadata("test-bucket", "rotted.txt") + .await + .unwrap(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + meta.insert( + META_KEY_CORRUPTION_DETAIL.to_string(), + "etag mismatch: stored=abc actual=def".to_string(), + ); + backend + .put_object_metadata("test-bucket", "rotted.txt", &meta) + .await + .unwrap(); + + let res = backend.get_object("test-bucket", "rotted.txt").await; + match res { + Err(StorageError::ObjectCorrupted { .. }) => {} + Err(other) => panic!("expected ObjectCorrupted, got {:?}", other), + Ok(_) => panic!("expected ObjectCorrupted, got Ok"), + } + + let res = backend.head_object("test-bucket", "rotted.txt").await; + match res { + Err(StorageError::ObjectCorrupted { .. }) => {} + Err(other) => panic!("expected ObjectCorrupted, got {:?}", other), + Ok(_) => panic!("expected ObjectCorrupted, got Ok"), + } + } + + #[tokio::test] + async fn test_poisoned_object_with_missing_file_still_returns_corrupted() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("test-bucket").await.unwrap(); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"will be quarantined".to_vec())); + backend + .put_object("test-bucket", "rotted.txt", data, None) + .await + .unwrap(); + + let mut meta = backend + .get_object_metadata("test-bucket", "rotted.txt") + .await + .unwrap(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + meta.insert( + META_KEY_CORRUPTION_DETAIL.to_string(), + "etag mismatch (no peer)".to_string(), + ); + backend + .put_object_metadata("test-bucket", "rotted.txt", &meta) + .await + .unwrap(); + + let live_path = backend + .get_object_path("test-bucket", "rotted.txt") + .await + .expect("path lookup should succeed before quarantine"); + std::fs::remove_file(&live_path).expect("simulate quarantine: remove live file"); + + let res = backend.get_object("test-bucket", "rotted.txt").await; + match res { + Err(StorageError::ObjectCorrupted { .. }) => {} + Err(other) => panic!("expected ObjectCorrupted after quarantine, got {:?}", other), + Ok(_) => panic!("expected ObjectCorrupted, got Ok"), + } + + let res = backend.head_object("test-bucket", "rotted.txt").await; + match res { + Err(StorageError::ObjectCorrupted { .. }) => {} + Err(other) => panic!("expected ObjectCorrupted after quarantine, got {:?}", other), + Ok(_) => panic!("expected ObjectCorrupted, got Ok"), + } + + let res = backend.get_object_path("test-bucket", "rotted.txt").await; + match res { + Err(StorageError::ObjectCorrupted { .. }) => {} + Err(other) => panic!("expected ObjectCorrupted, got {:?}", other), + Ok(_) => panic!("expected ObjectCorrupted, got Ok"), + } + } + + #[tokio::test] + async fn test_delete_object_metadata_entry_removes_index_entry() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("test-bucket").await.unwrap(); + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"x".to_vec())); + backend + .put_object("test-bucket", "ghost.txt", data, None) + .await + .unwrap(); + let path = backend + .get_object_path("test-bucket", "ghost.txt") + .await + .unwrap(); + std::fs::remove_file(&path).unwrap(); + + backend + .delete_object_metadata_entry("test-bucket", "ghost.txt") + .await + .unwrap(); + + let stored = backend + .get_object_metadata("test-bucket", "ghost.txt") + .await + .unwrap(); + assert!( + stored.is_empty(), + "metadata entry must be gone, got: {:?}", + stored + ); + } + + #[tokio::test] + async fn test_put_clears_poison_flag() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("test-bucket").await.unwrap(); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"first".to_vec())); + backend + .put_object("test-bucket", "file.txt", data, None) + .await + .unwrap(); + + let mut meta = backend + .get_object_metadata("test-bucket", "file.txt") + .await + .unwrap(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + backend + .put_object_metadata("test-bucket", "file.txt", &meta) + .await + .unwrap(); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"replacement".to_vec())); + backend + .put_object("test-bucket", "file.txt", data, None) + .await + .unwrap(); + + match backend.get_object("test-bucket", "file.txt").await { + Ok(_) => {} + Err(e) => panic!("get must succeed after PUT clears poison, got {:?}", e), + } + } + #[tokio::test] async fn test_list_objects() { let (_dir, backend) = create_test_backend(); diff --git a/docs.md b/docs.md index 5a7dd52..1bac34f 100644 --- a/docs.md +++ b/docs.md @@ -122,6 +122,40 @@ These values are taken from `crates/myfsio-server/src/config.rs`. | `SECRET_KEY` | unset, then fallback to `.myfsio.sys/config/.secret` if present | Session signing and IAM config encryption key | | `ADMIN_ACCESS_KEY` | unset | Optional deterministic first-run/reset access key | | `ADMIN_SECRET_KEY` | unset | Optional deterministic first-run/reset secret key | +| `SESSION_LIFETIME_DAYS` | `1` | UI session lifetime in days | +| `LOG_LEVEL` | `INFO` | Log verbosity (also honored as `RUST_LOG`) | +| `REQUEST_BODY_TIMEOUT_SECONDS` | `60` | Per-request body read timeout | +| `MULTIPART_MIN_PART_SIZE` | `5242880` | Minimum part size enforced where applicable (5 MiB) | +| `BULK_DELETE_MAX_KEYS` | `1000` | Maximum keys per UI bulk-delete request | +| `STREAM_CHUNK_SIZE` | `1048576` | Default streaming chunk size for opt-in routes | +| `OBJECT_KEY_MAX_LENGTH_BYTES` | `1024` | Maximum object key length | +| `OBJECT_CACHE_MAX_SIZE` | `100` | Object metadata cache capacity | +| `BUCKET_CONFIG_CACHE_TTL_SECONDS` | `30` | Bucket config cache TTL | +| `OBJECT_TAG_LIMIT` | `50` | Maximum tags per object | + +### Rate limiting + +| Variable | Default | Description | +| --- | --- | --- | +| `RATE_LIMIT_DEFAULT` | `5000 per minute` | Default S3 / KMS rate limit. Accepts `N per ` or `N/` | +| `RATE_LIMIT_LIST_BUCKETS` | inherits `RATE_LIMIT_DEFAULT` | Override for `GET /` | +| `RATE_LIMIT_BUCKET_OPS` | inherits `RATE_LIMIT_DEFAULT` | Override for `/{bucket}` | +| `RATE_LIMIT_OBJECT_OPS` | inherits `RATE_LIMIT_DEFAULT` | Override for `/{bucket}/{key}` | +| `RATE_LIMIT_HEAD_OPS` | inherits `RATE_LIMIT_DEFAULT` | Override for HEAD requests | +| `RATE_LIMIT_ADMIN` | `60 per minute` | Override for `/admin/*` | +| `RATE_LIMIT_STORAGE_URI` | `memory://` | Backend for rate-limit state. Only `memory://` is supported today | + +### CORS and proxying + +| Variable | Default | Description | +| --- | --- | --- | +| `CORS_ORIGINS` | `*` | Server-level allowed origins (comma-separated) | +| `CORS_METHODS` | `GET,PUT,POST,DELETE,OPTIONS,HEAD` | Server-level allowed methods | +| `CORS_ALLOW_HEADERS` | `*` | Allowed request headers | +| `CORS_EXPOSE_HEADERS` | `*` | Headers exposed to the browser | +| `NUM_TRUSTED_PROXIES` | `0` | Trusted reverse-proxy count. Forwarded-IP headers are ignored when `0` | +| `ALLOWED_REDIRECT_HOSTS` | empty | Comma-separated whitelist of safe UI login redirect hosts | +| `ALLOW_INTERNAL_ENDPOINTS` | `false` | Gate for internal diagnostic routes | ### Feature toggles @@ -131,6 +165,12 @@ These values are taken from `crates/myfsio-server/src/config.rs`. | `KMS_ENABLED` | `false` | Enable built-in KMS support | | `GC_ENABLED` | `false` | Start the garbage collector worker | | `INTEGRITY_ENABLED` | `false` | Start the integrity worker | +| `INTEGRITY_AUTO_HEAL` | `false` | When the periodic scan finishes, attempt to heal each issue (peer-fetch corrupted bytes, drop phantom metadata, etc.) | +| `INTEGRITY_DRY_RUN` | `false` | Report what the periodic scan would heal without touching anything | +| `INTEGRITY_INTERVAL_HOURS` | `24` | Period between background integrity scans | +| `INTEGRITY_BATCH_SIZE` | `10000` | Max objects scanned per cycle | +| `INTEGRITY_HEAL_CONCURRENCY` | `4` | Max concurrent heal tasks per cycle | +| `INTEGRITY_QUARANTINE_RETENTION_DAYS` | `7` | How long to retain quarantined files (cleaned up by GC) | | `LIFECYCLE_ENABLED` | `false` | Start the lifecycle worker | | `METRICS_HISTORY_ENABLED` | `false` | Persist system metrics snapshots | | `OPERATION_METRICS_ENABLED` | `false` | Persist API operation metrics | @@ -162,15 +202,35 @@ These values are taken from `crates/myfsio-server/src/config.rs`. | `SITE_SYNC_MAX_RETRIES` | `2` | Site sync retry count | | `SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS` | `1.0` | Allowed skew between peers | +### Garbage collection + +| Variable | Default | Description | +| --- | --- | --- | +| `GC_INTERVAL_HOURS` | `6` | Hours between GC cycles | +| `GC_TEMP_FILE_MAX_AGE_HOURS` | `24` | Delete temp files older than this | +| `GC_MULTIPART_MAX_AGE_DAYS` | `7` | Delete orphaned multipart uploads older than this | +| `GC_LOCK_FILE_MAX_AGE_HOURS` | `1` | Delete stale lock files older than this | +| `GC_DRY_RUN` | `false` | Log deletions without removing files | + +### Encryption tuning + +| Variable | Default | Description | +| --- | --- | --- | +| `ENCRYPTION_CHUNK_SIZE_BYTES` | `65536` | Plaintext chunk size for streaming AES-256-GCM (64 KiB) | +| `KMS_GENERATE_DATA_KEY_MIN_BYTES` | `1` | Minimum size for `generate-data-key` | +| `KMS_GENERATE_DATA_KEY_MAX_BYTES` | `1024` | Maximum size for `generate-data-key` | +| `LIFECYCLE_MAX_HISTORY_PER_BUCKET` | `50` | Max lifecycle history records kept per bucket | + ### Site identity values used by the UI These are read directly by UI pages: -| Variable | Description | -| --- | --- | -| `SITE_ID` | Local site identifier shown in the UI | -| `SITE_ENDPOINT` | Public endpoint for this site | -| `SITE_REGION` | Display region for the local site | +| Variable | Default | Description | +| --- | --- | --- | +| `SITE_ID` | unset | Local site identifier shown in the UI | +| `SITE_ENDPOINT` | unset | Public endpoint for this site | +| `SITE_REGION` | matches `AWS_REGION` | Display region for the local site | +| `SITE_PRIORITY` | `100` | Routing priority (lower = preferred) | ## 7. Data Layout @@ -178,29 +238,39 @@ With the default `STORAGE_ROOT=./data`, the Rust server writes: ```text data/ - / + / # raw object data .myfsio.sys/ config/ - iam.json - bucket_policies.json - connections.json - gc_history.json - integrity_history.json - metrics_history.json - operation_metrics.json + .secret # persisted SECRET_KEY (if generated) + iam.json # IAM users / access keys / policies + bucket_policies.json # legacy bucket policies (fallback only) + connections.json # remote endpoint credentials + replication_rules.json # replication rules + site_registry.json # local site + peer registry + website_domains.json # domain → bucket mapping (if enabled) + gc_history.json # GC execution history (if enabled) + integrity_history.json # integrity scan history (if enabled) + metrics_history.json # system metrics history (if enabled) + operation_metrics.json # API operation metrics (if enabled) buckets// - meta/ - versions/ - multipart/ + .bucket.json # bucket config (versioning, cors, lifecycle, etc.) + meta/ # per-object metadata + versions/ # archived versions (if versioning enabled) + lifecycle_history.json # lifecycle action log (if any rule has fired) + replication_failures.json # bounded failure log + site_sync_state.json # bidi sync watermark + multipart/ # in-progress multipart uploads keys/ + kms_master.key # 32-byte master key (base64) + kms_keys.json # KMS keys, encrypted under master key ``` -Important files: +Notable files: -- `data/.myfsio.sys/config/iam.json`: IAM users, access keys, and inline policies -- `data/.myfsio.sys/config/bucket_policies.json`: bucket policies -- `data/.myfsio.sys/config/connections.json`: replication connection settings -- `data/.myfsio.sys/config/.secret`: persisted secret key when one has been generated for the install +- `iam.json` is Fernet-encrypted at rest when `SECRET_KEY` is set. +- `bucket_policies.json` is read only as a fallback for policies that pre-date per-bucket `.bucket.json`. +- `kms_master.key` is plaintext on disk — protect `keys/` with filesystem permissions. +- `*_history.json` files only appear when their owning service has been enabled at least once. ## 8. Background Services @@ -230,14 +300,17 @@ Enable with: GC_ENABLED=true cargo run -p myfsio-server -- ``` -Current Rust defaults from `GcConfig::default()`: +Defaults (override with the env vars in section 6): -- Run every 6 hours -- Temp files older than 24 hours are eligible for cleanup -- Multipart uploads older than 7 days are eligible for cleanup -- Lock files older than 1 hour are eligible for cleanup +- `GC_INTERVAL_HOURS=6` +- `GC_TEMP_FILE_MAX_AGE_HOURS=24` +- `GC_MULTIPART_MAX_AGE_DAYS=7` +- `GC_LOCK_FILE_MAX_AGE_HOURS=1` +- `GC_DRY_RUN=false` -Those GC timings are currently hardcoded defaults, not environment-driven configuration. +Each GC cycle also sweeps `data/.myfsio.sys/quarantine///` directories whose `` mtime is older than `INTEGRITY_QUARANTINE_RETENTION_DAYS`, freeing the bytes recorded in `quarantine_bytes_freed` / `quarantine_entries_deleted` in the result JSON. + +History is persisted at `data/.myfsio.sys/config/gc_history.json` and can be triggered manually via `POST /admin/gc/run` (use `{"dry_run": true}` to preview). ### Integrity scanning @@ -247,11 +320,27 @@ Enable with: INTEGRITY_ENABLED=true cargo run -p myfsio-server -- ``` -Current Rust defaults from `IntegrityConfig::default()`: +Tune with: -- Run every 24 hours -- Batch size 1000 -- Auto-heal disabled +```bash +INTEGRITY_INTERVAL_HOURS=24 +INTEGRITY_BATCH_SIZE=10000 +INTEGRITY_AUTO_HEAL=false +INTEGRITY_DRY_RUN=false +INTEGRITY_HEAL_CONCURRENCY=4 +INTEGRITY_QUARANTINE_RETENTION_DAYS=7 +``` + +When `INTEGRITY_AUTO_HEAL=true` (and `INTEGRITY_DRY_RUN=false`), each scan ends with a heal phase that processes the issues it just recorded. For `corrupted_object` the bad bytes are renamed into `data/.myfsio.sys/quarantine///` and the heal logic tries, in order: + +1. **Pull from peer.** If a replication rule for the bucket points at a healthy remote whose `HEAD` returns the same ETag the local index has, the body is streamed to a temp file, MD5-verified against the stored ETag, and atomically swapped into the live path. The poison flags are cleared on success. +2. **Poison the entry.** If there is no replication target, the peer disagrees on the ETag, the peer is unreachable, or the downloaded body fails verification, the index entry is mutated to add `__corrupted__: "true"`, `__corrupted_at__`, `__corruption_detail__`, and `__quarantine_path__`. The data file stays in quarantine for `INTEGRITY_QUARANTINE_RETENTION_DAYS`. + +Subsequent reads (`GET`, `HEAD`, `CopyObject` source) on a poisoned key return `500 ObjectCorrupted` instead of serving rotted bytes; replication push skips poisoned keys; subsequent integrity scans skip poisoned keys instead of re-flagging them. Overwriting the key with a fresh `PUT` clears the poison. + +`stale_version`, `etag_cache_inconsistency`, and `phantom_metadata` issues are healed locally (move-to-quarantine, rebuild cache, drop entry); `orphaned_object` is reported only. + +Override per-invocation by passing `auto_heal` / `dry_run` to `POST /admin/integrity/run`. The response and history records now include a `heal_stats` map keyed by issue type with `{found, healed, poisoned, peer_mismatch, peer_unavailable, verify_failed, failed, skipped}`. History is at `data/.myfsio.sys/config/integrity_history.json`. ### Metrics history