From f2df64479cbde7c4223b66124945debb1c76c551 Mon Sep 17 00:00:00 2001 From: kqjy Date: Thu, 23 Apr 2026 22:40:38 +0800 Subject: [PATCH] Fix S3 versioning (live-object VersionId, DM PUT/DELETE), harden DeleteObjects/ListObjects conformance, and run hot paths on blocking threads --- crates/myfsio-common/src/error.rs | 6 +- crates/myfsio-server/src/handlers/mod.rs | 4 +- .../myfsio-server/src/middleware/ratelimit.rs | 23 +- crates/myfsio-server/tests/integration.rs | 376 ++++++++- crates/myfsio-storage/src/error.rs | 2 +- crates/myfsio-storage/src/fs_backend.rs | 738 +++++++++++++----- crates/myfsio-storage/src/validation.rs | 6 +- crates/myfsio-xml/src/request.rs | 41 +- crates/myfsio-xml/src/response.rs | 19 +- 9 files changed, 994 insertions(+), 221 deletions(-) diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs index c1a0c4d..5a39eec 100644 --- a/crates/myfsio-common/src/error.rs +++ b/crates/myfsio-common/src/error.rs @@ -5,6 +5,7 @@ pub enum S3ErrorCode { AccessDenied, BadDigest, BucketAlreadyExists, + BucketAlreadyOwnedByYou, BucketNotEmpty, EntityTooLarge, EntityTooSmall, @@ -43,6 +44,7 @@ impl S3ErrorCode { Self::AccessDenied => 403, Self::BadDigest => 400, Self::BucketAlreadyExists => 409, + Self::BucketAlreadyOwnedByYou => 409, Self::BucketNotEmpty => 409, Self::EntityTooLarge => 413, Self::EntityTooSmall => 400, @@ -72,7 +74,7 @@ impl S3ErrorCode { Self::RequestTimeTooSkewed => 403, Self::ServerSideEncryptionConfigurationNotFoundError => 404, Self::SignatureDoesNotMatch => 403, - Self::SlowDown => 429, + Self::SlowDown => 503, } } @@ -81,6 +83,7 @@ impl S3ErrorCode { Self::AccessDenied => "AccessDenied", Self::BadDigest => "BadDigest", Self::BucketAlreadyExists => "BucketAlreadyExists", + Self::BucketAlreadyOwnedByYou => "BucketAlreadyOwnedByYou", Self::BucketNotEmpty => "BucketNotEmpty", Self::EntityTooLarge => "EntityTooLarge", Self::EntityTooSmall => "EntityTooSmall", @@ -121,6 +124,7 @@ impl S3ErrorCode { Self::AccessDenied => "Access Denied", Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received", Self::BucketAlreadyExists => "The requested bucket name is not available", + Self::BucketAlreadyOwnedByYou => "Your previous request to create the named bucket succeeded and you already own it", Self::BucketNotEmpty => "The bucket you tried to delete is not empty", Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size", Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size", diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs index c842cbf..a0657b1 100644 --- a/crates/myfsio-server/src/handlers/mod.rs +++ b/crates/myfsio-server/src/handlers/mod.rs @@ -148,6 +148,7 @@ async fn ensure_object_lock_allows_write( Ok(()) } Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), + Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => Ok(()), Err(err) => Err(storage_err_response(err)), } } @@ -2666,7 +2667,8 @@ async fn evaluate_put_preconditions( } None } - Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => { + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) + | Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => { if has_if_match { Some(s3_error_response(S3Error::from_code( S3ErrorCode::PreconditionFailed, diff --git a/crates/myfsio-server/src/middleware/ratelimit.rs b/crates/myfsio-server/src/middleware/ratelimit.rs index 6bf26aa..e1c624d 100644 --- a/crates/myfsio-server/src/middleware/ratelimit.rs +++ b/crates/myfsio-server/src/middleware/ratelimit.rs @@ -162,20 +162,31 @@ pub async fn rate_limit_layer( let limiter = state.select_limiter(&req); match limiter.check(&key) { Ok(()) => next.run(req).await, - Err(retry_after) => too_many_requests(retry_after), + Err(retry_after) => { + let resource = req.uri().path().to_string(); + too_many_requests(retry_after, &resource) + } } } -fn too_many_requests(retry_after: u64) -> Response { - ( - StatusCode::TOO_MANY_REQUESTS, +fn too_many_requests(retry_after: u64, resource: &str) -> Response { + let request_id = uuid::Uuid::new_v4().simple().to_string(); + let body = myfsio_xml::response::rate_limit_exceeded_xml(resource, &request_id); + let mut response = ( + StatusCode::SERVICE_UNAVAILABLE, [ (header::CONTENT_TYPE, "application/xml".to_string()), (header::RETRY_AFTER, retry_after.to_string()), ], - myfsio_xml::response::rate_limit_exceeded_xml(), + body, ) - .into_response() + .into_response(); + if let Ok(value) = request_id.parse() { + response + .headers_mut() + .insert("x-amz-request-id", value); + } + response } fn rate_limit_key(req: &Request, num_trusted_proxies: usize) -> String { diff --git a/crates/myfsio-server/tests/integration.rs b/crates/myfsio-server/tests/integration.rs index 9e2fd8f..f33b8bc 100644 --- a/crates/myfsio-server/tests/integration.rs +++ b/crates/myfsio-server/tests/integration.rs @@ -163,7 +163,7 @@ async fn rate_limit_default_and_admin_are_independent() { ) .await .unwrap(); - assert_eq!(second.status(), StatusCode::TOO_MANY_REQUESTS); + assert_eq!(second.status(), StatusCode::SERVICE_UNAVAILABLE); assert!(second.headers().contains_key("retry-after")); let admin_first = app @@ -199,7 +199,7 @@ async fn rate_limit_default_and_admin_are_independent() { ) .await .unwrap(); - assert_eq!(admin_third.status(), StatusCode::TOO_MANY_REQUESTS); + assert_eq!(admin_third.status(), StatusCode::SERVICE_UNAVAILABLE); } fn test_ui_state() -> (myfsio_server::state::AppState, tempfile::TempDir) { @@ -2311,9 +2311,16 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() { ) .unwrap(); let archived_version_id = list_body - .split("") - .filter_map(|part| part.split_once("").map(|(id, _)| id)) - .find(|id| *id != "null") + .split("") + .skip(1) + .find(|block| block.contains("false")) + .and_then(|block| { + block + .split("") + .nth(1) + .and_then(|s| s.split_once("").map(|(id, _)| id)) + }) + .filter(|id| *id != "null") .expect("archived version id") .to_string(); @@ -2506,6 +2513,352 @@ async fn test_versioned_put_and_delete_emit_version_headers_and_delete_markers() ); } +#[tokio::test] +async fn test_consecutive_slashes_in_key_round_trip() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/slashes-bucket", Body::empty())) + .await + .unwrap(); + + let put_ab = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/slashes-bucket/a/b", + Body::from("single"), + )) + .await + .unwrap(); + assert_eq!(put_ab.status(), StatusCode::OK); + + let put_double = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/slashes-bucket/a//b", + Body::from("double"), + )) + .await + .unwrap(); + assert_eq!(put_double.status(), StatusCode::OK); + + let put_triple = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/slashes-bucket/a///b", + Body::from("triple"), + )) + .await + .unwrap(); + assert_eq!(put_triple.status(), StatusCode::OK); + + let get_ab = app + .clone() + .oneshot(signed_request( + Method::GET, + "/slashes-bucket/a/b", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(get_ab.status(), StatusCode::OK); + let body_ab = get_ab.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body_ab[..], b"single"); + + let get_triple = app + .clone() + .oneshot(signed_request( + Method::GET, + "/slashes-bucket/a///b", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(get_triple.status(), StatusCode::OK); + let body_triple = get_triple.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body_triple[..], b"triple"); + + let list_resp = app + .oneshot(signed_request( + Method::GET, + "/slashes-bucket?list-type=2", + Body::empty(), + )) + .await + .unwrap(); + let list_body = String::from_utf8( + list_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!( + list_body.contains("a/b"), + "expected a/b in listing: {}", + list_body + ); + assert!( + list_body.contains("a//b"), + "expected a//b in listing: {}", + list_body + ); + assert!( + list_body.contains("a///b"), + "expected a///b in listing: {}", + list_body + ); +} + +#[tokio::test] +async fn test_delete_live_version_restores_previous_to_live_slot() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/restore-bucket", Body::empty())) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/restore-bucket?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + let v1_resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/restore-bucket/k", + Body::from("one"), + )) + .await + .unwrap(); + let v1 = v1_resp + .headers() + .get("x-amz-version-id") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let v2_resp = app + .clone() + .oneshot(signed_request( + Method::PUT, + "/restore-bucket/k", + Body::from("two"), + )) + .await + .unwrap(); + let v2 = v2_resp + .headers() + .get("x-amz-version-id") + .unwrap() + .to_str() + .unwrap() + .to_string(); + assert_ne!(v1, v2); + + let del = app + .clone() + .oneshot(signed_request( + Method::DELETE, + &format!("/restore-bucket/k?versionId={}", v2), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(del.status(), StatusCode::NO_CONTENT); + + let get_live = app + .clone() + .oneshot(signed_request( + Method::GET, + "/restore-bucket/k", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(get_live.status(), StatusCode::OK); + let body = get_live.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"one"); + + let get_v1 = app + .oneshot(signed_request( + Method::GET, + &format!("/restore-bucket/k?versionId={}", v1), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(get_v1.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_delete_active_delete_marker_restores_previous_to_live_slot() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/undel-bucket", Body::empty())) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/undel-bucket?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/undel-bucket/k", + Body::from("only"), + )) + .await + .unwrap(); + + let del = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/undel-bucket/k", + Body::empty(), + )) + .await + .unwrap(); + let dm_version = del + .headers() + .get("x-amz-version-id") + .unwrap() + .to_str() + .unwrap() + .to_string(); + assert_eq!( + del.headers() + .get("x-amz-delete-marker") + .and_then(|v| v.to_str().ok()), + Some("true") + ); + + let shadowed = app + .clone() + .oneshot(signed_request( + Method::GET, + "/undel-bucket/k", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(shadowed.status(), StatusCode::NOT_FOUND); + + let del_dm = app + .clone() + .oneshot(signed_request( + Method::DELETE, + &format!("/undel-bucket/k?versionId={}", dm_version), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(del_dm.status(), StatusCode::NO_CONTENT); + + let restored = app + .oneshot(signed_request( + Method::GET, + "/undel-bucket/k", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(restored.status(), StatusCode::OK); + let body = restored.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"only"); +} + +#[tokio::test] +async fn test_versioned_get_on_delete_marker_returns_method_not_allowed() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/dm-bucket", Body::empty())) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/dm-bucket?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/dm-bucket/k", + Body::from("x"), + )) + .await + .unwrap(); + + let del = app + .clone() + .oneshot(signed_request( + Method::DELETE, + "/dm-bucket/k", + Body::empty(), + )) + .await + .unwrap(); + let dm_version = del + .headers() + .get("x-amz-version-id") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let versioned = app + .oneshot(signed_request( + Method::GET, + &format!("/dm-bucket/k?versionId={}", dm_version), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(versioned.status(), StatusCode::METHOD_NOT_ALLOWED); +} + #[tokio::test] async fn test_retention_is_enforced_when_deleting_archived_version() { let (app, _tmp) = test_app(); @@ -2586,9 +2939,16 @@ async fn test_retention_is_enforced_when_deleting_archived_version() { ) .unwrap(); let archived_version_id = list_body - .split("") - .filter_map(|part| part.split_once("").map(|(id, _)| id)) - .find(|id| *id != "null") + .split("") + .skip(1) + .find(|block| block.contains("false")) + .and_then(|block| { + block + .split("") + .nth(1) + .and_then(|s| s.split_once("").map(|(id, _)| id)) + }) + .filter(|id| *id != "null") .expect("archived version id") .to_string(); diff --git a/crates/myfsio-storage/src/error.rs b/crates/myfsio-storage/src/error.rs index 69b11d1..d1789fc 100644 --- a/crates/myfsio-storage/src/error.rs +++ b/crates/myfsio-storage/src/error.rs @@ -50,7 +50,7 @@ impl From for S3Error { S3Error::from_code(S3ErrorCode::NoSuchBucket).with_resource(format!("/{}", name)) } StorageError::BucketAlreadyExists(name) => { - S3Error::from_code(S3ErrorCode::BucketAlreadyExists) + S3Error::from_code(S3ErrorCode::BucketAlreadyOwnedByYou) .with_resource(format!("/{}", name)) } StorageError::BucketNotEmpty(name) => { diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index af6061e..8fcaabf 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -16,6 +16,55 @@ use std::time::Instant; use tokio::io::AsyncReadExt; use uuid::Uuid; +const EMPTY_SEGMENT_SENTINEL: &str = ".__myfsio_empty__"; + +fn fs_encode_key(key: &str) -> String { + if key.is_empty() { + return String::new(); + } + let trailing = key.ends_with('/'); + let body = if trailing { &key[..key.len() - 1] } else { key }; + if body.is_empty() { + return if trailing { "/".to_string() } else { String::new() }; + } + let encoded: Vec = body + .split('/') + .map(|seg| { + if seg.is_empty() { + EMPTY_SEGMENT_SENTINEL.to_string() + } else { + seg.to_string() + } + }) + .collect(); + let mut result = encoded.join("/"); + if trailing { + result.push('/'); + } + result +} + +fn fs_decode_key(rel_path: &str) -> String { + let normalized: String; + let input = if cfg!(windows) && rel_path.contains('\\') { + normalized = rel_path.replace('\\', "/"); + normalized.as_str() + } else { + rel_path + }; + input + .split('/') + .map(|seg| { + if seg == EMPTY_SEGMENT_SENTINEL { + "" + } else { + seg + } + }) + .collect::>() + .join("/") +} + fn validate_list_prefix(prefix: &str) -> StorageResult<()> { if prefix.contains('\0') { return Err(StorageError::InvalidObjectKey( @@ -88,7 +137,7 @@ fn path_is_within(candidate: &Path, root: &Path) -> bool { } } -type ListCacheEntry = (String, u64, f64, Option); +type ListCacheEntry = (String, u64, f64, Option, Option); #[derive(Clone, Default)] struct ShallowCacheEntry { @@ -213,13 +262,27 @@ impl FsStorageBackend { fn object_path(&self, bucket_name: &str, object_key: &str) -> StorageResult { self.validate_key(object_key)?; + let encoded = fs_encode_key(object_key); if object_key.ends_with('/') { + let trimmed = encoded.trim_end_matches('/'); Ok(self .bucket_path(bucket_name) - .join(object_key) + .join(trimmed) .join(DIR_MARKER_FILE)) } else { - Ok(self.bucket_path(bucket_name).join(object_key)) + Ok(self.bucket_path(bucket_name).join(&encoded)) + } + } + + fn object_live_path(&self, bucket_name: &str, object_key: &str) -> PathBuf { + let encoded = fs_encode_key(object_key); + if object_key.ends_with('/') { + let trimmed = encoded.trim_end_matches('/'); + self.bucket_path(bucket_name) + .join(trimmed) + .join(DIR_MARKER_FILE) + } else { + self.bucket_path(bucket_name).join(&encoded) } } @@ -247,7 +310,8 @@ impl FsStorageBackend { fn index_file_for_key(&self, bucket_name: &str, key: &str) -> (PathBuf, String) { let meta_root = self.bucket_meta_root(bucket_name); if key.ends_with('/') { - let trimmed = key.trim_end_matches('/'); + let encoded = fs_encode_key(key); + let trimmed = encoded.trim_end_matches('/'); if trimmed.is_empty() { return (meta_root.join(INDEX_FILE), DIR_MARKER_FILE.to_string()); } @@ -256,13 +320,14 @@ impl FsStorageBackend { DIR_MARKER_FILE.to_string(), ); } - let key_path = Path::new(key); - let entry_name = key_path + let encoded = fs_encode_key(key); + let encoded_path = Path::new(&encoded); + let entry_name = encoded_path .file_name() .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| key.to_string()); + .unwrap_or_else(|| encoded.clone()); - let parent = key_path.parent(); + let parent = encoded_path.parent(); match parent { Some(p) if p != Path::new("") && p != Path::new(".") => { (meta_root.join(p).join(INDEX_FILE), entry_name) @@ -304,6 +369,37 @@ impl FsStorageBackend { out } + fn load_dir_index_full_sync( + &self, + bucket_name: &str, + rel_dir: &Path, + ) -> HashMap, Option)> { + let index_path = self.index_file_for_dir(bucket_name, rel_dir); + if !index_path.exists() { + return HashMap::new(); + } + let Ok(text) = std::fs::read_to_string(&index_path) else { + return HashMap::new(); + }; + let Ok(index) = serde_json::from_str::>(&text) else { + return HashMap::new(); + }; + let mut out = HashMap::with_capacity(index.len()); + for (name, entry) in index { + let meta = entry.get("metadata").and_then(|m| m.as_object()); + let etag = meta + .and_then(|m| m.get("__etag__")) + .and_then(|v| v.as_str()) + .map(ToOwned::to_owned); + let version_id = meta + .and_then(|m| m.get("__version_id__")) + .and_then(|v| v.as_str()) + .map(ToOwned::to_owned); + out.insert(name, (etag, version_id)); + } + out + } + fn get_meta_index_lock(&self, index_path: &str) -> Arc> { self.meta_index_locks .entry(index_path.to_string()) @@ -344,7 +440,9 @@ impl FsStorageBackend { } fn version_dir(&self, bucket_name: &str, key: &str) -> PathBuf { - self.bucket_versions_root(bucket_name).join(key) + let encoded = fs_encode_key(key); + let trimmed = encoded.trim_end_matches('/'); + self.bucket_versions_root(bucket_name).join(trimmed) } fn delete_markers_root(&self, bucket_name: &str) -> PathBuf { @@ -352,8 +450,10 @@ impl FsStorageBackend { } fn delete_marker_path(&self, bucket_name: &str, key: &str) -> PathBuf { + let encoded = fs_encode_key(key); + let trimmed = encoded.trim_end_matches('/'); self.delete_markers_root(bucket_name) - .join(format!("{}.json", key)) + .join(format!("{}.json", trimmed)) } fn read_delete_marker_sync( @@ -804,8 +904,7 @@ impl FsStorageBackend { key: &str, reason: &str, ) -> std::io::Result<(u64, Option)> { - let bucket_path = self.bucket_path(bucket_name); - let source = bucket_path.join(key); + let source = self.object_live_path(bucket_name, key); if !source.exists() { return Ok((0, None)); } @@ -845,6 +944,101 @@ impl FsStorageBackend { Ok((source_size, Some(version_id))) } + fn promote_latest_archived_to_live_sync( + &self, + bucket_name: &str, + key: &str, + ) -> std::io::Result> { + let version_dir = self.version_dir(bucket_name, key); + if !version_dir.exists() { + return Ok(None); + } + + let entries = match std::fs::read_dir(&version_dir) { + Ok(e) => e, + Err(_) => return Ok(None), + }; + + let mut candidates: Vec<(DateTime, String, PathBuf, Value)> = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("json") { + continue; + } + let Ok(content) = std::fs::read_to_string(&path) else { + continue; + }; + let Ok(record) = serde_json::from_str::(&content) else { + continue; + }; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + continue; + } + let version_id = record + .get("version_id") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + if version_id.is_empty() { + continue; + } + let archived_at = record + .get("archived_at") + .and_then(Value::as_str) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|d| d.with_timezone(&Utc)) + .unwrap_or_else(Utc::now); + candidates.push((archived_at, version_id, path, record)); + } + + candidates.sort_by(|a, b| b.0.cmp(&a.0)); + let Some((_, version_id, manifest_path, record)) = candidates.into_iter().next() else { + return Ok(None); + }; + + let (_, data_path) = self.version_record_paths(bucket_name, key, &version_id); + if !data_path.is_file() { + return Ok(None); + } + + let live_path = self.object_live_path(bucket_name, key); + if let Some(parent) = live_path.parent() { + std::fs::create_dir_all(parent)?; + } + if live_path.exists() { + std::fs::remove_file(&live_path).ok(); + } + std::fs::rename(&data_path, &live_path)?; + + let mut meta: HashMap = record + .get("metadata") + .and_then(Value::as_object) + .map(|m| { + m.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect() + }) + .unwrap_or_default(); + meta.insert("__version_id__".to_string(), version_id.clone()); + if !meta.contains_key("__etag__") { + if let Some(etag) = record.get("etag").and_then(Value::as_str) { + if !etag.is_empty() { + meta.insert("__etag__".to_string(), etag.to_string()); + } + } + } + self.write_metadata_sync(bucket_name, key, &meta)?; + + Self::safe_unlink(&manifest_path)?; + Self::cleanup_empty_parents(&manifest_path, &self.bucket_versions_root(bucket_name)); + + Ok(Some(version_id)) + } + fn write_delete_marker_sync( &self, bucket_name: &str, @@ -918,8 +1112,13 @@ impl FsStorageBackend { self.require_bucket(bucket_name)?; self.validate_key(key)?; Self::validate_version_id(bucket_name, key, version_id)?; + + if let Some(record_and_path) = self.try_live_version_record_sync(bucket_name, key, version_id) { + return Ok(record_and_path); + } + let (manifest_path, data_path) = self.version_record_paths(bucket_name, key, version_id); - if !manifest_path.is_file() || !data_path.is_file() { + if !manifest_path.is_file() { return Err(StorageError::VersionNotFound { bucket: bucket_name.to_string(), key: key.to_string(), @@ -929,9 +1128,64 @@ impl FsStorageBackend { let content = std::fs::read_to_string(&manifest_path).map_err(StorageError::Io)?; let record = serde_json::from_str::(&content).map_err(StorageError::Json)?; + let is_delete_marker = record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false); + if !is_delete_marker && !data_path.is_file() { + return Err(StorageError::VersionNotFound { + bucket: bucket_name.to_string(), + key: key.to_string(), + version_id: version_id.to_string(), + }); + } Ok((record, data_path)) } + fn try_live_version_record_sync( + &self, + bucket_name: &str, + key: &str, + version_id: &str, + ) -> Option<(Value, PathBuf)> { + let live_path = self.object_live_path(bucket_name, key); + if !live_path.is_file() { + return None; + } + let metadata = self.read_metadata_sync(bucket_name, key); + let live_version = metadata.get("__version_id__")?.clone(); + if live_version != version_id { + return None; + } + let file_meta = std::fs::metadata(&live_path).ok()?; + let mtime = file_meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let archived_at = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); + let etag = metadata.get("__etag__").cloned().unwrap_or_default(); + let mut meta_json = serde_json::Map::new(); + for (k, v) in &metadata { + meta_json.insert(k.clone(), Value::String(v.clone())); + } + let record = serde_json::json!({ + "version_id": live_version, + "key": key, + "size": file_meta.len(), + "archived_at": archived_at.to_rfc3339(), + "etag": etag, + "metadata": Value::Object(meta_json), + "reason": "current", + "is_delete_marker": false, + }); + Some((record, live_path)) + } + fn version_metadata_from_record(record: &Value) -> HashMap { record .get("metadata") @@ -1125,7 +1379,8 @@ impl FsStorageBackend { let bucket_path = self.require_bucket(bucket_name)?; let mut all_keys: Vec = Vec::new(); - let mut dir_etag_cache: HashMap> = HashMap::new(); + let mut dir_idx_cache: HashMap, Option)>> = + HashMap::new(); let internal = INTERNAL_FOLDERS; let bucket_str = bucket_path.to_string_lossy().to_string(); let bucket_prefix_len = bucket_str.len() + 1; @@ -1150,12 +1405,16 @@ impl FsStorageBackend { stack.push(entry.path().to_string_lossy().to_string()); } else if ft.is_file() { let full_path = entry.path().to_string_lossy().to_string(); - let mut key = full_path[bucket_prefix_len..].replace('\\', "/"); + let mut fs_rel = full_path[bucket_prefix_len..].to_string(); + #[cfg(windows)] + { + fs_rel = fs_rel.replace('\\', "/"); + } let is_dir_marker = name_str.as_ref() == DIR_MARKER_FILE; if is_dir_marker { - key = key + fs_rel = fs_rel .strip_suffix(DIR_MARKER_FILE) - .unwrap_or(&key) + .unwrap_or(&fs_rel) .to_string(); } if let Ok(meta) = entry.metadata() { @@ -1166,20 +1425,23 @@ impl FsStorageBackend { .map(|d| d.as_secs_f64()) .unwrap_or(0.0); - let rel_dir = Path::new(&key) + let rel_dir = Path::new(&fs_rel) .parent() .map(|p| p.to_path_buf()) .unwrap_or_default(); - let etags = dir_etag_cache - .entry(rel_dir.clone()) - .or_insert_with(|| self.load_dir_index_sync(bucket_name, &rel_dir)); - let etag = if is_dir_marker { - None + let idx = dir_idx_cache.entry(rel_dir.clone()).or_insert_with(|| { + self.load_dir_index_full_sync(bucket_name, &rel_dir) + }); + let (etag, version_id) = if is_dir_marker { + (None, None) } else { - etags.get(name_str.as_ref()).cloned() + idx.get(name_str.as_ref()) + .cloned() + .unwrap_or((None, None)) }; - all_keys.push((key, meta.len(), mtime, etag)); + let key = fs_decode_key(&fs_rel); + all_keys.push((key, meta.len(), mtime, etag, version_id)); } } } @@ -1256,13 +1518,14 @@ impl FsStorageBackend { let objects: Vec = prefix_filter[start_idx..end_idx] .iter() - .map(|(key, size, mtime, etag)| { + .map(|(key, size, mtime, etag, version_id)| { let lm = Utc .timestamp_opt(*mtime as i64, ((*mtime % 1.0) * 1_000_000_000.0) as u32) .single() .unwrap_or_else(Utc::now); let mut obj = ObjectMeta::new(key.clone(), *size, lm); obj.etag = etag.clone(); + obj.version_id = version_id.clone(); obj }) .collect(); @@ -1307,11 +1570,16 @@ impl FsStorageBackend { let rel_dir_prefix = if rel_dir.as_os_str().is_empty() { String::new() } else { - let mut s = rel_dir.to_string_lossy().replace('\\', "/"); - if !s.ends_with('/') { - s.push('/'); + let mut s = rel_dir.to_string_lossy().into_owned(); + #[cfg(windows)] + { + s = s.replace('\\', "/"); } - s + let mut decoded = fs_decode_key(&s); + if !decoded.ends_with('/') { + decoded.push('/'); + } + decoded }; let entries = std::fs::read_dir(&target_dir).map_err(StorageError::Io)?; @@ -1328,6 +1596,7 @@ impl FsStorageBackend { Err(_) => continue, }; + let display_name = fs_decode_key(&name_str); if ft.is_dir() { let subdir_path = entry.path(); let marker_path = subdir_path.join(DIR_MARKER_FILE); @@ -1344,7 +1613,7 @@ impl FsStorageBackend { .single() .unwrap_or_else(Utc::now); let mut obj = ObjectMeta::new( - format!("{}{}/", rel_dir_prefix, name_str), + format!("{}{}/", rel_dir_prefix, display_name), meta.len(), lm, ); @@ -1352,12 +1621,12 @@ impl FsStorageBackend { files.push(obj); } } - dirs.push(format!("{}{}{}", rel_dir_prefix, name_str, delimiter)); + dirs.push(format!("{}{}{}", rel_dir_prefix, display_name, delimiter)); } else if ft.is_file() { if name_str == DIR_MARKER_FILE { continue; } - let rel = format!("{}{}", rel_dir_prefix, name_str); + let rel = format!("{}{}", rel_dir_prefix, display_name); if let Ok(meta) = entry.metadata() { let mtime = meta .modified() @@ -1431,7 +1700,8 @@ impl FsStorageBackend { PathBuf::new() } else { validate_list_prefix(¶ms.prefix)?; - let prefix_path = Path::new(¶ms.prefix); + let encoded_prefix = fs_encode_key(¶ms.prefix); + let prefix_path = Path::new(&encoded_prefix); if params.prefix.ends_with(¶ms.delimiter) { prefix_path.to_path_buf() } else { @@ -1524,8 +1794,8 @@ impl FsStorageBackend { new_size: u64, metadata: Option>, ) -> StorageResult { - let bucket_path = self.require_bucket(bucket_name)?; - let destination = bucket_path.join(key); + self.require_bucket(bucket_name)?; + let destination = self.object_live_path(bucket_name, key); if let Some(parent) = destination.parent() { std::fs::create_dir_all(parent).map_err(StorageError::Io)?; } @@ -1788,7 +2058,9 @@ impl crate::traits::StorageEngine for FsStorageBackend { drop(writer); let etag = format!("{:x}", hasher.finalize()); - self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) + run_blocking(|| { + self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) + }) } async fn get_object( @@ -1796,49 +2068,52 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { - self.require_bucket(bucket)?; - let path = self.object_path(bucket, key)?; - if !path.is_file() { - if self.read_bucket_config_sync(bucket).versioning_enabled { - if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { - return Err(StorageError::DeleteMarker { - bucket: bucket.to_string(), - key: key.to_string(), - version_id: dm_version_id, - }); + let (obj, path) = run_blocking(|| -> StorageResult<(ObjectMeta, PathBuf)> { + self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_enabled { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } } + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); } - return Err(StorageError::ObjectNotFound { - bucket: bucket.to_string(), - key: key.to_string(), - }); - } - let meta = std::fs::metadata(&path).map_err(StorageError::Io)?; - let mtime = meta - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_secs_f64()) - .unwrap_or(0.0); - let lm = Utc - .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) - .single() - .unwrap_or_else(Utc::now); + let meta = std::fs::metadata(&path).map_err(StorageError::Io)?; + let mtime = meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); - let stored_meta = self.read_metadata_sync(bucket, key); - let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); - obj.etag = stored_meta.get("__etag__").cloned(); - obj.content_type = stored_meta.get("__content_type__").cloned(); - obj.storage_class = stored_meta - .get("__storage_class__") - .cloned() - .or_else(|| Some("STANDARD".to_string())); - obj.version_id = stored_meta.get("__version_id__").cloned(); - obj.metadata = stored_meta - .into_iter() - .filter(|(k, _)| !k.starts_with("__")) - .collect(); + let stored_meta = self.read_metadata_sync(bucket, key); + let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); + obj.etag = stored_meta.get("__etag__").cloned(); + obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); + obj.metadata = stored_meta + .into_iter() + .filter(|(k, _)| !k.starts_with("__")) + .collect(); + Ok((obj, path)) + })?; let file = tokio::fs::File::open(&path) .await @@ -1869,50 +2144,52 @@ impl crate::traits::StorageEngine for FsStorageBackend { } async fn head_object(&self, bucket: &str, key: &str) -> StorageResult { - self.require_bucket(bucket)?; - let path = self.object_path(bucket, key)?; - if !path.is_file() { - if self.read_bucket_config_sync(bucket).versioning_enabled { - if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { - return Err(StorageError::DeleteMarker { - bucket: bucket.to_string(), - key: key.to_string(), - version_id: dm_version_id, - }); + run_blocking(|| { + self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + if !path.is_file() { + if self.read_bucket_config_sync(bucket).versioning_enabled { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + return Err(StorageError::DeleteMarker { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: dm_version_id, + }); + } } + return Err(StorageError::ObjectNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + }); } - return Err(StorageError::ObjectNotFound { - bucket: bucket.to_string(), - key: key.to_string(), - }); - } - let meta = std::fs::metadata(&path).map_err(StorageError::Io)?; - let mtime = meta - .modified() - .ok() - .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) - .map(|d| d.as_secs_f64()) - .unwrap_or(0.0); - let lm = Utc - .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) - .single() - .unwrap_or_else(Utc::now); + let meta = std::fs::metadata(&path).map_err(StorageError::Io)?; + let mtime = meta + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + let lm = Utc + .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) + .single() + .unwrap_or_else(Utc::now); - let stored_meta = self.read_metadata_sync(bucket, key); - let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); - obj.etag = stored_meta.get("__etag__").cloned(); - obj.content_type = stored_meta.get("__content_type__").cloned(); - obj.storage_class = stored_meta - .get("__storage_class__") - .cloned() - .or_else(|| Some("STANDARD".to_string())); - obj.version_id = stored_meta.get("__version_id__").cloned(); - obj.metadata = stored_meta - .into_iter() - .filter(|(k, _)| !k.starts_with("__")) - .collect(); - Ok(obj) + let stored_meta = self.read_metadata_sync(bucket, key); + let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); + obj.etag = stored_meta.get("__etag__").cloned(); + obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.version_id = stored_meta.get("__version_id__").cloned(); + obj.metadata = stored_meta + .into_iter() + .filter(|(k, _)| !k.starts_with("__")) + .collect(); + Ok(obj) + }) } async fn get_object_version( @@ -1922,6 +2199,15 @@ impl crate::traits::StorageEngine for FsStorageBackend { version_id: &str, ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } let obj = self.object_meta_from_version_record(key, &record, &data_path)?; let file = tokio::fs::File::open(&data_path) .await @@ -1936,7 +2222,16 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult { - let (_record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } Ok(data_path) } @@ -1947,6 +2242,15 @@ impl crate::traits::StorageEngine for FsStorageBackend { version_id: &str, ) -> StorageResult { let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + if record + .get("is_delete_marker") + .and_then(Value::as_bool) + .unwrap_or(false) + { + return Err(StorageError::MethodNotAllowed( + "The specified method is not allowed against a delete marker".to_string(), + )); + } self.object_meta_from_version_record(key, &record, &data_path) } @@ -1961,44 +2265,46 @@ impl crate::traits::StorageEngine for FsStorageBackend { } async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult { - let bucket_path = self.require_bucket(bucket)?; - let path = self.object_path(bucket, key)?; - let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled; + run_blocking(|| { + let bucket_path = self.require_bucket(bucket)?; + let path = self.object_path(bucket, key)?; + let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled; - if versioning_enabled { - if path.exists() { - self.archive_current_version_sync(bucket, key, "delete") + if versioning_enabled { + if path.exists() { + self.archive_current_version_sync(bucket, key, "delete") + .map_err(StorageError::Io)?; + Self::safe_unlink(&path).map_err(StorageError::Io)?; + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io)?; + Self::cleanup_empty_parents(&path, &bucket_path); + } + let dm_version_id = self + .write_delete_marker_sync(bucket, key) .map_err(StorageError::Io)?; - Self::safe_unlink(&path).map_err(StorageError::Io)?; - self.delete_metadata_sync(bucket, key) - .map_err(StorageError::Io)?; - Self::cleanup_empty_parents(&path, &bucket_path); + self.invalidate_bucket_caches(bucket); + return Ok(DeleteOutcome { + version_id: Some(dm_version_id), + is_delete_marker: true, + existed: true, + }); } - let dm_version_id = self - .write_delete_marker_sync(bucket, key) + + if !path.exists() { + return Ok(DeleteOutcome::default()); + } + + Self::safe_unlink(&path).map_err(StorageError::Io)?; + self.delete_metadata_sync(bucket, key) .map_err(StorageError::Io)?; + + Self::cleanup_empty_parents(&path, &bucket_path); self.invalidate_bucket_caches(bucket); - return Ok(DeleteOutcome { - version_id: Some(dm_version_id), - is_delete_marker: true, + Ok(DeleteOutcome { + version_id: None, + is_delete_marker: false, existed: true, - }); - } - - if !path.exists() { - return Ok(DeleteOutcome::default()); - } - - Self::safe_unlink(&path).map_err(StorageError::Io)?; - self.delete_metadata_sync(bucket, key) - .map_err(StorageError::Io)?; - - Self::cleanup_empty_parents(&path, &bucket_path); - self.invalidate_bucket_caches(bucket); - Ok(DeleteOutcome { - version_id: None, - is_delete_marker: false, - existed: true, + }) }) } @@ -2008,46 +2314,75 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, version_id: &str, ) -> StorageResult { - self.require_bucket(bucket)?; - self.validate_key(key)?; - Self::validate_version_id(bucket, key, version_id)?; - let (manifest_path, data_path) = self.version_record_paths(bucket, key, version_id); - if !manifest_path.is_file() && !data_path.is_file() { - return Err(StorageError::VersionNotFound { - bucket: bucket.to_string(), - key: key.to_string(), - version_id: version_id.to_string(), - }); - } + run_blocking(|| { + let bucket_path = self.require_bucket(bucket)?; + self.validate_key(key)?; + Self::validate_version_id(bucket, key, version_id)?; - let is_delete_marker = if manifest_path.is_file() { - std::fs::read_to_string(&manifest_path) - .ok() - .and_then(|content| serde_json::from_str::(&content).ok()) - .and_then(|record| record.get("is_delete_marker").and_then(Value::as_bool)) - .unwrap_or(false) - } else { - false - }; - - Self::safe_unlink(&data_path).map_err(StorageError::Io)?; - Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; - let versions_root = self.bucket_versions_root(bucket); - Self::cleanup_empty_parents(&manifest_path, &versions_root); - - if is_delete_marker { - if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { - if dm_version_id == version_id { - self.clear_delete_marker_sync(bucket, key); + let live_path = self.object_live_path(bucket, key); + if live_path.is_file() { + let metadata = self.read_metadata_sync(bucket, key); + if metadata.get("__version_id__").map(String::as_str) == Some(version_id) { + Self::safe_unlink(&live_path).map_err(StorageError::Io)?; + self.delete_metadata_sync(bucket, key) + .map_err(StorageError::Io)?; + Self::cleanup_empty_parents(&live_path, &bucket_path); + self.promote_latest_archived_to_live_sync(bucket, key) + .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); + return Ok(DeleteOutcome { + version_id: Some(version_id.to_string()), + is_delete_marker: false, + existed: true, + }); } } - } - self.invalidate_bucket_caches(bucket); - Ok(DeleteOutcome { - version_id: Some(version_id.to_string()), - is_delete_marker, - existed: true, + let (manifest_path, data_path) = self.version_record_paths(bucket, key, version_id); + if !manifest_path.is_file() && !data_path.is_file() { + return Err(StorageError::VersionNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: version_id.to_string(), + }); + } + + let is_delete_marker = if manifest_path.is_file() { + std::fs::read_to_string(&manifest_path) + .ok() + .and_then(|content| serde_json::from_str::(&content).ok()) + .and_then(|record| record.get("is_delete_marker").and_then(Value::as_bool)) + .unwrap_or(false) + } else { + false + }; + + Self::safe_unlink(&data_path).map_err(StorageError::Io)?; + Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; + let versions_root = self.bucket_versions_root(bucket); + Self::cleanup_empty_parents(&manifest_path, &versions_root); + + let mut was_active_dm = false; + if is_delete_marker { + if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) { + if dm_version_id == version_id { + self.clear_delete_marker_sync(bucket, key); + was_active_dm = true; + } + } + } + + if was_active_dm && !live_path.is_file() { + self.promote_latest_archived_to_live_sync(bucket, key) + .map_err(StorageError::Io)?; + } + + self.invalidate_bucket_caches(bucket); + Ok(DeleteOutcome { + version_id: Some(version_id.to_string()), + is_delete_marker, + existed: true, + }) }) } @@ -2076,7 +2411,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, key: &str, ) -> StorageResult> { - Ok(self.read_metadata_sync(bucket, key)) + Ok(run_blocking(|| self.read_metadata_sync(bucket, key))) } async fn put_object_metadata( @@ -2085,16 +2420,18 @@ impl crate::traits::StorageEngine for FsStorageBackend { key: &str, metadata: &HashMap, ) -> StorageResult<()> { - let mut entry = self.read_index_entry_sync(bucket, key).unwrap_or_default(); - let meta_map: serde_json::Map = metadata - .iter() - .map(|(k, v)| (k.clone(), Value::String(v.clone()))) - .collect(); - entry.insert("metadata".to_string(), Value::Object(meta_map)); - self.write_index_entry_sync(bucket, key, &entry) - .map_err(StorageError::Io)?; - self.invalidate_bucket_caches(bucket); - Ok(()) + run_blocking(|| { + let mut entry = self.read_index_entry_sync(bucket, key).unwrap_or_default(); + let meta_map: serde_json::Map = metadata + .iter() + .map(|(k, v)| (k.clone(), Value::String(v.clone()))) + .collect(); + entry.insert("metadata".to_string(), Value::Object(meta_map)); + self.write_index_entry_sync(bucket, key, &entry) + .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); + Ok(()) + }) } async fn list_objects( @@ -2607,7 +2944,14 @@ impl crate::traits::StorageEngine for FsStorageBackend { let fallback_key = path .parent() .and_then(|parent| parent.strip_prefix(&root).ok()) - .map(|rel| rel.to_string_lossy().replace('\\', "/")) + .map(|rel| { + let mut s = rel.to_string_lossy().into_owned(); + #[cfg(windows)] + { + s = s.replace('\\', "/"); + } + fs_decode_key(&s) + }) .unwrap_or_default(); let info = self.version_info_from_record(&fallback_key, &record); if prefix.is_some_and(|value| !info.key.starts_with(value)) { diff --git a/crates/myfsio-storage/src/validation.rs b/crates/myfsio-storage/src/validation.rs index a5688b5..14030ae 100644 --- a/crates/myfsio-storage/src/validation.rs +++ b/crates/myfsio-storage/src/validation.rs @@ -47,6 +47,7 @@ pub fn validate_object_key( normalized.split('/').collect() }; + for part in &parts { if part.is_empty() { continue; @@ -105,7 +106,10 @@ pub fn validate_object_key( } for part in &non_empty_parts { - if *part == ".__myfsio_dirobj__" || part.starts_with("_index.json") { + if *part == ".__myfsio_dirobj__" + || *part == ".__myfsio_empty__" + || part.starts_with("_index.json") + { return Some("Object key segment uses a reserved internal name".to_string()); } } diff --git a/crates/myfsio-xml/src/request.rs b/crates/myfsio-xml/src/request.rs index cf7ff72..dc53d8c 100644 --- a/crates/myfsio-xml/src/request.rs +++ b/crates/myfsio-xml/src/request.rs @@ -86,6 +86,11 @@ pub fn parse_complete_multipart_upload(xml: &str) -> Result Result { + let trimmed = xml.trim(); + if trimmed.is_empty() { + return Err("Request body is empty".to_string()); + } + let mut reader = Reader::from_str(xml); let mut result = DeleteObjectsRequest::default(); let mut buf = Vec::new(); @@ -93,18 +98,43 @@ pub fn parse_delete_objects(xml: &str) -> Result { let mut current_key: Option = None; let mut current_version_id: Option = None; let mut in_object = false; + let mut saw_delete_root = false; + let mut first_element_seen = false; loop { - match reader.read_event_into(&mut buf) { + let event = reader.read_event_into(&mut buf); + match event { Ok(Event::Start(ref e)) => { let name = String::from_utf8_lossy(e.name().as_ref()).to_string(); current_tag = name.clone(); - if name == "Object" { + if !first_element_seen { + first_element_seen = true; + if name != "Delete" { + return Err(format!( + "Expected root element, found <{}>", + name + )); + } + saw_delete_root = true; + } else if name == "Object" { in_object = true; current_key = None; current_version_id = None; } } + Ok(Event::Empty(ref e)) => { + let name = String::from_utf8_lossy(e.name().as_ref()).to_string(); + if !first_element_seen { + first_element_seen = true; + if name != "Delete" { + return Err(format!( + "Expected root element, found <{}>", + name + )); + } + saw_delete_root = true; + } + } Ok(Event::Text(ref e)) => { let text = e.unescape().map_err(|e| e.to_string())?.to_string(); match current_tag.as_str() { @@ -139,6 +169,13 @@ pub fn parse_delete_objects(xml: &str) -> Result { buf.clear(); } + if !saw_delete_root { + return Err("Expected root element".to_string()); + } + if result.objects.is_empty() { + return Err("Delete request must contain at least one ".to_string()); + } + Ok(result) } diff --git a/crates/myfsio-xml/src/response.rs b/crates/myfsio-xml/src/response.rs index dcf84b2..ccf0480 100644 --- a/crates/myfsio-xml/src/response.rs +++ b/crates/myfsio-xml/src/response.rs @@ -8,10 +8,21 @@ pub fn format_s3_datetime(dt: &DateTime) -> String { dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() } -pub fn rate_limit_exceeded_xml() -> String { - "\ -SlowDownRate limit exceeded" - .to_string() +pub fn rate_limit_exceeded_xml(resource: &str, request_id: &str) -> String { + format!( + "\ +SlowDownPlease reduce your request rate{}{}", + xml_escape(resource), + xml_escape(request_id), + ) +} + +fn xml_escape(s: &str) -> String { + s.replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace('\'', "'") } pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String {