From 7ef3820f6eb8a27f42f6bb59938f415db91e4d89 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 23 Apr 2026 17:52:30 +0800 Subject: [PATCH] Fix SigV4/SHA256/TCP_NODELAY critical paths; tighten multipart, copy, versioning, and S3 error conformance --- Cargo.lock | 14 +- Cargo.toml | 4 +- crates/myfsio-common/src/error.rs | 26 ++ crates/myfsio-server/Cargo.toml | 2 + crates/myfsio-server/src/config.rs | 4 + crates/myfsio-server/src/handlers/config.rs | 89 ++++- crates/myfsio-server/src/handlers/mod.rs | 362 +++++++++++++++--- crates/myfsio-server/src/lib.rs | 6 + crates/myfsio-server/src/main.rs | 10 + crates/myfsio-server/src/middleware/auth.rs | 53 ++- crates/myfsio-server/src/middleware/mod.rs | 1 + .../myfsio-server/src/middleware/session.rs | 4 +- .../myfsio-server/src/middleware/sha_body.rs | 107 ++++++ crates/myfsio-server/tests/integration.rs | 210 ++++++++++ crates/myfsio-storage/src/validation.rs | 7 + crates/myfsio-xml/src/response.rs | 2 +- 16 files changed, 821 insertions(+), 80 deletions(-) create mode 100644 crates/myfsio-server/src/middleware/sha_body.rs diff --git a/Cargo.lock b/Cargo.lock index 682f84e..e8cdfee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2639,7 +2639,7 @@ dependencies = [ [[package]] name = "myfsio-auth" -version = "0.4.3" +version = "0.4.4" dependencies = [ "aes", "base64", @@ -2664,7 +2664,7 @@ dependencies = [ [[package]] name = "myfsio-common" -version = "0.4.3" +version = "0.4.4" dependencies = [ "chrono", "serde", @@ -2675,7 +2675,7 @@ dependencies = [ [[package]] name = "myfsio-crypto" -version = "0.4.3" +version = "0.4.4" dependencies = [ "aes-gcm", "base64", @@ -2696,7 +2696,7 @@ dependencies = [ [[package]] name = "myfsio-server" -version = "0.4.3" +version = "0.4.4" dependencies = [ "aes-gcm", "async-trait", @@ -2714,6 +2714,8 @@ dependencies = [ "dotenvy", "duckdb", "futures", + "hex", + "http-body 1.0.1", "http-body-util", "hyper 1.9.0", "md-5 0.10.6", @@ -2751,7 +2753,7 @@ dependencies = [ [[package]] name = "myfsio-storage" -version = "0.4.3" +version = "0.4.4" dependencies = [ "chrono", "dashmap", @@ -2774,7 +2776,7 @@ dependencies = [ [[package]] name = "myfsio-xml" -version = "0.4.3" +version = "0.4.4" dependencies = [ "chrono", "myfsio-common", diff --git a/Cargo.toml b/Cargo.toml index e025cc3..f7bb2e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,14 +10,14 @@ members = [ ] [workspace.package] -version = "0.4.3" +version = "0.4.4" edition = "2021" [workspace.dependencies] tokio = { version = "1", features = ["full"] } axum = { version = "0.8" } tower = { version = "0.5" } -tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip"] } +tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip", "timeout"] } hyper = { version = "1" } bytes = "1" serde = { version = "1", features = ["derive"] } diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs index 5785d5f..c8e506e 100644 --- a/crates/myfsio-common/src/error.rs +++ b/crates/myfsio-common/src/error.rs @@ -7,11 +7,14 @@ pub enum S3ErrorCode { BucketAlreadyExists, BucketNotEmpty, EntityTooLarge, + EntityTooSmall, InternalError, InvalidAccessKeyId, InvalidArgument, InvalidBucketName, InvalidKey, + InvalidPart, + InvalidPartOrder, InvalidPolicyDocument, InvalidRange, InvalidRequest, @@ -19,13 +22,16 @@ pub enum S3ErrorCode { MalformedXML, MethodNotAllowed, NoSuchBucket, + NoSuchBucketPolicy, NoSuchKey, + NoSuchLifecycleConfiguration, NoSuchUpload, NoSuchVersion, NoSuchTagSet, PreconditionFailed, NotModified, QuotaExceeded, + ServerSideEncryptionConfigurationNotFoundError, SignatureDoesNotMatch, SlowDown, } @@ -38,11 +44,14 @@ impl S3ErrorCode { Self::BucketAlreadyExists => 409, Self::BucketNotEmpty => 409, Self::EntityTooLarge => 413, + Self::EntityTooSmall => 400, Self::InternalError => 500, Self::InvalidAccessKeyId => 403, Self::InvalidArgument => 400, Self::InvalidBucketName => 400, Self::InvalidKey => 400, + Self::InvalidPart => 400, + Self::InvalidPartOrder => 400, Self::InvalidPolicyDocument => 400, Self::InvalidRange => 416, Self::InvalidRequest => 400, @@ -50,13 +59,16 @@ impl S3ErrorCode { Self::MalformedXML => 400, Self::MethodNotAllowed => 405, Self::NoSuchBucket => 404, + Self::NoSuchBucketPolicy => 404, Self::NoSuchKey => 404, + Self::NoSuchLifecycleConfiguration => 404, Self::NoSuchUpload => 404, Self::NoSuchVersion => 404, Self::NoSuchTagSet => 404, Self::PreconditionFailed => 412, Self::NotModified => 304, Self::QuotaExceeded => 403, + Self::ServerSideEncryptionConfigurationNotFoundError => 404, Self::SignatureDoesNotMatch => 403, Self::SlowDown => 429, } @@ -69,11 +81,14 @@ impl S3ErrorCode { Self::BucketAlreadyExists => "BucketAlreadyExists", Self::BucketNotEmpty => "BucketNotEmpty", Self::EntityTooLarge => "EntityTooLarge", + Self::EntityTooSmall => "EntityTooSmall", Self::InternalError => "InternalError", Self::InvalidAccessKeyId => "InvalidAccessKeyId", Self::InvalidArgument => "InvalidArgument", Self::InvalidBucketName => "InvalidBucketName", Self::InvalidKey => "InvalidKey", + Self::InvalidPart => "InvalidPart", + Self::InvalidPartOrder => "InvalidPartOrder", Self::InvalidPolicyDocument => "InvalidPolicyDocument", Self::InvalidRange => "InvalidRange", Self::InvalidRequest => "InvalidRequest", @@ -81,13 +96,18 @@ impl S3ErrorCode { Self::MalformedXML => "MalformedXML", Self::MethodNotAllowed => "MethodNotAllowed", Self::NoSuchBucket => "NoSuchBucket", + Self::NoSuchBucketPolicy => "NoSuchBucketPolicy", Self::NoSuchKey => "NoSuchKey", + Self::NoSuchLifecycleConfiguration => "NoSuchLifecycleConfiguration", Self::NoSuchUpload => "NoSuchUpload", Self::NoSuchVersion => "NoSuchVersion", Self::NoSuchTagSet => "NoSuchTagSet", Self::PreconditionFailed => "PreconditionFailed", Self::NotModified => "NotModified", Self::QuotaExceeded => "QuotaExceeded", + Self::ServerSideEncryptionConfigurationNotFoundError => { + "ServerSideEncryptionConfigurationNotFoundError" + } Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::SlowDown => "SlowDown", } @@ -100,11 +120,14 @@ impl S3ErrorCode { Self::BucketAlreadyExists => "The requested bucket name is not available", Self::BucketNotEmpty => "The bucket you tried to delete is not empty", Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size", + Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size", Self::InternalError => "We encountered an internal error. Please try again.", Self::InvalidAccessKeyId => "The access key ID you provided does not exist", Self::InvalidArgument => "Invalid argument", Self::InvalidBucketName => "The specified bucket is not valid", Self::InvalidKey => "The specified key is not valid", + Self::InvalidPart => "One or more of the specified parts could not be found", + Self::InvalidPartOrder => "The list of parts was not in ascending order", Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document", Self::InvalidRange => "The requested range is not satisfiable", Self::InvalidRequest => "Invalid request", @@ -112,13 +135,16 @@ impl S3ErrorCode { Self::MalformedXML => "The XML you provided was not well-formed", Self::MethodNotAllowed => "The specified method is not allowed against this resource", Self::NoSuchBucket => "The specified bucket does not exist", + Self::NoSuchBucketPolicy => "The bucket policy does not exist", Self::NoSuchKey => "The specified key does not exist", + Self::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist", Self::NoSuchUpload => "The specified multipart upload does not exist", Self::NoSuchVersion => "The specified version does not exist", Self::NoSuchTagSet => "The TagSet does not exist", Self::PreconditionFailed => "At least one of the preconditions you specified did not hold", Self::NotModified => "Not Modified", Self::QuotaExceeded => "The bucket quota has been exceeded", + Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found", Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided", Self::SlowDown => "Please reduce your request rate", } diff --git a/crates/myfsio-server/Cargo.toml b/crates/myfsio-server/Cargo.toml index b1e305a..eb0cd8a 100644 --- a/crates/myfsio-server/Cargo.toml +++ b/crates/myfsio-server/Cargo.toml @@ -27,12 +27,14 @@ tokio-stream = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } futures = { workspace = true } +http-body = "1" http-body-util = "0.1" percent-encoding = { workspace = true } quick-xml = { workspace = true } mime_guess = "2" crc32fast = { workspace = true } sha2 = { workspace = true } +hex = { workspace = true } duckdb = { workspace = true } roxmltree = "0.20" parking_lot = { workspace = true } diff --git a/crates/myfsio-server/src/config.rs b/crates/myfsio-server/src/config.rs index 6bfd147..1264b3f 100644 --- a/crates/myfsio-server/src/config.rs +++ b/crates/myfsio-server/src/config.rs @@ -81,6 +81,7 @@ pub struct ServerConfig { pub multipart_min_part_size: u64, pub bulk_delete_max_keys: usize, pub stream_chunk_size: usize, + pub request_body_timeout_secs: u64, pub ratelimit_default: RateLimitSetting, pub ratelimit_admin: RateLimitSetting, pub ratelimit_storage_uri: String, @@ -225,6 +226,7 @@ impl ServerConfig { let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880); let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000); let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576); + let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60); let ratelimit_default = parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60)); let ratelimit_admin = @@ -304,6 +306,7 @@ impl ServerConfig { multipart_min_part_size, bulk_delete_max_keys, stream_chunk_size, + request_body_timeout_secs, ratelimit_default, ratelimit_admin, ratelimit_storage_uri, @@ -387,6 +390,7 @@ impl Default for ServerConfig { multipart_min_part_size: 5_242_880, bulk_delete_max_keys: 1000, stream_chunk_size: 1_048_576, + request_body_timeout_secs: 60, ratelimit_default: RateLimitSetting::new(200, 60), ratelimit_admin: RateLimitSetting::new(60, 60), ratelimit_storage_uri: "memory://".to_string(), diff --git a/crates/myfsio-server/src/handlers/config.rs b/crates/myfsio-server/src/handlers/config.rs index 4547374..16ac05e 100644 --- a/crates/myfsio-server/src/handlers/config.rs +++ b/crates/myfsio-server/src/handlers/config.rs @@ -218,11 +218,8 @@ pub async fn get_encryption(state: &AppState, bucket: &str) -> Response { } else { xml_response( StatusCode::NOT_FOUND, - S3Error::new( - S3ErrorCode::InvalidRequest, - "The server side encryption configuration was not found", - ) - .to_xml(), + S3Error::from_code(S3ErrorCode::ServerSideEncryptionConfigurationNotFoundError) + .to_xml(), ) } } @@ -270,11 +267,7 @@ pub async fn get_lifecycle(state: &AppState, bucket: &str) -> Response { } else { xml_response( StatusCode::NOT_FOUND, - S3Error::new( - S3ErrorCode::NoSuchKey, - "The lifecycle configuration does not exist", - ) - .to_xml(), + S3Error::from_code(S3ErrorCode::NoSuchLifecycleConfiguration).to_xml(), ) } } @@ -421,7 +414,7 @@ pub async fn get_policy(state: &AppState, bucket: &str) -> Response { } else { xml_response( StatusCode::NOT_FOUND, - S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(), + S3Error::from_code(S3ErrorCode::NoSuchBucketPolicy).to_xml(), ) } } @@ -1095,6 +1088,35 @@ pub async fn list_object_versions( || archived_versions.len() > archived_count; xml.push_str(&format!("{}", is_truncated)); + let current_keys: std::collections::HashSet = objects + .iter() + .take(current_count) + .map(|o| o.key.clone()) + .collect(); + let mut latest_archived_per_key: std::collections::HashMap = + std::collections::HashMap::new(); + for v in archived_versions.iter().take(archived_count) { + if current_keys.contains(&v.key) { + continue; + } + let existing = latest_archived_per_key.get(&v.key).cloned(); + match existing { + None => { + latest_archived_per_key.insert(v.key.clone(), v.version_id.clone()); + } + Some(existing_id) => { + let existing_ts = archived_versions + .iter() + .find(|x| x.key == v.key && x.version_id == existing_id) + .map(|x| x.last_modified) + .unwrap_or(v.last_modified); + if v.last_modified > existing_ts { + latest_archived_per_key.insert(v.key.clone(), v.version_id.clone()); + } + } + } + } + for obj in objects.iter().take(current_count) { xml.push_str(""); xml.push_str(&format!("{}", xml_escape(&obj.key))); @@ -1116,23 +1138,34 @@ pub async fn list_object_versions( } for version in archived_versions.iter().take(archived_count) { - xml.push_str(""); + let is_latest = latest_archived_per_key + .get(&version.key) + .map(|id| id == &version.version_id) + .unwrap_or(false); + let tag = if version.is_delete_marker { + "DeleteMarker" + } else { + "Version" + }; + xml.push_str(&format!("<{}>", tag)); xml.push_str(&format!("{}", xml_escape(&version.key))); xml.push_str(&format!( "{}", xml_escape(&version.version_id) )); - xml.push_str("false"); + xml.push_str(&format!("{}", is_latest)); xml.push_str(&format!( "{}", myfsio_xml::response::format_s3_datetime(&version.last_modified) )); - if let Some(ref etag) = version.etag { - xml.push_str(&format!("\"{}\"", xml_escape(etag))); + if !version.is_delete_marker { + if let Some(ref etag) = version.etag { + xml.push_str(&format!("\"{}\"", xml_escape(etag))); + } + xml.push_str(&format!("{}", version.size)); + xml.push_str("STANDARD"); } - xml.push_str(&format!("{}", version.size)); - xml.push_str("STANDARD"); - xml.push_str(""); + xml.push_str(&format!("", tag)); } xml.push_str(""); @@ -1182,6 +1215,26 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body: .to_xml(), ); } + for tag in &tags { + if tag.key.is_empty() || tag.key.len() > 128 { + return xml_response( + StatusCode::BAD_REQUEST, + S3Error::new(S3ErrorCode::InvalidTag, "Tag key length must be 1-128").to_xml(), + ); + } + if tag.value.len() > 256 { + return xml_response( + StatusCode::BAD_REQUEST, + S3Error::new(S3ErrorCode::InvalidTag, "Tag value length must be 0-256").to_xml(), + ); + } + if tag.key.contains('=') { + return xml_response( + StatusCode::BAD_REQUEST, + S3Error::new(S3ErrorCode::InvalidTag, "Tag keys must not contain '='").to_xml(), + ); + } + } match state.storage.set_object_tags(bucket, key, &tags).await { Ok(()) => StatusCode::OK.into_response(), diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index 1acd085..4ed1df8 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -47,6 +47,11 @@ fn s3_error_response(err: S3Error) -> Response { } fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response { + if let myfsio_storage::error::StorageError::Io(io_err) = &err { + if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(io_err) { + return bad_digest_response(message); + } + } s3_error_response(S3Error::from(err)) } @@ -391,6 +396,75 @@ pub async fn get_bucket( Some(marker.clone()) }; + if max_keys == 0 { + let has_any = if delimiter.is_empty() { + state + .storage + .list_objects( + &bucket, + &myfsio_common::types::ListParams { + max_keys: 1, + continuation_token: effective_start.clone(), + prefix: if prefix.is_empty() { + None + } else { + Some(prefix.clone()) + }, + start_after: if is_v2 { + query.start_after.clone() + } else { + None + }, + }, + ) + .await + .map(|r| !r.objects.is_empty()) + .unwrap_or(false) + } else { + state + .storage + .list_objects_shallow( + &bucket, + &myfsio_common::types::ShallowListParams { + prefix: prefix.clone(), + delimiter: delimiter.clone(), + max_keys: 1, + continuation_token: effective_start.clone(), + }, + ) + .await + .map(|r| !r.objects.is_empty() || !r.common_prefixes.is_empty()) + .unwrap_or(false) + }; + let xml = if is_v2 { + myfsio_xml::response::list_objects_v2_xml( + &bucket, + &prefix, + &delimiter, + 0, + &[], + &[], + has_any, + query.continuation_token.as_deref(), + None, + 0, + ) + } else { + myfsio_xml::response::list_objects_v1_xml( + &bucket, + &prefix, + &marker, + &delimiter, + 0, + &[], + &[], + has_any, + None, + ) + }; + return (StatusCode::OK, [("content-type", "application/xml")], xml).into_response(); + } + if delimiter.is_empty() { let params = myfsio_common::types::ListParams { max_keys, @@ -408,10 +482,14 @@ pub async fn get_bucket( }; match state.storage.list_objects(&bucket, ¶ms).await { Ok(result) => { - let next_marker = result - .next_continuation_token - .clone() - .or_else(|| result.objects.last().map(|o| o.key.clone())); + let next_marker = if result.is_truncated { + result + .next_continuation_token + .clone() + .or_else(|| result.objects.last().map(|o| o.key.clone())) + } else { + None + }; let xml = if is_v2 { let next_token = next_marker .as_deref() @@ -922,11 +1000,15 @@ async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, R http_body_util::BodyExt::collect(body) .await .map(|collected| collected.to_bytes().to_vec()) - .map_err(|_| { - s3_error_response(S3Error::new( - S3ErrorCode::InvalidRequest, - "Failed to read request body", - )) + .map_err(|err| { + if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(&err) { + bad_digest_response(message) + } else { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "Failed to read request body", + )) + } }) } @@ -1537,7 +1619,7 @@ pub async fn delete_object( Ok(()) => { notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete"); trigger_replication(&state, &bucket, &key, "delete"); - StatusCode::NO_CONTENT.into_response() + (StatusCode::NO_CONTENT, HeaderMap::new()).into_response() } Err(e) => storage_err_response(e), } @@ -1764,6 +1846,70 @@ async fn complete_multipart_handler( } }; + if parsed.parts.is_empty() { + return s3_error_response(S3Error::new( + S3ErrorCode::MalformedXML, + "CompleteMultipartUpload requires at least one part", + )); + } + + let mut last_part_num: u32 = 0; + for p in &parsed.parts { + if p.part_number == 0 { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidPartOrder, + "Part numbers must be greater than zero", + )); + } + if p.part_number <= last_part_num { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidPartOrder, + "Parts must be specified in ascending order with no duplicates", + )); + } + last_part_num = p.part_number; + } + + let stored_parts = match state.storage.list_parts(bucket, upload_id).await { + Ok(list) => list, + Err(e) => return storage_err_response(e), + }; + let stored_map: HashMap = stored_parts + .iter() + .map(|p| (p.part_number, (p.etag.clone(), p.size))) + .collect(); + let min_part_size: u64 = state.config.multipart_min_part_size; + let total_parts = parsed.parts.len(); + for (idx, p) in parsed.parts.iter().enumerate() { + let stored = match stored_map.get(&p.part_number) { + Some(s) => s, + None => { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidPart, + format!("Part {} not found", p.part_number), + )); + } + }; + let client_etag = p.etag.trim().trim_matches('"').to_ascii_lowercase(); + let stored_etag = stored.0.trim().trim_matches('"').to_ascii_lowercase(); + if !client_etag.is_empty() && client_etag != stored_etag { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidPart, + format!("ETag mismatch for part {}", p.part_number), + )); + } + let is_final = idx + 1 == total_parts; + if !is_final && stored.1 < min_part_size { + return s3_error_response(S3Error::new( + S3ErrorCode::EntityTooSmall, + format!( + "Part {} is smaller than the minimum allowed size of {} bytes", + p.part_number, min_part_size + ), + )); + } + } + let parts: Vec = parsed .parts .iter() @@ -1852,7 +1998,8 @@ async fn object_attributes_handler( if all || attrs.contains("etag") { if let Some(etag) = &meta.etag { - xml.push_str(&format!("{}", xml_escape(etag))); + let trimmed = etag.trim_matches('"'); + xml.push_str(&format!("\"{}\"", xml_escape(trimmed))); } } if all || attrs.contains("storageclass") { @@ -1909,48 +2056,151 @@ async fn copy_object_handler( return resp; } - let copy_result = if let Some(version_id) = src_version_id - .as_deref() - .filter(|value| !is_null_version(Some(*value))) - { - let (_meta, mut reader) = match state + let metadata_directive = headers + .get("x-amz-metadata-directive") + .and_then(|v| v.to_str().ok()) + .map(|v| v.trim().to_ascii_uppercase()) + .unwrap_or_else(|| "COPY".to_string()); + let tagging_directive = headers + .get("x-amz-tagging-directive") + .and_then(|v| v.to_str().ok()) + .map(|v| v.trim().to_ascii_uppercase()) + .unwrap_or_else(|| "COPY".to_string()); + let replace_metadata = metadata_directive == "REPLACE"; + let replace_tagging = tagging_directive == "REPLACE"; + + let same_object = src_bucket == dst_bucket + && src_key == dst_key + && src_version_id.as_deref().unwrap_or("") == ""; + if same_object && !replace_metadata && !replace_tagging { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.", + )); + } + + let source_metadata_existing = match src_version_id.as_deref() { + Some(version_id) if version_id != "null" => { + match state + .storage + .get_object_version_metadata(&src_bucket, &src_key, version_id) + .await + { + Ok(metadata) => metadata, + Err(e) => return storage_err_response(e), + } + } + _ => match state .storage - .get_object_version(&src_bucket, &src_key, version_id) + .get_object_metadata(&src_bucket, &src_key) .await { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, + }; + + let dst_metadata = if replace_metadata { + let mut m: HashMap = HashMap::new(); + for (request_header, metadata_key, _) in internal_header_pairs() { + if let Some(value) = headers.get(*request_header).and_then(|v| v.to_str().ok()) { + if *request_header == "content-encoding" { + if let Some(decoded_encoding) = decoded_content_encoding(value) { + m.insert((*metadata_key).to_string(), decoded_encoding); + } + } else { + m.insert((*metadata_key).to_string(), value.to_string()); + } + } + } + let content_type = guessed_content_type( + dst_key, + headers.get("content-type").and_then(|v| v.to_str().ok()), + ); + m.insert("__content_type__".to_string(), content_type); + for (name, value) in headers.iter() { + let name_str = name.as_str(); + if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") { + if let Ok(val) = value.to_str() { + m.insert(meta_key.to_string(), val.to_string()); + } + } + } + if let Some(value) = headers + .get("x-amz-storage-class") + .and_then(|v| v.to_str().ok()) + { + m.insert("__storage_class__".to_string(), value.to_ascii_uppercase()); + } + m + } else { + source_metadata_existing.clone() + }; + + let (_meta, mut reader) = match src_version_id.as_deref() { + Some(version_id) if version_id != "null" => { + match state + .storage + .get_object_version(&src_bucket, &src_key, version_id) + .await + { + Ok(result) => result, + Err(e) => return storage_err_response(e), + } + } + _ => match state.storage.get_object(&src_bucket, &src_key).await { Ok(result) => result, Err(e) => return storage_err_response(e), - }; - let mut data = Vec::new(); - if let Err(e) = reader.read_to_end(&mut data).await { - return storage_err_response(myfsio_storage::error::StorageError::Io(e)); - } - let metadata = match state - .storage - .get_object_version_metadata(&src_bucket, &src_key, version_id) - .await - { - Ok(metadata) => metadata, - Err(e) => return storage_err_response(e), - }; - state - .storage - .put_object( - dst_bucket, - dst_key, - Box::pin(std::io::Cursor::new(data)), - Some(metadata), - ) - .await - } else { - state - .storage - .copy_object(&src_bucket, &src_key, dst_bucket, dst_key) - .await + }, }; + let mut data = Vec::new(); + if let Err(e) = reader.read_to_end(&mut data).await { + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + + let copy_result = state + .storage + .put_object( + dst_bucket, + dst_key, + Box::pin(std::io::Cursor::new(data)), + Some(dst_metadata), + ) + .await; match copy_result { Ok(meta) => { + if replace_tagging { + let tags = match headers + .get("x-amz-tagging") + .and_then(|value| value.to_str().ok()) + .map(parse_tagging_header) + .transpose() + { + Ok(tags) => tags, + Err(response) => return response, + }; + if let Some(ref tags) = tags { + if tags.len() > state.config.object_tag_limit { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidTag, + format!("Maximum {} tags allowed", state.config.object_tag_limit), + )); + } + if let Err(e) = state + .storage + .set_object_tags(dst_bucket, dst_key, tags) + .await + { + return storage_err_response(e); + } + } else { + let _ = state + .storage + .set_object_tags(dst_bucket, dst_key, &[]) + .await; + } + } let etag = meta.etag.as_deref().unwrap_or(""); let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified); let xml = myfsio_xml::response::copy_object_result_xml(etag, &last_modified); @@ -1983,6 +2233,13 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R } }; + if parsed.objects.len() > 1000 { + return s3_error_response(S3Error::new( + S3ErrorCode::MalformedXML, + "The request must not contain more than 1000 keys", + )); + } + let mut deleted = Vec::new(); let mut errors = Vec::new(); @@ -2280,9 +2537,20 @@ fn evaluate_copy_preconditions( } fn parse_http_date(value: &str) -> Option> { - DateTime::parse_from_rfc2822(value) - .ok() - .map(|dt| dt.with_timezone(&Utc)) + let trimmed = value.trim(); + if let Ok(dt) = DateTime::parse_from_rfc2822(trimmed) { + return Some(dt.with_timezone(&Utc)); + } + if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) { + return Some(dt.with_timezone(&Utc)); + } + if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%A, %d-%b-%y %H:%M:%S GMT") { + return Some(naive.and_utc()); + } + if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%a %b %e %H:%M:%S %Y") { + return Some(naive.and_utc()); + } + None } fn etag_condition_matches(condition: &str, etag: Option<&str>) -> bool { diff --git a/crates/myfsio-server/src/lib.rs b/crates/myfsio-server/src/lib.rs index 93c7b77..e613b18 100644 --- a/crates/myfsio-server/src/lib.rs +++ b/crates/myfsio-server/src/lib.rs @@ -577,11 +577,17 @@ pub fn create_router(state: state::AppState) -> Router { middleware::rate_limit_layer, )); + let request_body_timeout = + std::time::Duration::from_secs(state.config.request_body_timeout_secs); + api_router .merge(admin_router) .layer(axum::middleware::from_fn(middleware::server_header)) .layer(cors_layer(&state.config)) .layer(tower_http::compression::CompressionLayer::new()) + .layer(tower_http::timeout::RequestBodyTimeoutLayer::new( + request_body_timeout, + )) .with_state(state) } diff --git a/crates/myfsio-server/src/main.rs b/crates/myfsio-server/src/main.rs index a77175f..0732f3c 100644 --- a/crates/myfsio-server/src/main.rs +++ b/crates/myfsio-server/src/main.rs @@ -189,6 +189,11 @@ async fn main() { let shutdown = shutdown_signal_shared(); let api_shutdown = shutdown.clone(); + let api_listener = axum::serve::ListenerExt::tap_io(api_listener, |stream| { + if let Err(err) = stream.set_nodelay(true) { + tracing::trace!("failed to set TCP_NODELAY on api socket: {}", err); + } + }); let api_task = tokio::spawn(async move { axum::serve( api_listener, @@ -202,6 +207,11 @@ async fn main() { let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) { let ui_shutdown = shutdown.clone(); + let listener = axum::serve::ListenerExt::tap_io(listener, |stream| { + if let Err(err) = stream.set_nodelay(true) { + tracing::trace!("failed to set TCP_NODELAY on ui socket: {}", err); + } + }); Some(tokio::spawn(async move { axum::serve(listener, app) .with_graceful_shutdown(async move { diff --git a/crates/myfsio-server/src/middleware/auth.rs b/crates/myfsio-server/src/middleware/auth.rs index dce2ea4..202c03c 100644 --- a/crates/myfsio-server/src/middleware/auth.rs +++ b/crates/myfsio-server/src/middleware/auth.rs @@ -12,9 +12,36 @@ use serde_json::Value; use std::time::Instant; use tokio::io::AsyncReadExt; +use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody}; use crate::services::acl::acl_from_bucket_config; use crate::state::AppState; +fn wrap_body_for_sha256_verification(req: &mut Request) { + let declared = match req + .headers() + .get("x-amz-content-sha256") + .and_then(|v| v.to_str().ok()) + { + Some(v) => v.to_string(), + None => return, + }; + if !is_hex_sha256(&declared) { + return; + } + let is_chunked = req + .headers() + .get("content-encoding") + .and_then(|v| v.to_str().ok()) + .map(|v| v.to_ascii_lowercase().contains("aws-chunked")) + .unwrap_or(false); + if is_chunked { + return; + } + let body = std::mem::replace(req.body_mut(), axum::body::Body::empty()); + let wrapped = Sha256VerifyBody::new(body, declared); + *req.body_mut() = axum::body::Body::new(wrapped); +} + #[derive(Clone, Debug)] struct OriginalCanonicalPath(String); @@ -475,6 +502,7 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N error_response(err, &auth_path) } else { req.extensions_mut().insert(principal); + wrap_body_for_sha256_verification(&mut req); next.run(req).await } } @@ -1102,7 +1130,9 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR let parts: Vec<&str> = auth_str .strip_prefix("AWS4-HMAC-SHA256 ") .unwrap() - .split(", ") + .split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) .collect(); if parts.len() != 3 { @@ -1112,9 +1142,24 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR )); } - let credential = parts[0].strip_prefix("Credential=").unwrap_or(""); - let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or(""); - let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or(""); + let mut credential: &str = ""; + let mut signed_headers_str: &str = ""; + let mut provided_signature: &str = ""; + for part in &parts { + if let Some(v) = part.strip_prefix("Credential=") { + credential = v; + } else if let Some(v) = part.strip_prefix("SignedHeaders=") { + signed_headers_str = v; + } else if let Some(v) = part.strip_prefix("Signature=") { + provided_signature = v; + } + } + if credential.is_empty() || signed_headers_str.is_empty() || provided_signature.is_empty() { + return AuthResult::Denied(S3Error::new( + S3ErrorCode::InvalidArgument, + "Malformed Authorization header", + )); + } let cred_parts: Vec<&str> = credential.split('/').collect(); if cred_parts.len() != 5 { diff --git a/crates/myfsio-server/src/middleware/mod.rs b/crates/myfsio-server/src/middleware/mod.rs index 2f3727d..aa0c96b 100644 --- a/crates/myfsio-server/src/middleware/mod.rs +++ b/crates/myfsio-server/src/middleware/mod.rs @@ -1,6 +1,7 @@ mod auth; pub mod ratelimit; pub mod session; +pub(crate) mod sha_body; pub use auth::auth_layer; pub use ratelimit::{rate_limit_layer, RateLimitLayerState}; diff --git a/crates/myfsio-server/src/middleware/session.rs b/crates/myfsio-server/src/middleware/session.rs index 68f8207..ee2bcc4 100644 --- a/crates/myfsio-server/src/middleware/session.rs +++ b/crates/myfsio-server/src/middleware/session.rs @@ -181,8 +181,8 @@ pub async fn csrf_layer( .unwrap_or(""); let is_form_submit = content_type.starts_with("application/x-www-form-urlencoded") || content_type.starts_with("multipart/form-data"); - let wants_json = accept.contains("application/json") - || content_type.starts_with("application/json"); + let wants_json = + accept.contains("application/json") || content_type.starts_with("application/json"); if is_form_submit && !wants_json { let ctx = crate::handlers::ui::base_context(&handle, None); diff --git a/crates/myfsio-server/src/middleware/sha_body.rs b/crates/myfsio-server/src/middleware/sha_body.rs new file mode 100644 index 0000000..cd2a97f --- /dev/null +++ b/crates/myfsio-server/src/middleware/sha_body.rs @@ -0,0 +1,107 @@ +use axum::body::Body; +use bytes::Bytes; +use http_body::{Body as HttpBody, Frame}; +use sha2::{Digest, Sha256}; +use std::error::Error; +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +struct Sha256MismatchError { + expected: String, + computed: String, +} + +impl Sha256MismatchError { + fn message(&self) -> String { + format!( + "The x-amz-content-sha256 you specified did not match what we received (expected {}, computed {})", + self.expected, self.computed + ) + } +} + +impl fmt::Display for Sha256MismatchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "XAmzContentSHA256Mismatch: expected {}, computed {}", + self.expected, self.computed + ) + } +} + +impl Error for Sha256MismatchError {} + +pub struct Sha256VerifyBody { + inner: Body, + expected: String, + hasher: Option, +} + +impl Sha256VerifyBody { + pub fn new(inner: Body, expected_hex: String) -> Self { + Self { + inner, + expected: expected_hex.to_ascii_lowercase(), + hasher: Some(Sha256::new()), + } + } +} + +impl HttpBody for Sha256VerifyBody { + type Data = Bytes; + type Error = Box; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let this = self.as_mut().get_mut(); + match Pin::new(&mut this.inner).poll_frame(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))), + Poll::Ready(Some(Ok(frame))) => { + if let Some(data) = frame.data_ref() { + if let Some(h) = this.hasher.as_mut() { + h.update(data); + } + } + Poll::Ready(Some(Ok(frame))) + } + Poll::Ready(None) => { + if let Some(hasher) = this.hasher.take() { + let computed = hex::encode(hasher.finalize()); + if computed != this.expected { + return Poll::Ready(Some(Err(Box::new(Sha256MismatchError { + expected: this.expected.clone(), + computed, + })))); + } + } + Poll::Ready(None) + } + } + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> http_body::SizeHint { + self.inner.size_hint() + } +} + +pub fn is_hex_sha256(s: &str) -> bool { + s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit()) +} + +pub fn sha256_mismatch_message(err: &(dyn Error + 'static)) -> Option { + if let Some(mismatch) = err.downcast_ref::() { + return Some(mismatch.message()); + } + + err.source().and_then(sha256_mismatch_message) +} diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index e52d722..09bba81 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -1,5 +1,7 @@ use axum::body::Body; use axum::http::{Method, Request, StatusCode}; +use base64::engine::general_purpose::URL_SAFE; +use base64::Engine; use http_body_util::BodyExt; use myfsio_storage::traits::{AsyncReadStream, StorageEngine}; use serde_json::Value; @@ -53,6 +55,7 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te ui_enabled: false, templates_dir: std::path::PathBuf::from("templates"), static_dir: std::path::PathBuf::from("static"), + multipart_min_part_size: 1, ..myfsio_server::config::ServerConfig::default() }; let state = myfsio_server::state::AppState::new(config); @@ -2394,6 +2397,87 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() { assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND); } +#[tokio::test] +async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/compat-bucket", Body::empty())) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/compat-bucket?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + let put_resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/compat-bucket/doc.txt", + Body::from("first"), + )) + .await + .unwrap(); + assert_eq!(put_resp.status(), StatusCode::OK); + assert!(!put_resp.headers().contains_key("x-amz-version-id")); + + let overwrite_resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/compat-bucket/doc.txt", + Body::from("second"), + )) + .await + .unwrap(); + assert_eq!(overwrite_resp.status(), StatusCode::OK); + assert!(!overwrite_resp.headers().contains_key("x-amz-version-id")); + + let delete_resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/compat-bucket/doc.txt", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT); + assert!(!delete_resp.headers().contains_key("x-amz-version-id")); + assert!(!delete_resp.headers().contains_key("x-amz-delete-marker")); + + let versions_resp = app + .oneshot(signed_request( + Method::GET, + "/compat-bucket?versions", + Body::empty(), + )) + .await + .unwrap(); + let versions_body = String::from_utf8( + versions_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(!versions_body.contains("")); +} + #[tokio::test] async fn test_retention_is_enforced_when_deleting_archived_version() { let (app, _tmp) = test_app(); @@ -2562,6 +2646,132 @@ async fn test_put_object_validates_content_md5() { assert_eq!(good_resp.status(), StatusCode::OK); } +#[tokio::test] +async fn test_x_amz_content_sha256_mismatch_returns_bad_digest() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/sha256-bucket", Body::empty())) + .await + .unwrap(); + + let bad_resp = app + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/sha256-bucket/object.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header( + "x-amz-content-sha256", + "0000000000000000000000000000000000000000000000000000000000000000", + ) + .body(Body::from("hello")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(bad_resp.status(), StatusCode::BAD_REQUEST); + let bad_body = String::from_utf8( + bad_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(bad_body.contains("BadDigest")); + assert!(bad_body.contains("x-amz-content-sha256")); +} + +#[tokio::test] +async fn test_max_keys_zero_respects_marker_and_v2_cursors() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/cursor-bucket", Body::empty())) + .await + .unwrap(); + for key in ["a.txt", "b.txt"] { + app.clone() + .oneshot(signed_request( + Method::PUT, + &format!("/cursor-bucket/{}", key), + Body::from(key.to_string()), + )) + .await + .unwrap(); + } + + let marker_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/cursor-bucket?max-keys=0&marker=b.txt", + Body::empty(), + )) + .await + .unwrap(); + let marker_body = String::from_utf8( + marker_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(marker_body.contains("false")); + + let start_after_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/cursor-bucket?list-type=2&max-keys=0&start-after=b.txt", + Body::empty(), + )) + .await + .unwrap(); + let start_after_body = String::from_utf8( + start_after_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(start_after_body.contains("false")); + + let token = URL_SAFE.encode("b.txt"); + let token_resp = app + .oneshot(signed_request( + Method::GET, + &format!( + "/cursor-bucket?list-type=2&max-keys=0&continuation-token={}", + token + ), + Body::empty(), + )) + .await + .unwrap(); + let token_body = String::from_utf8( + token_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(token_body.contains("false")); +} + #[tokio::test] async fn test_put_object_tagging_and_standard_headers_are_persisted() { let (app, _tmp) = test_app(); diff --git a/crates/myfsio-storage/src/validation.rs b/crates/myfsio-storage/src/validation.rs index c295615..05c9bdf 100644 --- a/crates/myfsio-storage/src/validation.rs +++ b/crates/myfsio-storage/src/validation.rs @@ -132,6 +132,13 @@ pub fn validate_bucket_name(bucket_name: &str) -> Option { return Some("Bucket name must not be formatted as an IP address".to_string()); } + if bucket_name.starts_with("xn--") { + return Some("Bucket name must not start with the reserved prefix 'xn--'".to_string()); + } + if bucket_name.ends_with("-s3alias") || bucket_name.ends_with("--ol-s3") { + return Some("Bucket name must not end with a reserved suffix".to_string()); + } + None } diff --git a/crates/myfsio-xml/src/response.rs b/crates/myfsio-xml/src/response.rs index 708db4b..2a9bd98 100644 --- a/crates/myfsio-xml/src/response.rs +++ b/crates/myfsio-xml/src/response.rs @@ -10,7 +10,7 @@ pub fn format_s3_datetime(dt: &DateTime) -> String { pub fn rate_limit_exceeded_xml() -> String { "\ -SlowDownRate limit exceeded" +SlowDownRate limit exceeded" .to_string() }