From 5aba9ac9e980ff4612a64fb4ac2441d371c4cd95 Mon Sep 17 00:00:00 2001 From: kqjy Date: Fri, 24 Apr 2026 18:45:22 +0800 Subject: [PATCH] Add snapshot/range storage primitives, gate GET preconditions on served snapshot, support partial-decrypt Range GET for SSE-encrypted objects --- Cargo.lock | 13 +- Cargo.toml | 4 +- crates/myfsio-common/src/types.rs | 3 + crates/myfsio-crypto/src/aes_gcm.rs | 292 ++++ crates/myfsio-crypto/src/encryption.rs | 102 +- crates/myfsio-server/src/handlers/mod.rs | 546 +++++--- crates/myfsio-server/src/state.rs | 1 + crates/myfsio-server/tests/integration.rs | 194 +++ crates/myfsio-storage/Cargo.toml | 1 + crates/myfsio-storage/src/fs_backend.rs | 1560 +++++++++++++++++---- crates/myfsio-storage/src/traits.rs | 45 + 11 files changed, 2219 insertions(+), 542 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ec8b6e..4ca53d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2639,7 +2639,7 @@ dependencies = [ [[package]] name = "myfsio-auth" -version = "0.4.4" +version = "0.4.5" dependencies = [ "aes", "base64", @@ -2664,7 +2664,7 @@ dependencies = [ [[package]] name = "myfsio-common" -version = "0.4.4" +version = "0.4.5" dependencies = [ "chrono", "serde", @@ -2675,7 +2675,7 @@ dependencies = [ [[package]] name = "myfsio-crypto" -version = "0.4.4" +version = "0.4.5" dependencies = [ "aes-gcm", "base64", @@ -2696,7 +2696,7 @@ dependencies = [ [[package]] name = "myfsio-server" -version = "0.4.4" +version = "0.4.5" dependencies = [ "aes-gcm", "async-trait", @@ -2753,7 +2753,7 @@ dependencies = [ [[package]] name = "myfsio-storage" -version = "0.4.4" +version = "0.4.5" dependencies = [ "chrono", "dashmap", @@ -2769,6 +2769,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-util", "tracing", "unicode-normalization", "uuid", @@ -2776,7 +2777,7 @@ dependencies = [ [[package]] name = "myfsio-xml" -version = "0.4.4" +version = "0.4.5" dependencies = [ "chrono", "myfsio-common", diff --git a/Cargo.toml b/Cargo.toml index f7bb2e4..f0a7591 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ ] [workspace.package] -version = "0.4.4" +version = "0.4.5" edition = "2021" [workspace.dependencies] @@ -42,7 +42,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } thiserror = "2" chrono = { version = "0.4", features = ["serde"] } base64 = "0.22" -tokio-util = { version = "0.7", features = ["io"] } +tokio-util = { version = "0.7", features = ["io", "io-util"] } tokio-stream = "0.1" futures = "0.3" dashmap = "6" diff --git a/crates/myfsio-common/src/types.rs b/crates/myfsio-common/src/types.rs index 57d15ad..6977d75 100644 --- a/crates/myfsio-common/src/types.rs +++ b/crates/myfsio-common/src/types.rs @@ -16,6 +16,8 @@ pub struct ObjectMeta { pub version_id: Option, #[serde(default)] pub is_delete_marker: bool, + #[serde(default, skip_serializing)] + pub internal_metadata: HashMap, } impl ObjectMeta { @@ -30,6 +32,7 @@ impl ObjectMeta { metadata: HashMap::new(), version_id: None, is_delete_marker: false, + internal_metadata: HashMap::new(), } } } diff --git a/crates/myfsio-crypto/src/aes_gcm.rs b/crates/myfsio-crypto/src/aes_gcm.rs index b349de3..e7ebd98 100644 --- a/crates/myfsio-crypto/src/aes_gcm.rs +++ b/crates/myfsio-crypto/src/aes_gcm.rs @@ -145,6 +145,113 @@ pub fn decrypt_stream_chunked( Ok(chunk_count) } +const GCM_TAG_LEN: usize = 16; + +pub fn decrypt_stream_chunked_range( + input_path: &Path, + output_path: &Path, + key: &[u8], + base_nonce: &[u8], + chunk_plain_size: usize, + plaintext_size: u64, + plain_start: u64, + plain_end_inclusive: u64, +) -> Result { + if key.len() != 32 { + return Err(CryptoError::InvalidKeySize(key.len())); + } + if base_nonce.len() != 12 { + return Err(CryptoError::InvalidNonceSize(base_nonce.len())); + } + if chunk_plain_size == 0 { + return Err(CryptoError::EncryptionFailed( + "chunk_plain_size must be > 0".into(), + )); + } + if plaintext_size == 0 { + let _ = File::create(output_path)?; + return Ok(0); + } + if plain_start > plain_end_inclusive || plain_end_inclusive >= plaintext_size { + return Err(CryptoError::EncryptionFailed(format!( + "range [{}, {}] invalid for plaintext size {}", + plain_start, plain_end_inclusive, plaintext_size + ))); + } + + let key_arr: [u8; 32] = key.try_into().unwrap(); + let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap(); + let cipher = Aes256Gcm::new(&key_arr.into()); + + let n = chunk_plain_size as u64; + let first_chunk = (plain_start / n) as u32; + let last_chunk = (plain_end_inclusive / n) as u32; + let total_chunks = plaintext_size.div_ceil(n) as u32; + let final_chunk_plain = plaintext_size - (total_chunks as u64 - 1) * n; + + let mut infile = File::open(input_path)?; + + let mut header = [0u8; HEADER_SIZE]; + infile.read_exact(&mut header)?; + let stored_chunk_count = u32::from_be_bytes(header); + if stored_chunk_count != total_chunks { + return Err(CryptoError::EncryptionFailed(format!( + "chunk count mismatch: header says {}, plaintext_size implies {}", + stored_chunk_count, total_chunks + ))); + } + + let mut outfile = File::create(output_path)?; + + let stride = n + GCM_TAG_LEN as u64 + HEADER_SIZE as u64; + let first_offset = HEADER_SIZE as u64 + first_chunk as u64 * stride; + infile.seek(SeekFrom::Start(first_offset))?; + + let mut size_buf = [0u8; HEADER_SIZE]; + let mut bytes_written: u64 = 0; + + for chunk_index in first_chunk..=last_chunk { + infile.read_exact(&mut size_buf)?; + let ct_len = u32::from_be_bytes(size_buf) as usize; + + let expected_plain = if chunk_index + 1 == total_chunks { + final_chunk_plain as usize + } else { + chunk_plain_size + }; + let expected_ct = expected_plain + GCM_TAG_LEN; + if ct_len != expected_ct { + return Err(CryptoError::EncryptionFailed(format!( + "chunk {} stored length {} != expected {} (corrupt file or chunk_size mismatch)", + chunk_index, ct_len, expected_ct + ))); + } + + let mut encrypted = vec![0u8; ct_len]; + infile.read_exact(&mut encrypted)?; + + let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)?; + let nonce = Nonce::from_slice(&nonce_bytes); + let decrypted = cipher + .decrypt(nonce, encrypted.as_ref()) + .map_err(|_| CryptoError::DecryptionFailed(chunk_index))?; + + let chunk_plain_start = chunk_index as u64 * n; + let chunk_plain_end_exclusive = chunk_plain_start + decrypted.len() as u64; + + let slice_start = plain_start.saturating_sub(chunk_plain_start) as usize; + let slice_end = (plain_end_inclusive + 1).min(chunk_plain_end_exclusive); + let slice_end_local = (slice_end - chunk_plain_start) as usize; + + if slice_end_local > slice_start { + outfile.write_all(&decrypted[slice_start..slice_end_local])?; + bytes_written += (slice_end_local - slice_start) as u64; + } + } + + Ok(bytes_written) +} + pub async fn encrypt_stream_chunked_async( input_path: &Path, output_path: &Path, @@ -230,6 +337,191 @@ mod tests { assert!(matches!(result, Err(CryptoError::InvalidKeySize(16)))); } + fn write_file(path: &Path, data: &[u8]) { + std::fs::File::create(path).unwrap().write_all(data).unwrap(); + } + + fn make_encrypted_file( + dir: &Path, + data: &[u8], + key: &[u8; 32], + nonce: &[u8; 12], + chunk: usize, + ) -> std::path::PathBuf { + let input = dir.join("input.bin"); + let encrypted = dir.join("encrypted.bin"); + write_file(&input, data); + encrypt_stream_chunked(&input, &encrypted, key, nonce, Some(chunk)).unwrap(); + encrypted + } + + #[test] + fn test_range_within_single_chunk() { + let dir = tempfile::tempdir().unwrap(); + let data: Vec = (0u8..=255).cycle().take(4096).collect(); + let key = [0x33u8; 32]; + let nonce = [0x07u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 1024); + let out = dir.path().join("range.bin"); + + let n = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 1024, + data.len() as u64, + 200, + 399, + ) + .unwrap(); + assert_eq!(n, 200); + let got = std::fs::read(&out).unwrap(); + assert_eq!(got, &data[200..400]); + } + + #[test] + fn test_range_spanning_multiple_chunks() { + let dir = tempfile::tempdir().unwrap(); + let data: Vec = (0..5000u32).map(|i| (i % 251) as u8).collect(); + let key = [0x44u8; 32]; + let nonce = [0x02u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512); + let out = dir.path().join("range.bin"); + + let n = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 512, + data.len() as u64, + 100, + 2999, + ) + .unwrap(); + assert_eq!(n, 2900); + let got = std::fs::read(&out).unwrap(); + assert_eq!(got, &data[100..3000]); + } + + #[test] + fn test_range_covers_final_partial_chunk() { + let dir = tempfile::tempdir().unwrap(); + let data: Vec = (0..1300u32).map(|i| (i % 71) as u8).collect(); + let key = [0x55u8; 32]; + let nonce = [0x0au8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512); + let out = dir.path().join("range.bin"); + + let n = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 512, + data.len() as u64, + 900, + 1299, + ) + .unwrap(); + assert_eq!(n, 400); + let got = std::fs::read(&out).unwrap(); + assert_eq!(got, &data[900..1300]); + } + + #[test] + fn test_range_full_object() { + let dir = tempfile::tempdir().unwrap(); + let data: Vec = (0..2048u32).map(|i| (i % 13) as u8).collect(); + let key = [0x11u8; 32]; + let nonce = [0x33u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512); + let out = dir.path().join("range.bin"); + + let n = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 512, + data.len() as u64, + 0, + data.len() as u64 - 1, + ) + .unwrap(); + assert_eq!(n, data.len() as u64); + let got = std::fs::read(&out).unwrap(); + assert_eq!(got, data); + } + + #[test] + fn test_range_wrong_key_fails() { + let dir = tempfile::tempdir().unwrap(); + let data = b"range-auth-check".repeat(100); + let key = [0x66u8; 32]; + let nonce = [0x09u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 256); + let out = dir.path().join("range.bin"); + + let wrong = [0x67u8; 32]; + let r = decrypt_stream_chunked_range( + &encrypted, + &out, + &wrong, + &nonce, + 256, + data.len() as u64, + 0, + data.len() as u64 - 1, + ); + assert!(matches!(r, Err(CryptoError::DecryptionFailed(_)))); + } + + #[test] + fn test_range_out_of_bounds_rejected() { + let dir = tempfile::tempdir().unwrap(); + let data = vec![0u8; 100]; + let key = [0x22u8; 32]; + let nonce = [0x44u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 64); + let out = dir.path().join("range.bin"); + + let r = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 64, + data.len() as u64, + 50, + 200, + ); + assert!(r.is_err()); + } + + #[test] + fn test_range_mismatched_chunk_size_detected() { + let dir = tempfile::tempdir().unwrap(); + let data: Vec = (0..2048u32).map(|i| i as u8).collect(); + let key = [0x77u8; 32]; + let nonce = [0x88u8; 12]; + let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512); + let out = dir.path().join("range.bin"); + + let r = decrypt_stream_chunked_range( + &encrypted, + &out, + &key, + &nonce, + 1024, + data.len() as u64, + 0, + 1023, + ); + assert!(r.is_err()); + } + #[test] fn test_wrong_key_fails_decrypt() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/myfsio-crypto/src/encryption.rs b/crates/myfsio-crypto/src/encryption.rs index 40ca870..c8cf135 100644 --- a/crates/myfsio-crypto/src/encryption.rs +++ b/crates/myfsio-crypto/src/encryption.rs @@ -4,7 +4,9 @@ use rand::RngCore; use std::collections::HashMap; use std::path::Path; -use crate::aes_gcm::{decrypt_stream_chunked, encrypt_stream_chunked, CryptoError}; +use crate::aes_gcm::{ + decrypt_stream_chunked, decrypt_stream_chunked_range, encrypt_stream_chunked, CryptoError, +}; use crate::kms::KmsService; #[derive(Debug, Clone, PartialEq)] @@ -37,6 +39,8 @@ pub struct EncryptionMetadata { pub nonce: String, pub encrypted_data_key: Option, pub kms_key_id: Option, + pub chunk_size: Option, + pub plaintext_size: Option, } impl EncryptionMetadata { @@ -53,6 +57,15 @@ impl EncryptionMetadata { if let Some(ref kid) = self.kms_key_id { map.insert("x-amz-encryption-key-id".to_string(), kid.clone()); } + if let Some(cs) = self.chunk_size { + map.insert("x-amz-encryption-chunk-size".to_string(), cs.to_string()); + } + if let Some(ps) = self.plaintext_size { + map.insert( + "x-amz-encryption-plaintext-size".to_string(), + ps.to_string(), + ); + } map } @@ -64,6 +77,12 @@ impl EncryptionMetadata { nonce: nonce.clone(), encrypted_data_key: meta.get("x-amz-encrypted-data-key").cloned(), kms_key_id: meta.get("x-amz-encryption-key-id").cloned(), + chunk_size: meta + .get("x-amz-encryption-chunk-size") + .and_then(|s| s.parse().ok()), + plaintext_size: meta + .get("x-amz-encryption-plaintext-size") + .and_then(|s| s.parse().ok()), }) } @@ -76,6 +95,8 @@ impl EncryptionMetadata { meta.remove("x-amz-encryption-nonce"); meta.remove("x-amz-encrypted-data-key"); meta.remove("x-amz-encryption-key-id"); + meta.remove("x-amz-encryption-chunk-size"); + meta.remove("x-amz-encryption-plaintext-size"); } } @@ -212,6 +233,11 @@ impl EncryptionService { data_key }; + let plaintext_size = tokio::fs::metadata(input_path) + .await + .map_err(CryptoError::Io)? + .len(); + let ip = input_path.to_owned(); let op = output_path.to_owned(); let ak = actual_key; @@ -228,22 +254,23 @@ impl EncryptionService { nonce: B64.encode(nonce), encrypted_data_key, kms_key_id, + chunk_size: Some(chunk_size), + plaintext_size: Some(plaintext_size), }) } - pub async fn decrypt_object( + async fn resolve_data_key( &self, - input_path: &Path, - output_path: &Path, enc_meta: &EncryptionMetadata, customer_key: Option<&[u8]>, - ) -> Result<(), CryptoError> { + ) -> Result<([u8; 32], [u8; 12]), CryptoError> { let nonce_bytes = B64 .decode(&enc_meta.nonce) .map_err(|e| CryptoError::EncryptionFailed(format!("Bad nonce encoding: {}", e)))?; if nonce_bytes.len() != 12 { return Err(CryptoError::InvalidNonceSize(nonce_bytes.len())); } + let nonce: [u8; 12] = nonce_bytes.try_into().unwrap(); let data_key: [u8; 32] = if let Some(ck) = customer_key { if ck.len() != 32 { @@ -281,15 +308,62 @@ impl EncryptionService { self.unwrap_data_key(wrapped)? }; + Ok((data_key, nonce)) + } + + pub async fn decrypt_object( + &self, + input_path: &Path, + output_path: &Path, + enc_meta: &EncryptionMetadata, + customer_key: Option<&[u8]>, + ) -> Result<(), CryptoError> { + let (data_key, nonce) = self.resolve_data_key(enc_meta, customer_key).await?; + let ip = input_path.to_owned(); let op = output_path.to_owned(); - let nb: [u8; 12] = nonce_bytes.try_into().unwrap(); - tokio::task::spawn_blocking(move || decrypt_stream_chunked(&ip, &op, &data_key, &nb)) + tokio::task::spawn_blocking(move || decrypt_stream_chunked(&ip, &op, &data_key, &nonce)) .await .map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??; Ok(()) } + + pub async fn decrypt_object_range( + &self, + input_path: &Path, + output_path: &Path, + enc_meta: &EncryptionMetadata, + customer_key: Option<&[u8]>, + plain_start: u64, + plain_end_inclusive: u64, + ) -> Result { + let chunk_size = enc_meta.chunk_size.ok_or_else(|| { + CryptoError::EncryptionFailed("chunk_size missing from encryption metadata".into()) + })?; + let plaintext_size = enc_meta.plaintext_size.ok_or_else(|| { + CryptoError::EncryptionFailed("plaintext_size missing from encryption metadata".into()) + })?; + + let (data_key, nonce) = self.resolve_data_key(enc_meta, customer_key).await?; + + let ip = input_path.to_owned(); + let op = output_path.to_owned(); + tokio::task::spawn_blocking(move || { + decrypt_stream_chunked_range( + &ip, + &op, + &data_key, + &nonce, + chunk_size, + plaintext_size, + plain_start, + plain_end_inclusive, + ) + }) + .await + .map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))? + } } #[cfg(test)] @@ -383,12 +457,26 @@ mod tests { nonce: "dGVzdG5vbmNlMTI=".to_string(), encrypted_data_key: Some("c29tZWtleQ==".to_string()), kms_key_id: None, + chunk_size: Some(65_536), + plaintext_size: Some(1_234_567), }; let map = meta.to_metadata_map(); let restored = EncryptionMetadata::from_metadata(&map).unwrap(); assert_eq!(restored.algorithm, "AES256"); assert_eq!(restored.nonce, meta.nonce); assert_eq!(restored.encrypted_data_key, meta.encrypted_data_key); + assert_eq!(restored.chunk_size, Some(65_536)); + assert_eq!(restored.plaintext_size, Some(1_234_567)); + } + + #[test] + fn test_encryption_metadata_legacy_missing_sizes() { + let mut map = HashMap::new(); + map.insert("x-amz-server-side-encryption".to_string(), "AES256".into()); + map.insert("x-amz-encryption-nonce".to_string(), "aGVsbG8=".into()); + let restored = EncryptionMetadata::from_metadata(&map).unwrap(); + assert_eq!(restored.chunk_size, None); + assert_eq!(restored.plaintext_size, None); } #[test] diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index 7940743..76d92ed 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -31,6 +31,33 @@ use crate::services::notifications; use crate::services::object_lock; use crate::state::AppState; +async fn open_self_deleting(path: std::path::PathBuf) -> std::io::Result { + #[cfg(unix)] + { + let file = tokio::fs::File::open(&path).await?; + let _ = tokio::fs::remove_file(&path).await; + Ok(file) + } + #[cfg(windows)] + { + use std::os::windows::fs::OpenOptionsExt; + const FILE_FLAG_DELETE_ON_CLOSE: u32 = 0x0400_0000; + const FILE_SHARE_READ: u32 = 0x0000_0001; + const FILE_SHARE_WRITE: u32 = 0x0000_0002; + const FILE_SHARE_DELETE: u32 = 0x0000_0004; + let file = tokio::task::spawn_blocking(move || { + std::fs::OpenOptions::new() + .read(true) + .custom_flags(FILE_FLAG_DELETE_ON_CLOSE) + .share_mode(FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE) + .open(&path) + }) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??; + Ok(tokio::fs::File::from_std(file)) + } +} + fn s3_error_response(err: S3Error) -> Response { let status = StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); @@ -1568,23 +1595,6 @@ pub async fn get_object( .version_id .as_deref() .filter(|value| !is_null_version(Some(*value))); - let head_meta = match version_id { - Some(version_id) => match state - .storage - .head_object_version(&bucket, &key, version_id) - .await - { - Ok(m) => m, - Err(e) => return storage_err_response(e), - }, - None => match state.storage.head_object(&bucket, &key).await { - Ok(m) => m, - Err(e) => return storage_err_response(e), - }, - }; - if let Some(resp) = evaluate_get_preconditions(&headers, &head_meta) { - return resp; - } let range_header = headers .get("range") @@ -1592,163 +1602,144 @@ pub async fn get_object( .map(|s| s.to_string()); if let Some(ref range_str) = range_header { - return range_get_handler(&state, &bucket, &key, range_str, &query).await; + return range_get_handler(&state, &bucket, &key, range_str, &query, &headers).await; } - let all_meta = match version_id { - Some(version_id) => state - .storage - .get_object_version_metadata(&bucket, &key, version_id) - .await - .unwrap_or_default(), - None => state - .storage - .get_object_metadata(&bucket, &key) - .await - .unwrap_or_default(), - }; - let enc_meta = myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&all_meta); + let stream_cap = state.config.stream_chunk_size.max(64 * 1024); - if let (Some(ref enc_info), Some(ref enc_svc)) = (&enc_meta, &state.encryption) { - let obj_path = match version_id { - Some(version_id) => match state - .storage - .get_object_version_path(&bucket, &key, version_id) - .await - { - Ok(p) => p, - Err(e) => return storage_err_response(e), - }, - None => match state.storage.get_object_path(&bucket, &key).await { - Ok(p) => p, - Err(e) => return storage_err_response(e), - }, - }; - let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp"); - let _ = tokio::fs::create_dir_all(&tmp_dir).await; - let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4())); - - let customer_key = extract_sse_c_key(&headers); - let ck_ref = customer_key.as_deref(); - - if let Err(e) = enc_svc - .decrypt_object(&obj_path, &dec_tmp, enc_info, ck_ref) - .await - { - let _ = tokio::fs::remove_file(&dec_tmp).await; - return s3_error_response(S3Error::new( - myfsio_common::error::S3ErrorCode::InternalError, - format!("Decryption failed: {}", e), - )); - } - - let file = match tokio::fs::File::open(&dec_tmp).await { - Ok(f) => f, - Err(e) => { - let _ = tokio::fs::remove_file(&dec_tmp).await; - return storage_err_response(myfsio_storage::error::StorageError::Io(e)); - } - }; - let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0); - let stream = ReaderStream::with_capacity(file, 256 * 1024); - let body = Body::from_stream(stream); - - let meta = head_meta.clone(); - - let tmp_path = dec_tmp.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let _ = tokio::fs::remove_file(&tmp_path).await; - }); - - let mut resp_headers = HeaderMap::new(); - resp_headers.insert("content-length", file_size.to_string().parse().unwrap()); - if let Some(ref etag) = meta.etag { - resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); - } - insert_content_type(&mut resp_headers, &key, meta.content_type.as_deref()); - resp_headers.insert( - "last-modified", - meta.last_modified - .format("%a, %d %b %Y %H:%M:%S GMT") - .to_string() - .parse() - .unwrap(), - ); - resp_headers.insert("accept-ranges", "bytes".parse().unwrap()); - resp_headers.insert( - "x-amz-server-side-encryption", - enc_info.algorithm.parse().unwrap(), - ); - apply_stored_response_headers(&mut resp_headers, &all_meta); - apply_stored_checksum_headers(&mut resp_headers, &all_meta); - if let Some(ref requested_version) = query.version_id { - if let Ok(value) = requested_version.parse() { - resp_headers.insert("x-amz-version-id", value); - } - } else if let Some(vid) = all_meta.get("__version_id__") { - if let Ok(value) = vid.parse() { - resp_headers.insert("x-amz-version-id", value); - } - } - - apply_user_metadata(&mut resp_headers, &meta.metadata); - - apply_response_overrides(&mut resp_headers, &query); - - return (StatusCode::OK, resp_headers, body).into_response(); - } - - let object_result = match version_id { - Some(version_id) => { + // Take a single snapshot of the live object BEFORE deciding whether it's + // encrypted. If we sniffed encryption from head_meta first, a PUT could + // flip the object's encryption state between head and snapshot — leaving + // us either serving ciphertext through the raw path or failing because + // the snapshot no longer has encryption metadata. All decisions must + // come from this snapshot. + let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp"); + let _ = tokio::fs::create_dir_all(&tmp_dir).await; + let snap_link = tmp_dir.join(format!("src-{}", uuid::Uuid::new_v4())); + let snap_res = match version_id { + Some(v) => { state .storage - .get_object_version(&bucket, &key, version_id) + .snapshot_object_version_to_link(&bucket, &key, v, &snap_link) .await } - None => state.storage.get_object(&bucket, &key).await, + None => { + state + .storage + .snapshot_object_to_link(&bucket, &key, &snap_link) + .await + } + }; + let snap_meta = match snap_res { + Ok(m) => m, + Err(e) => return storage_err_response(e), }; - match object_result { - Ok((meta, reader)) => { - let stream = ReaderStream::with_capacity(reader, 256 * 1024); - let body = Body::from_stream(stream); - - let mut headers = HeaderMap::new(); - headers.insert("content-length", meta.size.to_string().parse().unwrap()); - if let Some(ref etag) = meta.etag { - headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); - } - insert_content_type(&mut headers, &key, meta.content_type.as_deref()); - headers.insert( - "last-modified", - meta.last_modified - .format("%a, %d %b %Y %H:%M:%S GMT") - .to_string() - .parse() - .unwrap(), - ); - headers.insert("accept-ranges", "bytes".parse().unwrap()); - apply_stored_response_headers(&mut headers, &all_meta); - apply_stored_checksum_headers(&mut headers, &all_meta); - if let Some(ref requested_version) = query.version_id { - if let Ok(value) = requested_version.parse() { - headers.insert("x-amz-version-id", value); - } - } else if let Some(ref vid) = meta.version_id { - if let Ok(value) = vid.parse() { - headers.insert("x-amz-version-id", value); - } - } - - apply_user_metadata(&mut headers, &meta.metadata); - - apply_response_overrides(&mut headers, &query); - - (StatusCode::OK, headers, body).into_response() - } - Err(e) => storage_err_response(e), + // Evaluate preconditions against the served snapshot's metadata. A HEAD + // taken earlier could disagree with the snapshot if a concurrent PUT + // landed in between, causing us to serve a body that doesn't satisfy + // the caller's If-Match / If-None-Match / time conditions. + if let Some(resp) = evaluate_get_preconditions(&headers, &snap_meta) { + let _ = tokio::fs::remove_file(&snap_link).await; + return resp; } + + let enc_info = myfsio_crypto::encryption::EncryptionMetadata::from_metadata( + &snap_meta.internal_metadata, + ); + + let (file, file_size, enc_header): (tokio::fs::File, u64, Option<&str>) = match ( + enc_info.as_ref(), + state.encryption.as_ref(), + ) { + (Some(enc_info), Some(enc_svc)) => { + let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4())); + let customer_key = extract_sse_c_key(&headers); + let decrypt_res = enc_svc + .decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref()) + .await; + // Hardlink served its purpose; the decrypted plaintext is in + // dec_tmp now. + let _ = tokio::fs::remove_file(&snap_link).await; + if let Err(e) = decrypt_res { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + format!("Decryption failed: {}", e), + )); + } + let file = match open_self_deleting(dec_tmp.clone()).await { + Ok(f) => f, + Err(e) => { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + }; + let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0); + (file, file_size, Some(enc_info.algorithm.as_str())) + } + (Some(_), None) => { + // Snapshot is encrypted but the server has no encryption + // service configured to decrypt it. Serving ciphertext as + // plaintext would be actively wrong; refuse explicitly. + let _ = tokio::fs::remove_file(&snap_link).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + "Object is encrypted but encryption service is disabled".to_string(), + )); + } + (None, _) => { + // Raw path: stream directly from the hardlink, which becomes + // self-deleting on open (kernel keeps the inode alive via our + // fd). + let file = match open_self_deleting(snap_link.clone()).await { + Ok(f) => f, + Err(e) => { + let _ = tokio::fs::remove_file(&snap_link).await; + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + }; + (file, snap_meta.size, None) + } + }; + + let stream = ReaderStream::with_capacity(file, stream_cap); + let body = Body::from_stream(stream); + + let meta = &snap_meta; + let mut resp_headers = HeaderMap::new(); + resp_headers.insert("content-length", file_size.to_string().parse().unwrap()); + if let Some(ref etag) = meta.etag { + resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); + } + insert_content_type(&mut resp_headers, &key, meta.content_type.as_deref()); + resp_headers.insert( + "last-modified", + meta.last_modified + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string() + .parse() + .unwrap(), + ); + resp_headers.insert("accept-ranges", "bytes".parse().unwrap()); + if let Some(alg) = enc_header { + resp_headers.insert("x-amz-server-side-encryption", alg.parse().unwrap()); + } + apply_stored_response_headers(&mut resp_headers, &meta.internal_metadata); + apply_stored_checksum_headers(&mut resp_headers, &meta.internal_metadata); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } else if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } + apply_user_metadata(&mut resp_headers, &meta.metadata); + apply_response_overrides(&mut resp_headers, &query); + + (StatusCode::OK, resp_headers, body).into_response() } pub async fn post_object( @@ -1874,18 +1865,6 @@ pub async fn head_object( if let Some(resp) = evaluate_get_preconditions(&headers, &meta) { return resp; } - let all_meta = match version_id { - Some(version_id) => state - .storage - .get_object_version_metadata(&bucket, &key, version_id) - .await - .unwrap_or_default(), - None => state - .storage - .get_object_metadata(&bucket, &key) - .await - .unwrap_or_default(), - }; let mut headers = HeaderMap::new(); headers.insert("content-length", meta.size.to_string().parse().unwrap()); if let Some(ref etag) = meta.etag { @@ -1901,8 +1880,8 @@ pub async fn head_object( .unwrap(), ); headers.insert("accept-ranges", "bytes".parse().unwrap()); - apply_stored_response_headers(&mut headers, &all_meta); - apply_stored_checksum_headers(&mut headers, &all_meta); + apply_stored_response_headers(&mut headers, &meta.internal_metadata); + apply_stored_checksum_headers(&mut headers, &meta.internal_metadata); if let Some(ref requested_version) = query.version_id { if let Ok(value) = requested_version.parse() { headers.insert("x-amz-version-id", value); @@ -2607,71 +2586,191 @@ async fn range_get_handler( key: &str, range_str: &str, query: &ObjectQuery, + headers: &HeaderMap, ) -> Response { let version_id = query .version_id .as_deref() .filter(|value| !is_null_version(Some(*value))); - let meta = match version_id { - Some(version_id) => match state - .storage - .head_object_version(bucket, key, version_id) - .await - { - Ok(m) => m, - Err(e) => return storage_err_response(e), - }, - None => match state.storage.head_object(bucket, key).await { - Ok(m) => m, - Err(e) => return storage_err_response(e), - }, + + let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp"); + let _ = tokio::fs::create_dir_all(&tmp_dir).await; + let snap_link = tmp_dir.join(format!("rsrc-{}", uuid::Uuid::new_v4())); + + let snap_meta = match version_id { + Some(v) => { + state + .storage + .snapshot_object_version_to_link(bucket, key, v, &snap_link) + .await + } + None => { + state + .storage + .snapshot_object_to_link(bucket, key, &snap_link) + .await + } + }; + let meta = match snap_meta { + Ok(m) => m, + Err(e) => return storage_err_response(e), }; - let total_size = meta.size; - let (start, end) = match parse_range(range_str, total_size) { + if let Some(resp) = evaluate_get_preconditions(headers, &meta) { + let _ = tokio::fs::remove_file(&snap_link).await; + return resp; + } + + let enc_info = + myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&meta.internal_metadata); + + let (body_path, plaintext_size, enc_header): (std::path::PathBuf, u64, Option<&str>) = + match (enc_info.as_ref(), state.encryption.as_ref()) { + (Some(enc_info), Some(enc_svc)) => { + let customer_key = extract_sse_c_key(headers); + let has_fast_path = enc_info.chunk_size.is_some() + && enc_info.plaintext_size.is_some(); + + if has_fast_path { + let plaintext_size = enc_info.plaintext_size.unwrap(); + let (start, end) = match parse_range(range_str, plaintext_size) { + Some(r) => r, + None => { + let _ = tokio::fs::remove_file(&snap_link).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InvalidRange, + format!("Range not satisfiable for size {}", plaintext_size), + )); + } + }; + + let dec_tmp = tmp_dir.join(format!("rdec-{}", uuid::Uuid::new_v4())); + let res = enc_svc + .decrypt_object_range( + &snap_link, + &dec_tmp, + enc_info, + customer_key.as_deref(), + start, + end, + ) + .await; + let _ = tokio::fs::remove_file(&snap_link).await; + if let Err(e) = res { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + format!("Decryption failed: {}", e), + )); + } + + return stream_partial_content( + state, + &dec_tmp, + start, + end, + plaintext_size, + &meta, + key, + query, + Some(enc_info.algorithm.as_str()), + /* already_trimmed */ true, + ) + .await; + } + + let dec_tmp = tmp_dir.join(format!("rdec-{}", uuid::Uuid::new_v4())); + let res = enc_svc + .decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref()) + .await; + let _ = tokio::fs::remove_file(&snap_link).await; + if let Err(e) = res { + let _ = tokio::fs::remove_file(&dec_tmp).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + format!("Decryption failed: {}", e), + )); + } + let plaintext_size = tokio::fs::metadata(&dec_tmp) + .await + .map(|m| m.len()) + .unwrap_or(0); + (dec_tmp, plaintext_size, Some(enc_info.algorithm.as_str())) + } + (Some(_), None) => { + let _ = tokio::fs::remove_file(&snap_link).await; + return s3_error_response(S3Error::new( + myfsio_common::error::S3ErrorCode::InternalError, + "Object is encrypted but encryption service is disabled".to_string(), + )); + } + (None, _) => (snap_link.clone(), meta.size, None), + }; + + let (start, end) = match parse_range(range_str, plaintext_size) { Some(r) => r, None => { + let _ = tokio::fs::remove_file(&body_path).await; return s3_error_response(S3Error::new( myfsio_common::error::S3ErrorCode::InvalidRange, - format!("Range not satisfiable for size {}", total_size), + format!("Range not satisfiable for size {}", plaintext_size), )); } }; - let path = match version_id { - Some(version_id) => match state - .storage - .get_object_version_path(bucket, key, version_id) - .await - { - Ok(p) => p, - Err(e) => return storage_err_response(e), - }, - None => match state.storage.get_object_path(bucket, key).await { - Ok(p) => p, - Err(e) => return storage_err_response(e), - }, - }; - - let mut file = match tokio::fs::File::open(&path).await { - Ok(f) => f, - Err(e) => return storage_err_response(myfsio_storage::error::StorageError::Io(e)), - }; - - if let Err(e) = file.seek(std::io::SeekFrom::Start(start)).await { - return storage_err_response(myfsio_storage::error::StorageError::Io(e)); - } + stream_partial_content( + state, + &body_path, + start, + end, + plaintext_size, + &meta, + key, + query, + enc_header, + /* already_trimmed */ false, + ) + .await +} +async fn stream_partial_content( + state: &AppState, + body_path: &std::path::Path, + start: u64, + end: u64, + plaintext_size: u64, + meta: &myfsio_common::types::ObjectMeta, + key: &str, + query: &ObjectQuery, + enc_header: Option<&str>, + already_trimmed: bool, +) -> Response { let length = end - start + 1; + + let mut file = match open_self_deleting(body_path.to_path_buf()).await { + Ok(f) => f, + Err(e) => { + let _ = tokio::fs::remove_file(body_path).await; + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + }; + + if !already_trimmed { + if let Err(e) = file.seek(std::io::SeekFrom::Start(start)).await { + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + } let limited = file.take(length); - let stream = ReaderStream::with_capacity(limited, 256 * 1024); + + let stream_cap = state.config.stream_chunk_size.max(64 * 1024); + let stream = ReaderStream::with_capacity(limited, stream_cap); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); headers.insert("content-length", length.to_string().parse().unwrap()); headers.insert( "content-range", - format!("bytes {}-{}/{}", start, end, total_size) + format!("bytes {}-{}/{}", start, end, plaintext_size) .parse() .unwrap(), ); @@ -2680,10 +2779,19 @@ async fn range_get_handler( } insert_content_type(&mut headers, key, meta.content_type.as_deref()); headers.insert("accept-ranges", "bytes".parse().unwrap()); + if let Some(alg) = enc_header { + headers.insert("x-amz-server-side-encryption", alg.parse().unwrap()); + } + apply_stored_response_headers(&mut headers, &meta.internal_metadata); + apply_stored_checksum_headers(&mut headers, &meta.internal_metadata); if let Some(ref requested_version) = query.version_id { if let Ok(value) = requested_version.parse() { headers.insert("x-amz-version-id", value); } + } else if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + headers.insert("x-amz-version-id", value); + } } apply_response_overrides(&mut headers, query); diff --git a/crates/myfsio-server/src/state.rs b/crates/myfsio-server/src/state.rs index 7e577b3..4706be8 100644 --- a/crates/myfsio-server/src/state.rs +++ b/crates/myfsio-server/src/state.rs @@ -50,6 +50,7 @@ impl AppState { bucket_config_cache_ttl: Duration::from_secs_f64( config.bucket_config_cache_ttl_seconds, ), + stream_chunk_size: config.stream_chunk_size, }, )); let iam = Arc::new(IamService::new_with_secret( diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index c7ecc23..616cb04 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -4853,3 +4853,197 @@ async fn test_kms_encrypt_decrypt() { let result = B64.decode(pt_b64).unwrap(); assert_eq!(result, plaintext); } + +fn deterministic_payload(len: usize) -> Vec { + (0..len).map(|i| ((i * 2654435761usize) >> 16) as u8).collect() +} + +async fn put_sse_s3( + app: &axum::routing::RouterIntoService, + bucket: &str, + key: &str, + body: Vec, +) { + let req = Request::builder() + .method(Method::PUT) + .uri(format!("/{}", bucket)) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::empty()) + .unwrap(); + let _ = tower::ServiceExt::oneshot(app.clone(), req).await.unwrap(); + + let req = Request::builder() + .method(Method::PUT) + .uri(format!("/{}/{}", bucket, key)) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-server-side-encryption", "AES256") + .header("content-type", "application/octet-stream") + .body(Body::from(body)) + .unwrap(); + let resp = tower::ServiceExt::oneshot(app.clone(), req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +async fn range_get( + app: &axum::routing::RouterIntoService, + uri: &str, + range: &str, + extra_headers: &[(&str, &str)], +) -> axum::http::Response { + let mut builder = Request::builder() + .method(Method::GET) + .uri(uri) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("range", range); + for (k, v) in extra_headers { + builder = builder.header(*k, *v); + } + tower::ServiceExt::oneshot(app.clone(), builder.body(Body::empty()).unwrap()) + .await + .unwrap() +} + +async fn body_bytes(resp: axum::http::Response) -> Vec { + resp.into_body().collect().await.unwrap().to_bytes().to_vec() +} + +#[tokio::test] +async fn test_sse_s3_range_get_multi_chunk() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let payload = deterministic_payload(200_000); + put_sse_s3(&app, "rng-mc", "obj.bin", payload.clone()).await; + + let resp = range_get(&app, "/rng-mc/obj.bin", "bytes=60000-140000", &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "80001"); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 60000-140000/200000" + ); + assert_eq!( + resp.headers().get("x-amz-server-side-encryption").unwrap(), + "AES256" + ); + assert_eq!(body_bytes(resp).await, payload[60000..=140000]); +} + +#[tokio::test] +async fn test_sse_s3_range_get_within_single_chunk() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let payload = deterministic_payload(200_000); + put_sse_s3(&app, "rng-sc", "obj.bin", payload.clone()).await; + + let resp = range_get(&app, "/rng-sc/obj.bin", "bytes=100-4999", &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "4900"); + assert_eq!(body_bytes(resp).await, payload[100..=4999]); +} + +#[tokio::test] +async fn test_sse_s3_range_get_suffix() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let payload = deterministic_payload(200_000); + put_sse_s3(&app, "rng-sx", "obj.bin", payload.clone()).await; + + let resp = range_get(&app, "/rng-sx/obj.bin", "bytes=-1024", &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "1024"); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 198976-199999/200000" + ); + assert_eq!(body_bytes(resp).await, payload[198_976..]); +} + +#[tokio::test] +async fn test_sse_s3_range_get_final_partial_chunk() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let size = 65_536 + 12_345; + let payload = deterministic_payload(size); + put_sse_s3(&app, "rng-fp", "obj.bin", payload.clone()).await; + + let last_start = 70_000; + let last_end = size as u64 - 1; + let range = format!("bytes={}-{}", last_start, last_end); + let resp = range_get(&app, "/rng-fp/obj.bin", &range, &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + let expected_len = (last_end - last_start + 1).to_string(); + assert_eq!( + resp.headers().get("content-length").unwrap(), + &expected_len.as_str() + ); + assert_eq!( + body_bytes(resp).await, + payload[last_start as usize..=last_end as usize] + ); +} + +#[tokio::test] +async fn test_sse_s3_range_get_open_ended() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let payload = deterministic_payload(100_000); + put_sse_s3(&app, "rng-oe", "obj.bin", payload.clone()).await; + + let resp = range_get(&app, "/rng-oe/obj.bin", "bytes=90000-", &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "10000"); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 90000-99999/100000" + ); + assert_eq!(body_bytes(resp).await, payload[90_000..]); +} + +#[tokio::test] +async fn test_sse_s3_range_unsatisfiable_for_plaintext_size() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let payload = deterministic_payload(10_000); + put_sse_s3(&app, "rng-un", "obj.bin", payload).await; + + let resp = range_get(&app, "/rng-un/obj.bin", "bytes=20000-30000", &[]).await; + assert!( + resp.status() == StatusCode::RANGE_NOT_SATISFIABLE + || resp.status() == StatusCode::BAD_REQUEST, + "unexpected status: {}", + resp.status() + ); +} + +#[tokio::test] +async fn test_plaintext_range_still_works() { + let (app, _tmp) = test_app_encrypted().await; + let app = app.into_service(); + let req = Request::builder() + .method(Method::PUT) + .uri("/plain-rng") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::empty()) + .unwrap(); + let _ = tower::ServiceExt::oneshot(app.clone(), req).await.unwrap(); + + let payload = deterministic_payload(8_000); + let req = Request::builder() + .method(Method::PUT) + .uri("/plain-rng/obj.bin") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(payload.clone())) + .unwrap(); + let resp = tower::ServiceExt::oneshot(app.clone(), req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let resp = range_get(&app, "/plain-rng/obj.bin", "bytes=100-199", &[]).await; + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert!(resp.headers().get("x-amz-server-side-encryption").is_none()); + assert_eq!(body_bytes(resp).await, payload[100..=199]); +} diff --git a/crates/myfsio-storage/Cargo.toml b/crates/myfsio-storage/Cargo.toml index 44a06a9..f992de3 100644 --- a/crates/myfsio-storage/Cargo.toml +++ b/crates/myfsio-storage/Cargo.toml @@ -9,6 +9,7 @@ myfsio-crypto = { path = "../myfsio-crypto" } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } dashmap = { workspace = true } parking_lot = { workspace = true } uuid = { workspace = true } diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index aa5ecb6..ae9bb47 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -7,13 +7,12 @@ use myfsio_common::types::*; use chrono::{DateTime, TimeZone, Utc}; use dashmap::DashMap; use md5::{Digest, Md5}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use serde_json::Value; use std::collections::HashMap; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use std::time::Instant; -use tokio::io::AsyncReadExt; use uuid::Uuid; const EMPTY_SEGMENT_SENTINEL: &str = ".__myfsio_empty__"; @@ -145,14 +144,18 @@ struct ShallowCacheEntry { dirs: Vec, } +const OBJECT_LOCK_STRIPES: usize = 2048; + pub struct FsStorageBackend { root: PathBuf, object_key_max_length_bytes: usize, object_cache_max_size: usize, + stream_chunk_size: usize, bucket_config_cache: DashMap, bucket_config_cache_ttl: std::time::Duration, meta_read_cache: DashMap<(String, String), Option>>, meta_index_locks: DashMap>>, + object_lock_stripes: Box<[RwLock<()>]>, stats_cache: DashMap, stats_cache_ttl: std::time::Duration, list_cache: DashMap>, Instant)>, @@ -167,6 +170,7 @@ pub struct FsStorageBackendConfig { pub object_key_max_length_bytes: usize, pub object_cache_max_size: usize, pub bucket_config_cache_ttl: std::time::Duration, + pub stream_chunk_size: usize, } impl Default for FsStorageBackendConfig { @@ -175,6 +179,7 @@ impl Default for FsStorageBackendConfig { object_key_max_length_bytes: DEFAULT_OBJECT_KEY_MAX_BYTES, object_cache_max_size: 100, bucket_config_cache_ttl: std::time::Duration::from_secs(30), + stream_chunk_size: STREAM_CHUNK_SIZE, } } } @@ -185,14 +190,25 @@ impl FsStorageBackend { } pub fn new_with_config(root: PathBuf, config: FsStorageBackendConfig) -> Self { + let stream_chunk_size = if config.stream_chunk_size == 0 { + STREAM_CHUNK_SIZE + } else { + config.stream_chunk_size + }; + let object_lock_stripes = (0..OBJECT_LOCK_STRIPES) + .map(|_| RwLock::new(())) + .collect::>() + .into_boxed_slice(); let backend = Self { root, object_key_max_length_bytes: config.object_key_max_length_bytes, object_cache_max_size: config.object_cache_max_size, + stream_chunk_size, bucket_config_cache: DashMap::new(), bucket_config_cache_ttl: config.bucket_config_cache_ttl, meta_read_cache: DashMap::new(), meta_index_locks: DashMap::new(), + object_lock_stripes, stats_cache: DashMap::new(), stats_cache_ttl: std::time::Duration::from_secs(60), list_cache: DashMap::new(), @@ -407,6 +423,15 @@ impl FsStorageBackend { .clone() } + fn get_object_lock(&self, bucket: &str, key: &str) -> &RwLock<()> { + use std::hash::{Hash, Hasher}; + let mut h = std::collections::hash_map::DefaultHasher::new(); + bucket.hash(&mut h); + key.hash(&mut h); + let idx = (h.finish() as usize) % self.object_lock_stripes.len(); + &self.object_lock_stripes[idx] + } + fn prune_meta_read_cache(&self) { if self.object_cache_max_size == 0 { self.meta_read_cache.clear(); @@ -1259,9 +1284,11 @@ impl FsStorageBackend { .cloned() .or_else(|| Some("STANDARD".to_string())); obj.metadata = metadata - .into_iter() + .iter() .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) .collect(); + obj.internal_metadata = metadata; obj.version_id = version_id; obj.is_delete_marker = is_delete_marker; Ok(obj) @@ -1929,6 +1956,7 @@ impl FsStorageBackend { obj.etag = Some(etag); obj.metadata = metadata.unwrap_or_default(); obj.version_id = new_version_id; + obj.internal_metadata = internal_meta; Ok(obj) } } @@ -2032,7 +2060,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { &self, bucket: &str, key: &str, - mut stream: AsyncReadStream, + stream: AsyncReadStream, metadata: Option>, ) -> StorageResult { self.validate_key(key)?; @@ -2043,33 +2071,62 @@ impl crate::traits::StorageEngine for FsStorageBackend { .map_err(StorageError::Io)?; let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - let file = tokio::fs::File::create(&tmp_path) - .await - .map_err(StorageError::Io)?; - let mut writer = tokio::io::BufWriter::with_capacity(256 * 1024, file); - let mut hasher = Md5::new(); - let mut total_size: u64 = 0; - let mut buf = vec![0u8; 256 * 1024]; - loop { - let n = stream.read(&mut buf).await.map_err(StorageError::Io)?; - if n == 0 { - break; - } - hasher.update(&buf[..n]); - tokio::io::AsyncWriteExt::write_all(&mut writer, &buf[..n]) - .await - .map_err(StorageError::Io)?; - total_size += n as u64; - } - tokio::io::AsyncWriteExt::flush(&mut writer) - .await - .map_err(StorageError::Io)?; - drop(writer); + let chunk_size = self.stream_chunk_size; + let drain_tmp = tmp_path.clone(); - let etag = format!("{:x}", hasher.finalize()); - run_blocking(|| { - self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) + // Drain request body + MD5 on a blocking thread: one runtime crossing + // for the whole transfer, not one per 256 KiB chunk. + let drain_result = tokio::task::spawn_blocking(move || -> StorageResult<(String, u64)> { + use std::io::{BufWriter, Read, Write}; + let mut reader = tokio_util::io::SyncIoBridge::new(stream); + let file = std::fs::File::create(&drain_tmp).map_err(StorageError::Io)?; + let mut writer = BufWriter::with_capacity(chunk_size * 4, file); + let mut hasher = Md5::new(); + let mut total: u64 = 0; + let mut buf = vec![0u8; chunk_size]; + loop { + let n = reader.read(&mut buf).map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + writer.write_all(&buf[..n]).map_err(StorageError::Io)?; + total += n as u64; + } + writer.flush().map_err(StorageError::Io)?; + Ok((format!("{:x}", hasher.finalize()), total)) }) + .await; + + let (etag, total_size) = match drain_result { + Ok(Ok(v)) => v, + Ok(Err(e)) => { + let _ = tokio::fs::remove_file(&tmp_path).await; + return Err(e); + } + Err(join_err) => { + let _ = tokio::fs::remove_file(&tmp_path).await; + return Err(StorageError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + join_err, + ))); + } + }; + + // Commit body+metadata atomically under the per-key write lock. The + // lock is acquired *inside* run_blocking so the wait happens under + // block_in_place — if a long-running GET holds the read side, the + // runtime can migrate other async tasks off this worker instead of + // parking it. + let result = run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); + self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) + }); + + if result.is_err() { + let _ = tokio::fs::remove_file(&tmp_path).await; + } + result } async fn get_object( @@ -2077,7 +2134,8 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { - let (obj, path) = run_blocking(|| -> StorageResult<(ObjectMeta, PathBuf)> { + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { @@ -2096,7 +2154,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { }); } - let meta = std::fs::metadata(&path).map_err(StorageError::Io)?; + // Open the fd first so that the snapshot we hand back is anchored + // to this exact inode, even if a concurrent PUT renames over the + // path after we release the read lock. + let file = std::fs::File::open(&path).map_err(StorageError::Io)?; + let meta = file.metadata().map_err(StorageError::Io)?; let mtime = meta .modified() .ok() @@ -2118,19 +2180,275 @@ impl crate::traits::StorageEngine for FsStorageBackend { .or_else(|| Some("STANDARD".to_string())); obj.version_id = stored_meta.get("__version_id__").cloned(); obj.metadata = stored_meta - .into_iter() + .iter() .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) .collect(); - Ok((obj, path)) + obj.internal_metadata = stored_meta; + Ok((obj, file)) })?; - let file = tokio::fs::File::open(&path) - .await - .map_err(StorageError::Io)?; - let stream: AsyncReadStream = Box::pin(file); + let stream: AsyncReadStream = Box::pin(tokio::fs::File::from_std(file)); Ok((obj, stream)) } + async fn get_object_range( + &self, + bucket: &str, + key: &str, + start: u64, + len: Option, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); + self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); + } + + use std::io::{Seek, SeekFrom}; + let mut file = std::fs::File::open(&path).map_err(StorageError::Io)?; + let meta = file.metadata().map_err(StorageError::Io)?; + if start > meta.len() { + return Err(StorageError::InvalidRange); + } + if start > 0 { + file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + } + + let mtime = meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); + + let stored_meta = self.read_metadata_sync(bucket, key); + let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); + obj.etag = stored_meta.get("__etag__").cloned(); + obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); + obj.metadata = stored_meta + .iter() + .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + obj.internal_metadata = stored_meta; + Ok((obj, file)) + })?; + + let tokio_file = tokio::fs::File::from_std(file); + let stream: AsyncReadStream = match len { + Some(n) => { + use tokio::io::AsyncReadExt; + Box::pin(tokio_file.take(n)) + } + None => Box::pin(tokio_file), + }; + Ok((obj, stream)) + } + + async fn get_object_snapshot( + &self, + bucket: &str, + key: &str, + ) -> StorageResult<(ObjectMeta, tokio::fs::File)> { + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); + self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); + } + + let file = std::fs::File::open(&path).map_err(StorageError::Io)?; + let meta_fs = file.metadata().map_err(StorageError::Io)?; + let mtime = meta_fs + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); + + let stored_meta = self.read_metadata_sync(bucket, key); + let mut obj = ObjectMeta::new(key.to_string(), meta_fs.len(), lm); + obj.etag = stored_meta.get("__etag__").cloned(); + obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); + obj.metadata = stored_meta + .iter() + .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + obj.internal_metadata = stored_meta; + Ok((obj, file)) + })?; + Ok((obj, tokio::fs::File::from_std(file))) + } + + async fn get_object_version_snapshot( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<(ObjectMeta, tokio::fs::File)> { + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + let file = std::fs::File::open(&data_path).map_err(StorageError::Io)?; + let obj = self.object_meta_from_version_record(key, &record, &data_path)?; + Ok((obj, file)) + })?; + Ok((obj, tokio::fs::File::from_std(file))) + } + + async fn snapshot_object_to_link( + &self, + bucket: &str, + key: &str, + link_path: &std::path::Path, + ) -> StorageResult { + let link_owned = link_path.to_owned(); + run_blocking(|| -> StorageResult { + let _guard = self.get_object_lock(bucket, key).read(); + self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_status().is_active() { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); + } + + if let Some(parent) = link_owned.parent() { + std::fs::create_dir_all(parent).map_err(StorageError::Io)?; + } + // The hardlink shares the inode of the file *at this moment*. + // Later renames replace the live path with a different inode, + // but our hardlink path keeps resolving to the original one + // until it is explicitly unlinked. + let _ = std::fs::remove_file(&link_owned); + std::fs::hard_link(&path, &link_owned).map_err(StorageError::Io)?; + + let meta_fs = std::fs::metadata(&link_owned).map_err(StorageError::Io)?; + let mtime = meta_fs + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); + + let stored_meta = self.read_metadata_sync(bucket, key); + let mut obj = ObjectMeta::new(key.to_string(), meta_fs.len(), lm); + obj.etag = stored_meta.get("__etag__").cloned(); + obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); + obj.metadata = stored_meta + .iter() + .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + obj.internal_metadata = stored_meta; + Ok(obj) + }) + } + + async fn snapshot_object_version_to_link( + &self, + bucket: &str, + key: &str, + version_id: &str, + link_path: &std::path::Path, + ) -> StorageResult { + let link_owned = link_path.to_owned(); + run_blocking(|| -> StorageResult { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + if let Some(parent) = link_owned.parent() { + std::fs::create_dir_all(parent).map_err(StorageError::Io)?; + } + let _ = std::fs::remove_file(&link_owned); + std::fs::hard_link(&data_path, &link_owned).map_err(StorageError::Io)?; + self.object_meta_from_version_record(key, &record, &data_path) + }) + } + async fn get_object_path(&self, bucket: &str, key: &str) -> StorageResult { self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; @@ -2154,6 +2472,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { async fn head_object(&self, bucket: &str, key: &str) -> StorageResult { run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { @@ -2194,9 +2513,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { .or_else(|| Some("STANDARD".to_string())); obj.version_id = stored_meta.get("__version_id__").cloned(); obj.metadata = stored_meta - .into_iter() + .iter() .filter(|(k, _)| !k.starts_with("__")) + .map(|(k, v)| (k.clone(), v.clone())) .collect(); + obj.internal_metadata = stored_meta; Ok(obj) }) } @@ -2207,21 +2528,66 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { - let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; - if record - .get("is_delete_marker") - .and_then(Value::as_bool) - .unwrap_or(false) - { - return Err(StorageError::MethodNotAllowed( - "The specified method is not allowed against a delete marker".to_string(), - )); - } - let obj = self.object_meta_from_version_record(key, &record, &data_path)?; - let file = tokio::fs::File::open(&data_path) - .await - .map_err(StorageError::Io)?; - let stream: AsyncReadStream = Box::pin(file); + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + let file = std::fs::File::open(&data_path).map_err(StorageError::Io)?; + let obj = self.object_meta_from_version_record(key, &record, &data_path)?; + Ok((obj, file)) + })?; + let stream: AsyncReadStream = Box::pin(tokio::fs::File::from_std(file)); + Ok((obj, stream)) + } + + async fn get_object_version_range( + &self, + bucket: &str, + key: &str, + version_id: &str, + start: u64, + len: Option, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { + let (obj, file) = run_blocking(|| -> StorageResult<(ObjectMeta, std::fs::File)> { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + use std::io::{Seek, SeekFrom}; + let mut file = std::fs::File::open(&data_path).map_err(StorageError::Io)?; + let size = file.metadata().map(|m| m.len()).unwrap_or(0); + if start > size { + return Err(StorageError::InvalidRange); + } + if start > 0 { + file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + } + let obj = self.object_meta_from_version_record(key, &record, &data_path)?; + Ok((obj, file)) + })?; + let tokio_file = tokio::fs::File::from_std(file); + let stream: AsyncReadStream = match len { + Some(n) => { + use tokio::io::AsyncReadExt; + Box::pin(tokio_file.take(n)) + } + None => Box::pin(tokio_file), + }; Ok((obj, stream)) } @@ -2231,17 +2597,20 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult { - let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; - if record - .get("is_delete_marker") - .and_then(Value::as_bool) - .unwrap_or(false) - { - return Err(StorageError::MethodNotAllowed( - "The specified method is not allowed against a delete marker".to_string(), - )); - } - Ok(data_path) + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + Ok(data_path) + }) } async fn head_object_version( @@ -2250,17 +2619,20 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult { - let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; - if record - .get("is_delete_marker") - .and_then(Value::as_bool) - .unwrap_or(false) - { - return Err(StorageError::MethodNotAllowed( - "The specified method is not allowed against a delete marker".to_string(), - )); - } - self.object_meta_from_version_record(key, &record, &data_path) + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } + self.object_meta_from_version_record(key, &record, &data_path) + }) } async fn get_object_version_metadata( @@ -2269,12 +2641,16 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult> { - let (record, _data_path) = self.read_version_record_sync(bucket, key, version_id)?; - Ok(Self::version_metadata_from_record(&record)) + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); + let (record, _data_path) = self.read_version_record_sync(bucket, key, version_id)?; + Ok(Self::version_metadata_from_record(&record)) + }) } async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult { run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); let bucket_path = self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; let versioning_status = self.read_bucket_config_sync(bucket).versioning_status(); @@ -2338,6 +2714,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { version_id: &str, ) -> StorageResult { run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); let bucket_path = self.require_bucket(bucket)?; self.validate_key(key)?; Self::validate_version_id(bucket, key, version_id)?; @@ -2416,62 +2793,74 @@ impl crate::traits::StorageEngine for FsStorageBackend { dst_bucket: &str, dst_key: &str, ) -> StorageResult { - let src_path = self.object_path(src_bucket, src_key)?; - if !src_path.is_file() { - return Err(StorageError::ObjectNotFound { - bucket: src_bucket.to_string(), - key: src_key.to_string(), - }); - } - self.validate_key(dst_key)?; + let chunk_size = self.stream_chunk_size; let tmp_dir = self.tmp_dir(); std::fs::create_dir_all(&tmp_dir).map_err(StorageError::Io)?; let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - let (etag, new_size) = { + // Copy bytes + hash under the src read lock so the source body and + // metadata we capture are consistent with one another. The src read + // guard is released at the end of this block before we take the dst + // write guard, so even when src == dst (same stripe) there's no + // upgrade deadlock. + let copy_res = run_blocking(|| -> StorageResult<(String, u64, HashMap)> { + let _src_guard = self.get_object_lock(src_bucket, src_key).read(); + let src_path = self.object_path(src_bucket, src_key)?; + if !src_path.is_file() { + return Err(StorageError::ObjectNotFound { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + }); + } + use std::io::{BufReader, BufWriter, Read, Write}; - let src_file = std::fs::File::open(&src_path).map_err(|e| { - let _ = std::fs::remove_file(&tmp_path); - StorageError::Io(e) - })?; - let mut reader = BufReader::with_capacity(256 * 1024, src_file); + let src_file = std::fs::File::open(&src_path).map_err(StorageError::Io)?; + let mut reader = BufReader::with_capacity(chunk_size, src_file); let tmp_file = std::fs::File::create(&tmp_path).map_err(StorageError::Io)?; - let mut writer = BufWriter::with_capacity(256 * 1024, tmp_file); + let mut writer = BufWriter::with_capacity(chunk_size * 4, tmp_file); let mut hasher = Md5::new(); - let mut buf = [0u8; 256 * 1024]; + let mut buf = vec![0u8; chunk_size]; let mut total: u64 = 0; loop { - let n = reader.read(&mut buf).map_err(|e| { - let _ = std::fs::remove_file(&tmp_path); - StorageError::Io(e) - })?; + let n = reader.read(&mut buf).map_err(StorageError::Io)?; if n == 0 { break; } hasher.update(&buf[..n]); - writer.write_all(&buf[..n]).map_err(|e| { - let _ = std::fs::remove_file(&tmp_path); - StorageError::Io(e) - })?; + writer.write_all(&buf[..n]).map_err(StorageError::Io)?; total += n as u64; } - writer.flush().map_err(|e| { + writer.flush().map_err(StorageError::Io)?; + + let src_metadata = self.read_metadata_sync(src_bucket, src_key); + Ok((format!("{:x}", hasher.finalize()), total, src_metadata)) + }); + + let (etag, new_size, src_metadata) = match copy_res { + Ok(v) => v, + Err(e) => { let _ = std::fs::remove_file(&tmp_path); - StorageError::Io(e) - })?; - (format!("{:x}", hasher.finalize()), total) + return Err(e); + } }; - let src_metadata = self.read_metadata_sync(src_bucket, src_key); - self.finalize_put_sync( - dst_bucket, - dst_key, - &tmp_path, - etag, - new_size, - Some(src_metadata), - ) + let finalize = run_blocking(|| { + let _dst_guard = self.get_object_lock(dst_bucket, dst_key).write(); + self.finalize_put_sync( + dst_bucket, + dst_key, + &tmp_path, + etag, + new_size, + Some(src_metadata), + ) + }); + + if finalize.is_err() { + let _ = std::fs::remove_file(&tmp_path); + } + finalize } async fn get_object_metadata( @@ -2479,7 +2868,10 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, ) -> StorageResult> { - Ok(run_blocking(|| self.read_metadata_sync(bucket, key))) + Ok(run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); + self.read_metadata_sync(bucket, key) + })) } async fn put_object_metadata( @@ -2489,6 +2881,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { metadata: &HashMap, ) -> StorageResult<()> { run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); let mut entry = self.read_index_entry_sync(bucket, key).unwrap_or_default(); let meta_map: serde_json::Map = metadata .iter() @@ -2550,7 +2943,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, upload_id: &str, part_number: u32, - mut stream: AsyncReadStream, + stream: AsyncReadStream, ) -> StorageResult { let upload_dir = self.multipart_bucket_root(bucket).join(upload_id); let manifest_path = upload_dir.join(MANIFEST_FILE); @@ -2561,29 +2954,44 @@ impl crate::traits::StorageEngine for FsStorageBackend { let part_file = upload_dir.join(format!("part-{:05}.part", part_number)); let tmp_file = upload_dir.join(format!("part-{:05}.part.tmp", part_number)); - let mut file = tokio::fs::File::create(&tmp_file) - .await - .map_err(StorageError::Io)?; - let mut hasher = Md5::new(); - let mut part_size: u64 = 0; - let mut buf = [0u8; 65536]; - loop { - let n = stream.read(&mut buf).await.map_err(StorageError::Io)?; - if n == 0 { - break; + let chunk_size = self.stream_chunk_size; + let tmp_file_owned = tmp_file.clone(); + let drain_res = tokio::task::spawn_blocking(move || -> StorageResult<(String, u64)> { + use std::io::{BufWriter, Read, Write}; + let mut reader = tokio_util::io::SyncIoBridge::new(stream); + let file = std::fs::File::create(&tmp_file_owned).map_err(StorageError::Io)?; + let mut writer = BufWriter::with_capacity(chunk_size * 4, file); + let mut hasher = Md5::new(); + let mut part_size: u64 = 0; + let mut buf = vec![0u8; chunk_size]; + loop { + let n = reader.read(&mut buf).map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + writer.write_all(&buf[..n]).map_err(StorageError::Io)?; + part_size += n as u64; } - hasher.update(&buf[..n]); - tokio::io::AsyncWriteExt::write_all(&mut file, &buf[..n]) - .await - .map_err(StorageError::Io)?; - part_size += n as u64; - } - tokio::io::AsyncWriteExt::flush(&mut file) - .await - .map_err(StorageError::Io)?; - drop(file); + writer.flush().map_err(StorageError::Io)?; + Ok((format!("{:x}", hasher.finalize()), part_size)) + }) + .await; - let etag = format!("{:x}", hasher.finalize()); + let (etag, part_size) = match drain_res { + Ok(Ok(v)) => v, + Ok(Err(e)) => { + let _ = tokio::fs::remove_file(&tmp_file).await; + return Err(e); + } + Err(join) => { + let _ = tokio::fs::remove_file(&tmp_file).await; + return Err(StorageError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + join, + ))); + } + }; tokio::fs::rename(&tmp_file, &part_file) .await @@ -2628,87 +3036,97 @@ impl crate::traits::StorageEngine for FsStorageBackend { return Err(StorageError::UploadNotFound(upload_id.to_string())); } - let src_path = self.object_path(src_bucket, src_key)?; - if !src_path.is_file() { - return Err(StorageError::ObjectNotFound { - bucket: src_bucket.to_string(), - key: src_key.to_string(), - }); - } - - let src_meta = std::fs::metadata(&src_path).map_err(StorageError::Io)?; - let src_size = src_meta.len(); - let src_mtime = src_meta - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_secs_f64()) - .unwrap_or(0.0); - let last_modified = Utc - .timestamp_opt( - src_mtime as i64, - ((src_mtime % 1.0) * 1_000_000_000.0) as u32, - ) - .single() - .unwrap_or_else(Utc::now); - - let (start, end) = match range { - Some((s, e)) => { - if s >= src_size || e >= src_size || s > e { - return Err(StorageError::InvalidRange); - } - (s, e) - } - None => { - if src_size == 0 { - (0u64, 0u64) - } else { - (0u64, src_size - 1) - } - } - }; - let length = if src_size == 0 { 0 } else { end - start + 1 }; - let part_file = upload_dir.join(format!("part-{:05}.part", part_number)); let tmp_file = upload_dir.join(format!("part-{:05}.part.tmp", part_number)); + let chunk_size = self.stream_chunk_size; - let mut src = tokio::fs::File::open(&src_path) - .await - .map_err(StorageError::Io)?; - if start > 0 { - tokio::io::AsyncSeekExt::seek(&mut src, std::io::SeekFrom::Start(start)) - .await - .map_err(StorageError::Io)?; - } + // Everything that must be consistent with the copied bytes — path + // check, size/mtime, range validation, open+seek+read — happens under + // one held read guard. If a concurrent PUT renames the source + // between our metadata read and our file open, we'd otherwise record + // the old size/last_modified in the manifest but copy bytes from the + // new version. + let copy_res = run_blocking( + || -> StorageResult<(String, u64, DateTime)> { + let _guard = self.get_object_lock(src_bucket, src_key).read(); - let dst_file = tokio::fs::File::create(&tmp_file) - .await - .map_err(StorageError::Io)?; - let mut dst = tokio::io::BufWriter::with_capacity(256 * 1024, dst_file); - let mut hasher = Md5::new(); - let mut remaining = length; - let mut buf = vec![0u8; 256 * 1024]; - while remaining > 0 { - let to_read = std::cmp::min(remaining as usize, buf.len()); - let n = src - .read(&mut buf[..to_read]) - .await - .map_err(StorageError::Io)?; - if n == 0 { - break; + let src_path = self.object_path(src_bucket, src_key)?; + if !src_path.is_file() { + return Err(StorageError::ObjectNotFound { + bucket: src_bucket.to_string(), + key: src_key.to_string(), + }); + } + + use std::io::{BufWriter, Read, Seek, SeekFrom, Write}; + // Open first so subsequent metadata/seek/read are all + // anchored to the same inode, even if a later rename swaps + // the path after we release the guard. + let mut src = std::fs::File::open(&src_path).map_err(StorageError::Io)?; + let src_meta = src.metadata().map_err(StorageError::Io)?; + let src_size = src_meta.len(); + let src_mtime = src_meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let last_modified = Utc + .timestamp_opt( + src_mtime as i64, + ((src_mtime % 1.0) * 1_000_000_000.0) as u32, + ) + .single() + .unwrap_or_else(Utc::now); + + let (start, end) = match range { + Some((s, e)) => { + if s >= src_size || e >= src_size || s > e { + return Err(StorageError::InvalidRange); + } + (s, e) + } + None => { + if src_size == 0 { + (0u64, 0u64) + } else { + (0u64, src_size - 1) + } + } + }; + let length = if src_size == 0 { 0 } else { end - start + 1 }; + + if start > 0 { + src.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?; + } + let mut src = std::io::BufReader::with_capacity(chunk_size, src); + let dst = std::fs::File::create(&tmp_file).map_err(StorageError::Io)?; + let mut dst = BufWriter::with_capacity(chunk_size * 4, dst); + let mut hasher = Md5::new(); + let mut remaining = length; + let mut buf = vec![0u8; chunk_size]; + while remaining > 0 { + let to_read = std::cmp::min(remaining as usize, buf.len()); + let n = src.read(&mut buf[..to_read]).map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + dst.write_all(&buf[..n]).map_err(StorageError::Io)?; + remaining -= n as u64; + } + dst.flush().map_err(StorageError::Io)?; + Ok((format!("{:x}", hasher.finalize()), length, last_modified)) + }, + ); + + let (etag, length, last_modified) = match copy_res { + Ok(v) => v, + Err(e) => { + let _ = tokio::fs::remove_file(&tmp_file).await; + return Err(e); } - hasher.update(&buf[..n]); - tokio::io::AsyncWriteExt::write_all(&mut dst, &buf[..n]) - .await - .map_err(StorageError::Io)?; - remaining -= n as u64; - } - tokio::io::AsyncWriteExt::flush(&mut dst) - .await - .map_err(StorageError::Io)?; - drop(dst); - - let etag = format!("{:x}", hasher.finalize()); + }; tokio::fs::rename(&tmp_file, &part_file) .await @@ -2766,69 +3184,96 @@ impl crate::traits::StorageEngine for FsStorageBackend { .unwrap_or_default(); let tmp_dir = self.tmp_dir(); - tokio::fs::create_dir_all(&tmp_dir) - .await - .map_err(StorageError::Io)?; + std::fs::create_dir_all(&tmp_dir).map_err(StorageError::Io)?; let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - let out_raw = tokio::fs::File::create(&tmp_path) - .await - .map_err(StorageError::Io)?; - let mut out_file = tokio::io::BufWriter::with_capacity(256 * 1024, out_raw); - let mut md5_digest_concat = Vec::new(); - let mut total_size: u64 = 0; - let part_count = parts.len(); - let mut buf = vec![0u8; 256 * 1024]; + let chunk_size = self.stream_chunk_size; + let part_infos: Vec = parts.to_vec(); + let upload_dir_owned = upload_dir.clone(); + let tmp_path_owned = tmp_path.clone(); - for part_info in parts { - let part_file = upload_dir.join(format!("part-{:05}.part", part_info.part_number)); - if !part_file.exists() { - let _ = tokio::fs::remove_file(&tmp_path).await; - return Err(StorageError::InvalidObjectKey(format!( - "Part {} not found", - part_info.part_number + // Assemble parts on a blocking thread using std::fs, large buffers, + // and a single writer flush — no per-chunk runtime crossings. + let assemble_res = tokio::task::spawn_blocking(move || -> StorageResult<(String, u64)> { + use std::io::{BufReader, BufWriter, Read, Write}; + let out_raw = std::fs::File::create(&tmp_path_owned).map_err(StorageError::Io)?; + let mut out_file = BufWriter::with_capacity(chunk_size * 4, out_raw); + let mut md5_digest_concat = Vec::with_capacity(part_infos.len() * 16); + let mut total_size: u64 = 0; + let mut buf = vec![0u8; chunk_size]; + + for part_info in &part_infos { + let part_file = upload_dir_owned + .join(format!("part-{:05}.part", part_info.part_number)); + if !part_file.exists() { + return Err(StorageError::InvalidObjectKey(format!( + "Part {} not found", + part_info.part_number + ))); + } + let reader = std::fs::File::open(&part_file).map_err(StorageError::Io)?; + let mut reader = BufReader::with_capacity(chunk_size, reader); + let mut part_hasher = Md5::new(); + loop { + let n = reader.read(&mut buf).map_err(StorageError::Io)?; + if n == 0 { + break; + } + part_hasher.update(&buf[..n]); + out_file.write_all(&buf[..n]).map_err(StorageError::Io)?; + total_size += n as u64; + } + md5_digest_concat.extend_from_slice(&part_hasher.finalize()); + } + + out_file.flush().map_err(StorageError::Io)?; + let mut composite_hasher = Md5::new(); + composite_hasher.update(&md5_digest_concat); + let etag = format!("{:x}-{}", composite_hasher.finalize(), part_infos.len()); + Ok((etag, total_size)) + }) + .await; + + let (etag, total_size) = match assemble_res { + Ok(Ok(v)) => v, + Ok(Err(e)) => { + let _ = std::fs::remove_file(&tmp_path); + return Err(e); + } + Err(join) => { + let _ = std::fs::remove_file(&tmp_path); + return Err(StorageError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + join, ))); } - let part_reader = tokio::fs::File::open(&part_file) - .await - .map_err(StorageError::Io)?; - let mut part_reader = tokio::io::BufReader::with_capacity(256 * 1024, part_reader); - let mut part_hasher = Md5::new(); - loop { - let n = part_reader.read(&mut buf).await.map_err(StorageError::Io)?; - if n == 0 { - break; - } - part_hasher.update(&buf[..n]); - tokio::io::AsyncWriteExt::write_all(&mut out_file, &buf[..n]) - .await - .map_err(StorageError::Io)?; - total_size += n as u64; + }; + + // Commit to the destination key atomically under its write lock. + // Lock acquisition happens inside run_blocking so the wait runs under + // block_in_place rather than parking the async worker. + let result = run_blocking(|| { + let _guard = self.get_object_lock(bucket, &object_key).write(); + self.finalize_put_sync( + bucket, + &object_key, + &tmp_path, + etag, + total_size, + Some(metadata), + ) + }); + + match result { + Ok(obj) => { + let _ = std::fs::remove_dir_all(&upload_dir); + Ok(obj) + } + Err(e) => { + let _ = std::fs::remove_file(&tmp_path); + Err(e) } - md5_digest_concat.extend_from_slice(&part_hasher.finalize()); } - - tokio::io::AsyncWriteExt::flush(&mut out_file) - .await - .map_err(StorageError::Io)?; - drop(out_file); - - let mut composite_hasher = Md5::new(); - composite_hasher.update(&md5_digest_concat); - let etag = format!("{:x}-{}", composite_hasher.finalize(), part_count); - - let result = self.finalize_put_sync( - bucket, - &object_key, - &tmp_path, - etag, - total_size, - Some(metadata), - )?; - - let _ = std::fs::remove_dir_all(&upload_dir); - - Ok(result) } async fn abort_multipart(&self, bucket: &str, upload_id: &str) -> StorageResult<()> { @@ -3059,50 +3504,53 @@ impl crate::traits::StorageEngine for FsStorageBackend { } async fn get_object_tags(&self, bucket: &str, key: &str) -> StorageResult> { - self.require_bucket(bucket)?; - let obj_path = self.object_path(bucket, key)?; - if !obj_path.exists() { - return Err(StorageError::ObjectNotFound { - bucket: bucket.to_string(), - key: key.to_string(), - }); - } - - let entry = self.read_index_entry_sync(bucket, key); - if let Some(entry) = entry { - if let Some(tags_val) = entry.get("tags") { - if let Ok(tags) = serde_json::from_value::>(tags_val.clone()) { - return Ok(tags); + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).read(); + self.require_bucket(bucket)?; + let obj_path = self.object_path(bucket, key)?; + if !obj_path.exists() { + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); + } + let entry = self.read_index_entry_sync(bucket, key); + if let Some(entry) = entry { + if let Some(tags_val) = entry.get("tags") { + if let Ok(tags) = serde_json::from_value::>(tags_val.clone()) { + return Ok(tags); + } } } - } - Ok(Vec::new()) + Ok(Vec::new()) + }) } async fn set_object_tags(&self, bucket: &str, key: &str, tags: &[Tag]) -> StorageResult<()> { - self.require_bucket(bucket)?; - let obj_path = self.object_path(bucket, key)?; - if !obj_path.exists() { - return Err(StorageError::ObjectNotFound { - bucket: bucket.to_string(), - key: key.to_string(), - }); - } - - let mut entry = self.read_index_entry_sync(bucket, key).unwrap_or_default(); - if tags.is_empty() { - entry.remove("tags"); - } else { - entry.insert( - "tags".to_string(), - serde_json::to_value(tags).unwrap_or(Value::Null), - ); - } - - self.write_index_entry_sync(bucket, key, &entry) - .map_err(StorageError::Io)?; - self.invalidate_bucket_caches(bucket); - Ok(()) + run_blocking(|| { + let _guard = self.get_object_lock(bucket, key).write(); + self.require_bucket(bucket)?; + let obj_path = self.object_path(bucket, key)?; + if !obj_path.exists() { + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); + } + let mut entry = self.read_index_entry_sync(bucket, key).unwrap_or_default(); + if tags.is_empty() { + entry.remove("tags"); + } else { + entry.insert( + "tags".to_string(), + serde_json::to_value(tags).unwrap_or(Value::Null), + ); + } + self.write_index_entry_sync(bucket, key, &entry) + .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); + Ok(()) + }) } async fn delete_object_tags(&self, bucket: &str, key: &str) -> StorageResult<()> { @@ -3114,6 +3562,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { mod tests { use super::*; use crate::traits::StorageEngine; + use tokio::io::AsyncReadExt; fn create_test_backend() -> (tempfile::TempDir, FsStorageBackend) { let dir = tempfile::tempdir().unwrap(); @@ -3506,4 +3955,499 @@ mod tests { assert_eq!(stats.objects, 1); assert_eq!(stats.bytes, 5); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_snapshot_to_link_matches_meta() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + + let (dir, backend) = create_test_backend(); + let root = dir.path().to_path_buf(); + let backend = StdArc::new(backend); + backend.create_bucket("link-bkt").await.unwrap(); + + let tmp_dir = root.join(".myfsio.sys").join("tmp"); + std::fs::create_dir_all(&tmp_dir).unwrap(); + + // Seed with known content. + let data: AsyncReadStream = + Box::pin(std::io::Cursor::new(vec![b'a'; 4096])); + backend.put_object("link-bkt", "hot", data, None).await.unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mut handles = Vec::new(); + + // Writers swap the object between three distinct fill bytes with + // distinct known etags; we'll check the snapshot's etag is one of + // them and matches what we read from link_path. + for w in 0..2 { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut i: u32 = 0; + while !stop.load(Ordering::Relaxed) { + let fill = b'a' + (((w + i) % 3) as u8); + let size = 2048 + ((w + i) % 3) as usize * 1024; + let body = vec![fill; size]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(body)); + let _ = b.put_object("link-bkt", "hot", data, None).await; + i = i.wrapping_add(1); + } + })); + } + + let reads = StdArc::new(AtomicU64::new(0)); + let mismatches = StdArc::new(AtomicU64::new(0)); + for _ in 0..4 { + let b = backend.clone(); + let stop = stop.clone(); + let tmp_dir = tmp_dir.clone(); + let reads = reads.clone(); + let mismatches = mismatches.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + let link = tmp_dir.join(format!("lnk-{}", Uuid::new_v4())); + match b.snapshot_object_to_link("link-bkt", "hot", &link).await { + Ok(meta) => { + let bytes = std::fs::read(&link).unwrap_or_default(); + let md5 = format!("{:x}", Md5::digest(&bytes)); + reads.fetch_add(1, Ordering::Relaxed); + if meta.etag.as_deref() != Some(md5.as_str()) + || bytes.len() as u64 != meta.size + { + mismatches.fetch_add(1, Ordering::Relaxed); + } + let _ = std::fs::remove_file(&link); + } + Err(_) => { + let _ = std::fs::remove_file(&link); + } + } + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + let r = reads.load(Ordering::Relaxed); + let m = mismatches.load(Ordering::Relaxed); + assert!(r > 10, "expected some snapshot reads, got {}", r); + assert_eq!( + m, 0, + "observed {} snapshot_to_link results where meta etag/size didn't match the linked bytes, out of {}", + m, r + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_get_object_snapshot_size_matches_body() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + use tokio::io::AsyncReadExt; + + let (_dir, backend) = create_test_backend(); + let backend = StdArc::new(backend); + backend.create_bucket("snap-bkt").await.unwrap(); + + let data: AsyncReadStream = + Box::pin(std::io::Cursor::new(vec![b'a'; 1024])); + backend + .put_object("snap-bkt", "sz", data, None) + .await + .unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mut handles = Vec::new(); + + // Writers flip between 1 KiB and 2 KiB bodies so every PUT changes + // the reported size. + for w in 0..2 { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut i: u32 = 0; + while !stop.load(Ordering::Relaxed) { + let fill = b'a' + ((w + i) % 20) as u8; + let size = if i % 2 == 0 { 1024 } else { 2048 }; + let body = vec![fill; size]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(body)); + let _ = b.put_object("snap-bkt", "sz", data, None).await; + i = i.wrapping_add(1); + } + })); + } + + let reads = StdArc::new(AtomicU64::new(0)); + let mismatches = StdArc::new(AtomicU64::new(0)); + for _ in 0..4 { + let b = backend.clone(); + let stop = stop.clone(); + let reads = reads.clone(); + let mismatches = mismatches.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + if let Ok((meta, mut file)) = b.get_object_snapshot("snap-bkt", "sz").await { + let mut buf = Vec::new(); + if file.read_to_end(&mut buf).await.is_ok() { + reads.fetch_add(1, Ordering::Relaxed); + if buf.len() as u64 != meta.size { + mismatches.fetch_add(1, Ordering::Relaxed); + } + } + } + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + let r = reads.load(Ordering::Relaxed); + let m = mismatches.load(Ordering::Relaxed); + assert!(r > 10, "expected some snapshot reads, got {}", r); + assert_eq!( + m, 0, + "observed {} snapshots where meta.size didn't match body length, out of {} reads", + m, r + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_range_get_snapshot_consistency() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + + let (_dir, backend) = create_test_backend(); + let backend = StdArc::new(backend); + backend.create_bucket("range-bkt").await.unwrap(); + + // Every version is a 256 KiB run of a single byte, so body bytes + // alone identify which version any ranged read came from. We pair + // that with the returned ETag to check they agree. + const SIZE: u64 = 256 * 1024; + let seed = vec![b'a'; SIZE as usize]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(seed)); + backend.put_object("range-bkt", "hot", data, None).await.unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mut handles = Vec::new(); + + for w in 0..2 { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut i: u8 = 0; + while !stop.load(Ordering::Relaxed) { + let fill = b'a' + ((w as u8 * 7 + i) % 20); + let body = vec![fill; SIZE as usize]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(body)); + let _ = b.put_object("range-bkt", "hot", data, None).await; + i = i.wrapping_add(1); + } + })); + } + + let reads = StdArc::new(AtomicU64::new(0)); + let mismatches = StdArc::new(AtomicU64::new(0)); + for _ in 0..6 { + let b = backend.clone(); + let stop = stop.clone(); + let reads = reads.clone(); + let mismatches = mismatches.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + let start = 1000u64; + let len = 4000u64; + if let Ok((meta, mut stream)) = + b.get_object_range("range-bkt", "hot", start, Some(len)).await + { + let mut buf = Vec::with_capacity(len as usize); + if stream.read_to_end(&mut buf).await.is_ok() && !buf.is_empty() { + // Every byte in the range must be the same — all + // writers fill the object uniformly — and the + // etag must be the MD5 of a uniform buffer of + // that byte at full object size. + let fill = buf[0]; + let all_match = buf.iter().all(|b| *b == fill); + let expected_etag = format!( + "{:x}", + Md5::digest(&vec![fill; SIZE as usize]) + ); + let etag_ok = meta.etag.as_deref() == Some(expected_etag.as_str()); + reads.fetch_add(1, Ordering::Relaxed); + if !(all_match && etag_ok) { + mismatches.fetch_add(1, Ordering::Relaxed); + } + } + } + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + let r = reads.load(Ordering::Relaxed); + let m = mismatches.load(Ordering::Relaxed); + assert!(r > 10, "expected some Range GETs, got {}", r); + assert_eq!( + m, 0, + "observed {} Range GETs where etag and body fill byte disagreed, out of {} reads", + m, r + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_upload_part_copy_snapshot_consistency() { + use myfsio_common::types::PartInfo; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + + let (_dir, backend) = create_test_backend(); + let backend = StdArc::new(backend); + backend.create_bucket("mp-bkt").await.unwrap(); + + // Two fixed-size source versions: all 'a' or all 'b'. Writers flip + // between them; readers do upload_part_copy and check that the + // recorded ETag corresponds to exactly one of the two known MD5s + // (not a cross-pollinated value). + const SIZE: u64 = 64 * 1024; + let etag_a = format!("{:x}", Md5::digest(&vec![b'a'; SIZE as usize])); + let etag_b = format!("{:x}", Md5::digest(&vec![b'b'; SIZE as usize])); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'a'; SIZE as usize])); + backend + .put_object("mp-bkt", "src", data, None) + .await + .unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mut handles = Vec::new(); + + // Writer flips the source between two fixed contents. + { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut flip = false; + while !stop.load(Ordering::Relaxed) { + flip = !flip; + let fill = if flip { b'a' } else { b'b' }; + let data: AsyncReadStream = + Box::pin(std::io::Cursor::new(vec![fill; SIZE as usize])); + let _ = b.put_object("mp-bkt", "src", data, None).await; + } + })); + } + + let ops = StdArc::new(AtomicU64::new(0)); + let bad = StdArc::new(AtomicU64::new(0)); + for _ in 0..4 { + let b = backend.clone(); + let stop = stop.clone(); + let etag_a = etag_a.clone(); + let etag_b = etag_b.clone(); + let ops = ops.clone(); + let bad = bad.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + let upload_id = match b.initiate_multipart("mp-bkt", "dst", None).await { + Ok(u) => u, + Err(_) => continue, + }; + let res = b + .upload_part_copy( + "mp-bkt", &upload_id, 1, "mp-bkt", "src", None, + ) + .await; + if let Ok((etag, _lm)) = res { + // The part etag is the MD5 of the copied bytes; it + // must be one of the two known values, never something + // in between (which would signal metadata from one + // version and bytes from another). + if etag != etag_a && etag != etag_b { + bad.fetch_add(1, Ordering::Relaxed); + } + ops.fetch_add(1, Ordering::Relaxed); + } + let _ = b.abort_multipart("mp-bkt", &upload_id).await; + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + let o = ops.load(Ordering::Relaxed); + let x = bad.load(Ordering::Relaxed); + assert!(o >= 4, "expected at least a few upload_part_copy ops, got {}", o); + assert_eq!( + x, 0, + "observed {} upload_part_copy results with etag unrelated to source content (out of {})", + x, o + ); + // Sanity: make sure the test actually exercised both versions. + let _ = PartInfo { + part_number: 1, + etag: etag_a, + }; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_contention_does_not_stall_other_async_tasks() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + + let (_dir, backend) = create_test_backend(); + let backend = StdArc::new(backend); + backend.create_bucket("contend").await.unwrap(); + + // Seed a 1 MiB object. + let seed = vec![b'x'; 1_048_576]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(seed)); + backend + .put_object("contend", "hot", data, None) + .await + .unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mut handles = Vec::new(); + + // Hammer the same key with PUTs that each acquire the write lock + // inside block_in_place. On only 2 worker threads, this is a + // worst-case pattern for worker starvation. + for w in 0..4 { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut i: u8 = 0; + while !stop.load(Ordering::Relaxed) { + let fill = b'a' + ((w as u8 + i) % 26); + let body = vec![fill; 1_048_576]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(body)); + let _ = b.put_object("contend", "hot", data, None).await; + i = i.wrapping_add(1); + } + })); + } + + // Unrelated async tasks (simulating e.g. health checks, metrics + // emissions) that should keep firing throughout the contention. + let pings = StdArc::new(AtomicU64::new(0)); + for _ in 0..2 { + let stop = stop.clone(); + let pings = pings.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + tokio::task::yield_now().await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + pings.fetch_add(1, Ordering::Relaxed); + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + // If the worker was getting parked on lock.write() outside of + // block_in_place, the unrelated task's ping counter would stall. We + // expect ~1 ping per ms when healthy; assert a generous floor. + let p = pings.load(Ordering::Relaxed); + assert!( + p >= 50, + "unrelated async tasks stalled during PUT contention: only {} pings in 400ms", + p + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_concurrent_put_get_atomicity() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc as StdArc; + + let (_dir, backend) = create_test_backend(); + let backend = StdArc::new(backend); + backend.create_bucket("race-bucket").await.unwrap(); + + const SIZE: usize = 256 * 1024; + // Seed an initial version so GETs can start immediately. + let seed = vec![b'a'; SIZE]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(seed)); + backend + .put_object("race-bucket", "hot", data, None) + .await + .unwrap(); + + let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + let mismatches = StdArc::new(AtomicU64::new(0)); + let reads = StdArc::new(AtomicU64::new(0)); + + let mut handles = Vec::new(); + for w in 0..2 { + let b = backend.clone(); + let stop = stop.clone(); + handles.push(tokio::spawn(async move { + let mut i: u8 = 0; + while !stop.load(Ordering::Relaxed) { + let fill = b'a'.wrapping_add((w * 8 + i) as u8); + let body = vec![fill; SIZE]; + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(body)); + let _ = b.put_object("race-bucket", "hot", data, None).await; + i = i.wrapping_add(1); + } + })); + } + for _ in 0..6 { + let b = backend.clone(); + let stop = stop.clone(); + let mismatches = mismatches.clone(); + let reads = reads.clone(); + handles.push(tokio::spawn(async move { + while !stop.load(Ordering::Relaxed) { + if let Ok((obj, mut stream)) = b.get_object("race-bucket", "hot").await { + let mut buf = Vec::with_capacity(SIZE); + if stream.read_to_end(&mut buf).await.is_ok() { + let header_etag = obj.etag.unwrap_or_default(); + let body_md5 = format!("{:x}", Md5::digest(&buf)); + reads.fetch_add(1, Ordering::Relaxed); + if header_etag != body_md5 { + mismatches.fetch_add(1, Ordering::Relaxed); + } + } + } + } + })); + } + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + stop.store(true, Ordering::Relaxed); + for h in handles { + let _ = h.await; + } + + let r = reads.load(Ordering::Relaxed); + let m = mismatches.load(Ordering::Relaxed); + assert!(r > 10, "expected at least a handful of GETs, got {}", r); + assert_eq!( + m, 0, + "observed {} ETag/body mismatches out of {} reads", + m, r + ); + } } diff --git a/crates/myfsio-storage/src/traits.rs b/crates/myfsio-storage/src/traits.rs index 0c9f3c3..d1a7d03 100644 --- a/crates/myfsio-storage/src/traits.rs +++ b/crates/myfsio-storage/src/traits.rs @@ -30,8 +30,44 @@ pub trait StorageEngine: Send + Sync { key: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)>; + async fn get_object_range( + &self, + bucket: &str, + key: &str, + start: u64, + len: Option, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)>; + + async fn get_object_snapshot( + &self, + bucket: &str, + key: &str, + ) -> StorageResult<(ObjectMeta, tokio::fs::File)>; + + async fn get_object_version_snapshot( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<(ObjectMeta, tokio::fs::File)>; + async fn get_object_path(&self, bucket: &str, key: &str) -> StorageResult; + async fn snapshot_object_to_link( + &self, + bucket: &str, + key: &str, + link_path: &std::path::Path, + ) -> StorageResult; + + async fn snapshot_object_version_to_link( + &self, + bucket: &str, + key: &str, + version_id: &str, + link_path: &std::path::Path, + ) -> StorageResult; + async fn head_object(&self, bucket: &str, key: &str) -> StorageResult; async fn get_object_version( @@ -41,6 +77,15 @@ pub trait StorageEngine: Send + Sync { version_id: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)>; + async fn get_object_version_range( + &self, + bucket: &str, + key: &str, + version_id: &str, + start: u64, + len: Option, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)>; + async fn get_object_version_path( &self, bucket: &str,