diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index ea6b076..60b0f6a 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -164,32 +164,37 @@ async fn ensure_object_lock_allows_write( key: &str, headers: Option<&HeaderMap>, ) -> Result<(), Response> { - match state.storage.head_object(bucket, key).await { - Ok(_) => { - let metadata = match state.storage.get_object_metadata(bucket, key).await { - Ok(metadata) => metadata, - Err(err) => return Err(storage_err_response(err)), - }; - let bypass_governance = headers - .and_then(|headers| { - headers - .get("x-amz-bypass-governance-retention") - .and_then(|value| value.to_str().ok()) - }) - .map(|value| value.eq_ignore_ascii_case("true")) - .unwrap_or(false); - if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) { - return Err(s3_error_response(S3Error::new( - S3ErrorCode::AccessDenied, - message, - ))); - } - Ok(()) - } - Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), - Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => Ok(()), - Err(err) => Err(storage_err_response(err)), + let head_res = state.storage.head_object(bucket, key).await; + let needs_lock_check = match &head_res { + Ok(_) => true, + Err(myfsio_storage::error::StorageError::ObjectCorrupted { .. }) => true, + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => return Ok(()), + Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => return Ok(()), + Err(_) => false, + }; + if !needs_lock_check { + return Err(storage_err_response(head_res.err().unwrap())); } + + let metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return Err(storage_err_response(err)), + }; + let bypass_governance = headers + .and_then(|headers| { + headers + .get("x-amz-bypass-governance-retention") + .and_then(|value| value.to_str().ok()) + }) + .map(|value| value.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::AccessDenied, + message, + ))); + } + Ok(()) } async fn ensure_object_version_lock_allows_delete( @@ -1609,6 +1614,13 @@ pub async fn get_object( .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()); + if range_header.is_some() && query.part_number.is_some() { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "Cannot specify both Range and partNumber on the same request", + )); + } + if let Some(ref range_str) = range_header { return range_get_handler(&state, &bucket, &key, range_str, &query, &headers).await; } @@ -1643,6 +1655,40 @@ pub async fn get_object( Err(e) => return storage_err_response(e), }; + if let Some(part_number) = query.part_number { + match resolve_part_view(&snap_meta, part_number) { + Ok(view) if view.multipart => { + if view.length == 0 { + if let Some(resp) = evaluate_get_preconditions(&headers, &snap_meta) { + let _ = tokio::fs::remove_file(&snap_link).await; + return resp; + } + let _ = tokio::fs::remove_file(&snap_link).await; + let mut h = + build_part_response_headers(&key, &snap_meta, &view, &query); + apply_user_metadata(&mut h, &snap_meta.metadata); + return (StatusCode::PARTIAL_CONTENT, h).into_response(); + } + let range_str = format!("bytes={}-{}", view.start, view.start + view.length - 1); + return serve_range_from_snapshot( + &state, + snap_link, + snap_meta, + &range_str, + &query, + &headers, + Some(view.parts_count), + ) + .await; + } + Ok(_) => {} + Err(resp) => { + let _ = tokio::fs::remove_file(&snap_link).await; + return resp; + } + } + } + // Evaluate preconditions against the served snapshot's metadata. A HEAD // taken earlier could disagree with the snapshot if a concurrent PUT // landed in between, causing us to serve a body that doesn't satisfy @@ -1870,6 +1916,21 @@ pub async fn head_object( if let Some(resp) = evaluate_get_preconditions(&headers, &meta) { return resp; } + + let part_view = match query.part_number { + Some(n) => match resolve_part_view(&meta, n) { + Ok(v) => Some(v), + Err(resp) => return resp, + }, + None => None, + }; + + if let Some(view) = part_view.as_ref().filter(|v| v.multipart) { + let mut headers = build_part_response_headers(&key, &meta, view, &query); + apply_user_metadata(&mut headers, &meta.metadata); + return (StatusCode::PARTIAL_CONTENT, headers).into_response(); + } + let mut headers = HeaderMap::new(); headers.insert("content-length", meta.size.to_string().parse().unwrap()); if let Some(ref etag) = meta.etag { @@ -1905,6 +1966,134 @@ pub async fn head_object( } } +struct PartView { + start: u64, + length: u64, + parts_count: u32, + multipart: bool, +} + +fn build_part_response_headers( + key: &str, + meta: &myfsio_common::types::ObjectMeta, + view: &PartView, + query: &ObjectQuery, +) -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert("content-length", view.length.to_string().parse().unwrap()); + if view.length > 0 { + headers.insert( + "content-range", + format!( + "bytes {}-{}/{}", + view.start, + view.start + view.length - 1, + meta.size + ) + .parse() + .unwrap(), + ); + } + if let Some(ref etag) = meta.etag { + headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); + } + insert_content_type(&mut headers, key, meta.content_type.as_deref()); + headers.insert( + "last-modified", + meta.last_modified + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string() + .parse() + .unwrap(), + ); + headers.insert("accept-ranges", "bytes".parse().unwrap()); + apply_stored_response_headers(&mut headers, &meta.internal_metadata); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + headers.insert("x-amz-version-id", value); + } + } else if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + headers.insert("x-amz-version-id", value); + } + } + headers.insert( + "x-amz-mp-parts-count", + view.parts_count.to_string().parse().unwrap(), + ); + apply_response_overrides(&mut headers, query); + headers +} + +fn resolve_part_view( + meta: &myfsio_common::types::ObjectMeta, + part_number: u32, +) -> Result { + if part_number < 1 { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "partNumber must be >= 1", + ))); + } + + let etag = meta.etag.as_deref().unwrap_or(""); + let is_multipart = myfsio_storage::fs_backend::is_multipart_etag(etag); + + if !is_multipart { + if part_number == 1 { + return Ok(PartView { + start: 0, + length: meta.size, + parts_count: 1, + multipart: false, + }); + } + return Err(s3_error_response(S3Error::new( + S3ErrorCode::InvalidPart, + format!( + "partNumber {} is out of range for a non-multipart object", + part_number + ), + ))); + } + + let part_sizes = match meta + .internal_metadata + .get(myfsio_storage::fs_backend::META_KEY_PART_SIZES) + .and_then(|raw| myfsio_storage::fs_backend::parse_part_sizes(raw)) + { + Some(sizes) => sizes, + None => { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "Object is multipart but has no recorded part-size manifest; \ + partNumber addressing is unavailable", + ))); + } + }; + + let idx = (part_number as usize).saturating_sub(1); + if idx >= part_sizes.len() { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::InvalidPart, + format!( + "partNumber {} exceeds the {} parts in this object", + part_number, + part_sizes.len() + ), + ))); + } + + let start: u64 = part_sizes.iter().take(idx).sum(); + let length = part_sizes[idx]; + Ok(PartView { + start, + length, + parts_count: part_sizes.len() as u32, + multipart: true, + }) +} + async fn initiate_multipart_handler(state: &AppState, bucket: &str, key: &str) -> Response { match state.storage.initiate_multipart(bucket, key, None).await { Ok(upload_id) => { @@ -2486,34 +2675,37 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R async move { let key = obj.key.clone(); let requested_vid = obj.version_id.clone(); + let to_err = |err: myfsio_storage::error::StorageError| -> (String, String) { + let s3err = S3Error::from(err); + (s3err.code.as_str().to_string(), s3err.message) + }; + let run_can_delete = + |metadata: &HashMap| -> Result<(), (String, String)> { + object_lock::can_delete_object(metadata, false) + .map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)) + }; let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() { - Some(version_id) if version_id != "null" => match state - .storage - .get_object_version_metadata(&bucket, &obj.key, version_id) - .await - { - Ok(metadata) => object_lock::can_delete_object(&metadata, false) - .map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)), - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) + Some(version_id) if version_id != "null" => { + match state + .storage + .get_object_version_metadata(&bucket, &obj.key, version_id) + .await + { + Ok(metadata) => run_can_delete(&metadata), + Err(err) => Err(to_err(err)), } - }, + } _ => match state.storage.head_object(&bucket, &obj.key).await { - Ok(_) => match state.storage.get_object_metadata(&bucket, &obj.key).await { - Ok(metadata) => object_lock::can_delete_object(&metadata, false) - .map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)), - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) + Ok(_) + | Err(myfsio_storage::error::StorageError::ObjectCorrupted { .. }) => { + match state.storage.get_object_metadata(&bucket, &obj.key).await { + Ok(metadata) => run_can_delete(&metadata), + Err(err) => Err(to_err(err)), } - }, + } Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => Ok(()), - Err(err) => { - let s3err = S3Error::from(err); - Err((s3err.code.as_str().to_string(), s3err.message)) - } + Err(err) => Err(to_err(err)), }, }; @@ -2578,6 +2770,18 @@ async fn range_get_handler( range_str: &str, query: &ObjectQuery, headers: &HeaderMap, +) -> Response { + range_get_handler_inner(state, bucket, key, range_str, query, headers, None).await +} + +async fn range_get_handler_inner( + state: &AppState, + bucket: &str, + key: &str, + range_str: &str, + query: &ObjectQuery, + headers: &HeaderMap, + parts_count: Option, ) -> Response { let version_id = query .version_id @@ -2607,6 +2811,21 @@ async fn range_get_handler( Err(e) => return storage_err_response(e), }; + serve_range_from_snapshot(state, snap_link, meta, range_str, query, headers, parts_count).await +} + +async fn serve_range_from_snapshot( + state: &AppState, + snap_link: std::path::PathBuf, + meta: myfsio_common::types::ObjectMeta, + range_str: &str, + query: &ObjectQuery, + headers: &HeaderMap, + parts_count: Option, +) -> Response { + let key = meta.key.as_str(); + let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp"); + if let Some(resp) = evaluate_get_preconditions(headers, &meta) { let _ = tokio::fs::remove_file(&snap_link).await; return resp; @@ -2666,6 +2885,7 @@ async fn range_get_handler( query, Some(enc_info.algorithm.as_str()), /* already_trimmed */ true, + parts_count, ) .await; } @@ -2720,6 +2940,7 @@ async fn range_get_handler( query, enc_header, /* already_trimmed */ false, + parts_count, ) .await } @@ -2735,6 +2956,7 @@ async fn stream_partial_content( query: &ObjectQuery, enc_header: Option<&str>, already_trimmed: bool, + parts_count: Option, ) -> Response { let length = end - start + 1; @@ -2789,6 +3011,10 @@ async fn stream_partial_content( apply_response_overrides(&mut headers, query); + if let Some(count) = parts_count { + headers.insert("x-amz-mp-parts-count", count.to_string().parse().unwrap()); + } + (StatusCode::PARTIAL_CONTENT, headers, body).into_response() } diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index e747756..8957fd1 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -63,6 +63,84 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te (app, tmp) } +fn test_app_and_state() -> ( + axum::Router, + myfsio_server::state::AppState, + tempfile::TempDir, +) { + let tmp = tempfile::TempDir::new().unwrap(); + let iam_path = tmp.path().join(".myfsio.sys").join("config"); + std::fs::create_dir_all(&iam_path).unwrap(); + + std::fs::write( + iam_path.join("iam.json"), + serde_json::json!({ + "version": 2, + "users": [{ + "user_id": "u-test1234", + "display_name": "admin", + "enabled": true, + "access_keys": [{ + "access_key": TEST_ACCESS_KEY, + "secret_key": TEST_SECRET_KEY, + "status": "active" + }], + "policies": [{ + "bucket": "*", + "actions": ["*"], + "prefix": "*" + }] + }] + }) + .to_string(), + ) + .unwrap(); + + let config = myfsio_server::config::ServerConfig { + bind_addr: "127.0.0.1:0".parse().unwrap(), + ui_bind_addr: "127.0.0.1:0".parse().unwrap(), + storage_root: tmp.path().to_path_buf(), + region: "us-east-1".to_string(), + iam_config_path: iam_path.join("iam.json"), + sigv4_timestamp_tolerance_secs: 900, + presigned_url_min_expiry: 1, + presigned_url_max_expiry: 604800, + secret_key: None, + encryption_enabled: false, + kms_enabled: false, + gc_enabled: false, + integrity_enabled: false, + metrics_enabled: false, + metrics_history_enabled: false, + metrics_interval_minutes: 5, + metrics_retention_hours: 24, + metrics_history_interval_minutes: 5, + metrics_history_retention_hours: 24, + lifecycle_enabled: false, + website_hosting_enabled: false, + replication_connect_timeout_secs: 5, + replication_read_timeout_secs: 30, + replication_max_retries: 2, + replication_streaming_threshold_bytes: 10_485_760, + replication_max_failures_per_bucket: 50, + site_sync_enabled: false, + site_sync_interval_secs: 60, + site_sync_batch_size: 100, + site_sync_connect_timeout_secs: 10, + site_sync_read_timeout_secs: 120, + site_sync_max_retries: 2, + site_sync_clock_skew_tolerance: 1.0, + ui_enabled: false, + templates_dir: std::path::PathBuf::from("templates"), + static_dir: std::path::PathBuf::from("static"), + multipart_min_part_size: 1, + ..myfsio_server::config::ServerConfig::default() + }; + let state = myfsio_server::state::AppState::new(config); + let app = myfsio_server::create_router(state.clone()); + (app, state, tmp) +} + fn test_app() -> (axum::Router, tempfile::TempDir) { test_app_with_iam(serde_json::json!({ "version": 2, @@ -5151,3 +5229,977 @@ async fn test_plaintext_range_still_works() { assert!(resp.headers().get("x-amz-server-side-encryption").is_none()); assert_eq!(body_bytes(resp).await, payload[100..=199]); } + +async fn poison_object(state: &myfsio_server::state::AppState, bucket: &str, key: &str) { + use myfsio_storage::fs_backend::{META_KEY_CORRUPTED, META_KEY_CORRUPTION_DETAIL}; + let mut meta = state + .storage + .get_object_metadata(bucket, key) + .await + .unwrap(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + meta.insert( + META_KEY_CORRUPTION_DETAIL.to_string(), + "test poisoned for §E recovery".to_string(), + ); + state + .storage + .put_object_metadata(bucket, key, &meta) + .await + .unwrap(); +} + +async fn set_legal_hold(state: &myfsio_server::state::AppState, bucket: &str, key: &str) { + let mut meta = state + .storage + .get_object_metadata(bucket, key) + .await + .unwrap(); + meta.insert("__legal_hold__".to_string(), "true".to_string()); + state + .storage + .put_object_metadata(bucket, key, &meta) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_poisoned_object_can_be_overwritten_via_put() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/heal-put", Body::empty())) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/heal-put/healme", + Body::from("v1 bytes"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + poison_object(&state, "heal-put", "healme").await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/heal-put/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status().as_u16(), + 422, + "poisoned GET must surface 422 ObjectCorrupted" + ); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/heal-put/healme", + Body::from("v2 bytes"), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "PUT must overwrite a poisoned object instead of returning 422" + ); + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/heal-put/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(body_bytes(resp).await, b"v2 bytes".to_vec()); +} + +#[tokio::test] +async fn test_poisoned_object_can_be_deleted() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/heal-del", Body::empty())) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/heal-del/healme", + Body::from("rotting"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + poison_object(&state, "heal-del", "healme").await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/heal-del/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NO_CONTENT, + "DELETE must succeed on a poisoned object instead of returning 422" + ); + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/heal-del/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NOT_FOUND, + "HEAD after DELETE must be 404, not 422 (poison flag was cleared)" + ); +} + +#[tokio::test] +async fn test_poisoned_quarantined_object_can_be_deleted() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/heal-q", Body::empty())) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/heal-q/healme", + Body::from("rotting"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + poison_object(&state, "heal-q", "healme").await; + + let live_path = state + .storage + .get_object_path("heal-q", "healme") + .await + .unwrap(); + std::fs::remove_file(&live_path).unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/heal-q/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/heal-q/healme", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +async fn complete_two_part_upload( + app: &axum::Router, + bucket: &str, + key: &str, + part1: Vec, + part2: Vec, +) { + app.clone() + .oneshot(signed_request( + Method::PUT, + &format!("/{}", bucket), + Body::empty(), + )) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::POST, + &format!("/{}/{}?uploads", bucket, key), + Body::empty(), + )) + .await + .unwrap(); + let body = String::from_utf8( + resp.into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + let upload_id = body + .split("") + .nth(1) + .unwrap() + .split("") + .next() + .unwrap() + .to_string(); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri(format!( + "/{}/{}?uploadId={}&partNumber=1", + bucket, key, upload_id + )) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(part1)) + .unwrap(), + ) + .await + .unwrap(); + let etag1 = resp + .headers() + .get("etag") + .unwrap() + .to_str() + .unwrap() + .trim_matches('"') + .to_string(); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri(format!( + "/{}/{}?uploadId={}&partNumber=2", + bucket, key, upload_id + )) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(part2)) + .unwrap(), + ) + .await + .unwrap(); + let etag2 = resp + .headers() + .get("etag") + .unwrap() + .to_str() + .unwrap() + .trim_matches('"') + .to_string(); + + let complete_xml = format!( + "1\"{etag1}\"2\"{etag2}\"" + ); + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri(format!("/{}/{}?uploadId={}", bucket, key, upload_id)) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(complete_xml)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_part_number_get_returns_only_that_part() { + let (app, _tmp) = test_app(); + let part1 = vec![b'A'; 1024]; + let part2 = vec![b'B'; 512]; + complete_two_part_upload(&app, "mp-pn", "obj.bin", part1.clone(), part2.clone()).await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/mp-pn/obj.bin?partNumber=1", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("x-amz-mp-parts-count").unwrap(), + "2", + "x-amz-mp-parts-count must reflect the assembled object" + ); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 0-1023/1536" + ); + assert_eq!(resp.headers().get("content-length").unwrap(), "1024"); + assert_eq!(body_bytes(resp).await, part1); + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/mp-pn/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 1024-1535/1536" + ); + assert_eq!(resp.headers().get("content-length").unwrap(), "512"); + assert_eq!(body_bytes(resp).await, part2); +} + +#[tokio::test] +async fn test_head_part_number_returns_part_size() { + let (app, _tmp) = test_app(); + let part1 = vec![b'X'; 2048]; + let part2 = vec![b'Y'; 256]; + complete_two_part_upload(&app, "mp-head", "obj.bin", part1, part2).await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/mp-head/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "256"); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 2048-2303/2304" + ); + assert_eq!(resp.headers().get("x-amz-mp-parts-count").unwrap(), "2"); +} + +#[tokio::test] +async fn test_part_number_out_of_range_rejected() { + let (app, _tmp) = test_app(); + complete_two_part_upload( + &app, + "mp-oob", + "obj.bin", + vec![b'A'; 8], + vec![b'B'; 8], + ) + .await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/mp-oob/obj.bin?partNumber=5", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_eq!( + resp.headers().get("x-amz-error-code").unwrap(), + "InvalidPart" + ); +} + +#[tokio::test] +async fn test_part_number_one_on_non_multipart_returns_full_body() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/single", Body::empty())) + .await + .unwrap(); + + let payload = b"single-shot upload".to_vec(); + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/single/obj.bin", + Body::from(payload.clone()), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/single/obj.bin?partNumber=1", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "partNumber=1 on a non-multipart object must return the whole body as 200 OK" + ); + assert!(resp.headers().get("x-amz-mp-parts-count").is_none()); + assert_eq!(body_bytes(resp).await, payload); + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/single/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn test_legal_hold_blocks_delete_even_when_poisoned() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/lock-poison", Body::empty())) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/lock-poison/locked.bin", + Body::from("v1"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + set_legal_hold(&state, "lock-poison", "locked.bin").await; + poison_object(&state, "lock-poison", "locked.bin").await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/lock-poison/locked.bin", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::FORBIDDEN, + "legal-hold must still block DELETE on a poisoned object (poison must not bypass object lock)" + ); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/lock-poison/locked.bin", + Body::from("v2"), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::FORBIDDEN, + "legal-hold must still block PUT-overwrite on a poisoned object" + ); +} + +#[tokio::test] +async fn test_governance_retention_blocks_poisoned_delete_without_bypass() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/gov-poison", Body::empty())) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/gov-poison/locked.bin", + Body::from("v1"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let mut meta = state + .storage + .get_object_metadata("gov-poison", "locked.bin") + .await + .unwrap(); + let retain_until = (chrono::Utc::now() + chrono::Duration::days(1)).to_rfc3339(); + meta.insert( + "__object_retention__".to_string(), + format!( + "{{\"mode\":\"GOVERNANCE\",\"retain_until_date\":\"{}\"}}", + retain_until + ), + ); + state + .storage + .put_object_metadata("gov-poison", "locked.bin", &meta) + .await + .unwrap(); + + poison_object(&state, "gov-poison", "locked.bin").await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/gov-poison/locked.bin", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::FORBIDDEN, + "governance retention must block DELETE without bypass header even when object is poisoned" + ); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::DELETE) + .uri("/gov-poison/locked.bin") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-bypass-governance-retention", "true") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NO_CONTENT, + "with x-amz-bypass-governance-retention=true, poisoned governance-locked object can be deleted" + ); +} + +#[tokio::test] +async fn test_part_number_uses_served_snapshot_metadata() { + let (app, state, _tmp) = test_app_and_state(); + let part1 = vec![b'A'; 1024]; + let part2 = vec![b'B'; 512]; + complete_two_part_upload(&app, "race-pn", "obj.bin", part1.clone(), part2.clone()).await; + + let live_meta = state + .storage + .get_object_metadata("race-pn", "obj.bin") + .await + .unwrap(); + let original_part_sizes = live_meta + .get(myfsio_storage::fs_backend::META_KEY_PART_SIZES) + .cloned() + .unwrap(); + assert_eq!(original_part_sizes, "1024,512"); + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/race-pn/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!( + resp.headers().get("content-range").unwrap(), + "bytes 1024-1535/1536", + "Content-Range must be derived from the snapshot's __part_sizes__" + ); + assert_eq!(resp.headers().get("x-amz-mp-parts-count").unwrap(), "2"); + assert_eq!(body_bytes(resp).await, part2); +} + +#[tokio::test] +async fn test_bulk_delete_poisoned_unlocked_object_succeeds() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/bulk-poison", + Body::empty(), + )) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/bulk-poison/dead.bin", + Body::from("rotting"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + poison_object(&state, "bulk-poison", "dead.bin").await; + + let body = "dead.bin"; + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/bulk-poison?delete") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body_str = String::from_utf8( + resp.into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!( + body_str.contains("") && body_str.contains("dead.bin"), + "bulk delete on poisoned-unlocked object should report success, got: {}", + body_str + ); + assert!( + !body_str.contains("ObjectCorrupted"), + "bulk delete must not surface ObjectCorrupted on a poisoned-unlocked object: {}", + body_str + ); +} + +#[tokio::test] +async fn test_bulk_delete_poisoned_locked_object_blocked_by_legal_hold() { + let (app, state, _tmp) = test_app_and_state(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/bulk-locked", + Body::empty(), + )) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/bulk-locked/locked.bin", + Body::from("rotting"), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + set_legal_hold(&state, "bulk-locked", "locked.bin").await; + poison_object(&state, "bulk-locked", "locked.bin").await; + + let body = "locked.bin"; + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/bulk-locked?delete") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body_str = String::from_utf8( + resp.into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!( + body_str.contains("") && body_str.contains("AccessDenied"), + "bulk delete on legal-hold-locked poisoned object must report AccessDenied, got: {}", + body_str + ); + assert!( + !body_str.contains("ObjectCorrupted"), + "bulk delete must not surface ObjectCorrupted on a poisoned-locked object: {}", + body_str + ); + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/bulk-locked/locked.bin", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!( + resp.status().as_u16(), + 422, + "object must still be poisoned after blocked bulk-delete" + ); +} + +async fn upload_zero_length_final_part_object( + app: &axum::Router, + bucket: &str, + key: &str, + part1: Vec, +) { + app.clone() + .oneshot(signed_request( + Method::PUT, + &format!("/{}", bucket), + Body::empty(), + )) + .await + .unwrap(); + + let resp = app + .clone() + .oneshot(signed_request( + Method::POST, + &format!("/{}/{}?uploads", bucket, key), + Body::empty(), + )) + .await + .unwrap(); + let body = String::from_utf8( + resp.into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + let upload_id = body + .split("") + .nth(1) + .unwrap() + .split("") + .next() + .unwrap() + .to_string(); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri(format!( + "/{}/{}?uploadId={}&partNumber=1", + bucket, key, upload_id + )) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(part1)) + .unwrap(), + ) + .await + .unwrap(); + let etag1 = resp + .headers() + .get("etag") + .unwrap() + .to_str() + .unwrap() + .trim_matches('"') + .to_string(); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri(format!( + "/{}/{}?uploadId={}&partNumber=2", + bucket, key, upload_id + )) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let etag2 = resp + .headers() + .get("etag") + .unwrap() + .to_str() + .unwrap() + .trim_matches('"') + .to_string(); + + let complete_xml = format!( + "1\"{etag1}\"2\"{etag2}\"" + ); + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri(format!("/{}/{}?uploadId={}", bucket, key, upload_id)) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from(complete_xml)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_zero_length_part_get_omits_content_range() { + let (app, _tmp) = test_app(); + upload_zero_length_final_part_object(&app, "zero-pn", "obj.bin", vec![b'A'; 1024]).await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/zero-pn/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "0"); + assert!( + resp.headers().get("content-range").is_none(), + "zero-length part GET must not emit Content-Range (would be misleading)" + ); + assert_eq!(resp.headers().get("x-amz-mp-parts-count").unwrap(), "2"); + assert_eq!(body_bytes(resp).await, Vec::::new()); +} + +#[tokio::test] +async fn test_zero_length_part_head_omits_content_range() { + let (app, _tmp) = test_app(); + upload_zero_length_final_part_object(&app, "zero-pn-h", "obj.bin", vec![b'A'; 1024]).await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/zero-pn-h/obj.bin?partNumber=2", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT); + assert_eq!(resp.headers().get("content-length").unwrap(), "0"); + assert!( + resp.headers().get("content-range").is_none(), + "zero-length part HEAD must not emit Content-Range" + ); + assert_eq!(resp.headers().get("x-amz-mp-parts-count").unwrap(), "2"); +} + +#[tokio::test] +async fn test_zero_length_part_get_evaluates_if_none_match() { + let (app, _tmp) = test_app(); + upload_zero_length_final_part_object(&app, "zero-pn-cond", "obj.bin", vec![b'A'; 1024]).await; + + let resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/zero-pn-cond/obj.bin", + Body::empty(), + )) + .await + .unwrap(); + let etag = resp + .headers() + .get("etag") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/zero-pn-cond/obj.bin?partNumber=2") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("if-none-match", etag.clone()) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NOT_MODIFIED, + "zero-length part GET must honor If-None-Match like any other GET" + ); +} diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index 569e66a..38eff07 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -21,6 +21,34 @@ pub const META_KEY_CORRUPTED: &str = "__corrupted__"; pub const META_KEY_CORRUPTED_AT: &str = "__corrupted_at__"; pub const META_KEY_CORRUPTION_DETAIL: &str = "__corruption_detail__"; pub const META_KEY_QUARANTINE_PATH: &str = "__quarantine_path__"; +pub const META_KEY_PART_SIZES: &str = "__part_sizes__"; + +pub fn encode_part_sizes(sizes: &[u64]) -> String { + let mut out = String::with_capacity(sizes.len() * 8); + for (i, s) in sizes.iter().enumerate() { + if i > 0 { + out.push(','); + } + out.push_str(&s.to_string()); + } + out +} + +pub fn parse_part_sizes(raw: &str) -> Option> { + let mut out = Vec::new(); + for tok in raw.split(',') { + let tok = tok.trim(); + if tok.is_empty() { + return None; + } + out.push(tok.parse::().ok()?); + } + if out.is_empty() { + None + } else { + Some(out) + } +} pub fn metadata_is_corrupted(meta: &HashMap) -> bool { meta.get(META_KEY_CORRUPTED) @@ -2829,6 +2857,12 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.delete_metadata_sync(bucket, key) .map_err(StorageError::Io)?; Self::cleanup_empty_parents(&path, &bucket_path); + } else { + let stored_meta = self.read_metadata_sync(bucket, key); + if !stored_meta.is_empty() { + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io)?; + } } let dm_version_id = self .write_delete_marker_sync(bucket, key) @@ -2842,6 +2876,17 @@ impl crate::traits::StorageEngine for FsStorageBackend { } if !path.exists() { + let stored_meta = self.read_metadata_sync(bucket, key); + if !stored_meta.is_empty() { + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); + return Ok(DeleteOutcome { + version_id: None, + is_delete_marker: false, + existed: true, + }); + } return Ok(DeleteOutcome::default()); } @@ -3353,47 +3398,52 @@ impl crate::traits::StorageEngine for FsStorageBackend { // Assemble parts on a blocking thread using std::fs, large buffers, // and a single writer flush — no per-chunk runtime crossings. - let assemble_res = tokio::task::spawn_blocking(move || -> StorageResult<(String, u64)> { - use std::io::{BufReader, BufWriter, Read, Write}; - let out_raw = std::fs::File::create(&tmp_path_owned).map_err(StorageError::Io)?; - let mut out_file = BufWriter::with_capacity(chunk_size * 4, out_raw); - let mut md5_digest_concat = Vec::with_capacity(part_infos.len() * 16); - let mut total_size: u64 = 0; - let mut buf = vec![0u8; chunk_size]; + let assemble_res = + tokio::task::spawn_blocking(move || -> StorageResult<(String, u64, Vec)> { + use std::io::{BufReader, BufWriter, Read, Write}; + let out_raw = std::fs::File::create(&tmp_path_owned).map_err(StorageError::Io)?; + let mut out_file = BufWriter::with_capacity(chunk_size * 4, out_raw); + let mut md5_digest_concat = Vec::with_capacity(part_infos.len() * 16); + let mut total_size: u64 = 0; + let mut part_sizes: Vec = Vec::with_capacity(part_infos.len()); + let mut buf = vec![0u8; chunk_size]; - for part_info in &part_infos { - let part_file = - upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number)); - if !part_file.exists() { - return Err(StorageError::InvalidObjectKey(format!( - "Part {} not found", - part_info.part_number - ))); - } - let reader = std::fs::File::open(&part_file).map_err(StorageError::Io)?; - let mut reader = BufReader::with_capacity(chunk_size, reader); - let mut part_hasher = Md5::new(); - loop { - let n = reader.read(&mut buf).map_err(StorageError::Io)?; - if n == 0 { - break; + for part_info in &part_infos { + let part_file = + upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number)); + if !part_file.exists() { + return Err(StorageError::InvalidObjectKey(format!( + "Part {} not found", + part_info.part_number + ))); } - part_hasher.update(&buf[..n]); - out_file.write_all(&buf[..n]).map_err(StorageError::Io)?; - total_size += n as u64; + let reader = std::fs::File::open(&part_file).map_err(StorageError::Io)?; + let mut reader = BufReader::with_capacity(chunk_size, reader); + let mut part_hasher = Md5::new(); + let mut part_bytes: u64 = 0; + loop { + let n = reader.read(&mut buf).map_err(StorageError::Io)?; + if n == 0 { + break; + } + part_hasher.update(&buf[..n]); + out_file.write_all(&buf[..n]).map_err(StorageError::Io)?; + total_size += n as u64; + part_bytes += n as u64; + } + md5_digest_concat.extend_from_slice(&part_hasher.finalize()); + part_sizes.push(part_bytes); } - md5_digest_concat.extend_from_slice(&part_hasher.finalize()); - } - out_file.flush().map_err(StorageError::Io)?; - let mut composite_hasher = Md5::new(); - composite_hasher.update(&md5_digest_concat); - let etag = format!("{:x}-{}", composite_hasher.finalize(), part_infos.len()); - Ok((etag, total_size)) - }) - .await; + out_file.flush().map_err(StorageError::Io)?; + let mut composite_hasher = Md5::new(); + composite_hasher.update(&md5_digest_concat); + let etag = format!("{:x}-{}", composite_hasher.finalize(), part_infos.len()); + Ok((etag, total_size, part_sizes)) + }) + .await; - let (etag, total_size) = match assemble_res { + let (etag, total_size, part_sizes) = match assemble_res { Ok(Ok(v)) => v, Ok(Err(e)) => { let _ = std::fs::remove_file(&tmp_path); @@ -3408,6 +3458,9 @@ impl crate::traits::StorageEngine for FsStorageBackend { } }; + let mut metadata = metadata; + metadata.insert(META_KEY_PART_SIZES.to_string(), encode_part_sizes(&part_sizes)); + // Commit to the destination key atomically under its write lock. // Lock acquisition happens inside run_blocking so the wait runs under // block_in_place rather than parking the async worker. @@ -4033,6 +4086,117 @@ mod tests { ); } + #[test] + fn test_part_sizes_roundtrip() { + let sizes = vec![5_242_880, 5_242_880, 5_242_880, 12_345]; + let encoded = encode_part_sizes(&sizes); + assert_eq!(encoded, "5242880,5242880,5242880,12345"); + let parsed = parse_part_sizes(&encoded).unwrap(); + assert_eq!(parsed, sizes); + assert!(parse_part_sizes("").is_none()); + assert!(parse_part_sizes(",,,").is_none()); + assert!(parse_part_sizes("abc").is_none()); + assert!(parse_part_sizes("123,abc").is_none()); + assert!(parse_part_sizes(" ").is_none()); + } + + #[tokio::test] + async fn test_delete_object_clears_poisoned_metadata() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("test-bucket").await.unwrap(); + + let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"will rot".to_vec())); + backend + .put_object("test-bucket", "rot.txt", data, None) + .await + .unwrap(); + + let mut meta = backend + .get_object_metadata("test-bucket", "rot.txt") + .await + .unwrap(); + meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string()); + backend + .put_object_metadata("test-bucket", "rot.txt", &meta) + .await + .unwrap(); + + let live_path = backend + .get_object_path("test-bucket", "rot.txt") + .await + .unwrap(); + std::fs::remove_file(&live_path).unwrap(); + + backend + .delete_object("test-bucket", "rot.txt") + .await + .unwrap(); + + match backend.head_object("test-bucket", "rot.txt").await { + Err(StorageError::ObjectNotFound { .. }) => {} + other => panic!( + "after DELETE on a poisoned/quarantined object, HEAD should be ObjectNotFound, got {:?}", + other + ), + } + + let leftover = backend + .get_object_metadata("test-bucket", "rot.txt") + .await + .unwrap(); + assert!( + leftover.is_empty(), + "metadata sidecar must be cleared after DELETE on poisoned object" + ); + } + + #[tokio::test] + async fn test_complete_multipart_persists_part_sizes() { + let (_dir, backend) = create_test_backend(); + backend.create_bucket("mp-bucket").await.unwrap(); + + let upload_id = backend + .initiate_multipart("mp-bucket", "obj.bin", None) + .await + .unwrap(); + + let part1: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'A'; 1024])); + backend + .upload_part("mp-bucket", &upload_id, 1, part1) + .await + .unwrap(); + let part2: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'B'; 512])); + backend + .upload_part("mp-bucket", &upload_id, 2, part2) + .await + .unwrap(); + + let parts = vec![ + PartInfo { + part_number: 1, + etag: String::new(), + }, + PartInfo { + part_number: 2, + etag: String::new(), + }, + ]; + let obj = backend + .complete_multipart("mp-bucket", &upload_id, &parts) + .await + .unwrap(); + assert_eq!(obj.size, 1536); + + let stored = backend + .get_object_metadata("mp-bucket", "obj.bin") + .await + .unwrap(); + let raw = stored + .get(META_KEY_PART_SIZES) + .expect("part sizes must be persisted on completion"); + assert_eq!(parse_part_sizes(raw).unwrap(), vec![1024u64, 512u64]); + } + #[tokio::test] async fn test_put_clears_poison_flag() { let (_dir, backend) = create_test_backend();