From bd405cc2fe5915ec04f0986a1e3b6c45d54e078b Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 23 Apr 2026 20:23:11 +0800 Subject: [PATCH] Fix S3 versioning/delete markers, path-safety leaks, and error-code conformance; parallelize DeleteObjects; restore per-op rate limits --- Cargo.lock | 1 + crates/myfsio-common/src/constants.rs | 1 + crates/myfsio-common/src/error.rs | 4 + crates/myfsio-common/src/types.rs | 13 + crates/myfsio-server/src/config.rs | 61 +++- crates/myfsio-server/src/handlers/config.rs | 6 +- crates/myfsio-server/src/handlers/mod.rs | 340 ++++++++++++++---- crates/myfsio-server/src/handlers/ui_api.rs | 8 +- crates/myfsio-server/src/lib.rs | 6 +- crates/myfsio-server/src/middleware/auth.rs | 9 +- .../myfsio-server/src/middleware/ratelimit.rs | 69 +++- crates/myfsio-server/tests/integration.rs | 40 ++- crates/myfsio-storage/src/error.rs | 15 + crates/myfsio-storage/src/fs_backend.rs | 318 ++++++++++++++-- crates/myfsio-storage/src/traits.rs | 4 +- crates/myfsio-storage/src/validation.rs | 12 + crates/myfsio-xml/Cargo.toml | 1 + crates/myfsio-xml/src/request.rs | 4 +- crates/myfsio-xml/src/response.rs | 128 ++++++- 19 files changed, 893 insertions(+), 147 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8cdfee..8ec8b6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2780,6 +2780,7 @@ version = "0.4.4" dependencies = [ "chrono", "myfsio-common", + "percent-encoding", "quick-xml", "serde", ] diff --git a/crates/myfsio-common/src/constants.rs b/crates/myfsio-common/src/constants.rs index 9a9e25c..9951f8d 100644 --- a/crates/myfsio-common/src/constants.rs +++ b/crates/myfsio-common/src/constants.rs @@ -8,6 +8,7 @@ pub const STATS_FILE: &str = "stats.json"; pub const ETAG_INDEX_FILE: &str = "etag_index.json"; pub const INDEX_FILE: &str = "_index.json"; pub const MANIFEST_FILE: &str = "manifest.json"; +pub const DIR_MARKER_FILE: &str = ".__myfsio_dirobj__"; pub const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"]; diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs index c8e506e..c1a0c4d 100644 --- a/crates/myfsio-common/src/error.rs +++ b/crates/myfsio-common/src/error.rs @@ -31,6 +31,7 @@ pub enum S3ErrorCode { PreconditionFailed, NotModified, QuotaExceeded, + RequestTimeTooSkewed, ServerSideEncryptionConfigurationNotFoundError, SignatureDoesNotMatch, SlowDown, @@ -68,6 +69,7 @@ impl S3ErrorCode { Self::PreconditionFailed => 412, Self::NotModified => 304, Self::QuotaExceeded => 403, + Self::RequestTimeTooSkewed => 403, Self::ServerSideEncryptionConfigurationNotFoundError => 404, Self::SignatureDoesNotMatch => 403, Self::SlowDown => 429, @@ -105,6 +107,7 @@ impl S3ErrorCode { Self::PreconditionFailed => "PreconditionFailed", Self::NotModified => "NotModified", Self::QuotaExceeded => "QuotaExceeded", + Self::RequestTimeTooSkewed => "RequestTimeTooSkewed", Self::ServerSideEncryptionConfigurationNotFoundError => { "ServerSideEncryptionConfigurationNotFoundError" } @@ -144,6 +147,7 @@ impl S3ErrorCode { Self::PreconditionFailed => "At least one of the preconditions you specified did not hold", Self::NotModified => "Not Modified", Self::QuotaExceeded => "The bucket quota has been exceeded", + Self::RequestTimeTooSkewed => "The difference between the request time and the server's time is too large", Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found", Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided", Self::SlowDown => "Please reduce your request rate", diff --git a/crates/myfsio-common/src/types.rs b/crates/myfsio-common/src/types.rs index 91195f7..e191b3d 100644 --- a/crates/myfsio-common/src/types.rs +++ b/crates/myfsio-common/src/types.rs @@ -12,6 +12,10 @@ pub struct ObjectMeta { pub content_type: Option, pub storage_class: Option, pub metadata: HashMap, + #[serde(default)] + pub version_id: Option, + #[serde(default)] + pub is_delete_marker: bool, } impl ObjectMeta { @@ -24,10 +28,19 @@ impl ObjectMeta { content_type: None, storage_class: Some("STANDARD".to_string()), metadata: HashMap::new(), + version_id: None, + is_delete_marker: false, } } } +#[derive(Debug, Clone, Default)] +pub struct DeleteOutcome { + pub version_id: Option, + pub is_delete_marker: bool, + pub existed: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BucketMeta { pub name: String, diff --git a/crates/myfsio-server/src/config.rs b/crates/myfsio-server/src/config.rs index 1264b3f..c786271 100644 --- a/crates/myfsio-server/src/config.rs +++ b/crates/myfsio-server/src/config.rs @@ -83,6 +83,10 @@ pub struct ServerConfig { pub stream_chunk_size: usize, pub request_body_timeout_secs: u64, pub ratelimit_default: RateLimitSetting, + pub ratelimit_list_buckets: RateLimitSetting, + pub ratelimit_bucket_ops: RateLimitSetting, + pub ratelimit_object_ops: RateLimitSetting, + pub ratelimit_head_ops: RateLimitSetting, pub ratelimit_admin: RateLimitSetting, pub ratelimit_storage_uri: String, pub ui_enabled: bool, @@ -228,7 +232,15 @@ impl ServerConfig { let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576); let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60); let ratelimit_default = - parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60)); + parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(500, 60)); + let ratelimit_list_buckets = + parse_rate_limit_env("RATE_LIMIT_LIST_BUCKETS", ratelimit_default); + let ratelimit_bucket_ops = + parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default); + let ratelimit_object_ops = + parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default); + let ratelimit_head_ops = + parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default); let ratelimit_admin = parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60)); let ratelimit_storage_uri = @@ -308,6 +320,10 @@ impl ServerConfig { stream_chunk_size, request_body_timeout_secs, ratelimit_default, + ratelimit_list_buckets, + ratelimit_bucket_ops, + ratelimit_object_ops, + ratelimit_head_ops, ratelimit_admin, ratelimit_storage_uri, ui_enabled, @@ -391,7 +407,11 @@ impl Default for ServerConfig { bulk_delete_max_keys: 1000, stream_chunk_size: 1_048_576, request_body_timeout_secs: 60, - ratelimit_default: RateLimitSetting::new(200, 60), + ratelimit_default: RateLimitSetting::new(500, 60), + ratelimit_list_buckets: RateLimitSetting::new(500, 60), + ratelimit_bucket_ops: RateLimitSetting::new(500, 60), + ratelimit_object_ops: RateLimitSetting::new(500, 60), + ratelimit_head_ops: RateLimitSetting::new(500, 60), ratelimit_admin: RateLimitSetting::new(60, 60), ratelimit_storage_uri: "memory://".to_string(), ui_enabled: true, @@ -476,7 +496,31 @@ fn parse_list_env(key: &str, default: &str) -> Vec { } pub fn parse_rate_limit(value: &str) -> Option { - let parts = value.split_whitespace().collect::>(); + let trimmed = value.trim(); + if let Some((requests, window)) = trimmed.split_once('/') { + let max_requests = requests.trim().parse::().ok()?; + if max_requests == 0 { + return None; + } + let window_str = window.trim().to_ascii_lowercase(); + let window_seconds = if let Ok(n) = window_str.parse::() { + if n == 0 { + return None; + } + n + } else { + match window_str.as_str() { + "s" | "sec" | "second" | "seconds" => 1, + "m" | "min" | "minute" | "minutes" => 60, + "h" | "hr" | "hour" | "hours" => 3600, + "d" | "day" | "days" => 86_400, + _ => return None, + } + }; + return Some(RateLimitSetting::new(max_requests, window_seconds)); + } + + let parts = trimmed.split_whitespace().collect::>(); if parts.len() != 3 || !parts[1].eq_ignore_ascii_case("per") { return None; } @@ -521,6 +565,15 @@ mod tests { parse_rate_limit("3 per hours"), Some(RateLimitSetting::new(3, 3600)) ); + assert_eq!( + parse_rate_limit("50000/60"), + Some(RateLimitSetting::new(50000, 60)) + ); + assert_eq!( + parse_rate_limit("100/minute"), + Some(RateLimitSetting::new(100, 60)) + ); + assert_eq!(parse_rate_limit("0/60"), None); assert_eq!(parse_rate_limit("0 per minute"), None); assert_eq!(parse_rate_limit("bad"), None); } @@ -536,7 +589,7 @@ mod tests { assert_eq!(config.object_key_max_length_bytes, 1024); assert_eq!(config.object_tag_limit, 50); - assert_eq!(config.ratelimit_default, RateLimitSetting::new(200, 60)); + assert_eq!(config.ratelimit_default, RateLimitSetting::new(500, 60)); std::env::remove_var("OBJECT_TAG_LIMIT"); std::env::remove_var("RATE_LIMIT_DEFAULT"); diff --git a/crates/myfsio-server/src/handlers/config.rs b/crates/myfsio-server/src/handlers/config.rs index 16ac05e..eeccdac 100644 --- a/crates/myfsio-server/src/handlers/config.rs +++ b/crates/myfsio-server/src/handlers/config.rs @@ -1118,9 +1118,13 @@ pub async fn list_object_versions( } for obj in objects.iter().take(current_count) { + let version_id = obj.version_id.clone().unwrap_or_else(|| "null".to_string()); xml.push_str(""); xml.push_str(&format!("{}", xml_escape(&obj.key))); - xml.push_str("null"); + xml.push_str(&format!( + "{}", + xml_escape(&version_id) + )); xml.push_str("true"); xml.push_str(&format!( "{}", diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index 4ed1df8..c842cbf 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -51,10 +51,64 @@ fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response { if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(io_err) { return bad_digest_response(message); } + if let Some(response) = io_error_to_s3_response(io_err) { + return response; + } + } + if let myfsio_storage::error::StorageError::DeleteMarker { + bucket, + key, + version_id, + } = &err + { + let s3_err = S3Error::from_code(S3ErrorCode::NoSuchKey) + .with_resource(format!("/{}/{}", bucket, key)) + .with_request_id(uuid::Uuid::new_v4().simple().to_string()); + let status = StatusCode::from_u16(s3_err.http_status()) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut resp_headers = HeaderMap::new(); + resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap()); + if let Ok(vid) = version_id.parse() { + resp_headers.insert("x-amz-version-id", vid); + } + resp_headers.insert("content-type", "application/xml".parse().unwrap()); + return (status, resp_headers, s3_err.to_xml()).into_response(); } s3_error_response(S3Error::from(err)) } +fn io_error_to_s3_response(err: &std::io::Error) -> Option { + use std::io::ErrorKind; + let message = err.to_string(); + let lower = message.to_ascii_lowercase(); + let hit_collision = matches!( + err.kind(), + ErrorKind::NotADirectory + | ErrorKind::IsADirectory + | ErrorKind::AlreadyExists + | ErrorKind::DirectoryNotEmpty + ) || lower.contains("not a directory") + || lower.contains("is a directory") + || lower.contains("file exists") + || lower.contains("directory not empty"); + let hit_name_too_long = matches!(err.kind(), ErrorKind::InvalidFilename) + || lower.contains("file name too long"); + if !hit_collision && !hit_name_too_long { + return None; + } + let code = if hit_name_too_long { + S3ErrorCode::InvalidKey + } else { + S3ErrorCode::InvalidRequest + }; + let detail = if hit_name_too_long { + "Object key exceeds the filesystem's per-segment length limit" + } else { + "Object key collides with an existing object path on the storage backend" + }; + Some(s3_error_response(S3Error::new(code, detail))) +} + fn trigger_replication(state: &AppState, bucket: &str, key: &str, action: &str) { let manager = state.replication.clone(); let bucket = bucket.to_string(); @@ -242,6 +296,8 @@ pub struct BucketQuery { pub continuation_token: Option, #[serde(rename = "start-after")] pub start_after: Option, + #[serde(rename = "encoding-type")] + pub encoding_type: Option, pub uploads: Option, pub delete: Option, pub versioning: Option, @@ -490,11 +546,12 @@ pub async fn get_bucket( } else { None }; + let encoding_type = query.encoding_type.as_deref(); let xml = if is_v2 { let next_token = next_marker .as_deref() .map(|s| URL_SAFE.encode(s.as_bytes())); - myfsio_xml::response::list_objects_v2_xml( + myfsio_xml::response::list_objects_v2_xml_with_encoding( &bucket, &prefix, &delimiter, @@ -505,9 +562,10 @@ pub async fn get_bucket( query.continuation_token.as_deref(), next_token.as_deref(), result.objects.len(), + encoding_type, ) } else { - myfsio_xml::response::list_objects_v1_xml( + myfsio_xml::response::list_objects_v1_xml_with_encoding( &bucket, &prefix, &marker, @@ -517,6 +575,7 @@ pub async fn get_bucket( &[], result.is_truncated, next_marker.as_deref(), + encoding_type, ) }; (StatusCode::OK, [("content-type", "application/xml")], xml).into_response() @@ -532,12 +591,13 @@ pub async fn get_bucket( }; match state.storage.list_objects_shallow(&bucket, ¶ms).await { Ok(result) => { + let encoding_type = query.encoding_type.as_deref(); let xml = if is_v2 { let next_token = result .next_continuation_token .as_deref() .map(|s| URL_SAFE.encode(s.as_bytes())); - myfsio_xml::response::list_objects_v2_xml( + myfsio_xml::response::list_objects_v2_xml_with_encoding( &bucket, ¶ms.prefix, &delimiter, @@ -548,9 +608,10 @@ pub async fn get_bucket( query.continuation_token.as_deref(), next_token.as_deref(), result.objects.len() + result.common_prefixes.len(), + encoding_type, ) } else { - myfsio_xml::response::list_objects_v1_xml( + myfsio_xml::response::list_objects_v1_xml_with_encoding( &bucket, ¶ms.prefix, &marker, @@ -560,6 +621,7 @@ pub async fn get_bucket( &result.common_prefixes, result.is_truncated, result.next_continuation_token.as_deref(), + encoding_type, ) }; (StatusCode::OK, [("content-type", "application/xml")], xml).into_response() @@ -955,6 +1017,47 @@ fn has_upload_checksum(headers: &HeaderMap) -> bool { || headers.contains_key("x-amz-checksum-crc32") } +fn persist_additional_checksums(headers: &HeaderMap, metadata: &mut HashMap) { + for algo in [ + "sha256", "sha1", "crc32", "crc32c", "crc64nvme", + ] { + let header_name = format!("x-amz-checksum-{}", algo); + if let Some(value) = headers.get(&header_name).and_then(|v| v.to_str().ok()) { + let trimmed = value.trim(); + if !trimmed.is_empty() { + metadata.insert(format!("__checksum_{}__", algo), trimmed.to_string()); + } + } + } + if let Some(value) = headers + .get("x-amz-sdk-checksum-algorithm") + .and_then(|v| v.to_str().ok()) + { + let trimmed = value.trim().to_ascii_uppercase(); + if !trimmed.is_empty() { + metadata.insert("__checksum_algorithm__".to_string(), trimmed); + } + } +} + +fn apply_stored_checksum_headers(resp_headers: &mut HeaderMap, metadata: &HashMap) { + for algo in [ + "sha256", "sha1", "crc32", "crc32c", "crc64nvme", + ] { + if let Some(value) = metadata.get(&format!("__checksum_{}__", algo)) { + if let Ok(parsed) = value.parse() { + resp_headers.insert( + axum::http::HeaderName::from_bytes( + format!("x-amz-checksum-{}", algo).as_bytes(), + ) + .unwrap(), + parsed, + ); + } + } + } +} + fn validate_upload_checksums(headers: &HeaderMap, data: &[u8]) -> Result<(), Response> { if let Some(expected) = base64_header_bytes(headers, "content-md5")? { if expected.len() != 16 || Md5::digest(data).as_slice() != expected.as_slice() { @@ -984,7 +1087,7 @@ fn validate_upload_checksums(headers: &HeaderMap, data: &[u8]) -> Result<(), Res Ok(()) } -async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, Response> { +async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result { if aws_chunked { let mut reader = chunked::decode_body(body); let mut data = Vec::new(); @@ -994,12 +1097,12 @@ async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, R "Failed to read aws-chunked request body", )) })?; - return Ok(data); + return Ok(bytes::Bytes::from(data)); } http_body_util::BodyExt::collect(body) .await - .map(|collected| collected.to_bytes().to_vec()) + .map(|collected| collected.to_bytes()) .map_err(|err| { if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(&err) { bad_digest_response(message) @@ -1213,6 +1316,8 @@ pub async fn put_object( } } + persist_additional_checksums(&headers, &mut metadata); + let aws_chunked = is_aws_chunked(&headers); let boxed: myfsio_storage::traits::AsyncReadStream = if has_upload_checksum(&headers) { let data = match collect_upload_body(body, aws_chunked).await { @@ -1227,8 +1332,7 @@ pub async fn put_object( Box::pin(chunked::decode_body(body)) } else { let stream = tokio_util::io::StreamReader::new( - http_body_util::BodyStream::new(body) - .map_ok(|frame| frame.into_data().unwrap_or_default()) + body.into_data_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), ); Box::pin(stream) @@ -1288,10 +1392,16 @@ pub async fn put_object( resp_headers .insert("etag", format!("\"{}\"", etag).parse().unwrap()); } + if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } resp_headers.insert( "x-amz-server-side-encryption", enc_ctx.algorithm.as_str().parse().unwrap(), ); + apply_stored_checksum_headers(&mut resp_headers, &enc_metadata); notifications::emit_object_created( &state, &bucket, @@ -1321,6 +1431,17 @@ pub async fn put_object( if let Some(ref etag) = meta.etag { resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); } + if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } + let stored = state + .storage + .get_object_metadata(&bucket, &key) + .await + .unwrap_or_default(); + apply_stored_checksum_headers(&mut resp_headers, &stored); notifications::emit_object_created( &state, &bucket, @@ -1450,7 +1571,7 @@ pub async fn get_object( } }; let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0); - let stream = ReaderStream::new(file); + let stream = ReaderStream::with_capacity(file, 256 * 1024); let body = Body::from_stream(stream); let meta = head_meta.clone(); @@ -1481,10 +1602,15 @@ pub async fn get_object( enc_info.algorithm.parse().unwrap(), ); apply_stored_response_headers(&mut resp_headers, &all_meta); + apply_stored_checksum_headers(&mut resp_headers, &all_meta); if let Some(ref requested_version) = query.version_id { if let Ok(value) = requested_version.parse() { resp_headers.insert("x-amz-version-id", value); } + } else if let Some(vid) = all_meta.get("__version_id__") { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } } apply_user_metadata(&mut resp_headers, &meta.metadata); @@ -1506,7 +1632,7 @@ pub async fn get_object( match object_result { Ok((meta, reader)) => { - let stream = ReaderStream::new(reader); + let stream = ReaderStream::with_capacity(reader, 256 * 1024); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); @@ -1525,10 +1651,15 @@ pub async fn get_object( ); headers.insert("accept-ranges", "bytes".parse().unwrap()); apply_stored_response_headers(&mut headers, &all_meta); + apply_stored_checksum_headers(&mut headers, &all_meta); if let Some(ref requested_version) = query.version_id { if let Ok(value) = requested_version.parse() { headers.insert("x-amz-version-id", value); } + } else if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + headers.insert("x-amz-version-id", value); + } } apply_user_metadata(&mut headers, &meta.metadata); @@ -1596,10 +1727,15 @@ pub async fn delete_object( .delete_object_version(&bucket, &key, version_id) .await { - Ok(()) => { + Ok(outcome) => { let mut resp_headers = HeaderMap::new(); - if let Ok(value) = version_id.parse() { - resp_headers.insert("x-amz-version-id", value); + if let Some(ref vid) = outcome.version_id { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } + if outcome.is_delete_marker { + resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap()); } notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete"); trigger_replication(&state, &bucket, &key, "delete"); @@ -1616,10 +1752,19 @@ pub async fn delete_object( } match state.storage.delete_object(&bucket, &key).await { - Ok(()) => { + Ok(outcome) => { + let mut resp_headers = HeaderMap::new(); + if let Some(ref vid) = outcome.version_id { + if let Ok(value) = vid.parse() { + resp_headers.insert("x-amz-version-id", value); + } + } + if outcome.is_delete_marker { + resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap()); + } notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete"); trigger_replication(&state, &bucket, &key, "delete"); - (StatusCode::NO_CONTENT, HeaderMap::new()).into_response() + (StatusCode::NO_CONTENT, resp_headers).into_response() } Err(e) => storage_err_response(e), } @@ -1678,10 +1823,15 @@ pub async fn head_object( ); headers.insert("accept-ranges", "bytes".parse().unwrap()); apply_stored_response_headers(&mut headers, &all_meta); + apply_stored_checksum_headers(&mut headers, &all_meta); if let Some(ref requested_version) = query.version_id { if let Ok(value) = requested_version.parse() { headers.insert("x-amz-version-id", value); } + } else if let Some(ref vid) = meta.version_id { + if let Ok(value) = vid.parse() { + headers.insert("x-amz-version-id", value); + } } apply_user_metadata(&mut headers, &meta.metadata); @@ -1714,8 +1864,7 @@ async fn upload_part_handler_with_chunking( Box::pin(chunked::decode_body(body)) } else { let stream = tokio_util::io::StreamReader::new( - http_body_util::BodyStream::new(body) - .map_ok(|frame| frame.into_data().unwrap_or_default()) + body.into_data_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), ); Box::pin(stream) @@ -2240,61 +2389,110 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R )); } - let mut deleted = Vec::new(); - let mut errors = Vec::new(); + use futures::stream::{self, StreamExt}; - for obj in &parsed.objects { - if let Err(message) = match obj.version_id.as_deref() { - Some(version_id) if version_id != "null" => match state - .storage - .get_object_version_metadata(bucket, &obj.key, version_id) - .await - { - Ok(metadata) => object_lock::can_delete_object(&metadata, false), - Err(err) => Err(S3Error::from(err).message), - }, - _ => match state.storage.head_object(bucket, &obj.key).await { - Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await { - Ok(metadata) => object_lock::can_delete_object(&metadata, false), - Err(err) => Err(S3Error::from(err).message), - }, - Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), - Err(err) => Err(S3Error::from(err).message), - }, - } { - errors.push(( - obj.key.clone(), - S3ErrorCode::AccessDenied.as_str().to_string(), - message, - )); - continue; - } - let delete_result = if let Some(version_id) = obj.version_id.as_deref() { - if version_id == "null" { - state.storage.delete_object(bucket, &obj.key).await - } else { - state - .storage - .delete_object_version(bucket, &obj.key, version_id) - .await - } - } else { - state.storage.delete_object(bucket, &obj.key).await - }; + let results: Vec<(String, Option, Result)> = + stream::iter(parsed.objects.iter().cloned()) + .map(|obj| { + let state = state.clone(); + let bucket = bucket.to_string(); + async move { + let key = obj.key.clone(); + let requested_vid = obj.version_id.clone(); + let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() { + Some(version_id) if version_id != "null" => match state + .storage + .get_object_version_metadata(&bucket, &obj.key, version_id) + .await + { + Ok(metadata) => object_lock::can_delete_object(&metadata, false) + .map_err(|m| { + (S3ErrorCode::AccessDenied.as_str().to_string(), m) + }), + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) + } + }, + _ => match state.storage.head_object(&bucket, &obj.key).await { + Ok(_) => { + match state + .storage + .get_object_metadata(&bucket, &obj.key) + .await + { + Ok(metadata) => object_lock::can_delete_object(&metadata, false) + .map_err(|m| { + ( + S3ErrorCode::AccessDenied.as_str().to_string(), + m, + ) + }), + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) + } + } + } + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => { + Ok(()) + } + Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => { + Ok(()) + } + Err(err) => { + let s3err = S3Error::from(err); + Err((s3err.code.as_str().to_string(), s3err.message)) + } + }, + }; - match delete_result { - Ok(()) => { - notifications::emit_object_removed(state, bucket, &obj.key, "", "", "", "Delete"); - trigger_replication(state, bucket, &obj.key, "delete"); - deleted.push((obj.key.clone(), obj.version_id.clone())) + let result = match lock_check { + Err(e) => Err(e), + Ok(()) => { + let outcome = match obj.version_id.as_deref() { + Some(version_id) if version_id != "null" => { + state + .storage + .delete_object_version(&bucket, &obj.key, version_id) + .await + } + _ => state.storage.delete_object(&bucket, &obj.key).await, + }; + outcome.map_err(|e| { + let s3err = S3Error::from(e); + (s3err.code.as_str().to_string(), s3err.message) + }) + } + }; + (key, requested_vid, result) + } + }) + .buffer_unordered(32) + .collect() + .await; + + let mut deleted: Vec = Vec::new(); + let mut errors: Vec<(String, String, String)> = Vec::new(); + for (key, requested_vid, result) in results { + match result { + Ok(outcome) => { + notifications::emit_object_removed(state, bucket, &key, "", "", "", "Delete"); + trigger_replication(state, bucket, &key, "delete"); + let delete_marker_version_id = if outcome.is_delete_marker { + outcome.version_id.clone() + } else { + None + }; + deleted.push(myfsio_xml::response::DeletedEntry { + key, + version_id: requested_vid, + delete_marker: outcome.is_delete_marker, + delete_marker_version_id, + }); } - Err(e) => { - let s3err = S3Error::from(e); - errors.push(( - obj.key.clone(), - s3err.code.as_str().to_string(), - s3err.message, - )); + Err((code, message)) => { + errors.push((key, code, message)); } } } @@ -2366,7 +2564,7 @@ async fn range_get_handler( let length = end - start + 1; let limited = file.take(length); - let stream = ReaderStream::new(limited); + let stream = ReaderStream::with_capacity(limited, 256 * 1024); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); diff --git a/crates/myfsio-server/src/handlers/ui_api.rs b/crates/myfsio-server/src/handlers/ui_api.rs index 1d778be..98939a3 100644 --- a/crates/myfsio-server/src/handlers/ui_api.rs +++ b/crates/myfsio-server/src/handlers/ui_api.rs @@ -121,6 +121,8 @@ fn storage_status(err: &StorageError) -> StatusCode { | StorageError::ObjectNotFound { .. } | StorageError::VersionNotFound { .. } | StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND, + StorageError::DeleteMarker { .. } => StatusCode::NOT_FOUND, + StorageError::MethodNotAllowed(_) => StatusCode::METHOD_NOT_ALLOWED, StorageError::InvalidBucketName(_) | StorageError::InvalidObjectKey(_) | StorageError::InvalidRange @@ -2599,7 +2601,7 @@ async fn move_object_json(state: &AppState, bucket: &str, key: &str, body: Body) match state.storage.copy_object(bucket, key, dest_bucket, dest_key).await { Ok(_) => match state.storage.delete_object(bucket, key).await { - Ok(()) => { + Ok(_) => { super::trigger_replication(state, dest_bucket, dest_key, "write"); super::trigger_replication(state, bucket, key, "delete"); Json(json!({ @@ -2674,7 +2676,7 @@ async fn delete_object_json( } match state.storage.delete_object(bucket, key).await { - Ok(()) => { + Ok(_) => { super::trigger_replication(state, bucket, key, "delete"); Json(json!({ "status": "ok", @@ -2953,7 +2955,7 @@ pub async fn bulk_delete_objects( for key in keys { match state.storage.delete_object(&bucket_name, &key).await { - Ok(()) => { + Ok(_) => { super::trigger_replication(&state, &bucket_name, &key, "delete"); if payload.purge_versions { if let Err(err) = diff --git a/crates/myfsio-server/src/lib.rs b/crates/myfsio-server/src/lib.rs index e613b18..620e0a7 100644 --- a/crates/myfsio-server/src/lib.rs +++ b/crates/myfsio-server/src/lib.rs @@ -335,8 +335,12 @@ pub fn create_ui_router(state: state::AppState) -> Router { } pub fn create_router(state: state::AppState) -> Router { - let default_rate_limit = middleware::RateLimitLayerState::new( + let default_rate_limit = middleware::RateLimitLayerState::with_per_op( state.config.ratelimit_default, + state.config.ratelimit_list_buckets, + state.config.ratelimit_bucket_ops, + state.config.ratelimit_object_ops, + state.config.ratelimit_head_ops, state.config.num_trusted_proxies, ); let admin_rate_limit = middleware::RateLimitLayerState::new( diff --git a/crates/myfsio-server/src/middleware/auth.rs b/crates/myfsio-server/src/middleware/auth.rs index 202c03c..494c4f5 100644 --- a/crates/myfsio-server/src/middleware/auth.rs +++ b/crates/myfsio-server/src/middleware/auth.rs @@ -1344,7 +1344,7 @@ fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult { } if elapsed < -(state.config.sigv4_timestamp_tolerance_secs as i64) { return AuthResult::Denied(S3Error::new( - S3ErrorCode::AccessDenied, + S3ErrorCode::RequestTimeTooSkewed, "Request is too far in the future", )); } @@ -1414,8 +1414,11 @@ fn check_timestamp_freshness(amz_date: &str, tolerance_secs: u64) -> Option tolerance_secs { return Some(S3Error::new( - S3ErrorCode::AccessDenied, - "Request timestamp too old or too far in the future", + S3ErrorCode::RequestTimeTooSkewed, + format!( + "The difference between the request time and the server's time is too large ({}s, tolerance {}s)", + diff, tolerance_secs + ), )); } None diff --git a/crates/myfsio-server/src/middleware/ratelimit.rs b/crates/myfsio-server/src/middleware/ratelimit.rs index bcdb2c8..6bf26aa 100644 --- a/crates/myfsio-server/src/middleware/ratelimit.rs +++ b/crates/myfsio-server/src/middleware/ratelimit.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use axum::extract::{ConnectInfo, Request, State}; -use axum::http::{header, StatusCode}; +use axum::http::{header, Method, StatusCode}; use axum::middleware::Next; use axum::response::{IntoResponse, Response}; use parking_lot::Mutex; @@ -13,17 +13,77 @@ use crate::config::RateLimitSetting; #[derive(Clone)] pub struct RateLimitLayerState { - limiter: Arc, + default_limiter: Arc, + list_buckets_limiter: Option>, + bucket_ops_limiter: Option>, + object_ops_limiter: Option>, + head_ops_limiter: Option>, num_trusted_proxies: usize, } impl RateLimitLayerState { pub fn new(setting: RateLimitSetting, num_trusted_proxies: usize) -> Self { Self { - limiter: Arc::new(FixedWindowLimiter::new(setting)), + default_limiter: Arc::new(FixedWindowLimiter::new(setting)), + list_buckets_limiter: None, + bucket_ops_limiter: None, + object_ops_limiter: None, + head_ops_limiter: None, num_trusted_proxies, } } + + pub fn with_per_op( + default: RateLimitSetting, + list_buckets: RateLimitSetting, + bucket_ops: RateLimitSetting, + object_ops: RateLimitSetting, + head_ops: RateLimitSetting, + num_trusted_proxies: usize, + ) -> Self { + Self { + default_limiter: Arc::new(FixedWindowLimiter::new(default)), + list_buckets_limiter: (list_buckets != default) + .then(|| Arc::new(FixedWindowLimiter::new(list_buckets))), + bucket_ops_limiter: (bucket_ops != default) + .then(|| Arc::new(FixedWindowLimiter::new(bucket_ops))), + object_ops_limiter: (object_ops != default) + .then(|| Arc::new(FixedWindowLimiter::new(object_ops))), + head_ops_limiter: (head_ops != default) + .then(|| Arc::new(FixedWindowLimiter::new(head_ops))), + num_trusted_proxies, + } + } + + fn select_limiter(&self, req: &Request) -> &Arc { + let path = req.uri().path(); + let method = req.method(); + if path == "/" && *method == Method::GET { + if let Some(ref limiter) = self.list_buckets_limiter { + return limiter; + } + } + let segments: Vec<&str> = path + .trim_start_matches('/') + .split('/') + .filter(|s| !s.is_empty()) + .collect(); + if *method == Method::HEAD { + if let Some(ref limiter) = self.head_ops_limiter { + return limiter; + } + } + if segments.len() == 1 { + if let Some(ref limiter) = self.bucket_ops_limiter { + return limiter; + } + } else if segments.len() >= 2 { + if let Some(ref limiter) = self.object_ops_limiter { + return limiter; + } + } + &self.default_limiter + } } #[derive(Debug)] @@ -99,7 +159,8 @@ pub async fn rate_limit_layer( next: Next, ) -> Response { let key = rate_limit_key(&req, state.num_trusted_proxies); - match state.limiter.check(&key) { + let limiter = state.select_limiter(&req); + match limiter.check(&key) { Ok(()) => next.run(req).await, Err(retry_after) => too_many_requests(retry_after), } diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index 09bba81..9e2fd8f 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -121,6 +121,10 @@ fn test_app_with_rate_limits( storage_root: tmp.path().to_path_buf(), iam_config_path: iam_path.join("iam.json"), ratelimit_default: default, + ratelimit_list_buckets: default, + ratelimit_bucket_ops: default, + ratelimit_object_ops: default, + ratelimit_head_ops: default, ratelimit_admin: admin, ui_enabled: false, ..myfsio_server::config::ServerConfig::default() @@ -2398,7 +2402,7 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() { } #[tokio::test] -async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { +async fn test_versioned_put_and_delete_emit_version_headers_and_delete_markers() { let (app, _tmp) = test_app(); app.clone() @@ -2430,7 +2434,14 @@ async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { .await .unwrap(); assert_eq!(put_resp.status(), StatusCode::OK); - assert!(!put_resp.headers().contains_key("x-amz-version-id")); + let first_version = put_resp + .headers() + .get("x-amz-version-id") + .expect("PUT on versioned bucket must emit x-amz-version-id") + .to_str() + .unwrap() + .to_string(); + assert!(!first_version.is_empty()); let overwrite_resp = app .clone() @@ -2442,7 +2453,14 @@ async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { .await .unwrap(); assert_eq!(overwrite_resp.status(), StatusCode::OK); - assert!(!overwrite_resp.headers().contains_key("x-amz-version-id")); + let second_version = overwrite_resp + .headers() + .get("x-amz-version-id") + .expect("overwrite on versioned bucket must emit a new x-amz-version-id") + .to_str() + .unwrap() + .to_string(); + assert_ne!(first_version, second_version); let delete_resp = app .clone() @@ -2454,8 +2472,14 @@ async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { .await .unwrap(); assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT); - assert!(!delete_resp.headers().contains_key("x-amz-version-id")); - assert!(!delete_resp.headers().contains_key("x-amz-delete-marker")); + assert_eq!( + delete_resp + .headers() + .get("x-amz-delete-marker") + .and_then(|v| v.to_str().ok()), + Some("true") + ); + assert!(delete_resp.headers().contains_key("x-amz-version-id")); let versions_resp = app .oneshot(signed_request( @@ -2475,7 +2499,11 @@ async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() { .to_vec(), ) .unwrap(); - assert!(!versions_body.contains("")); + assert!( + versions_body.contains(""), + "expected DeleteMarker entry in ListObjectVersions output, got: {}", + versions_body + ); } #[tokio::test] diff --git a/crates/myfsio-storage/src/error.rs b/crates/myfsio-storage/src/error.rs index 4bb652a..69b11d1 100644 --- a/crates/myfsio-storage/src/error.rs +++ b/crates/myfsio-storage/src/error.rs @@ -17,10 +17,18 @@ pub enum StorageError { key: String, version_id: String, }, + #[error("Object is a delete marker: {bucket}/{key}")] + DeleteMarker { + bucket: String, + key: String, + version_id: String, + }, #[error("Invalid bucket name: {0}")] InvalidBucketName(String), #[error("Invalid object key: {0}")] InvalidObjectKey(String), + #[error("Method not allowed: {0}")] + MethodNotAllowed(String), #[error("Upload not found: {0}")] UploadNotFound(String), #[error("Quota exceeded: {0}")] @@ -58,10 +66,17 @@ impl From for S3Error { version_id, } => S3Error::from_code(S3ErrorCode::NoSuchVersion) .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)), + StorageError::DeleteMarker { + bucket, + key, + version_id, + } => S3Error::from_code(S3ErrorCode::MethodNotAllowed) + .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)), StorageError::InvalidBucketName(msg) => { S3Error::new(S3ErrorCode::InvalidBucketName, msg) } StorageError::InvalidObjectKey(msg) => S3Error::new(S3ErrorCode::InvalidKey, msg), + StorageError::MethodNotAllowed(msg) => S3Error::new(S3ErrorCode::MethodNotAllowed, msg), StorageError::UploadNotFound(id) => S3Error::new( S3ErrorCode::NoSuchUpload, format!("Upload {} not found", id), diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index 0365beb..af6061e 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -213,7 +213,14 @@ impl FsStorageBackend { fn object_path(&self, bucket_name: &str, object_key: &str) -> StorageResult { self.validate_key(object_key)?; - Ok(self.bucket_path(bucket_name).join(object_key)) + if object_key.ends_with('/') { + Ok(self + .bucket_path(bucket_name) + .join(object_key) + .join(DIR_MARKER_FILE)) + } else { + Ok(self.bucket_path(bucket_name).join(object_key)) + } } fn validate_key(&self, object_key: &str) -> StorageResult<()> { @@ -239,6 +246,16 @@ impl FsStorageBackend { fn index_file_for_key(&self, bucket_name: &str, key: &str) -> (PathBuf, String) { let meta_root = self.bucket_meta_root(bucket_name); + if key.ends_with('/') { + let trimmed = key.trim_end_matches('/'); + if trimmed.is_empty() { + return (meta_root.join(INDEX_FILE), DIR_MARKER_FILE.to_string()); + } + return ( + meta_root.join(trimmed).join(INDEX_FILE), + DIR_MARKER_FILE.to_string(), + ); + } let key_path = Path::new(key); let entry_name = key_path .file_name() @@ -330,6 +347,55 @@ impl FsStorageBackend { self.bucket_versions_root(bucket_name).join(key) } + fn delete_markers_root(&self, bucket_name: &str) -> PathBuf { + self.system_bucket_root(bucket_name).join("delete_markers") + } + + fn delete_marker_path(&self, bucket_name: &str, key: &str) -> PathBuf { + self.delete_markers_root(bucket_name) + .join(format!("{}.json", key)) + } + + fn read_delete_marker_sync( + &self, + bucket_name: &str, + key: &str, + ) -> Option<(String, chrono::DateTime)> { + let path = self.delete_marker_path(bucket_name, key); + if !path.is_file() { + return None; + } + let content = std::fs::read_to_string(&path).ok()?; + let record: Value = serde_json::from_str(&content).ok()?; + let version_id = record + .get("version_id") + .and_then(Value::as_str)? + .to_string(); + let last_modified = record + .get("last_modified") + .and_then(Value::as_str) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|d| d.with_timezone(&Utc)) + .unwrap_or_else(Utc::now); + Some((version_id, last_modified)) + } + + fn clear_delete_marker_sync(&self, bucket_name: &str, key: &str) { + let path = self.delete_marker_path(bucket_name, key); + if path.exists() { + let _ = std::fs::remove_file(&path); + } + } + + fn new_version_id_sync() -> String { + let now = Utc::now(); + format!( + "{}-{}", + now.format("%Y%m%dT%H%M%S%6fZ"), + &Uuid::new_v4().to_string()[..8] + ) + } + fn legacy_meta_root(&self, bucket_name: &str) -> PathBuf { self.bucket_path(bucket_name).join(".meta") } @@ -737,22 +803,23 @@ impl FsStorageBackend { bucket_name: &str, key: &str, reason: &str, - ) -> std::io::Result { + ) -> std::io::Result<(u64, Option)> { let bucket_path = self.bucket_path(bucket_name); let source = bucket_path.join(key); if !source.exists() { - return Ok(0); + return Ok((0, None)); } let version_dir = self.version_dir(bucket_name, key); std::fs::create_dir_all(&version_dir)?; let now = Utc::now(); - let version_id = format!( - "{}-{}", - now.format("%Y%m%dT%H%M%S%6fZ"), - &Uuid::new_v4().to_string()[..8] - ); + let metadata = self.read_metadata_sync(bucket_name, key); + let version_id = metadata + .get("__version_id__") + .cloned() + .filter(|v| !v.is_empty() && !v.contains('/') && !v.contains('\\') && !v.contains("..")) + .unwrap_or_else(Self::new_version_id_sync); let data_path = version_dir.join(format!("{}.bin", version_id)); std::fs::copy(&source, &data_path)?; @@ -760,7 +827,6 @@ impl FsStorageBackend { let source_meta = source.metadata()?; let source_size = source_meta.len(); - let metadata = self.read_metadata_sync(bucket_name, key); let etag = Self::compute_etag_sync(&source).unwrap_or_default(); let record = serde_json::json!({ @@ -776,7 +842,43 @@ impl FsStorageBackend { let manifest_path = version_dir.join(format!("{}.json", version_id)); Self::atomic_write_json_sync(&manifest_path, &record, true)?; - Ok(source_size) + Ok((source_size, Some(version_id))) + } + + fn write_delete_marker_sync( + &self, + bucket_name: &str, + key: &str, + ) -> std::io::Result { + let version_dir = self.version_dir(bucket_name, key); + std::fs::create_dir_all(&version_dir)?; + let now = Utc::now(); + let version_id = Self::new_version_id_sync(); + + let record = serde_json::json!({ + "version_id": version_id, + "key": key, + "size": 0, + "archived_at": now.to_rfc3339(), + "etag": "", + "metadata": HashMap::::new(), + "reason": "delete-marker", + "is_delete_marker": true, + }); + + let manifest_path = version_dir.join(format!("{}.json", version_id)); + Self::atomic_write_json_sync(&manifest_path, &record, true)?; + + let marker_path = self.delete_marker_path(bucket_name, key); + if let Some(parent) = marker_path.parent() { + std::fs::create_dir_all(parent)?; + } + let marker_record = serde_json::json!({ + "version_id": version_id, + "last_modified": now.to_rfc3339(), + }); + Self::atomic_write_json_sync(&marker_path, &marker_record, true)?; + Ok(version_id) } fn version_record_paths( @@ -869,6 +971,15 @@ impl FsStorageBackend { .map(ToOwned::to_owned) .or_else(|| metadata.get("__etag__").cloned()); + let version_id = record + .get("version_id") + .and_then(Value::as_str) + .map(|s| s.to_string()); + let is_delete_marker = record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false); + let mut obj = ObjectMeta::new(key.to_string(), size, last_modified); obj.etag = etag; obj.content_type = metadata.get("__content_type__").cloned(); @@ -880,6 +991,8 @@ impl FsStorageBackend { .into_iter() .filter(|(k, _)| !k.starts_with("__")) .collect(); + obj.version_id = version_id; + obj.is_delete_marker = is_delete_marker; Ok(obj) } @@ -905,6 +1018,10 @@ impl FsStorageBackend { .get("etag") .and_then(Value::as_str) .map(|s| s.to_string()); + let is_delete_marker = record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false); VersionInfo { version_id, @@ -913,7 +1030,7 @@ impl FsStorageBackend { last_modified: archived_at, etag, is_latest: false, - is_delete_marker: false, + is_delete_marker, } } @@ -1033,7 +1150,14 @@ impl FsStorageBackend { stack.push(entry.path().to_string_lossy().to_string()); } else if ft.is_file() { let full_path = entry.path().to_string_lossy().to_string(); - let key = full_path[bucket_prefix_len..].replace('\\', "/"); + let mut key = full_path[bucket_prefix_len..].replace('\\', "/"); + let is_dir_marker = name_str.as_ref() == DIR_MARKER_FILE; + if is_dir_marker { + key = key + .strip_suffix(DIR_MARKER_FILE) + .unwrap_or(&key) + .to_string(); + } if let Ok(meta) = entry.metadata() { let mtime = meta .modified() @@ -1049,7 +1173,11 @@ impl FsStorageBackend { let etags = dir_etag_cache .entry(rel_dir.clone()) .or_insert_with(|| self.load_dir_index_sync(bucket_name, &rel_dir)); - let etag = etags.get(name_str.as_ref()).cloned(); + let etag = if is_dir_marker { + None + } else { + etags.get(name_str.as_ref()).cloned() + }; all_keys.push((key, meta.len(), mtime, etag)); } @@ -1200,11 +1328,36 @@ impl FsStorageBackend { Err(_) => continue, }; - let rel = format!("{}{}", rel_dir_prefix, name_str); - if ft.is_dir() { - dirs.push(format!("{}{}", rel, delimiter)); + let subdir_path = entry.path(); + let marker_path = subdir_path.join(DIR_MARKER_FILE); + if marker_path.is_file() { + if let Ok(meta) = std::fs::metadata(&marker_path) { + let mtime = meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); + let mut obj = ObjectMeta::new( + format!("{}{}/", rel_dir_prefix, name_str), + meta.len(), + lm, + ); + obj.etag = None; + files.push(obj); + } + } + dirs.push(format!("{}{}{}", rel_dir_prefix, name_str, delimiter)); } else if ft.is_file() { + if name_str == DIR_MARKER_FILE { + continue; + } + let rel = format!("{}{}", rel_dir_prefix, name_str); if let Ok(meta) = entry.metadata() { let mtime = meta .modified() @@ -1438,10 +1591,19 @@ impl FsStorageBackend { .map(|d| d.as_secs_f64()) .unwrap_or(0.0); + let new_version_id = if versioning_enabled { + Some(Self::new_version_id_sync()) + } else { + None + }; + let mut internal_meta = HashMap::new(); internal_meta.insert("__etag__".to_string(), etag.clone()); internal_meta.insert("__size__".to_string(), new_size.to_string()); internal_meta.insert("__last_modified__".to_string(), mtime.to_string()); + if let Some(ref vid) = new_version_id { + internal_meta.insert("__version_id__".to_string(), vid.clone()); + } if let Some(ref user_meta) = metadata { for (k, v) in user_meta { @@ -1452,6 +1614,10 @@ impl FsStorageBackend { self.write_metadata_sync(bucket_name, key, &internal_meta) .map_err(StorageError::Io)?; + if versioning_enabled { + self.clear_delete_marker_sync(bucket_name, key); + } + let lm = Utc .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) .single() @@ -1460,6 +1626,7 @@ impl FsStorageBackend { let mut obj = ObjectMeta::new(key.to_string(), new_size, lm); obj.etag = Some(etag); obj.metadata = metadata.unwrap_or_default(); + obj.version_id = new_version_id; Ok(obj) } @@ -1597,27 +1764,28 @@ impl crate::traits::StorageEngine for FsStorageBackend { .map_err(StorageError::Io)?; let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - let mut file = tokio::fs::File::create(&tmp_path) + let file = tokio::fs::File::create(&tmp_path) .await .map_err(StorageError::Io)?; + let mut writer = tokio::io::BufWriter::with_capacity(256 * 1024, file); let mut hasher = Md5::new(); let mut total_size: u64 = 0; - let mut buf = [0u8; 65536]; + let mut buf = vec![0u8; 256 * 1024]; loop { let n = stream.read(&mut buf).await.map_err(StorageError::Io)?; if n == 0 { break; } hasher.update(&buf[..n]); - tokio::io::AsyncWriteExt::write_all(&mut file, &buf[..n]) + tokio::io::AsyncWriteExt::write_all(&mut writer, &buf[..n]) .await .map_err(StorageError::Io)?; total_size += n as u64; } - tokio::io::AsyncWriteExt::flush(&mut file) + tokio::io::AsyncWriteExt::flush(&mut writer) .await .map_err(StorageError::Io)?; - drop(file); + drop(writer); let etag = format!("{:x}", hasher.finalize()); self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) @@ -1628,8 +1796,18 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { + self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_enabled { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } return Err(StorageError::ObjectNotFound { bucket: bucket.to_string(), key: key.to_string(), @@ -1656,6 +1834,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { .get("__storage_class__") .cloned() .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); obj.metadata = stored_meta .into_iter() .filter(|(k, _)| !k.starts_with("__")) @@ -1669,8 +1848,18 @@ impl crate::traits::StorageEngine for FsStorageBackend { } async fn get_object_path(&self, bucket: &str, key: &str) -> StorageResult { + self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_enabled { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } return Err(StorageError::ObjectNotFound { bucket: bucket.to_string(), key: key.to_string(), @@ -1680,8 +1869,18 @@ impl crate::traits::StorageEngine for FsStorageBackend { } async fn head_object(&self, bucket: &str, key: &str) -> StorageResult { + self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_enabled { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } + } return Err(StorageError::ObjectNotFound { bucket: bucket.to_string(), key: key.to_string(), @@ -1708,6 +1907,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { .get("__storage_class__") .cloned() .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); obj.metadata = stored_meta .into_iter() .filter(|(k, _)| !k.starts_with("__")) @@ -1760,17 +1960,33 @@ impl crate::traits::StorageEngine for FsStorageBackend { Ok(Self::version_metadata_from_record(&record)) } - async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()> { + async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult { let bucket_path = self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; - if !path.exists() { - return Ok(()); + let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled; + + if versioning_enabled { + if path.exists() { + self.archive_current_version_sync(bucket, key, "delete") + .map_err(StorageError::Io)?; + Self::safe_unlink(&path).map_err(StorageError::Io)?; + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io)?; + Self::cleanup_empty_parents(&path, &bucket_path); + } + let dm_version_id = self + .write_delete_marker_sync(bucket, key) + .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); + return Ok(DeleteOutcome { + version_id: Some(dm_version_id), + is_delete_marker: true, + existed: true, + }); } - let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled; - if versioning_enabled { - self.archive_current_version_sync(bucket, key, "delete") - .map_err(StorageError::Io)?; + if !path.exists() { + return Ok(DeleteOutcome::default()); } Self::safe_unlink(&path).map_err(StorageError::Io)?; @@ -1779,7 +1995,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { Self::cleanup_empty_parents(&path, &bucket_path); self.invalidate_bucket_caches(bucket); - Ok(()) + Ok(DeleteOutcome { + version_id: None, + is_delete_marker: false, + existed: true, + }) } async fn delete_object_version( @@ -1787,7 +2007,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, version_id: &str, - ) -> StorageResult<()> { + ) -> StorageResult { self.require_bucket(bucket)?; self.validate_key(key)?; Self::validate_version_id(bucket, key, version_id)?; @@ -1800,12 +2020,35 @@ impl crate::traits::StorageEngine for FsStorageBackend { }); } + let is_delete_marker = if manifest_path.is_file() { + std::fs::read_to_string(&manifest_path) + .ok() + .and_then(|content| serde_json::from_str::(&content).ok()) + .and_then(|record| record.get("is_delete_marker").and_then(Value::as_bool)) + .unwrap_or(false) + } else { + false + }; + Self::safe_unlink(&data_path).map_err(StorageError::Io)?; Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; let versions_root = self.bucket_versions_root(bucket); Self::cleanup_empty_parents(&manifest_path, &versions_root); + + if is_delete_marker { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + if dm_version_id == version_id { + self.clear_delete_marker_sync(bucket, key); + } + } + } + self.invalidate_bucket_caches(bucket); - Ok(()) + Ok(DeleteOutcome { + version_id: Some(version_id.to_string()), + is_delete_marker, + existed: true, + }) } async fn copy_object( @@ -2033,12 +2276,13 @@ impl crate::traits::StorageEngine for FsStorageBackend { .map_err(StorageError::Io)?; } - let mut dst = tokio::fs::File::create(&tmp_file) + let dst_file = tokio::fs::File::create(&tmp_file) .await .map_err(StorageError::Io)?; + let mut dst = tokio::io::BufWriter::with_capacity(256 * 1024, dst_file); let mut hasher = Md5::new(); let mut remaining = length; - let mut buf = vec![0u8; 65536]; + let mut buf = vec![0u8; 256 * 1024]; while remaining > 0 { let to_read = std::cmp::min(remaining as usize, buf.len()); let n = src @@ -2122,12 +2366,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { .map_err(StorageError::Io)?; let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - let mut out_file = tokio::fs::File::create(&tmp_path) + let out_raw = tokio::fs::File::create(&tmp_path) .await .map_err(StorageError::Io)?; + let mut out_file = tokio::io::BufWriter::with_capacity(256 * 1024, out_raw); let mut md5_digest_concat = Vec::new(); let mut total_size: u64 = 0; let part_count = parts.len(); + let mut buf = vec![0u8; 256 * 1024]; for part_info in parts { let part_file = upload_dir.join(format!("part-{:05}.part", part_info.part_number)); @@ -2138,11 +2384,11 @@ impl crate::traits::StorageEngine for FsStorageBackend { part_info.part_number ))); } - let mut part_reader = tokio::fs::File::open(&part_file) + let part_reader = tokio::fs::File::open(&part_file) .await .map_err(StorageError::Io)?; + let mut part_reader = tokio::io::BufReader::with_capacity(256 * 1024, part_reader); let mut part_hasher = Md5::new(); - let mut buf = [0u8; 65536]; loop { let n = part_reader.read(&mut buf).await.map_err(StorageError::Io)?; if n == 0 { diff --git a/crates/myfsio-storage/src/traits.rs b/crates/myfsio-storage/src/traits.rs index af069b9..e818fa0 100644 --- a/crates/myfsio-storage/src/traits.rs +++ b/crates/myfsio-storage/src/traits.rs @@ -62,14 +62,14 @@ pub trait StorageEngine: Send + Sync { version_id: &str, ) -> StorageResult>; - async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>; + async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult; async fn delete_object_version( &self, bucket: &str, key: &str, version_id: &str, - ) -> StorageResult<()>; + ) -> StorageResult; async fn copy_object( &self, diff --git a/crates/myfsio-storage/src/validation.rs b/crates/myfsio-storage/src/validation.rs index 05c9bdf..a5688b5 100644 --- a/crates/myfsio-storage/src/validation.rs +++ b/crates/myfsio-storage/src/validation.rs @@ -60,6 +60,12 @@ pub fn validate_object_key( return Some("Object key contains invalid segments".to_string()); } + if part.len() > 255 { + return Some( + "Object key contains a path segment that exceeds 255 bytes".to_string(), + ); + } + if part.chars().any(|c| (c as u32) < 32) { return Some("Object key contains control characters".to_string()); } @@ -98,6 +104,12 @@ pub fn validate_object_key( } } + for part in &non_empty_parts { + if *part == ".__myfsio_dirobj__" || part.starts_with("_index.json") { + return Some("Object key segment uses a reserved internal name".to_string()); + } + } + None } diff --git a/crates/myfsio-xml/Cargo.toml b/crates/myfsio-xml/Cargo.toml index 71b65e2..af9b616 100644 --- a/crates/myfsio-xml/Cargo.toml +++ b/crates/myfsio-xml/Cargo.toml @@ -8,3 +8,4 @@ myfsio-common = { path = "../myfsio-common" } quick-xml = { workspace = true } serde = { workspace = true } chrono = { workspace = true } +percent-encoding = { workspace = true } diff --git a/crates/myfsio-xml/src/request.rs b/crates/myfsio-xml/src/request.rs index a97123c..cf7ff72 100644 --- a/crates/myfsio-xml/src/request.rs +++ b/crates/myfsio-xml/src/request.rs @@ -1,13 +1,13 @@ use quick_xml::events::Event; use quick_xml::Reader; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct DeleteObjectsRequest { pub objects: Vec, pub quiet: bool, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ObjectIdentifier { pub key: String, pub version_id: Option, diff --git a/crates/myfsio-xml/src/response.rs b/crates/myfsio-xml/src/response.rs index 2a9bd98..dcf84b2 100644 --- a/crates/myfsio-xml/src/response.rs +++ b/crates/myfsio-xml/src/response.rs @@ -62,6 +62,21 @@ pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta] String::from_utf8(writer.into_inner().into_inner()).unwrap() } +fn maybe_url_encode(value: &str, encoding_type: Option<&str>) -> String { + if matches!(encoding_type, Some(v) if v.eq_ignore_ascii_case("url")) { + percent_encoding::utf8_percent_encode(value, KEY_ENCODE_SET).to_string() + } else { + value.to_string() + } +} + +const KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::NON_ALPHANUMERIC + .remove(b'-') + .remove(b'_') + .remove(b'.') + .remove(b'~') + .remove(b'/'); + pub fn list_objects_v2_xml( bucket_name: &str, prefix: &str, @@ -73,6 +88,34 @@ pub fn list_objects_v2_xml( continuation_token: Option<&str>, next_continuation_token: Option<&str>, key_count: usize, +) -> String { + list_objects_v2_xml_with_encoding( + bucket_name, + prefix, + delimiter, + max_keys, + objects, + common_prefixes, + is_truncated, + continuation_token, + next_continuation_token, + key_count, + None, + ) +} + +pub fn list_objects_v2_xml_with_encoding( + bucket_name: &str, + prefix: &str, + delimiter: &str, + max_keys: usize, + objects: &[ObjectMeta], + common_prefixes: &[String], + is_truncated: bool, + continuation_token: Option<&str>, + next_continuation_token: Option<&str>, + key_count: usize, + encoding_type: Option<&str>, ) -> String { let mut writer = Writer::new(Cursor::new(Vec::new())); @@ -85,13 +128,22 @@ pub fn list_objects_v2_xml( writer.write_event(Event::Start(start)).unwrap(); write_text_element(&mut writer, "Name", bucket_name); - write_text_element(&mut writer, "Prefix", prefix); + write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type)); if !delimiter.is_empty() { - write_text_element(&mut writer, "Delimiter", delimiter); + write_text_element( + &mut writer, + "Delimiter", + &maybe_url_encode(delimiter, encoding_type), + ); } write_text_element(&mut writer, "MaxKeys", &max_keys.to_string()); write_text_element(&mut writer, "KeyCount", &key_count.to_string()); write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string()); + if let Some(encoding) = encoding_type { + if !encoding.is_empty() { + write_text_element(&mut writer, "EncodingType", encoding); + } + } if let Some(token) = continuation_token { write_text_element(&mut writer, "ContinuationToken", token); @@ -104,7 +156,7 @@ pub fn list_objects_v2_xml( writer .write_event(Event::Start(BytesStart::new("Contents"))) .unwrap(); - write_text_element(&mut writer, "Key", &obj.key); + write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type)); write_text_element( &mut writer, "LastModified", @@ -128,7 +180,7 @@ pub fn list_objects_v2_xml( writer .write_event(Event::Start(BytesStart::new("CommonPrefixes"))) .unwrap(); - write_text_element(&mut writer, "Prefix", prefix); + write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type)); writer .write_event(Event::End(BytesEnd::new("CommonPrefixes"))) .unwrap(); @@ -151,6 +203,32 @@ pub fn list_objects_v1_xml( common_prefixes: &[String], is_truncated: bool, next_marker: Option<&str>, +) -> String { + list_objects_v1_xml_with_encoding( + bucket_name, + prefix, + marker, + delimiter, + max_keys, + objects, + common_prefixes, + is_truncated, + next_marker, + None, + ) +} + +pub fn list_objects_v1_xml_with_encoding( + bucket_name: &str, + prefix: &str, + marker: &str, + delimiter: &str, + max_keys: usize, + objects: &[ObjectMeta], + common_prefixes: &[String], + is_truncated: bool, + next_marker: Option<&str>, + encoding_type: Option<&str>, ) -> String { let mut writer = Writer::new(Cursor::new(Vec::new())); @@ -163,27 +241,36 @@ pub fn list_objects_v1_xml( writer.write_event(Event::Start(start)).unwrap(); write_text_element(&mut writer, "Name", bucket_name); - write_text_element(&mut writer, "Prefix", prefix); - write_text_element(&mut writer, "Marker", marker); + write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type)); + write_text_element(&mut writer, "Marker", &maybe_url_encode(marker, encoding_type)); write_text_element(&mut writer, "MaxKeys", &max_keys.to_string()); write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string()); if !delimiter.is_empty() { - write_text_element(&mut writer, "Delimiter", delimiter); + write_text_element( + &mut writer, + "Delimiter", + &maybe_url_encode(delimiter, encoding_type), + ); } if !delimiter.is_empty() && is_truncated { if let Some(nm) = next_marker { if !nm.is_empty() { - write_text_element(&mut writer, "NextMarker", nm); + write_text_element(&mut writer, "NextMarker", &maybe_url_encode(nm, encoding_type)); } } } + if let Some(encoding) = encoding_type { + if !encoding.is_empty() { + write_text_element(&mut writer, "EncodingType", encoding); + } + } for obj in objects { writer .write_event(Event::Start(BytesStart::new("Contents"))) .unwrap(); - write_text_element(&mut writer, "Key", &obj.key); + write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type)); write_text_element( &mut writer, "LastModified", @@ -202,7 +289,7 @@ pub fn list_objects_v1_xml( writer .write_event(Event::Start(BytesStart::new("CommonPrefixes"))) .unwrap(); - write_text_element(&mut writer, "Prefix", cp); + write_text_element(&mut writer, "Prefix", &maybe_url_encode(cp, encoding_type)); writer .write_event(Event::End(BytesEnd::new("CommonPrefixes"))) .unwrap(); @@ -325,8 +412,15 @@ pub fn copy_object_result_xml(etag: &str, last_modified: &str) -> String { String::from_utf8(writer.into_inner().into_inner()).unwrap() } +pub struct DeletedEntry { + pub key: String, + pub version_id: Option, + pub delete_marker: bool, + pub delete_marker_version_id: Option, +} + pub fn delete_result_xml( - deleted: &[(String, Option)], + deleted: &[DeletedEntry], errors: &[(String, String, String)], quiet: bool, ) -> String { @@ -340,14 +434,20 @@ pub fn delete_result_xml( writer.write_event(Event::Start(start)).unwrap(); if !quiet { - for (key, version_id) in deleted { + for entry in deleted { writer .write_event(Event::Start(BytesStart::new("Deleted"))) .unwrap(); - write_text_element(&mut writer, "Key", key); - if let Some(vid) = version_id { + write_text_element(&mut writer, "Key", &entry.key); + if let Some(ref vid) = entry.version_id { write_text_element(&mut writer, "VersionId", vid); } + if entry.delete_marker { + write_text_element(&mut writer, "DeleteMarker", "true"); + if let Some(ref dm_vid) = entry.delete_marker_version_id { + write_text_element(&mut writer, "DeleteMarkerVersionId", dm_vid); + } + } writer .write_event(Event::End(BytesEnd::new("Deleted"))) .unwrap();