From 777d862a029d1c613b120055418587684ed3d499 Mon Sep 17 00:00:00 2001 From: kqjy Date: Sat, 25 Apr 2026 19:29:54 +0800 Subject: [PATCH] Fix integrity auto-heal data-loss bug, return 422 ObjectCorrupted, lock heal swap, verify multipart peer body --- crates/myfsio-common/src/error.rs | 2 +- crates/myfsio-common/src/types.rs | 5 +- crates/myfsio-server/src/config.rs | 9 +- crates/myfsio-server/src/handlers/config.rs | 11 +- crates/myfsio-server/src/handlers/mod.rs | 281 ++++++++-------- crates/myfsio-server/src/handlers/select.rs | 11 +- crates/myfsio-server/src/handlers/ui_api.rs | 8 +- crates/myfsio-server/src/handlers/ui_pages.rs | 6 +- crates/myfsio-server/src/lib.rs | 4 +- crates/myfsio-server/src/middleware/auth.rs | 11 +- .../src/middleware/bucket_cors.rs | 9 +- .../myfsio-server/src/middleware/ratelimit.rs | 4 +- crates/myfsio-server/src/services/gc.rs | 5 +- .../myfsio-server/src/services/integrity.rs | 153 +++++++-- .../myfsio-server/src/services/peer_fetch.rs | 131 +++++++- crates/myfsio-server/tests/integration.rs | 29 +- crates/myfsio-storage/src/fs_backend.rs | 317 ++++++++++-------- crates/myfsio-storage/src/validation.rs | 1 - docs.md | 2 +- 19 files changed, 634 insertions(+), 365 deletions(-) diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs index ceabe0d..d3b426b 100644 --- a/crates/myfsio-common/src/error.rs +++ b/crates/myfsio-common/src/error.rs @@ -69,7 +69,7 @@ impl S3ErrorCode { Self::NoSuchUpload => 404, Self::NoSuchVersion => 404, Self::NoSuchTagSet => 404, - Self::ObjectCorrupted => 500, + Self::ObjectCorrupted => 422, Self::PreconditionFailed => 412, Self::NotModified => 304, Self::QuotaExceeded => 403, diff --git a/crates/myfsio-common/src/types.rs b/crates/myfsio-common/src/types.rs index 6977d75..72a33ba 100644 --- a/crates/myfsio-common/src/types.rs +++ b/crates/myfsio-common/src/types.rs @@ -152,7 +152,10 @@ impl VersioningStatus { } pub fn is_active(self) -> bool { - matches!(self, VersioningStatus::Enabled | VersioningStatus::Suspended) + matches!( + self, + VersioningStatus::Enabled | VersioningStatus::Suspended + ) } } diff --git a/crates/myfsio-server/src/config.rs b/crates/myfsio-server/src/config.rs index ea83d9a..34181f0 100644 --- a/crates/myfsio-server/src/config.rs +++ b/crates/myfsio-server/src/config.rs @@ -248,12 +248,9 @@ impl ServerConfig { parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(5000, 60)); let ratelimit_list_buckets = parse_rate_limit_env("RATE_LIMIT_LIST_BUCKETS", ratelimit_default); - let ratelimit_bucket_ops = - parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default); - let ratelimit_object_ops = - parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default); - let ratelimit_head_ops = - parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default); + let ratelimit_bucket_ops = parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default); + let ratelimit_object_ops = parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default); + let ratelimit_head_ops = parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default); let ratelimit_admin = parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60)); let ratelimit_storage_uri = diff --git a/crates/myfsio-server/src/handlers/config.rs b/crates/myfsio-server/src/handlers/config.rs index 6938d02..9917dfb 100644 --- a/crates/myfsio-server/src/handlers/config.rs +++ b/crates/myfsio-server/src/handlers/config.rs @@ -1059,7 +1059,16 @@ pub async fn delete_logging(state: &AppState, bucket: &str) -> Response { fn s3_error_response(code: S3ErrorCode, message: &str, status: StatusCode) -> Response { let err = S3Error::new(code, message.to_string()); - (status, [("content-type", "application/xml")], err.to_xml()).into_response() + let code_str = code.as_str(); + ( + status, + [ + ("content-type", "application/xml"), + ("x-amz-error-code", code_str), + ], + err.to_xml(), + ) + .into_response() } pub async fn list_object_versions( diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index 43dce2c..ea6b076 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -66,11 +66,20 @@ fn s3_error_response(err: S3Error) -> Response { } else { err.resource.clone() }; + let code_str = err.code.as_str(); let body = err .with_resource(resource) .with_request_id(uuid::Uuid::new_v4().simple().to_string()) .to_xml(); - (status, [("content-type", "application/xml")], body).into_response() + ( + status, + [ + ("content-type", "application/xml"), + ("x-amz-error-code", code_str), + ], + body, + ) + .into_response() } fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response { @@ -91,14 +100,17 @@ fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response { let s3_err = S3Error::from_code(S3ErrorCode::NoSuchKey) .with_resource(format!("/{}/{}", bucket, key)) .with_request_id(uuid::Uuid::new_v4().simple().to_string()); - let status = StatusCode::from_u16(s3_err.http_status()) - .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let status = + StatusCode::from_u16(s3_err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let mut resp_headers = HeaderMap::new(); resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap()); if let Ok(vid) = version_id.parse() { resp_headers.insert("x-amz-version-id", vid); } resp_headers.insert("content-type", "application/xml".parse().unwrap()); + if let Ok(code_hdr) = s3_err.code.as_str().parse() { + resp_headers.insert("x-amz-error-code", code_hdr); + } return (status, resp_headers, s3_err.to_xml()).into_response(); } s3_error_response(S3Error::from(err)) @@ -118,8 +130,8 @@ fn io_error_to_s3_response(err: &std::io::Error) -> Option { || lower.contains("is a directory") || lower.contains("file exists") || lower.contains("directory not empty"); - let hit_name_too_long = matches!(err.kind(), ErrorKind::InvalidFilename) - || lower.contains("file name too long"); + let hit_name_too_long = + matches!(err.kind(), ErrorKind::InvalidFilename) || lower.contains("file name too long"); if !hit_collision && !hit_name_too_long { return None; } @@ -1118,9 +1130,7 @@ fn has_upload_checksum(headers: &HeaderMap) -> bool { } fn persist_additional_checksums(headers: &HeaderMap, metadata: &mut HashMap) { - for algo in [ - "sha256", "sha1", "crc32", "crc32c", "crc64nvme", - ] { + for algo in ["sha256", "sha1", "crc32", "crc32c", "crc64nvme"] { let header_name = format!("x-amz-checksum-{}", algo); if let Some(value) = headers.get(&header_name).and_then(|v| v.to_str().ok()) { let trimmed = value.trim(); @@ -1141,9 +1151,7 @@ fn persist_additional_checksums(headers: &HeaderMap, metadata: &mut HashMap) { - for algo in [ - "sha256", "sha1", "crc32", "crc32c", "crc64nvme", - ] { + for algo in ["sha256", "sha1", "crc32", "crc32c", "crc64nvme"] { if let Some(value) = metadata.get(&format!("__checksum_{}__", algo)) { if let Ok(parsed) = value.parse() { resp_headers.insert( @@ -1644,64 +1652,61 @@ pub async fn get_object( return resp; } - let enc_info = myfsio_crypto::encryption::EncryptionMetadata::from_metadata( - &snap_meta.internal_metadata, - ); + let enc_info = + myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&snap_meta.internal_metadata); - let (file, file_size, enc_header): (tokio::fs::File, u64, Option<&str>) = match ( - enc_info.as_ref(), - state.encryption.as_ref(), - ) { - (Some(enc_info), Some(enc_svc)) => { - let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4())); - let customer_key = extract_sse_c_key(&headers); - let decrypt_res = enc_svc - .decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref()) - .await; - // Hardlink served its purpose; the decrypted plaintext is in - // dec_tmp now. - let _ = tokio::fs::remove_file(&snap_link).await; - if let Err(e) = decrypt_res { - let _ = tokio::fs::remove_file(&dec_tmp).await; + let (file, file_size, enc_header): (tokio::fs::File, u64, Option<&str>) = + match (enc_info.as_ref(), state.encryption.as_ref()) { + (Some(enc_info), Some(enc_svc)) => { + let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4())); + let customer_key = extract_sse_c_key(&headers); + let decrypt_res = enc_svc + .decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref()) + .await; + // Hardlink served its purpose; the decrypted plaintext is in + // dec_tmp now. + let _ = tokio::fs::remove_file(&snap_link).await; + if let Err(e) = decrypt_res { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + format!("Decryption failed: {}", e), + )); + } + let file = match open_self_deleting(dec_tmp.clone()).await { + Ok(f) => f, + Err(e) => { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + }; + let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0); + (file, file_size, Some(enc_info.algorithm.as_str())) + } + (Some(_), None) => { + // Snapshot is encrypted but the server has no encryption + // service configured to decrypt it. Serving ciphertext as + // plaintext would be actively wrong; refuse explicitly. + let _ = tokio::fs::remove_file(&snap_link).await; return s3_error_response(S3Error::new( myfsio_common::error::S3ErrorCode::InternalError, - format!("Decryption failed: {}", e), + "Object is encrypted but encryption service is disabled".to_string(), )); } - let file = match open_self_deleting(dec_tmp.clone()).await { - Ok(f) => f, - Err(e) => { - let _ = tokio::fs::remove_file(&dec_tmp).await; - return storage_err_response(myfsio_storage::error::StorageError::Io(e)); - } - }; - let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0); - (file, file_size, Some(enc_info.algorithm.as_str())) - } - (Some(_), None) => { - // Snapshot is encrypted but the server has no encryption - // service configured to decrypt it. Serving ciphertext as - // plaintext would be actively wrong; refuse explicitly. - let _ = tokio::fs::remove_file(&snap_link).await; - return s3_error_response(S3Error::new( - myfsio_common::error::S3ErrorCode::InternalError, - "Object is encrypted but encryption service is disabled".to_string(), - )); - } - (None, _) => { - // Raw path: stream directly from the hardlink, which becomes - // self-deleting on open (kernel keeps the inode alive via our - // fd). - let file = match open_self_deleting(snap_link.clone()).await { - Ok(f) => f, - Err(e) => { - let _ = tokio::fs::remove_file(&snap_link).await; - return storage_err_response(myfsio_storage::error::StorageError::Io(e)); - } - }; - (file, snap_meta.size, None) - } - }; + (None, _) => { + // Raw path: stream directly from the hardlink, which becomes + // self-deleting on open (kernel keeps the inode alive via our + // fd). + let file = match open_self_deleting(snap_link.clone()).await { + Ok(f) => f, + Err(e) => { + let _ = tokio::fs::remove_file(&snap_link).await; + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + }; + (file, snap_meta.size, None) + } + }; let stream = ReaderStream::with_capacity(file, stream_cap); let body = Body::from_stream(stream); @@ -2470,86 +2475,72 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R use futures::stream::{self, StreamExt}; - let results: Vec<(String, Option, Result)> = - stream::iter(parsed.objects.iter().cloned()) - .map(|obj| { - let state = state.clone(); - let bucket = bucket.to_string(); - async move { - let key = obj.key.clone(); - let requested_vid = obj.version_id.clone(); - let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() { - Some(version_id) if version_id != "null" => match state - .storage - .get_object_version_metadata(&bucket, &obj.key, version_id) - .await - { - Ok(metadata) => object_lock::can_delete_object(&metadata, false) - .map_err(|m| { - (S3ErrorCode::AccessDenied.as_str().to_string(), m) - }), - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) - } - }, - _ => match state.storage.head_object(&bucket, &obj.key).await { - Ok(_) => { - match state - .storage - .get_object_metadata(&bucket, &obj.key) - .await - { - Ok(metadata) => object_lock::can_delete_object(&metadata, false) - .map_err(|m| { - ( - S3ErrorCode::AccessDenied.as_str().to_string(), - m, - ) - }), - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) - } - } - } - Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => { - Ok(()) - } - Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => { - Ok(()) - } - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) - } - }, - }; - - let result = match lock_check { - Err(e) => Err(e), - Ok(()) => { - let outcome = match obj.version_id.as_deref() { - Some(version_id) if version_id != "null" => { - state - .storage - .delete_object_version(&bucket, &obj.key, version_id) - .await - } - _ => state.storage.delete_object(&bucket, &obj.key).await, - }; - outcome.map_err(|e| { - let s3err = S3Error::from(e); - (s3err.code.as_str().to_string(), s3err.message) - }) + let results: Vec<( + String, + Option, + Result, + )> = stream::iter(parsed.objects.iter().cloned()) + .map(|obj| { + let state = state.clone(); + let bucket = bucket.to_string(); + async move { + let key = obj.key.clone(); + let requested_vid = obj.version_id.clone(); + let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() { + Some(version_id) if version_id != "null" => match state + .storage + .get_object_version_metadata(&bucket, &obj.key, version_id) + .await + { + Ok(metadata) => object_lock::can_delete_object(&metadata, false) + .map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)), + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) } - }; - (key, requested_vid, result) - } - }) - .buffer_unordered(32) - .collect() - .await; + }, + _ => match state.storage.head_object(&bucket, &obj.key).await { + Ok(_) => match state.storage.get_object_metadata(&bucket, &obj.key).await { + Ok(metadata) => object_lock::can_delete_object(&metadata, false) + .map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)), + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) + } + }, + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), + Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => Ok(()), + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) + } + }, + }; + + let result = match lock_check { + Err(e) => Err(e), + Ok(()) => { + let outcome = match obj.version_id.as_deref() { + Some(version_id) if version_id != "null" => { + state + .storage + .delete_object_version(&bucket, &obj.key, version_id) + .await + } + _ => state.storage.delete_object(&bucket, &obj.key).await, + }; + outcome.map_err(|e| { + let s3err = S3Error::from(e); + (s3err.code.as_str().to_string(), s3err.message) + }) + } + }; + (key, requested_vid, result) + } + }) + .buffer_unordered(32) + .collect() + .await; let mut deleted: Vec = Vec::new(); let mut errors: Vec<(String, String, String)> = Vec::new(); @@ -2628,8 +2619,8 @@ async fn range_get_handler( match (enc_info.as_ref(), state.encryption.as_ref()) { (Some(enc_info), Some(enc_svc)) => { let customer_key = extract_sse_c_key(headers); - let has_fast_path = enc_info.chunk_size.is_some() - && enc_info.plaintext_size.is_some(); + let has_fast_path = + enc_info.chunk_size.is_some() && enc_info.plaintext_size.is_some(); if has_fast_path { let plaintext_size = enc_info.plaintext_size.unwrap(); diff --git a/crates/myfsio-server/src/handlers/select.rs b/crates/myfsio-server/src/handlers/select.rs index cf77f80..59925c5 100644 --- a/crates/myfsio-server/src/handlers/select.rs +++ b/crates/myfsio-server/src/handlers/select.rs @@ -511,11 +511,20 @@ fn s3_error_response(err: S3Error) -> Response { } else { err.resource.clone() }; + let code_str = err.code.as_str(); let body = err .with_resource(resource) .with_request_id(uuid::Uuid::new_v4().simple().to_string()) .to_xml(); - (status, [("content-type", "application/xml")], body).into_response() + ( + status, + [ + ("content-type", "application/xml"), + ("x-amz-error-code", code_str), + ], + body, + ) + .into_response() } fn build_stats_xml(bytes_scanned: usize, bytes_returned: usize) -> String { diff --git a/crates/myfsio-server/src/handlers/ui_api.rs b/crates/myfsio-server/src/handlers/ui_api.rs index 3e3b494..de70ecd 100644 --- a/crates/myfsio-server/src/handlers/ui_api.rs +++ b/crates/myfsio-server/src/handlers/ui_api.rs @@ -129,10 +129,10 @@ fn storage_status(err: &StorageError) -> StatusCode { | StorageError::QuotaExceeded(_) => StatusCode::BAD_REQUEST, StorageError::BucketAlreadyExists(_) => StatusCode::CONFLICT, StorageError::BucketNotEmpty(_) => StatusCode::CONFLICT, - StorageError::Io(_) - | StorageError::Json(_) - | StorageError::Internal(_) - | StorageError::ObjectCorrupted { .. } => StatusCode::INTERNAL_SERVER_ERROR, + StorageError::ObjectCorrupted { .. } => StatusCode::UNPROCESSABLE_ENTITY, + StorageError::Io(_) | StorageError::Json(_) | StorageError::Internal(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } } } diff --git a/crates/myfsio-server/src/handlers/ui_pages.rs b/crates/myfsio-server/src/handlers/ui_pages.rs index 2214609..eb4c5fe 100644 --- a/crates/myfsio-server/src/handlers/ui_pages.rs +++ b/crates/myfsio-server/src/handlers/ui_pages.rs @@ -432,8 +432,10 @@ pub async fn bucket_detail( .get_versioning_status(&bucket_name) .await .unwrap_or(myfsio_common::types::VersioningStatus::Disabled); - let versioning_enabled = - matches!(versioning_status_enum, myfsio_common::types::VersioningStatus::Enabled); + let versioning_enabled = matches!( + versioning_status_enum, + myfsio_common::types::VersioningStatus::Enabled + ); let versioning_suspended = matches!( versioning_status_enum, myfsio_common::types::VersioningStatus::Suspended diff --git a/crates/myfsio-server/src/lib.rs b/crates/myfsio-server/src/lib.rs index 9c74f94..ada074e 100644 --- a/crates/myfsio-server/src/lib.rs +++ b/crates/myfsio-server/src/lib.rs @@ -324,7 +324,9 @@ pub fn create_ui_router(state: state::AppState) -> Router { axum::http::header::CACHE_CONTROL, axum::http::HeaderValue::from_static("no-cache"), )) - .service(tower_http::services::ServeDir::new(&state.config.static_dir)); + .service(tower_http::services::ServeDir::new( + &state.config.static_dir, + )); protected .merge(public) diff --git a/crates/myfsio-server/src/middleware/auth.rs b/crates/myfsio-server/src/middleware/auth.rs index 494c4f5..c36b563 100644 --- a/crates/myfsio-server/src/middleware/auth.rs +++ b/crates/myfsio-server/src/middleware/auth.rs @@ -1449,9 +1449,18 @@ fn error_response(err: S3Error, resource: &str) -> Response { let status = StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let request_id = uuid::Uuid::new_v4().simple().to_string(); + let code_str = err.code.as_str(); let body = err .with_resource(resource.to_string()) .with_request_id(request_id) .to_xml(); - (status, [("content-type", "application/xml")], body).into_response() + ( + status, + [ + ("content-type", "application/xml"), + ("x-amz-error-code", code_str), + ], + body, + ) + .into_response() } diff --git a/crates/myfsio-server/src/middleware/bucket_cors.rs b/crates/myfsio-server/src/middleware/bucket_cors.rs index 0da5545..d75435e 100644 --- a/crates/myfsio-server/src/middleware/bucket_cors.rs +++ b/crates/myfsio-server/src/middleware/bucket_cors.rs @@ -76,10 +76,7 @@ fn find_matching_rule<'a>( request_headers: &[&str], ) -> Option<&'a CorsRule> { rules.iter().find(|rule| { - let origin_match = rule - .allowed_origins - .iter() - .any(|p| match_origin(p, origin)); + let origin_match = rule.allowed_origins.iter().any(|p| match_origin(p, origin)); if !origin_match { return false; } @@ -104,9 +101,7 @@ fn find_matching_rule_for_actual<'a>( method: &str, ) -> Option<&'a CorsRule> { rules.iter().find(|rule| { - rule.allowed_origins - .iter() - .any(|p| match_origin(p, origin)) + rule.allowed_origins.iter().any(|p| match_origin(p, origin)) && rule .allowed_methods .iter() diff --git a/crates/myfsio-server/src/middleware/ratelimit.rs b/crates/myfsio-server/src/middleware/ratelimit.rs index e1c624d..036b096 100644 --- a/crates/myfsio-server/src/middleware/ratelimit.rs +++ b/crates/myfsio-server/src/middleware/ratelimit.rs @@ -182,9 +182,7 @@ fn too_many_requests(retry_after: u64, resource: &str) -> Response { ) .into_response(); if let Ok(value) = request_id.parse() { - response - .headers_mut() - .insert("x-amz-request-id", value); + response.headers_mut().insert("x-amz-request-id", value); } response } diff --git a/crates/myfsio-server/src/services/gc.rs b/crates/myfsio-server/src/services/gc.rs index a72fabc..3138ef2 100644 --- a/crates/myfsio-server/src/services/gc.rs +++ b/crates/myfsio-server/src/services/gc.rs @@ -276,10 +276,7 @@ impl GcService { if !ts_path.is_dir() { continue; } - let modified = ts_entry - .metadata() - .ok() - .and_then(|m| m.modified().ok()); + let modified = ts_entry.metadata().ok().and_then(|m| m.modified().ok()); let Some(modified) = modified else { continue; }; diff --git a/crates/myfsio-server/src/services/integrity.rs b/crates/myfsio-server/src/services/integrity.rs index d81985e..d1eb4e4 100644 --- a/crates/myfsio-server/src/services/integrity.rs +++ b/crates/myfsio-server/src/services/integrity.rs @@ -391,15 +391,13 @@ async fn heal_corrupted( } } - if live_path.exists() { - if let Err(e) = std::fs::rename(&live_path, &quarantine_full) { - tracing::error!( - "Heal {}/{}: quarantine rename failed: {}", - bucket, - key, - e - ); - return HealStatus::Failed; + { + let _guard = storage.lock_object_write(bucket, key); + if live_path.exists() { + if let Err(e) = std::fs::rename(&live_path, &quarantine_full) { + tracing::error!("Heal {}/{}: quarantine rename failed: {}", bucket, key, e); + return HealStatus::Failed; + } } } @@ -421,14 +419,30 @@ async fn heal_corrupted( .await { HealOutcome::Healed { peer_etag, bytes } => { - if let Err(e) = atomic_swap(&temp_path, &live_path) { + let swap_result = { + let _guard = storage.lock_object_write(bucket, key); + if live_path.exists() { + let _ = std::fs::remove_file(&temp_path); + tracing::info!( + "Heal {}/{}: concurrent PUT raced; preserving fresh write", + bucket, + key + ); + return HealStatus::Skipped; + } + atomic_swap(&temp_path, &live_path) + }; + if let Err(e) = swap_result { tracing::error!( "Heal {}/{}: atomic swap failed: {} (restoring from quarantine)", bucket, key, e ); - let _ = std::fs::rename(&quarantine_full, &live_path); + let _guard = storage.lock_object_write(bucket, key); + if !live_path.exists() { + let _ = std::fs::rename(&quarantine_full, &live_path); + } let _ = std::fs::remove_file(&temp_path); return HealStatus::Failed; } @@ -444,8 +458,7 @@ async fn heal_corrupted( } HealOutcome::PeerMismatch { stored, peer } => { let msg = format!("peer etag {} != stored {}", peer, stored); - let _ = - poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; tracing::warn!("Heal {}/{}: peer mismatch ({}), poisoned", bucket, key, msg); return HealStatus::PeerMismatch; } @@ -460,14 +473,15 @@ async fn heal_corrupted( "etag mismatch (stored={}, actual={}) — peer unavailable: {}", stored_etag, actual_etag, error ); - let _ = - poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; return HealStatus::PeerUnavailable; } HealOutcome::VerifyFailed { expected, actual } => { - let msg = format!("peer download verify failed: expected={} actual={}", expected, actual); - let _ = - poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + let msg = format!( + "peer download verify failed: expected={} actual={}", + expected, actual + ); + let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; tracing::warn!("Heal {}/{}: {}", bucket, key, msg); return HealStatus::VerifyFailed; } @@ -476,8 +490,7 @@ async fn heal_corrupted( "etag mismatch (stored={}, actual={}); no peer configured", stored_etag, actual_etag ); - let _ = - poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; + let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await; return HealStatus::Poisoned; } } @@ -512,12 +525,22 @@ async fn heal_stale_version(storage_root: &Path, bucket: &str, key: &str) -> Hea .join(key); if let Some(parent) = dst.parent() { if let Err(e) = std::fs::create_dir_all(parent) { - tracing::error!("Stale-version quarantine mkdir failed {}/{}: {}", bucket, key, e); + tracing::error!( + "Stale-version quarantine mkdir failed {}/{}: {}", + bucket, + key, + e + ); return HealStatus::Failed; } } if let Err(e) = std::fs::rename(&src, &dst) { - tracing::error!("Stale-version quarantine rename failed {}/{}: {}", bucket, key, e); + tracing::error!( + "Stale-version quarantine rename failed {}/{}: {}", + bucket, + key, + e + ); return HealStatus::Failed; } tracing::info!("Quarantined stale version {}/{}", bucket, key); @@ -577,11 +600,7 @@ async fn heal_etag_cache( } } -async fn heal_phantom_metadata( - storage: &FsStorageBackend, - bucket: &str, - key: &str, -) -> HealStatus { +async fn heal_phantom_metadata(storage: &FsStorageBackend, bucket: &str, key: &str) -> HealStatus { match storage.delete_object_metadata_entry(bucket, key).await { Ok(_) => { tracing::info!("Dropped phantom metadata for {}/{}", bucket, key); @@ -1062,6 +1081,9 @@ fn check_stale_versions( } state.objects_scanned += 1; if !bin_stems.contains_key(stem) { + if manifest_is_delete_marker(path) { + continue; + } state.stale_versions += 1; let key = path .strip_prefix(&versions_root) @@ -1080,6 +1102,19 @@ fn check_stale_versions( } } +fn manifest_is_delete_marker(path: &Path) -> bool { + let Ok(content) = std::fs::read_to_string(path) else { + return false; + }; + let Ok(value) = serde_json::from_str::(&content) else { + return false; + }; + value + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) +} + fn check_etag_cache( state: &mut ScanState, storage_root: &Path, @@ -1265,7 +1300,58 @@ mod tests { .unwrap(); let state = scan_all_buckets(root, 10_000); - assert_eq!(state.corrupted_objects, 0, "poisoned entries must not re-flag"); + assert_eq!( + state.corrupted_objects, 0, + "poisoned entries must not re-flag" + ); + } + + #[test] + fn delete_marker_manifests_are_not_flagged_stale() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let bucket = "vbucket"; + fs::create_dir_all(root.join(bucket)).unwrap(); + + let versions_dir = root + .join(SYSTEM_ROOT) + .join(SYSTEM_BUCKETS_DIR) + .join(bucket) + .join(BUCKET_VERSIONS_DIR) + .join("v.txt"); + fs::create_dir_all(&versions_dir).unwrap(); + + let dm = json!({ + "version_id": "dm-vid-1", + "key": "v.txt", + "size": 0, + "etag": "", + "is_delete_marker": true, + }); + fs::write( + versions_dir.join("dm-vid-1.json"), + serde_json::to_string(&dm).unwrap(), + ) + .unwrap(); + + let truly_stale = json!({ + "version_id": "broken-vid-2", + "key": "v.txt", + "size": 12, + "etag": "abc", + "is_delete_marker": false, + }); + fs::write( + versions_dir.join("broken-vid-2.json"), + serde_json::to_string(&truly_stale).unwrap(), + ) + .unwrap(); + + let state = scan_all_buckets(root, 10_000); + assert_eq!( + state.stale_versions, 1, + "delete-marker manifest must not be flagged; only the data-bearing orphan should count" + ); } #[test] @@ -1332,10 +1418,7 @@ mod tests { write_index( &meta_root, - &[( - "multi.bin", - "deadbeefdeadbeefdeadbeefdeadbeef-3", - )], + &[("multi.bin", "deadbeefdeadbeefdeadbeefdeadbeef-3")], ); let state = scan_all_buckets(root, 10_000); @@ -1343,6 +1426,10 @@ mod tests { state.corrupted_objects, 0, "multipart-style ETags must not be checked against whole-body MD5" ); - assert!(state.errors.is_empty(), "unexpected errors: {:?}", state.errors); + assert!( + state.errors.is_empty(), + "unexpected errors: {:?}", + state.errors + ); } } diff --git a/crates/myfsio-server/src/services/peer_fetch.rs b/crates/myfsio-server/src/services/peer_fetch.rs index 687360e..e1c06e7 100644 --- a/crates/myfsio-server/src/services/peer_fetch.rs +++ b/crates/myfsio-server/src/services/peer_fetch.rs @@ -158,6 +158,12 @@ impl PeerFetcher { }; } + if is_multipart_etag(expected_etag) { + return self + .fetch_multipart_for_heal(&client, &target_bucket, key, expected_etag, dest_path) + .await; + } + let resp = match client .get_object() .bucket(&target_bucket) @@ -225,7 +231,7 @@ impl PeerFetcher { drop(file); let actual = format!("{:x}", hasher.finalize()); - if !is_multipart_etag(expected_etag) && actual != expected_etag { + if actual != expected_etag { let _ = tokio::fs::remove_file(dest_path).await; return HealOutcome::VerifyFailed { expected: expected_etag.to_string(), @@ -238,6 +244,129 @@ impl PeerFetcher { bytes: total, } } + + async fn fetch_multipart_for_heal( + &self, + client: &Client, + target_bucket: &str, + key: &str, + expected_etag: &str, + dest_path: &Path, + ) -> HealOutcome { + let part_count = match expected_etag + .split_once('-') + .and_then(|(_, n)| n.parse::().ok()) + { + Some(n) if n >= 1 => n, + _ => { + return HealOutcome::VerifyFailed { + expected: expected_etag.to_string(), + actual: format!("unparseable multipart suffix in {}", expected_etag), + }; + } + }; + + if let Some(parent) = dest_path.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + return HealOutcome::PeerUnavailable { + error: format!("mkdir parent: {}", e), + }; + } + } + + let mut file = match tokio::fs::File::create(dest_path).await { + Ok(f) => f, + Err(e) => { + return HealOutcome::PeerUnavailable { + error: format!("create temp: {}", e), + }; + } + }; + + let mut composite = Md5::new(); + let mut total: u64 = 0; + let mut buf = vec![0u8; 64 * 1024]; + + for part_no in 1..=part_count { + let part_no_i32 = part_no as i32; + let resp = match client + .get_object() + .bucket(target_bucket) + .key(key) + .part_number(part_no_i32) + .send() + .await + { + Ok(r) => r, + Err(err) => { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::PeerUnavailable { + error: format!("GetObject part {}: {:?}", part_no, err), + }; + } + }; + + let mut reader = resp.body.into_async_read(); + let mut part_hasher = Md5::new(); + let mut part_bytes: u64 = 0; + loop { + let n = match reader.read(&mut buf).await { + Ok(n) => n, + Err(e) => { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::PeerUnavailable { + error: format!("read part {}: {}", part_no, e), + }; + } + }; + if n == 0 { + break; + } + part_hasher.update(&buf[..n]); + if let Err(e) = file.write_all(&buf[..n]).await { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::PeerUnavailable { + error: format!("write part {}: {}", part_no, e), + }; + } + part_bytes += n as u64; + } + if part_bytes == 0 { + drop(file); + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::VerifyFailed { + expected: expected_etag.to_string(), + actual: format!("part {} returned zero bytes", part_no), + }; + } + composite.update(part_hasher.finalize().as_slice()); + total += part_bytes; + } + + if let Err(e) = file.flush().await { + return HealOutcome::PeerUnavailable { + error: format!("flush temp: {}", e), + }; + } + drop(file); + + let composite_etag = format!("{:x}-{}", composite.finalize(), part_count); + if composite_etag != expected_etag { + let _ = tokio::fs::remove_file(dest_path).await; + return HealOutcome::VerifyFailed { + expected: expected_etag.to_string(), + actual: composite_etag, + }; + } + + HealOutcome::Healed { + peer_etag: expected_etag.to_string(), + bytes: total, + } + } } #[cfg(test)] diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index 965dc7e..e747756 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -2647,7 +2647,11 @@ async fn test_consecutive_slashes_in_key_round_trip() { let (app, _tmp) = test_app(); app.clone() - .oneshot(signed_request(Method::PUT, "/slashes-bucket", Body::empty())) + .oneshot(signed_request( + Method::PUT, + "/slashes-bucket", + Body::empty(), + )) .await .unwrap(); @@ -2750,7 +2754,11 @@ async fn test_delete_live_version_restores_previous_to_live_slot() { let (app, _tmp) = test_app(); app.clone() - .oneshot(signed_request(Method::PUT, "/restore-bucket", Body::empty())) + .oneshot(signed_request( + Method::PUT, + "/restore-bucket", + Body::empty(), + )) .await .unwrap(); app.clone() @@ -2952,11 +2960,7 @@ async fn test_versioned_get_on_delete_marker_returns_method_not_allowed() { .unwrap(); app.clone() - .oneshot(signed_request( - Method::PUT, - "/dm-bucket/k", - Body::from("x"), - )) + .oneshot(signed_request(Method::PUT, "/dm-bucket/k", Body::from("x"))) .await .unwrap(); @@ -4948,7 +4952,9 @@ async fn test_kms_encrypt_decrypt() { } fn deterministic_payload(len: usize) -> Vec { - (0..len).map(|i| ((i * 2654435761usize) >> 16) as u8).collect() + (0..len) + .map(|i| ((i * 2654435761usize) >> 16) as u8) + .collect() } async fn put_sse_s3( @@ -5000,7 +5006,12 @@ async fn range_get( } async fn body_bytes(resp: axum::http::Response) -> Vec { - resp.into_body().collect().await.unwrap().to_bytes().to_vec() + resp.into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec() } #[tokio::test] diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index 6186b82..569e66a 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -55,7 +55,11 @@ fn fs_encode_key(key: &str) -> String { let trailing = key.ends_with('/'); let body = if trailing { &key[..key.len() - 1] } else { key }; if body.is_empty() { - return if trailing { "/".to_string() } else { String::new() }; + return if trailing { + "/".to_string() + } else { + String::new() + }; } let encoded: Vec = body .split('/') @@ -463,6 +467,14 @@ impl FsStorageBackend { &self.object_lock_stripes[idx] } + pub fn lock_object_write( + &self, + bucket: &str, + key: &str, + ) -> parking_lot::RwLockWriteGuard<'_, ()> { + self.get_object_lock(bucket, key).write() + } + fn prune_meta_read_cache(&self) { if self.object_cache_max_size == 0 { self.meta_read_cache.clear(); @@ -772,11 +784,7 @@ impl FsStorageBackend { Ok(()) } - pub async fn delete_object_metadata_entry( - &self, - bucket: &str, - key: &str, - ) -> StorageResult<()> { + pub async fn delete_object_metadata_entry(&self, bucket: &str, key: &str) -> StorageResult<()> { run_blocking(|| { let _guard = self.get_object_lock(bucket, key).write(); self.delete_metadata_sync(bucket, key) @@ -1123,11 +1131,7 @@ impl FsStorageBackend { Ok(Some(version_id)) } - fn write_delete_marker_sync( - &self, - bucket_name: &str, - key: &str, - ) -> std::io::Result { + fn write_delete_marker_sync(&self, bucket_name: &str, key: &str) -> std::io::Result { let version_dir = self.version_dir(bucket_name, key); std::fs::create_dir_all(&version_dir)?; let now = Utc::now(); @@ -1197,7 +1201,9 @@ impl FsStorageBackend { self.validate_key(key)?; Self::validate_version_id(bucket_name, key, version_id)?; - if let Some(record_and_path) = self.try_live_version_record_sync(bucket_name, key, version_id) { + if let Some(record_and_path) = + self.try_live_version_record_sync(bucket_name, key, version_id) + { return Ok(record_and_path); } @@ -1523,9 +1529,7 @@ impl FsStorageBackend { let (etag, version_id) = if is_dir_marker { (None, None) } else { - idx.get(name_str.as_ref()) - .cloned() - .unwrap_or((None, None)) + idx.get(name_str.as_ref()).cloned().unwrap_or((None, None)) }; let key = fs_decode_key(&fs_rel); @@ -2190,7 +2194,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { detail: metadata_corruption_detail(&stored_meta), }); } - if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if self + .read_bucket_config_sync(bucket) + .versioning_status() + .is_active() + { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { bucket: bucket.to_string(), @@ -2270,7 +2278,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { detail: metadata_corruption_detail(&stored_meta), }); } - if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if self + .read_bucket_config_sync(bucket) + .versioning_status() + .is_active() + { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { bucket: bucket.to_string(), @@ -2292,7 +2304,8 @@ impl crate::traits::StorageEngine for FsStorageBackend { return Err(StorageError::InvalidRange); } if start > 0 { - file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + file.seek(SeekFrom::Start(start)) + .map_err(StorageError::Io)?; } let mtime = meta @@ -2360,7 +2373,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { detail: metadata_corruption_detail(&stored_meta), }); } - if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if self + .read_bucket_config_sync(bucket) + .versioning_status() + .is_active() + { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { bucket: bucket.to_string(), @@ -2460,7 +2477,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { detail: metadata_corruption_detail(&stored_meta), }); } - if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if self + .read_bucket_config_sync(bucket) + .versioning_status() + .is_active() + { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { bucket: bucket.to_string(), @@ -2595,7 +2616,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { detail: metadata_corruption_detail(&stored_meta), }); } - if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if self + .read_bucket_config_sync(bucket) + .versioning_status() + .is_active() + { if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { return Err(StorageError::DeleteMarker { bucket: bucket.to_string(), @@ -2701,7 +2726,8 @@ impl crate::traits::StorageEngine for FsStorageBackend { return Err(StorageError::InvalidRange); } if start > 0 { - file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + file.seek(SeekFrom::Start(start)) + .map_err(StorageError::Io)?; } let obj = self.object_meta_from_version_record(key, &record, &data_path)?; Ok((obj, file)) @@ -2930,45 +2956,47 @@ impl crate::traits::StorageEngine for FsStorageBackend { // guard is released at the end of this block before we take the dst // write guard, so even when src == dst (same stripe) there's no // upgrade deadlock. - let copy_res = run_blocking(|| -> StorageResult<(String, u64, HashMap)> { - let _src_guard = self.get_object_lock(src_bucket, src_key).read(); - let src_path = self.object_path(src_bucket, src_key)?; - if !src_path.is_file() { - return Err(StorageError::ObjectNotFound { - bucket: src_bucket.to_string(), - key: src_key.to_string(), - }); - } - - use std::io::{BufReader, BufWriter, Read, Write}; - let src_file = std::fs::File::open(&src_path).map_err(StorageError::Io)?; - let mut reader = BufReader::with_capacity(chunk_size, src_file); - let tmp_file = std::fs::File::create(&tmp_path).map_err(StorageError::Io)?; - let mut writer = BufWriter::with_capacity(chunk_size * 4, tmp_file); - let mut hasher = Md5::new(); - let mut buf = vec![0u8; chunk_size]; - let mut total: u64 = 0; - loop { - let n = reader.read(&mut buf).map_err(StorageError::Io)?; - if n == 0 { - break; + let copy_res = run_blocking( + || -> StorageResult<(String, u64, HashMap)> { + let _src_guard = self.get_object_lock(src_bucket, src_key).read(); + let src_path = self.object_path(src_bucket, src_key)?; + if !src_path.is_file() { + return Err(StorageError::ObjectNotFound { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + }); } - hasher.update(&buf[..n]); - writer.write_all(&buf[..n]).map_err(StorageError::Io)?; - total += n as u64; - } - writer.flush().map_err(StorageError::Io)?; - let src_metadata = self.read_metadata_sync(src_bucket, src_key); - if metadata_is_corrupted(&src_metadata) { - return Err(StorageError::ObjectCorrupted { - bucket: src_bucket.to_string(), - key: src_key.to_string(), - detail: metadata_corruption_detail(&src_metadata), - }); - } - Ok((format!("{:x}", hasher.finalize()), total, src_metadata)) - }); + use std::io::{BufReader, BufWriter, Read, Write}; + let src_file = std::fs::File::open(&src_path).map_err(StorageError::Io)?; + let mut reader = BufReader::with_capacity(chunk_size, src_file); + let tmp_file = std::fs::File::create(&tmp_path).map_err(StorageError::Io)?; + let mut writer = BufWriter::with_capacity(chunk_size * 4, tmp_file); + let mut hasher = Md5::new(); + let mut buf = vec![0u8; chunk_size]; + let mut total: u64 = 0; + loop { + let n = reader.read(&mut buf).map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + writer.write_all(&buf[..n]).map_err(StorageError::Io)?; + total += n as u64; + } + writer.flush().map_err(StorageError::Io)?; + + let src_metadata = self.read_metadata_sync(src_bucket, src_key); + if metadata_is_corrupted(&src_metadata) { + return Err(StorageError::ObjectCorrupted { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + detail: metadata_corruption_detail(&src_metadata), + }); + } + Ok((format!("{:x}", hasher.finalize()), total, src_metadata)) + }, + ); let (etag, new_size, src_metadata) = match copy_res { Ok(v) => v, @@ -3179,79 +3207,77 @@ impl crate::traits::StorageEngine for FsStorageBackend { // between our metadata read and our file open, we'd otherwise record // the old size/last_modified in the manifest but copy bytes from the // new version. - let copy_res = run_blocking( - || -> StorageResult<(String, u64, DateTime)> { - let _guard = self.get_object_lock(src_bucket, src_key).read(); + let copy_res = run_blocking(|| -> StorageResult<(String, u64, DateTime)> { + let _guard = self.get_object_lock(src_bucket, src_key).read(); - let src_path = self.object_path(src_bucket, src_key)?; - if !src_path.is_file() { - return Err(StorageError::ObjectNotFound { - bucket: src_bucket.to_string(), - key: src_key.to_string(), - }); - } + let src_path = self.object_path(src_bucket, src_key)?; + if !src_path.is_file() { + return Err(StorageError::ObjectNotFound { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + }); + } - use std::io::{BufWriter, Read, Seek, SeekFrom, Write}; - // Open first so subsequent metadata/seek/read are all - // anchored to the same inode, even if a later rename swaps - // the path after we release the guard. - let mut src = std::fs::File::open(&src_path).map_err(StorageError::Io)?; - let src_meta = src.metadata().map_err(StorageError::Io)?; - let src_size = src_meta.len(); - let src_mtime = src_meta - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_secs_f64()) - .unwrap_or(0.0); - let last_modified = Utc - .timestamp_opt( - src_mtime as i64, - ((src_mtime % 1.0) * 1_000_000_000.0) as u32, - ) - .single() - .unwrap_or_else(Utc::now); + use std::io::{BufWriter, Read, Seek, SeekFrom, Write}; + // Open first so subsequent metadata/seek/read are all + // anchored to the same inode, even if a later rename swaps + // the path after we release the guard. + let mut src = std::fs::File::open(&src_path).map_err(StorageError::Io)?; + let src_meta = src.metadata().map_err(StorageError::Io)?; + let src_size = src_meta.len(); + let src_mtime = src_meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let last_modified = Utc + .timestamp_opt( + src_mtime as i64, + ((src_mtime % 1.0) * 1_000_000_000.0) as u32, + ) + .single() + .unwrap_or_else(Utc::now); - let (start, end) = match range { - Some((s, e)) => { - if s >= src_size || e >= src_size || s > e { - return Err(StorageError::InvalidRange); - } - (s, e) + let (start, end) = match range { + Some((s, e)) => { + if s >= src_size || e >= src_size || s > e { + return Err(StorageError::InvalidRange); } - None => { - if src_size == 0 { - (0u64, 0u64) - } else { - (0u64, src_size - 1) - } + (s, e) + } + None => { + if src_size == 0 { + (0u64, 0u64) + } else { + (0u64, src_size - 1) } - }; - let length = if src_size == 0 { 0 } else { end - start + 1 }; + } + }; + let length = if src_size == 0 { 0 } else { end - start + 1 }; - if start > 0 { - src.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + if start > 0 { + src.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + } + let mut src = std::io::BufReader::with_capacity(chunk_size, src); + let dst = std::fs::File::create(&tmp_file).map_err(StorageError::Io)?; + let mut dst = BufWriter::with_capacity(chunk_size * 4, dst); + let mut hasher = Md5::new(); + let mut remaining = length; + let mut buf = vec![0u8; chunk_size]; + while remaining > 0 { + let to_read = std::cmp::min(remaining as usize, buf.len()); + let n = src.read(&mut buf[..to_read]).map_err(StorageError::Io)?; + if n == 0 { + break; } - let mut src = std::io::BufReader::with_capacity(chunk_size, src); - let dst = std::fs::File::create(&tmp_file).map_err(StorageError::Io)?; - let mut dst = BufWriter::with_capacity(chunk_size * 4, dst); - let mut hasher = Md5::new(); - let mut remaining = length; - let mut buf = vec![0u8; chunk_size]; - while remaining > 0 { - let to_read = std::cmp::min(remaining as usize, buf.len()); - let n = src.read(&mut buf[..to_read]).map_err(StorageError::Io)?; - if n == 0 { - break; - } - hasher.update(&buf[..n]); - dst.write_all(&buf[..n]).map_err(StorageError::Io)?; - remaining -= n as u64; - } - dst.flush().map_err(StorageError::Io)?; - Ok((format!("{:x}", hasher.finalize()), length, last_modified)) - }, - ); + hasher.update(&buf[..n]); + dst.write_all(&buf[..n]).map_err(StorageError::Io)?; + remaining -= n as u64; + } + dst.flush().map_err(StorageError::Io)?; + Ok((format!("{:x}", hasher.finalize()), length, last_modified)) + }); let (etag, length, last_modified) = match copy_res { Ok(v) => v, @@ -3336,8 +3362,8 @@ impl crate::traits::StorageEngine for FsStorageBackend { let mut buf = vec![0u8; chunk_size]; for part_info in &part_infos { - let part_file = upload_dir_owned - .join(format!("part-{:05}.part", part_info.part_number)); + let part_file = + upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number)); if !part_file.exists() { return Err(StorageError::InvalidObjectKey(format!( "Part {} not found", @@ -4260,9 +4286,11 @@ mod tests { std::fs::create_dir_all(&tmp_dir).unwrap(); // Seed with known content. - let data: AsyncReadStream = - Box::pin(std::io::Cursor::new(vec![b'a'; 4096])); - backend.put_object("link-bkt", "hot", data, None).await.unwrap(); + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'a'; 4096])); + backend + .put_object("link-bkt", "hot", data, None) + .await + .unwrap(); let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); let mut handles = Vec::new(); @@ -4343,8 +4371,7 @@ mod tests { let backend = StdArc::new(backend); backend.create_bucket("snap-bkt").await.unwrap(); - let data: AsyncReadStream = - Box::pin(std::io::Cursor::new(vec![b'a'; 1024])); + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'a'; 1024])); backend .put_object("snap-bkt", "sz", data, None) .await @@ -4424,7 +4451,10 @@ mod tests { const SIZE: u64 = 256 * 1024; let seed = vec![b'a'; SIZE as usize]; let data: AsyncReadStream = Box::pin(std::io::Cursor::new(seed)); - backend.put_object("range-bkt", "hot", data, None).await.unwrap(); + backend + .put_object("range-bkt", "hot", data, None) + .await + .unwrap(); let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); let mut handles = Vec::new(); @@ -4455,8 +4485,9 @@ mod tests { while !stop.load(Ordering::Relaxed) { let start = 1000u64; let len = 4000u64; - if let Ok((meta, mut stream)) = - b.get_object_range("range-bkt", "hot", start, Some(len)).await + if let Ok((meta, mut stream)) = b + .get_object_range("range-bkt", "hot", start, Some(len)) + .await { let mut buf = Vec::with_capacity(len as usize); if stream.read_to_end(&mut buf).await.is_ok() && !buf.is_empty() { @@ -4466,10 +4497,8 @@ mod tests { // that byte at full object size. let fill = buf[0]; let all_match = buf.iter().all(|b| *b == fill); - let expected_etag = format!( - "{:x}", - Md5::digest(&vec![fill; SIZE as usize]) - ); + let expected_etag = + format!("{:x}", Md5::digest(&vec![fill; SIZE as usize])); let etag_ok = meta.etag.as_deref() == Some(expected_etag.as_str()); reads.fetch_add(1, Ordering::Relaxed); if !(all_match && etag_ok) { @@ -4556,9 +4585,7 @@ mod tests { Err(_) => continue, }; let res = b - .upload_part_copy( - "mp-bkt", &upload_id, 1, "mp-bkt", "src", None, - ) + .upload_part_copy("mp-bkt", &upload_id, 1, "mp-bkt", "src", None) .await; if let Ok((etag, _lm)) = res { // The part etag is the MD5 of the copied bytes; it @@ -4583,7 +4610,11 @@ mod tests { let o = ops.load(Ordering::Relaxed); let x = bad.load(Ordering::Relaxed); - assert!(o >= 4, "expected at least a few upload_part_copy ops, got {}", o); + assert!( + o >= 4, + "expected at least a few upload_part_copy ops, got {}", + o + ); assert_eq!( x, 0, "observed {} upload_part_copy results with etag unrelated to source content (out of {})", diff --git a/crates/myfsio-storage/src/validation.rs b/crates/myfsio-storage/src/validation.rs index 80c4fb3..ade1b1e 100644 --- a/crates/myfsio-storage/src/validation.rs +++ b/crates/myfsio-storage/src/validation.rs @@ -47,7 +47,6 @@ pub fn validate_object_key( normalized.split('/').collect() }; - for part in &parts { if part.is_empty() { continue; diff --git a/docs.md b/docs.md index 1bac34f..c1d36bb 100644 --- a/docs.md +++ b/docs.md @@ -336,7 +336,7 @@ When `INTEGRITY_AUTO_HEAL=true` (and `INTEGRITY_DRY_RUN=false`), each scan ends 1. **Pull from peer.** If a replication rule for the bucket points at a healthy remote whose `HEAD` returns the same ETag the local index has, the body is streamed to a temp file, MD5-verified against the stored ETag, and atomically swapped into the live path. The poison flags are cleared on success. 2. **Poison the entry.** If there is no replication target, the peer disagrees on the ETag, the peer is unreachable, or the downloaded body fails verification, the index entry is mutated to add `__corrupted__: "true"`, `__corrupted_at__`, `__corruption_detail__`, and `__quarantine_path__`. The data file stays in quarantine for `INTEGRITY_QUARANTINE_RETENTION_DAYS`. -Subsequent reads (`GET`, `HEAD`, `CopyObject` source) on a poisoned key return `500 ObjectCorrupted` instead of serving rotted bytes; replication push skips poisoned keys; subsequent integrity scans skip poisoned keys instead of re-flagging them. Overwriting the key with a fresh `PUT` clears the poison. +Subsequent reads (`GET`, `HEAD`, `CopyObject` source) on a poisoned key return `422 ObjectCorrupted` instead of serving rotted bytes; the response includes an `x-amz-error-code: ObjectCorrupted` header so HEAD callers (which receive no body) can still detect the condition. Replication push skips poisoned keys; subsequent integrity scans skip poisoned keys instead of re-flagging them. Overwriting the key with a fresh `PUT` clears the poison. `stale_version`, `etag_cache_inconsistency`, and `phantom_metadata` issues are healed locally (move-to-quarantine, rebuild cache, drop entry); `orphaned_object` is reported only.