From 9ec5797919c1928da1edc3bf18865f7cef7219e5 Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 22 Apr 2026 00:12:22 +0800 Subject: [PATCH] Applied max-keys to combined current + archived ListObjectVersions output and reports truncation --- README.md | 2 +- docs.md | 4 +- python/README.md | 14 + python/app/__init__.py | 2 +- python/run.py | 7 + python/tests/test_website_hosting.py | 2 +- rust/myfsio-engine/Cargo.lock | 2 + .../crates/myfsio-common/src/error.rs | 8 + .../crates/myfsio-common/src/types.rs | 2 + .../crates/myfsio-server/Cargo.toml | 2 + .../myfsio-server/src/handlers/config.rs | 68 +- .../crates/myfsio-server/src/handlers/mod.rs | 848 ++++++++++++++++-- .../myfsio-server/src/handlers/ui_api.rs | 1 + .../crates/myfsio-server/src/lib.rs | 2 +- .../myfsio-server/src/middleware/auth.rs | 99 +- .../crates/myfsio-server/tests/integration.rs | 441 ++++++++- .../crates/myfsio-storage/src/error.rs | 12 + .../crates/myfsio-storage/src/fs_backend.rs | 310 ++++++- .../crates/myfsio-storage/src/traits.rs | 41 + 19 files changed, 1750 insertions(+), 117 deletions(-) create mode 100644 python/README.md diff --git a/README.md b/README.md index b9a6318..8d59b81 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ MyFSIO is an S3-compatible object storage server with a Rust runtime and a filesystem-backed storage engine. The active server lives under `rust/myfsio-engine` and serves both the S3 API and the built-in web UI from a single process. -The repository still contains a `python/` tree, but you do not need Python to run the current server. +The `python/` implementation is deprecated as of 2026-04-21. It remains in the repository for migration reference and legacy tests, but new development and supported runtime usage should target the Rust server. ## Features diff --git a/docs.md b/docs.md index b2438b7..7b236a1 100644 --- a/docs.md +++ b/docs.md @@ -2,6 +2,8 @@ This document describes the current Rust server in `rust/myfsio-engine`. It replaces the older Python-oriented runbook. +The `python/` implementation is deprecated as of 2026-04-21. It is retained for migration reference and legacy validation only; production usage and new development should use the Rust server. + ## 1. What Changed The active runtime is now Rust: @@ -11,7 +13,7 @@ The active runtime is now Rust: - The main development workflow is `cargo run -p myfsio-server --`. - API-only mode is controlled with `UI_ENABLED=false`. -The `python/` directory may still contain older implementation code, templates, and tests, but it is not required to run the current server. +The deprecated `python/` directory may still contain older implementation code, templates, and tests, but it is not required to run the current server. ## 2. Quick Start diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..853b88a --- /dev/null +++ b/python/README.md @@ -0,0 +1,14 @@ +# Deprecated Python Implementation + +The Python implementation of MyFSIO is deprecated as of 2026-04-21. + +The supported server runtime now lives in `../rust/myfsio-engine` and serves the S3 API and web UI from the Rust `myfsio-server` binary. Keep this tree for migration reference, compatibility checks, and legacy tests only. + +For normal development and operations, run: + +```bash +cd ../rust/myfsio-engine +cargo run -p myfsio-server -- +``` + +Do not add new product features to the Python implementation unless they are needed to unblock a migration or compare behavior with the Rust server. diff --git a/python/app/__init__.py b/python/app/__init__.py index f37a33a..fc465fb 100644 --- a/python/app/__init__.py +++ b/python/app/__init__.py @@ -720,7 +720,7 @@ def _configure_logging(app: Flask) -> None: def _website_error_response(status_code, message): if status_code == 404: - body = "404 page not found" + body = "

404 page not found

" else: body = f"{status_code} {message}" return Response(body, status=status_code, mimetype="text/html") diff --git a/python/run.py b/python/run.py index 40ab540..1e3c0c0 100644 --- a/python/run.py +++ b/python/run.py @@ -28,6 +28,11 @@ from app.config import AppConfig from app.iam import IamService, IamError, ALLOWED_ACTIONS, _derive_fernet_key from app.version import get_version +PYTHON_DEPRECATION_MESSAGE = ( + "The Python MyFSIO runtime is deprecated as of 2026-04-21. " + "Use the Rust server in rust/myfsio-engine for supported development and production usage." +) + def _server_host() -> str: """Return the bind host for API and UI servers.""" @@ -233,6 +238,8 @@ if __name__ == "__main__": parser.add_argument("--version", action="version", version=f"MyFSIO {get_version()}") args = parser.parse_args() + warnings.warn(PYTHON_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=1) + if args.reset_cred or args.mode == "reset-cred": reset_credentials() sys.exit(0) diff --git a/python/tests/test_website_hosting.py b/python/tests/test_website_hosting.py index ae000b9..a12e65f 100644 --- a/python/tests/test_website_hosting.py +++ b/python/tests/test_website_hosting.py @@ -439,4 +439,4 @@ class TestWebsiteServing: store.set_mapping("noerr.example.com", "no-err") resp = website_client.get("/missing.html", headers={"Host": "noerr.example.com"}) assert resp.status_code == 404 - assert resp.data == b"404 page not found" + assert resp.data == b"

404 page not found

" diff --git a/rust/myfsio-engine/Cargo.lock b/rust/myfsio-engine/Cargo.lock index d8c061c..1c0817f 100644 --- a/rust/myfsio-engine/Cargo.lock +++ b/rust/myfsio-engine/Cargo.lock @@ -2707,6 +2707,7 @@ dependencies = [ "futures", "http-body-util", "hyper 1.9.0", + "md-5 0.10.6", "mime_guess", "multer", "myfsio-auth", @@ -2723,6 +2724,7 @@ dependencies = [ "roxmltree", "serde", "serde_json", + "sha2 0.10.9", "subtle", "sysinfo", "tempfile", diff --git a/rust/myfsio-engine/crates/myfsio-common/src/error.rs b/rust/myfsio-engine/crates/myfsio-common/src/error.rs index 64e9110..5785d5f 100644 --- a/rust/myfsio-engine/crates/myfsio-common/src/error.rs +++ b/rust/myfsio-engine/crates/myfsio-common/src/error.rs @@ -3,6 +3,7 @@ use std::fmt; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum S3ErrorCode { AccessDenied, + BadDigest, BucketAlreadyExists, BucketNotEmpty, EntityTooLarge, @@ -14,6 +15,7 @@ pub enum S3ErrorCode { InvalidPolicyDocument, InvalidRange, InvalidRequest, + InvalidTag, MalformedXML, MethodNotAllowed, NoSuchBucket, @@ -32,6 +34,7 @@ impl S3ErrorCode { pub fn http_status(&self) -> u16 { match self { Self::AccessDenied => 403, + Self::BadDigest => 400, Self::BucketAlreadyExists => 409, Self::BucketNotEmpty => 409, Self::EntityTooLarge => 413, @@ -43,6 +46,7 @@ impl S3ErrorCode { Self::InvalidPolicyDocument => 400, Self::InvalidRange => 416, Self::InvalidRequest => 400, + Self::InvalidTag => 400, Self::MalformedXML => 400, Self::MethodNotAllowed => 405, Self::NoSuchBucket => 404, @@ -61,6 +65,7 @@ impl S3ErrorCode { pub fn as_str(&self) -> &'static str { match self { Self::AccessDenied => "AccessDenied", + Self::BadDigest => "BadDigest", Self::BucketAlreadyExists => "BucketAlreadyExists", Self::BucketNotEmpty => "BucketNotEmpty", Self::EntityTooLarge => "EntityTooLarge", @@ -72,6 +77,7 @@ impl S3ErrorCode { Self::InvalidPolicyDocument => "InvalidPolicyDocument", Self::InvalidRange => "InvalidRange", Self::InvalidRequest => "InvalidRequest", + Self::InvalidTag => "InvalidTag", Self::MalformedXML => "MalformedXML", Self::MethodNotAllowed => "MethodNotAllowed", Self::NoSuchBucket => "NoSuchBucket", @@ -90,6 +96,7 @@ impl S3ErrorCode { pub fn default_message(&self) -> &'static str { match self { Self::AccessDenied => "Access Denied", + Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received", Self::BucketAlreadyExists => "The requested bucket name is not available", Self::BucketNotEmpty => "The bucket you tried to delete is not empty", Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size", @@ -101,6 +108,7 @@ impl S3ErrorCode { Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document", Self::InvalidRange => "The requested range is not satisfiable", Self::InvalidRequest => "Invalid request", + Self::InvalidTag => "The Tagging header is invalid", Self::MalformedXML => "The XML you provided was not well-formed", Self::MethodNotAllowed => "The specified method is not allowed against this resource", Self::NoSuchBucket => "The specified bucket does not exist", diff --git a/rust/myfsio-engine/crates/myfsio-common/src/types.rs b/rust/myfsio-engine/crates/myfsio-common/src/types.rs index a07d565..91195f7 100644 --- a/rust/myfsio-engine/crates/myfsio-common/src/types.rs +++ b/rust/myfsio-engine/crates/myfsio-common/src/types.rs @@ -112,6 +112,8 @@ pub struct VersionInfo { pub last_modified: DateTime, pub etag: Option, pub is_latest: bool, + #[serde(default)] + pub is_delete_marker: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml index 6a1092b..27d8f75 100644 --- a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml +++ b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml @@ -10,6 +10,7 @@ myfsio-crypto = { path = "../myfsio-crypto" } myfsio-storage = { path = "../myfsio-storage" } myfsio-xml = { path = "../myfsio-xml" } base64 = { workspace = true } +md-5 = { workspace = true } axum = { workspace = true } tokio = { workspace = true } tower = { workspace = true } @@ -29,6 +30,7 @@ percent-encoding = { workspace = true } quick-xml = { workspace = true } mime_guess = "2" crc32fast = { workspace = true } +sha2 = { workspace = true } duckdb = { workspace = true } roxmltree = "0.20" parking_lot = { workspace = true } diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs index 973c4db..5bb6163 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs @@ -1038,7 +1038,12 @@ fn s3_error_response(code: S3ErrorCode, message: &str, status: StatusCode) -> Re (status, [("content-type", "application/xml")], err.to_xml()).into_response() } -pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { +pub async fn list_object_versions( + state: &AppState, + bucket: &str, + prefix: Option<&str>, + max_keys: usize, +) -> Response { match state.storage.list_buckets().await { Ok(buckets) => { if !buckets.iter().any(|b| b.name == bucket) { @@ -1050,13 +1055,24 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { Err(e) => return storage_err(e), } + let fetch_limit = max_keys.saturating_add(1).max(1); let params = myfsio_common::types::ListParams { - max_keys: 1000, + max_keys: fetch_limit, + prefix: prefix.map(ToOwned::to_owned), ..Default::default() }; - let objects = match state.storage.list_objects(bucket, ¶ms).await { - Ok(result) => result.objects, + let object_result = match state.storage.list_objects(bucket, ¶ms).await { + Ok(result) => result, + Err(e) => return storage_err(e), + }; + let objects = object_result.objects; + let archived_versions = match state + .storage + .list_bucket_object_versions(bucket, prefix) + .await + { + Ok(versions) => versions, Err(e) => return storage_err(e), }; @@ -1064,11 +1080,24 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { "\ ", ); - xml.push_str(&format!("{}", bucket)); + xml.push_str(&format!("{}", xml_escape(bucket))); + xml.push_str(&format!( + "{}", + xml_escape(prefix.unwrap_or("")) + )); + xml.push_str(&format!("{}", max_keys)); - for obj in &objects { + let current_count = objects.len().min(max_keys); + let remaining = max_keys.saturating_sub(current_count); + let archived_count = archived_versions.len().min(remaining); + let is_truncated = object_result.is_truncated + || objects.len() > current_count + || archived_versions.len() > archived_count; + xml.push_str(&format!("{}", is_truncated)); + + for obj in objects.iter().take(current_count) { xml.push_str(""); - xml.push_str(&format!("{}", obj.key)); + xml.push_str(&format!("{}", xml_escape(&obj.key))); xml.push_str("null"); xml.push_str("true"); xml.push_str(&format!( @@ -1076,9 +1105,32 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { myfsio_xml::response::format_s3_datetime(&obj.last_modified) )); if let Some(ref etag) = obj.etag { - xml.push_str(&format!("\"{}\"", etag)); + xml.push_str(&format!("\"{}\"", xml_escape(etag))); } xml.push_str(&format!("{}", obj.size)); + xml.push_str(&format!( + "{}", + xml_escape(obj.storage_class.as_deref().unwrap_or("STANDARD")) + )); + xml.push_str(""); + } + + for version in archived_versions.iter().take(archived_count) { + xml.push_str(""); + xml.push_str(&format!("{}", xml_escape(&version.key))); + xml.push_str(&format!( + "{}", + xml_escape(&version.version_id) + )); + xml.push_str("false"); + xml.push_str(&format!( + "{}", + myfsio_xml::response::format_s3_datetime(&version.last_modified) + )); + if let Some(ref etag) = version.etag { + xml.push_str(&format!("\"{}\"", xml_escape(etag))); + } + xml.push_str(&format!("{}", version.size)); xml.push_str("STANDARD"); xml.push_str(""); } diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs index de6f02e..45d1b51 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs @@ -13,10 +13,13 @@ use axum::body::Body; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; -use base64::engine::general_purpose::URL_SAFE; +use base64::engine::general_purpose::{STANDARD, URL_SAFE}; use base64::Engine; use chrono::{DateTime, Utc}; +use md5::Md5; +use percent_encoding::percent_decode_str; use serde_json::json; +use sha2::{Digest, Sha256}; use myfsio_common::error::{S3Error, S3ErrorCode}; use myfsio_common::types::PartInfo; @@ -90,7 +93,44 @@ async fn ensure_object_lock_allows_write( } } -pub async fn list_buckets(State(state): State) -> Response { +async fn ensure_object_version_lock_allows_delete( + state: &AppState, + bucket: &str, + key: &str, + version_id: &str, + headers: &HeaderMap, +) -> Result<(), Response> { + let metadata = match state + .storage + .get_object_version_metadata(bucket, key, version_id) + .await + { + Ok(metadata) => metadata, + Err(err) => return Err(storage_err_response(err)), + }; + let bypass_governance = headers + .get("x-amz-bypass-governance-retention") + .and_then(|value| value.to_str().ok()) + .map(|value| value.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::AccessDenied, + message, + ))); + } + Ok(()) +} + +pub async fn list_buckets( + State(state): State, + Query(query): Query, + headers: HeaderMap, +) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + return get_bucket(State(state), Path(host_bucket), Query(query), headers).await; + } + match state.storage.list_buckets().await { Ok(buckets) => { let xml = myfsio_xml::response::list_buckets_xml("myfsio", "myfsio", &buckets); @@ -117,8 +157,22 @@ pub async fn create_bucket( State(state): State, Path(bucket): Path, Query(query): Query, + headers: HeaderMap, body: Body, ) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + if host_bucket != bucket { + return put_object( + State(state), + Path((host_bucket, bucket)), + Query(ObjectQuery::default()), + headers, + body, + ) + .await; + } + } + if query.quota.is_some() { return config::put_quota(&state, &bucket, body).await; } @@ -205,11 +259,41 @@ pub struct BucketQuery { pub versions: Option, } +async fn virtual_host_bucket_from_headers(state: &AppState, headers: &HeaderMap) -> Option { + let host = headers + .get("host") + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.split(':').next())? + .trim() + .to_ascii_lowercase(); + let (candidate, _) = host.split_once('.')?; + if myfsio_storage::validation::validate_bucket_name(candidate).is_some() { + return None; + } + match state.storage.bucket_exists(candidate).await { + Ok(true) => Some(candidate.to_string()), + _ => None, + } +} + pub async fn get_bucket( State(state): State, Path(bucket): Path, Query(query): Query, + headers: HeaderMap, ) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + if host_bucket != bucket { + return get_object( + State(state), + Path((host_bucket, bucket)), + Query(ObjectQuery::default()), + headers, + ) + .await; + } + } + if !matches!(state.storage.bucket_exists(&bucket).await, Ok(true)) { return storage_err_response(myfsio_storage::error::StorageError::BucketNotFound(bucket)); } @@ -260,7 +344,13 @@ pub async fn get_bucket( return config::get_logging(&state, &bucket).await; } if query.versions.is_some() { - return config::list_object_versions(&state, &bucket).await; + return config::list_object_versions( + &state, + &bucket, + query.prefix.as_deref(), + query.max_keys.unwrap_or(1000), + ) + .await; } if query.uploads.is_some() { return list_multipart_uploads_handler(&state, &bucket).await; @@ -408,6 +498,19 @@ pub async fn post_bucket( headers: HeaderMap, body: Body, ) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + if host_bucket != bucket { + return post_object( + State(state), + Path((host_bucket, bucket)), + Query(ObjectQuery::default()), + headers, + body, + ) + .await; + } + } + if query.delete.is_some() { return delete_objects_handler(&state, &bucket, body).await; } @@ -425,7 +528,20 @@ pub async fn delete_bucket( State(state): State, Path(bucket): Path, Query(query): Query, + headers: HeaderMap, ) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + if host_bucket != bucket { + return delete_object( + State(state), + Path((host_bucket, bucket)), + Query(ObjectQuery::default()), + headers, + ) + .await; + } + } + if query.quota.is_some() { return config::delete_quota(&state, &bucket).await; } @@ -466,7 +582,23 @@ pub async fn delete_bucket( } } -pub async fn head_bucket(State(state): State, Path(bucket): Path) -> Response { +pub async fn head_bucket( + State(state): State, + Path(bucket): Path, + headers: HeaderMap, +) -> Response { + if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await { + if host_bucket != bucket { + return head_object( + State(state), + Path((host_bucket, bucket)), + Query(ObjectQuery::default()), + headers, + ) + .await; + } + } + match state.storage.bucket_exists(&bucket).await { Ok(true) => { let mut headers = HeaderMap::new(); @@ -489,6 +621,8 @@ pub struct ObjectQuery { pub upload_id: Option, #[serde(rename = "partNumber")] pub part_number: Option, + #[serde(rename = "versionId")] + pub version_id: Option, pub tagging: Option, pub acl: Option, pub retention: Option, @@ -583,6 +717,314 @@ fn insert_content_type(headers: &mut HeaderMap, key: &str, explicit: Option<&str } } +fn internal_header_pairs() -> &'static [(&'static str, &'static str, &'static str)] { + &[ + ("cache-control", "__cache_control__", "cache-control"), + ( + "content-disposition", + "__content_disposition__", + "content-disposition", + ), + ( + "content-language", + "__content_language__", + "content-language", + ), + ( + "content-encoding", + "__content_encoding__", + "content-encoding", + ), + ("expires", "__expires__", "expires"), + ( + "x-amz-website-redirect-location", + "__website_redirect_location__", + "x-amz-website-redirect-location", + ), + ] +} + +fn decoded_content_encoding(value: &str) -> Option { + let filtered: Vec<&str> = value + .split(',') + .map(str::trim) + .filter(|part| !part.is_empty() && !part.eq_ignore_ascii_case("aws-chunked")) + .collect(); + if filtered.is_empty() { + None + } else { + Some(filtered.join(", ")) + } +} + +fn insert_standard_object_metadata( + headers: &HeaderMap, + metadata: &mut HashMap, +) -> Result<(), Response> { + for (request_header, metadata_key, _) in internal_header_pairs() { + if let Some(value) = headers.get(*request_header).and_then(|v| v.to_str().ok()) { + if *request_header == "content-encoding" { + if let Some(decoded_encoding) = decoded_content_encoding(value) { + metadata.insert((*metadata_key).to_string(), decoded_encoding); + } + } else { + metadata.insert((*metadata_key).to_string(), value.to_string()); + } + } + } + if let Some(value) = headers + .get("x-amz-storage-class") + .and_then(|v| v.to_str().ok()) + { + metadata.insert("__storage_class__".to_string(), value.to_ascii_uppercase()); + } + + if let Some(value) = headers + .get("x-amz-object-lock-legal-hold") + .and_then(|v| v.to_str().ok()) + { + object_lock::set_legal_hold(metadata, value.eq_ignore_ascii_case("ON")); + } + + let retention_mode = headers + .get("x-amz-object-lock-mode") + .and_then(|v| v.to_str().ok()); + let retain_until = headers + .get("x-amz-object-lock-retain-until-date") + .and_then(|v| v.to_str().ok()); + if let (Some(mode), Some(retain_until)) = (retention_mode, retain_until) { + let mode = match mode.to_ascii_uppercase().as_str() { + "GOVERNANCE" => object_lock::RetentionMode::GOVERNANCE, + "COMPLIANCE" => object_lock::RetentionMode::COMPLIANCE, + _ => { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-object-lock-mode", + ))) + } + }; + let retain_until_date = DateTime::parse_from_rfc3339(retain_until) + .map(|value| value.with_timezone(&Utc)) + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-object-lock-retain-until-date", + )) + })?; + object_lock::set_object_retention( + metadata, + &object_lock::ObjectLockRetention { + mode, + retain_until_date, + }, + ) + .map_err(|message| { + s3_error_response(S3Error::new(S3ErrorCode::InvalidArgument, message)) + })?; + } + Ok(()) +} + +fn apply_stored_response_headers(headers: &mut HeaderMap, metadata: &HashMap) { + for (_, metadata_key, response_header) in internal_header_pairs() { + if let Some(value) = metadata + .get(*metadata_key) + .and_then(|value| value.parse().ok()) + { + headers.insert(*response_header, value); + } + } + if let Some(value) = metadata + .get("__storage_class__") + .and_then(|value| value.parse().ok()) + { + headers.insert("x-amz-storage-class", value); + } +} + +fn apply_user_metadata(headers: &mut HeaderMap, metadata: &HashMap) { + for (k, v) in metadata { + if let Ok(header_val) = v.parse() { + let header_name = format!("x-amz-meta-{}", k); + if let Ok(name) = header_name.parse::() { + headers.insert(name, header_val); + } + } + } +} + +fn is_null_version(version_id: Option<&str>) -> bool { + version_id.is_none_or(|value| value == "null") +} + +fn bad_digest_response(message: impl Into) -> Response { + s3_error_response(S3Error::new(S3ErrorCode::BadDigest, message)) +} + +fn base64_header_bytes(headers: &HeaderMap, name: &str) -> Result>, Response> { + let Some(value) = headers.get(name).and_then(|v| v.to_str().ok()) else { + return Ok(None); + }; + STANDARD + .decode(value.trim()) + .map(Some) + .map_err(|_| bad_digest_response(format!("Invalid base64 value for {}", name))) +} + +fn has_upload_checksum(headers: &HeaderMap) -> bool { + headers.contains_key("content-md5") + || headers.contains_key("x-amz-checksum-sha256") + || headers.contains_key("x-amz-checksum-crc32") +} + +fn validate_upload_checksums(headers: &HeaderMap, data: &[u8]) -> Result<(), Response> { + if let Some(expected) = base64_header_bytes(headers, "content-md5")? { + if expected.len() != 16 || Md5::digest(data).as_slice() != expected.as_slice() { + return Err(bad_digest_response( + "The Content-MD5 you specified did not match what we received", + )); + } + } + + if let Some(expected) = base64_header_bytes(headers, "x-amz-checksum-sha256")? { + if Sha256::digest(data).as_slice() != expected.as_slice() { + return Err(bad_digest_response( + "The x-amz-checksum-sha256 you specified did not match what we received", + )); + } + } + + if let Some(expected) = base64_header_bytes(headers, "x-amz-checksum-crc32")? { + let actual = crc32fast::hash(data).to_be_bytes(); + if expected.as_slice() != actual { + return Err(bad_digest_response( + "The x-amz-checksum-crc32 you specified did not match what we received", + )); + } + } + + Ok(()) +} + +async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, Response> { + if aws_chunked { + let mut reader = chunked::decode_body(body); + let mut data = Vec::new(); + reader.read_to_end(&mut data).await.map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "Failed to read aws-chunked request body", + )) + })?; + return Ok(data); + } + + http_body_util::BodyExt::collect(body) + .await + .map(|collected| collected.to_bytes().to_vec()) + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidRequest, + "Failed to read request body", + )) + }) +} + +fn parse_tagging_header(value: &str) -> Result, Response> { + let mut tags = Vec::new(); + if value.trim().is_empty() { + return Ok(tags); + } + + for pair in value.split('&') { + let (raw_key, raw_value) = pair.split_once('=').ok_or_else(|| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidTag, + "The x-amz-tagging header must use query-string key=value pairs", + )) + })?; + let key = percent_decode_str(raw_key) + .decode_utf8() + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidTag, + "Tag keys must be valid UTF-8", + )) + })? + .to_string(); + let value = percent_decode_str(raw_value) + .decode_utf8() + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidTag, + "Tag values must be valid UTF-8", + )) + })? + .to_string(); + tags.push(myfsio_common::types::Tag { key, value }); + } + + Ok(tags) +} + +fn parse_copy_source(copy_source: &str) -> Result<(String, String, Option), Response> { + let source = copy_source.strip_prefix('/').unwrap_or(copy_source); + let (bucket_raw, key_and_query) = source.split_once('/').ok_or_else(|| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-copy-source", + )) + })?; + let (key_raw, query) = key_and_query + .split_once('?') + .map(|(key, query)| (key, Some(query))) + .unwrap_or((key_and_query, None)); + + let bucket = percent_decode_str(bucket_raw) + .decode_utf8() + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-copy-source bucket encoding", + )) + })? + .to_string(); + let key = percent_decode_str(key_raw) + .decode_utf8() + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-copy-source key encoding", + )) + })? + .to_string(); + + let mut version_id = None; + if let Some(query) = query { + for pair in query.split('&') { + let Some((name, value)) = pair.split_once('=') else { + continue; + }; + if name == "versionId" { + version_id = Some( + percent_decode_str(value) + .decode_utf8() + .map_err(|_| { + s3_error_response(S3Error::new( + S3ErrorCode::InvalidArgument, + "Invalid x-amz-copy-source versionId encoding", + )) + })? + .to_string(), + ); + break; + } + } + } + + Ok((bucket, key, version_id)) +} + pub async fn put_object( State(state): State, Path((bucket, key)): Path<(String, String)>, @@ -647,6 +1089,9 @@ pub async fn put_object( { return response; } + if let Some(response) = evaluate_put_preconditions(&state, &bucket, &key, &headers).await { + return response; + } let content_type = guessed_content_type( &key, @@ -655,6 +1100,9 @@ pub async fn put_object( let mut metadata = HashMap::new(); metadata.insert("__content_type__".to_string(), content_type); + if let Err(response) = insert_standard_object_metadata(&headers, &mut metadata) { + return response; + } for (name, value) in headers.iter() { let name_str = name.as_str(); @@ -665,7 +1113,27 @@ pub async fn put_object( } } - let boxed: myfsio_storage::traits::AsyncReadStream = if is_aws_chunked(&headers) { + let tags = match headers + .get("x-amz-tagging") + .and_then(|value| value.to_str().ok()) + .map(parse_tagging_header) + .transpose() + { + Ok(tags) => tags, + Err(response) => return response, + }; + + let aws_chunked = is_aws_chunked(&headers); + let boxed: myfsio_storage::traits::AsyncReadStream = if has_upload_checksum(&headers) { + let data = match collect_upload_body(body, aws_chunked).await { + Ok(data) => data, + Err(response) => return response, + }; + if let Err(response) = validate_upload_checksums(&headers, &data) { + return response; + } + Box::pin(std::io::Cursor::new(data)) + } else if aws_chunked { Box::pin(chunked::decode_body(body)) } else { let stream = tokio_util::io::StreamReader::new( @@ -682,6 +1150,11 @@ pub async fn put_object( .await { Ok(meta) => { + if let Some(ref tags) = tags { + if let Err(e) = state.storage.set_object_tags(&bucket, &key, tags).await { + return storage_err_response(e); + } + } if let Some(enc_ctx) = resolve_encryption_context(&state, &bucket, &headers).await { if let Some(ref enc_svc) = state.encryption { let obj_path = match state.storage.get_object_path(&bucket, &key).await { @@ -801,9 +1274,23 @@ pub async fn get_object( return list_parts_handler(&state, &bucket, &key, upload_id).await; } - let head_meta = match state.storage.head_object(&bucket, &key).await { - Ok(m) => m, - Err(e) => return storage_err_response(e), + let version_id = query + .version_id + .as_deref() + .filter(|value| !is_null_version(Some(*value))); + let head_meta = match version_id { + Some(version_id) => match state + .storage + .head_object_version(&bucket, &key, version_id) + .await + { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, + None => match state.storage.head_object(&bucket, &key).await { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, }; if let Some(resp) = evaluate_get_preconditions(&headers, &head_meta) { return resp; @@ -818,17 +1305,34 @@ pub async fn get_object( return range_get_handler(&state, &bucket, &key, range_str, &query).await; } - let all_meta = state - .storage - .get_object_metadata(&bucket, &key) - .await - .unwrap_or_default(); + let all_meta = match version_id { + Some(version_id) => state + .storage + .get_object_version_metadata(&bucket, &key, version_id) + .await + .unwrap_or_default(), + None => state + .storage + .get_object_metadata(&bucket, &key) + .await + .unwrap_or_default(), + }; let enc_meta = myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&all_meta); if let (Some(ref enc_info), Some(ref enc_svc)) = (&enc_meta, &state.encryption) { - let obj_path = match state.storage.get_object_path(&bucket, &key).await { - Ok(p) => p, - Err(e) => return storage_err_response(e), + let obj_path = match version_id { + Some(version_id) => match state + .storage + .get_object_version_path(&bucket, &key, version_id) + .await + { + Ok(p) => p, + Err(e) => return storage_err_response(e), + }, + None => match state.storage.get_object_path(&bucket, &key).await { + Ok(p) => p, + Err(e) => return storage_err_response(e), + }, }; let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp"); let _ = tokio::fs::create_dir_all(&tmp_dir).await; @@ -886,22 +1390,31 @@ pub async fn get_object( "x-amz-server-side-encryption", enc_info.algorithm.parse().unwrap(), ); - - for (k, v) in &meta.metadata { - if let Ok(header_val) = v.parse() { - let header_name = format!("x-amz-meta-{}", k); - if let Ok(name) = header_name.parse::() { - resp_headers.insert(name, header_val); - } + apply_stored_response_headers(&mut resp_headers, &all_meta); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + resp_headers.insert("x-amz-version-id", value); } } + apply_user_metadata(&mut resp_headers, &meta.metadata); + apply_response_overrides(&mut resp_headers, &query); return (StatusCode::OK, resp_headers, body).into_response(); } - match state.storage.get_object(&bucket, &key).await { + let object_result = match version_id { + Some(version_id) => { + state + .storage + .get_object_version(&bucket, &key, version_id) + .await + } + None => state.storage.get_object(&bucket, &key).await, + }; + + match object_result { Ok((meta, reader)) => { let stream = ReaderStream::new(reader); let body = Body::from_stream(stream); @@ -921,16 +1434,15 @@ pub async fn get_object( .unwrap(), ); headers.insert("accept-ranges", "bytes".parse().unwrap()); - - for (k, v) in &meta.metadata { - if let Ok(header_val) = v.parse() { - let header_name = format!("x-amz-meta-{}", k); - if let Ok(name) = header_name.parse::() { - headers.insert(name, header_val); - } + apply_stored_response_headers(&mut headers, &all_meta); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + headers.insert("x-amz-version-id", value); } } + apply_user_metadata(&mut headers, &meta.metadata); + apply_response_overrides(&mut headers, &query); (StatusCode::OK, headers, body).into_response() @@ -978,6 +1490,35 @@ pub async fn delete_object( return abort_multipart_handler(&state, &bucket, upload_id).await; } + if let Some(version_id) = query + .version_id + .as_deref() + .filter(|value| !is_null_version(Some(*value))) + { + if let Err(response) = + ensure_object_version_lock_allows_delete(&state, &bucket, &key, version_id, &headers) + .await + { + return response; + } + return match state + .storage + .delete_object_version(&bucket, &key, version_id) + .await + { + Ok(()) => { + let mut resp_headers = HeaderMap::new(); + if let Ok(value) = version_id.parse() { + resp_headers.insert("x-amz-version-id", value); + } + notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete"); + trigger_replication(&state, &bucket, &key, "delete"); + (StatusCode::NO_CONTENT, resp_headers).into_response() + } + Err(e) => storage_err_response(e), + }; + } + if let Err(response) = ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await { @@ -997,13 +1538,40 @@ pub async fn delete_object( pub async fn head_object( State(state): State, Path((bucket, key)): Path<(String, String)>, + Query(query): Query, headers: HeaderMap, ) -> Response { - match state.storage.head_object(&bucket, &key).await { + let version_id = query + .version_id + .as_deref() + .filter(|value| !is_null_version(Some(*value))); + let result = match version_id { + Some(version_id) => { + state + .storage + .head_object_version(&bucket, &key, version_id) + .await + } + None => state.storage.head_object(&bucket, &key).await, + }; + + match result { Ok(meta) => { if let Some(resp) = evaluate_get_preconditions(&headers, &meta) { return resp; } + let all_meta = match version_id { + Some(version_id) => state + .storage + .get_object_version_metadata(&bucket, &key, version_id) + .await + .unwrap_or_default(), + None => state + .storage + .get_object_metadata(&bucket, &key) + .await + .unwrap_or_default(), + }; let mut headers = HeaderMap::new(); headers.insert("content-length", meta.size.to_string().parse().unwrap()); if let Some(ref etag) = meta.etag { @@ -1019,16 +1587,15 @@ pub async fn head_object( .unwrap(), ); headers.insert("accept-ranges", "bytes".parse().unwrap()); - - for (k, v) in &meta.metadata { - if let Ok(header_val) = v.parse() { - let header_name = format!("x-amz-meta-{}", k); - if let Ok(name) = header_name.parse::() { - headers.insert(name, header_val); - } + apply_stored_response_headers(&mut headers, &all_meta); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + headers.insert("x-amz-version-id", value); } } + apply_user_metadata(&mut headers, &meta.metadata); + (StatusCode::OK, headers).into_response() } Err(e) => storage_err_response(e), @@ -1311,30 +1878,70 @@ async fn copy_object_handler( return response; } - let source = copy_source.strip_prefix('/').unwrap_or(copy_source); - let (src_bucket, src_key) = match source.split_once('/') { - Some(parts) => parts, - None => { - return s3_error_response(S3Error::new( - myfsio_common::error::S3ErrorCode::InvalidArgument, - "Invalid x-amz-copy-source", - )); - } + let (src_bucket, src_key, src_version_id) = match parse_copy_source(copy_source) { + Ok(parts) => parts, + Err(response) => return response, }; - let source_meta = match state.storage.head_object(src_bucket, src_key).await { - Ok(m) => m, - Err(e) => return storage_err_response(e), + let source_meta = match src_version_id.as_deref() { + Some(version_id) if version_id != "null" => match state + .storage + .head_object_version(&src_bucket, &src_key, version_id) + .await + { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, + _ => match state.storage.head_object(&src_bucket, &src_key).await { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, }; if let Some(resp) = evaluate_copy_preconditions(headers, &source_meta) { return resp; } - match state - .storage - .copy_object(src_bucket, src_key, dst_bucket, dst_key) - .await + let copy_result = if let Some(version_id) = src_version_id + .as_deref() + .filter(|value| !is_null_version(Some(*value))) { + let (_meta, mut reader) = match state + .storage + .get_object_version(&src_bucket, &src_key, version_id) + .await + { + Ok(result) => result, + Err(e) => return storage_err_response(e), + }; + let mut data = Vec::new(); + if let Err(e) = reader.read_to_end(&mut data).await { + return storage_err_response(myfsio_storage::error::StorageError::Io(e)); + } + let metadata = match state + .storage + .get_object_version_metadata(&src_bucket, &src_key, version_id) + .await + { + Ok(metadata) => metadata, + Err(e) => return storage_err_response(e), + }; + state + .storage + .put_object( + dst_bucket, + dst_key, + Box::pin(std::io::Cursor::new(data)), + Some(metadata), + ) + .await + } else { + state + .storage + .copy_object(&src_bucket, &src_key, dst_bucket, dst_key) + .await + }; + + match copy_result { Ok(meta) => { let etag = meta.etag.as_deref().unwrap_or(""); let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified); @@ -1372,13 +1979,23 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R let mut errors = Vec::new(); for obj in &parsed.objects { - if let Err(message) = match state.storage.head_object(bucket, &obj.key).await { - Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await { + if let Err(message) = match obj.version_id.as_deref() { + Some(version_id) if version_id != "null" => match state + .storage + .get_object_version_metadata(bucket, &obj.key, version_id) + .await + { Ok(metadata) => object_lock::can_delete_object(&metadata, false), Err(err) => Err(S3Error::from(err).message), }, - Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), - Err(err) => Err(S3Error::from(err).message), + _ => match state.storage.head_object(bucket, &obj.key).await { + Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await { + Ok(metadata) => object_lock::can_delete_object(&metadata, false), + Err(err) => Err(S3Error::from(err).message), + }, + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), + Err(err) => Err(S3Error::from(err).message), + }, } { errors.push(( obj.key.clone(), @@ -1387,7 +2004,20 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R )); continue; } - match state.storage.delete_object(bucket, &obj.key).await { + let delete_result = if let Some(version_id) = obj.version_id.as_deref() { + if version_id == "null" { + state.storage.delete_object(bucket, &obj.key).await + } else { + state + .storage + .delete_object_version(bucket, &obj.key, version_id) + .await + } + } else { + state.storage.delete_object(bucket, &obj.key).await + }; + + match delete_result { Ok(()) => { notifications::emit_object_removed(state, bucket, &obj.key, "", "", "", "Delete"); trigger_replication(state, bucket, &obj.key, "delete"); @@ -1415,9 +2045,23 @@ async fn range_get_handler( range_str: &str, query: &ObjectQuery, ) -> Response { - let meta = match state.storage.head_object(bucket, key).await { - Ok(m) => m, - Err(e) => return storage_err_response(e), + let version_id = query + .version_id + .as_deref() + .filter(|value| !is_null_version(Some(*value))); + let meta = match version_id { + Some(version_id) => match state + .storage + .head_object_version(bucket, key, version_id) + .await + { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, + None => match state.storage.head_object(bucket, key).await { + Ok(m) => m, + Err(e) => return storage_err_response(e), + }, }; let total_size = meta.size; @@ -1431,9 +2075,19 @@ async fn range_get_handler( } }; - let path = match state.storage.get_object_path(bucket, key).await { - Ok(p) => p, - Err(e) => return storage_err_response(e), + let path = match version_id { + Some(version_id) => match state + .storage + .get_object_version_path(bucket, key, version_id) + .await + { + Ok(p) => p, + Err(e) => return storage_err_response(e), + }, + None => match state.storage.get_object_path(bucket, key).await { + Ok(p) => p, + Err(e) => return storage_err_response(e), + }, }; let mut file = match tokio::fs::File::open(&path).await { @@ -1463,6 +2117,11 @@ async fn range_get_handler( } insert_content_type(&mut headers, key, meta.content_type.as_deref()); headers.insert("accept-ranges", "bytes".parse().unwrap()); + if let Some(ref requested_version) = query.version_id { + if let Ok(value) = requested_version.parse() { + headers.insert("x-amz-version-id", value); + } + } apply_response_overrides(&mut headers, query); @@ -1514,6 +2173,49 @@ fn evaluate_get_preconditions( None } +async fn evaluate_put_preconditions( + state: &AppState, + bucket: &str, + key: &str, + headers: &HeaderMap, +) -> Option { + let has_if_match = headers.contains_key("if-match"); + let has_if_none_match = headers.contains_key("if-none-match"); + if !has_if_match && !has_if_none_match { + return None; + } + + match state.storage.head_object(bucket, key).await { + Ok(meta) => { + if let Some(value) = headers.get("if-match").and_then(|v| v.to_str().ok()) { + if !etag_condition_matches(value, meta.etag.as_deref()) { + return Some(s3_error_response(S3Error::from_code( + S3ErrorCode::PreconditionFailed, + ))); + } + } + if let Some(value) = headers.get("if-none-match").and_then(|v| v.to_str().ok()) { + if etag_condition_matches(value, meta.etag.as_deref()) { + return Some(s3_error_response(S3Error::from_code( + S3ErrorCode::PreconditionFailed, + ))); + } + } + None + } + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => { + if has_if_match { + Some(s3_error_response(S3Error::from_code( + S3ErrorCode::PreconditionFailed, + ))) + } else { + None + } + } + Err(err) => Some(storage_err_response(err)), + } +} + fn evaluate_copy_preconditions( headers: &HeaderMap, source_meta: &myfsio_common::types::ObjectMeta, @@ -2176,6 +2878,20 @@ mod tests { .unwrap() } + #[test] + fn aws_chunked_wire_encoding_is_not_persisted_as_object_encoding() { + let mut headers = HeaderMap::new(); + headers.insert("content-encoding", "aws-chunked".parse().unwrap()); + let mut metadata = HashMap::new(); + insert_standard_object_metadata(&headers, &mut metadata).unwrap(); + assert!(!metadata.contains_key("__content_encoding__")); + + headers.insert("content-encoding", "aws-chunked, gzip".parse().unwrap()); + let mut metadata = HashMap::new(); + insert_standard_object_metadata(&headers, &mut metadata).unwrap(); + assert_eq!(metadata.get("__content_encoding__").unwrap(), "gzip"); + } + #[tokio::test] async fn public_bucket_acl_allows_anonymous_reads() { let (state, _tmp) = test_state(); diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs index 82da4df..0217117 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs @@ -117,6 +117,7 @@ fn storage_status(err: &StorageError) -> StatusCode { match err { StorageError::BucketNotFound(_) | StorageError::ObjectNotFound { .. } + | StorageError::VersionNotFound { .. } | StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND, StorageError::InvalidBucketName(_) | StorageError::InvalidObjectKey(_) diff --git a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs index 30ed1ec..8199121 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs @@ -9,7 +9,7 @@ pub mod templates; use axum::Router; -pub const SERVER_HEADER: &str = "MyFSIO"; +pub const SERVER_HEADER: &str = concat!("MyFSIO-Rust/", env!("CARGO_PKG_VERSION")); pub fn create_ui_router(state: state::AppState) -> Router { use axum::routing::{delete, get, post, put}; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs index 9a8f189..dce2ea4 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs @@ -1,5 +1,5 @@ use axum::extract::{Request, State}; -use axum::http::{header, HeaderMap, Method, StatusCode}; +use axum::http::{header, HeaderMap, Method, StatusCode, Uri}; use axum::middleware::Next; use axum::response::{IntoResponse, Response}; @@ -15,6 +15,9 @@ use tokio::io::AsyncReadExt; use crate::services::acl::acl_from_bucket_config; use crate::state::AppState; +#[derive(Clone, Debug)] +struct OriginalCanonicalPath(String); + fn website_error_response( status: StatusCode, body: Option>, @@ -45,7 +48,7 @@ fn website_error_response( fn default_website_error_body(status: StatusCode) -> String { let code = status.as_u16(); if status == StatusCode::NOT_FOUND { - "404 page not found".to_string() + "

404 page not found

".to_string() } else { let reason = status.canonical_reason().unwrap_or("Error"); format!("{code} {reason}") @@ -324,6 +327,67 @@ async fn maybe_serve_website( .await } +fn virtual_host_candidate(host: &str) -> Option { + let (candidate, _) = host.split_once('.')?; + if candidate.is_empty() || matches!(candidate, "www" | "s3" | "api" | "admin" | "kms") { + return None; + } + if myfsio_storage::validation::validate_bucket_name(candidate).is_some() { + return None; + } + Some(candidate.to_string()) +} + +async fn virtual_host_bucket( + state: &AppState, + host: &str, + path: &str, + method: &Method, +) -> Option { + if path.starts_with("/ui") + || path.starts_with("/admin") + || path.starts_with("/kms") + || path.starts_with("/myfsio") + { + return None; + } + + let bucket = virtual_host_candidate(host)?; + if path == format!("/{}", bucket) || path.starts_with(&format!("/{}/", bucket)) { + return None; + } + + match state.storage.bucket_exists(&bucket).await { + Ok(true) => Some(bucket), + Ok(false) if *method == Method::PUT && path == "/" => Some(bucket), + _ => None, + } +} + +fn rewrite_uri_for_virtual_host(uri: &Uri, bucket: &str) -> Option { + let path = uri.path(); + let rewritten_path = if path == "/" { + format!("/{}/", bucket) + } else { + format!("/{}{}", bucket, path) + }; + let path_and_query = match uri.query() { + Some(query) => format!("{}?{}", rewritten_path, query), + None => rewritten_path, + }; + + let mut parts = uri.clone().into_parts(); + parts.path_and_query = Some(path_and_query.parse().ok()?); + Uri::from_parts(parts).ok() +} + +fn sigv4_canonical_path(req: &Request) -> &str { + req.extensions() + .get::() + .map(|path| path.0.as_str()) + .unwrap_or_else(|| req.uri().path()) +} + pub async fn auth_layer(State(state): State, mut req: Request, next: Next) -> Response { let start = Instant::now(); let uri = req.uri().clone(); @@ -360,7 +424,7 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N } else if let Some(response) = maybe_serve_website( &state, method.clone(), - host.unwrap_or_default(), + host.clone().unwrap_or_default(), path.clone(), range_header, ) @@ -368,38 +432,53 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N { response } else { + let auth_path = if let Some(bucket) = + virtual_host_bucket(&state, host.as_deref().unwrap_or_default(), &path, &method).await + { + if let Some(rewritten) = rewrite_uri_for_virtual_host(req.uri(), &bucket) { + req.extensions_mut() + .insert(OriginalCanonicalPath(path.clone())); + *req.uri_mut() = rewritten; + req.uri().path().to_string() + } else { + path.clone() + } + } else { + path.clone() + }; + match try_auth(&state, &req) { AuthResult::NoAuth => match authorize_request( &state, None, &method, - &path, + &auth_path, &query, copy_source.as_deref(), ) .await { Ok(()) => next.run(req).await, - Err(err) => error_response(err, &path), + Err(err) => error_response(err, &auth_path), }, AuthResult::Ok(principal) => { if let Err(err) = authorize_request( &state, Some(&principal), &method, - &path, + &auth_path, &query, copy_source.as_deref(), ) .await { - error_response(err, &path) + error_response(err, &auth_path) } else { req.extensions_mut().insert(principal); next.run(req).await } } - AuthResult::Denied(err) => error_response(err, &path), + AuthResult::Denied(err) => error_response(err, &auth_path), } }; @@ -1078,7 +1157,7 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR }; let method = req.method().as_str(); - let canonical_uri = req.uri().path(); + let canonical_uri = sigv4_canonical_path(req); let query_params = parse_query_params(req.uri().query().unwrap_or("")); @@ -1234,7 +1313,7 @@ fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult { }; let method = req.method().as_str(); - let canonical_uri = req.uri().path(); + let canonical_uri = sigv4_canonical_path(req); let query_params_no_sig: Vec<(String, String)> = params .iter() diff --git a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs index 09767f9..8a50ece 100644 --- a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs +++ b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs @@ -2121,6 +2121,445 @@ async fn test_bucket_versioning() { assert!(body.contains("Enabled")); } +#[tokio::test] +async fn test_versioned_object_can_be_read_and_deleted_by_version_id() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/versions-bucket", + Body::empty(), + )) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/versions-bucket?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/versions-bucket/doc.txt", + Body::from("first"), + )) + .await + .unwrap(); + app.clone() + .oneshot(signed_request( + Method::PUT, + "/versions-bucket/doc.txt", + Body::from("second"), + )) + .await + .unwrap(); + + let list_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/versions-bucket?versions", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(list_resp.status(), StatusCode::OK); + let list_body = String::from_utf8( + list_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + let archived_version_id = list_body + .split("") + .filter_map(|part| part.split_once("").map(|(id, _)| id)) + .find(|id| *id != "null") + .expect("archived version id") + .to_string(); + + let version_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(version_resp.status(), StatusCode::OK); + assert_eq!( + version_resp.headers()["x-amz-version-id"].to_str().unwrap(), + archived_version_id + ); + let version_body = version_resp.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&version_body[..], b"first"); + + let traversal_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + &format!( + "/versions-bucket/doc.txt?versionId=../other/{}", + archived_version_id + ), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(traversal_resp.status(), StatusCode::NOT_FOUND); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/versions-bucket/doc.txt", + Body::from("third"), + )) + .await + .unwrap(); + let limited_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/versions-bucket?versions&max-keys=1", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(limited_resp.status(), StatusCode::OK); + let limited_body = String::from_utf8( + limited_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert_eq!(limited_body.matches("").count(), 1); + assert!(limited_body.contains("true")); + + let delete_resp = app + .clone() + .oneshot(signed_request( + Method::DELETE, + &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT); + + let missing_resp = app + .oneshot(signed_request( + Method::GET, + &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_retention_is_enforced_when_deleting_archived_version() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/locked-versions", + Body::empty(), + )) + .await + .unwrap(); + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/locked-versions?versioning") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from( + "Enabled", + )) + .unwrap(), + ) + .await + .unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/locked-versions/doc.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-object-lock-mode", "GOVERNANCE") + .header( + "x-amz-object-lock-retain-until-date", + "2099-01-01T00:00:00Z", + ) + .body(Body::from("locked")) + .unwrap(), + ) + .await + .unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/locked-versions/doc.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-bypass-governance-retention", "true") + .body(Body::from("replacement")) + .unwrap(), + ) + .await + .unwrap(); + + let list_resp = app + .clone() + .oneshot(signed_request( + Method::GET, + "/locked-versions?versions", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(list_resp.status(), StatusCode::OK); + let list_body = String::from_utf8( + list_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + let archived_version_id = list_body + .split("") + .filter_map(|part| part.split_once("").map(|(id, _)| id)) + .find(|id| *id != "null") + .expect("archived version id") + .to_string(); + + let denied = app + .clone() + .oneshot(signed_request( + Method::DELETE, + &format!("/locked-versions/doc.txt?versionId={}", archived_version_id), + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(denied.status(), StatusCode::FORBIDDEN); + + let allowed = app + .oneshot( + Request::builder() + .method(Method::DELETE) + .uri(format!( + "/locked-versions/doc.txt?versionId={}", + archived_version_id + )) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-bypass-governance-retention", "true") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(allowed.status(), StatusCode::NO_CONTENT); +} + +#[tokio::test] +async fn test_put_object_validates_content_md5() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/md5-bucket", Body::empty())) + .await + .unwrap(); + + let bad_resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/md5-bucket/object.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("content-md5", "AAAAAAAAAAAAAAAAAAAAAA==") + .body(Body::from("hello")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(bad_resp.status(), StatusCode::BAD_REQUEST); + let bad_body = String::from_utf8( + bad_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(bad_body.contains("BadDigest")); + + let good_resp = app + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/md5-bucket/object.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("content-md5", "XUFAKrxLKna5cZ2REBfFkg==") + .body(Body::from("hello")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(good_resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_put_object_tagging_and_standard_headers_are_persisted() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request( + Method::PUT, + "/headers-bucket", + Body::empty(), + )) + .await + .unwrap(); + + let put_resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/headers-bucket/report.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-tagging", "env=prod&name=quarter%201") + .header("cache-control", "max-age=60") + .header("content-disposition", "attachment") + .header("content-language", "en-US") + .header("x-amz-storage-class", "STANDARD_IA") + .body(Body::from("report")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(put_resp.status(), StatusCode::OK); + + let head_resp = app + .clone() + .oneshot(signed_request( + Method::HEAD, + "/headers-bucket/report.txt", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(head_resp.status(), StatusCode::OK); + assert_eq!(head_resp.headers()["cache-control"], "max-age=60"); + assert_eq!(head_resp.headers()["content-disposition"], "attachment"); + assert_eq!(head_resp.headers()["content-language"], "en-US"); + assert_eq!(head_resp.headers()["x-amz-storage-class"], "STANDARD_IA"); + + let tags_resp = app + .oneshot(signed_request( + Method::GET, + "/headers-bucket/report.txt?tagging", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(tags_resp.status(), StatusCode::OK); + let tags_body = String::from_utf8( + tags_resp + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(tags_body.contains("env")); + assert!(tags_body.contains("prod")); + assert!(tags_body.contains("name")); + assert!(tags_body.contains("quarter 1")); +} + +#[tokio::test] +async fn test_virtual_host_bucket_routes_to_s3_object_handlers() { + let (app, _tmp) = test_app(); + + app.clone() + .oneshot(signed_request(Method::PUT, "/vh-bucket", Body::empty())) + .await + .unwrap(); + + let put_resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::PUT) + .uri("/hello.txt") + .header("host", "vh-bucket.localhost") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::from("virtual host body")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(put_resp.status(), StatusCode::OK); + + let get_resp = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/hello.txt") + .header("host", "vh-bucket.localhost") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(get_resp.status(), StatusCode::OK); + let body = get_resp.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"virtual host body"); +} + #[tokio::test] async fn test_bucket_tagging() { let (app, _tmp) = test_app(); @@ -3323,7 +3762,7 @@ async fn test_static_website_default_404_returns_html_body() { ) .unwrap(); assert_eq!(body.len(), content_length); - assert_eq!(body, "404 page not found"); + assert_eq!(body, "

404 page not found

"); let head_resp = app .oneshot(website_request(Method::HEAD, "/missing.html")) diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/error.rs b/rust/myfsio-engine/crates/myfsio-storage/src/error.rs index 992d1fc..4bb652a 100644 --- a/rust/myfsio-engine/crates/myfsio-storage/src/error.rs +++ b/rust/myfsio-engine/crates/myfsio-storage/src/error.rs @@ -11,6 +11,12 @@ pub enum StorageError { BucketNotEmpty(String), #[error("Object not found: {bucket}/{key}")] ObjectNotFound { bucket: String, key: String }, + #[error("Object version not found: {bucket}/{key}?versionId={version_id}")] + VersionNotFound { + bucket: String, + key: String, + version_id: String, + }, #[error("Invalid bucket name: {0}")] InvalidBucketName(String), #[error("Invalid object key: {0}")] @@ -46,6 +52,12 @@ impl From for S3Error { S3Error::from_code(S3ErrorCode::NoSuchKey) .with_resource(format!("/{}/{}", bucket, key)) } + StorageError::VersionNotFound { + bucket, + key, + version_id, + } => S3Error::from_code(S3ErrorCode::NoSuchVersion) + .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)), StorageError::InvalidBucketName(msg) => { S3Error::new(S3ErrorCode::InvalidBucketName, msg) } diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs index 7f5bd36..9f7a72f 100644 --- a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs +++ b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs @@ -605,6 +605,144 @@ impl FsStorageBackend { Ok(source_size) } + fn version_record_paths( + &self, + bucket_name: &str, + key: &str, + version_id: &str, + ) -> (PathBuf, PathBuf) { + let version_dir = self.version_dir(bucket_name, key); + ( + version_dir.join(format!("{}.json", version_id)), + version_dir.join(format!("{}.bin", version_id)), + ) + } + + fn validate_version_id(bucket_name: &str, key: &str, version_id: &str) -> StorageResult<()> { + if version_id.is_empty() + || version_id.contains('/') + || version_id.contains('\\') + || version_id.contains("..") + { + return Err(StorageError::VersionNotFound { + bucket: bucket_name.to_string(), + key: key.to_string(), + version_id: version_id.to_string(), + }); + } + Ok(()) + } + + fn read_version_record_sync( + &self, + bucket_name: &str, + key: &str, + version_id: &str, + ) -> StorageResult<(Value, PathBuf)> { + self.require_bucket(bucket_name)?; + self.validate_key(key)?; + Self::validate_version_id(bucket_name, key, version_id)?; + let (manifest_path, data_path) = self.version_record_paths(bucket_name, key, version_id); + if !manifest_path.is_file() || !data_path.is_file() { + return Err(StorageError::VersionNotFound { + bucket: bucket_name.to_string(), + key: key.to_string(), + version_id: version_id.to_string(), + }); + } + + let content = std::fs::read_to_string(&manifest_path).map_err(StorageError::Io)?; + let record = serde_json::from_str::(&content).map_err(StorageError::Json)?; + Ok((record, data_path)) + } + + fn version_metadata_from_record(record: &Value) -> HashMap { + record + .get("metadata") + .and_then(Value::as_object) + .map(|meta| { + meta.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect::>() + }) + .unwrap_or_default() + } + + fn object_meta_from_version_record( + &self, + key: &str, + record: &Value, + data_path: &Path, + ) -> StorageResult { + let metadata = Self::version_metadata_from_record(record); + + let data_len = std::fs::metadata(data_path) + .map(|meta| meta.len()) + .unwrap_or_default(); + let size = record + .get("size") + .and_then(Value::as_u64) + .unwrap_or(data_len); + let last_modified = record + .get("archived_at") + .and_then(Value::as_str) + .and_then(|value| DateTime::parse_from_rfc3339(value).ok()) + .map(|value| value.with_timezone(&Utc)) + .unwrap_or_else(Utc::now); + let etag = record + .get("etag") + .and_then(Value::as_str) + .map(ToOwned::to_owned) + .or_else(|| metadata.get("__etag__").cloned()); + + let mut obj = ObjectMeta::new(key.to_string(), size, last_modified); + obj.etag = etag; + obj.content_type = metadata.get("__content_type__").cloned(); + obj.storage_class = metadata + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); + obj.metadata = metadata + .into_iter() + .filter(|(k, _)| !k.starts_with("__")) + .collect(); + Ok(obj) + } + + fn version_info_from_record(&self, fallback_key: &str, record: &Value) -> VersionInfo { + let version_id = record + .get("version_id") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + let key = record + .get("key") + .and_then(Value::as_str) + .unwrap_or(fallback_key) + .to_string(); + let size = record.get("size").and_then(Value::as_u64).unwrap_or(0); + let archived_at = record + .get("archived_at") + .and_then(Value::as_str) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|d| d.with_timezone(&Utc)) + .unwrap_or_else(Utc::now); + let etag = record + .get("etag") + .and_then(Value::as_str) + .map(|s| s.to_string()); + + VersionInfo { + version_id, + key, + size, + last_modified: archived_at, + etag, + is_latest: false, + is_delete_marker: false, + } + } + fn bucket_stats_sync(&self, bucket_name: &str) -> StorageResult { let bucket_path = self.require_bucket(bucket_name)?; @@ -1241,6 +1379,10 @@ impl crate::traits::StorageEngine for FsStorageBackend { let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); obj.metadata = stored_meta .into_iter() .filter(|(k, _)| !k.starts_with("__")) @@ -1289,6 +1431,10 @@ impl crate::traits::StorageEngine for FsStorageBackend { let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm); obj.etag = stored_meta.get("__etag__").cloned(); obj.content_type = stored_meta.get("__content_type__").cloned(); + obj.storage_class = stored_meta + .get("__storage_class__") + .cloned() + .or_else(|| Some("STANDARD".to_string())); obj.metadata = stored_meta .into_iter() .filter(|(k, _)| !k.starts_with("__")) @@ -1296,6 +1442,51 @@ impl crate::traits::StorageEngine for FsStorageBackend { Ok(obj) } + async fn get_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)> { + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + let obj = self.object_meta_from_version_record(key, &record, &data_path)?; + let file = tokio::fs::File::open(&data_path) + .await + .map_err(StorageError::Io)?; + let stream: AsyncReadStream = Box::pin(file); + Ok((obj, stream)) + } + + async fn get_object_version_path( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult { + let (_record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + Ok(data_path) + } + + async fn head_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult { + let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?; + self.object_meta_from_version_record(key, &record, &data_path) + } + + async fn get_object_version_metadata( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult> { + let (record, _data_path) = self.read_version_record_sync(bucket, key, version_id)?; + Ok(Self::version_metadata_from_record(&record)) + } + async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()> { let bucket_path = self.require_bucket(bucket)?; let path = self.object_path(bucket, key)?; @@ -1317,6 +1508,32 @@ impl crate::traits::StorageEngine for FsStorageBackend { Ok(()) } + async fn delete_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<()> { + self.require_bucket(bucket)?; + self.validate_key(key)?; + Self::validate_version_id(bucket, key, version_id)?; + let (manifest_path, data_path) = self.version_record_paths(bucket, key, version_id); + if !manifest_path.is_file() && !data_path.is_file() { + return Err(StorageError::VersionNotFound { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: version_id.to_string(), + }); + } + + Self::safe_unlink(&data_path).map_err(StorageError::Io)?; + Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; + let versions_root = self.bucket_versions_root(bucket); + Self::cleanup_empty_parents(&manifest_path, &versions_root); + self.stats_cache.remove(bucket); + Ok(()) + } + async fn copy_object( &self, src_bucket: &str, @@ -1817,40 +2034,73 @@ impl crate::traits::StorageEngine for FsStorageBackend { } if let Ok(content) = std::fs::read_to_string(entry.path()) { if let Ok(record) = serde_json::from_str::(&content) { - let version_id = record - .get("version_id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let size = record.get("size").and_then(|v| v.as_u64()).unwrap_or(0); - let archived_at = record - .get("archived_at") - .and_then(|v| v.as_str()) - .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) - .map(|d| d.with_timezone(&Utc)) - .unwrap_or_else(Utc::now); - let etag = record - .get("etag") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - - versions.push(VersionInfo { - version_id, - key: key.to_string(), - size, - last_modified: archived_at, - etag, - is_latest: false, - }); + versions.push(self.version_info_from_record(key, &record)); } } } versions.sort_by(|a, b| b.last_modified.cmp(&a.last_modified)); - if let Some(first) = versions.first_mut() { - first.is_latest = true; + + Ok(versions) + } + + async fn list_bucket_object_versions( + &self, + bucket: &str, + prefix: Option<&str>, + ) -> StorageResult> { + self.require_bucket(bucket)?; + let root = self.bucket_versions_root(bucket); + if !root.exists() { + return Ok(Vec::new()); } + let mut versions = Vec::new(); + let mut stack = vec![root.clone()]; + while let Some(current) = stack.pop() { + let entries = match std::fs::read_dir(¤t) { + Ok(entries) => entries, + Err(_) => continue, + }; + for entry in entries.flatten() { + let path = entry.path(); + let ft = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if ft.is_dir() { + stack.push(path); + continue; + } + if !ft.is_file() || path.extension().and_then(|ext| ext.to_str()) != Some("json") { + continue; + } + let content = match std::fs::read_to_string(&path) { + Ok(content) => content, + Err(_) => continue, + }; + let record = match serde_json::from_str::(&content) { + Ok(record) => record, + Err(_) => continue, + }; + let fallback_key = path + .parent() + .and_then(|parent| parent.strip_prefix(&root).ok()) + .map(|rel| rel.to_string_lossy().replace('\\', "/")) + .unwrap_or_default(); + let info = self.version_info_from_record(&fallback_key, &record); + if prefix.is_some_and(|value| !info.key.starts_with(value)) { + continue; + } + versions.push(info); + } + } + + versions.sort_by(|a, b| { + a.key + .cmp(&b.key) + .then_with(|| b.last_modified.cmp(&a.last_modified)) + }); Ok(versions) } @@ -2271,6 +2521,12 @@ mod tests { .unwrap(); assert_eq!(versions.len(), 1); assert_eq!(versions[0].size, 8); + + let invalid_version = format!("../other/{}", versions[0].version_id); + let result = backend + .get_object_version("test-bucket", "file.txt", &invalid_version) + .await; + assert!(matches!(result, Err(StorageError::VersionNotFound { .. }))); } #[tokio::test] diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs b/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs index 32ee5cb..af069b9 100644 --- a/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs +++ b/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs @@ -34,8 +34,43 @@ pub trait StorageEngine: Send + Sync { async fn head_object(&self, bucket: &str, key: &str) -> StorageResult; + async fn get_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<(ObjectMeta, AsyncReadStream)>; + + async fn get_object_version_path( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult; + + async fn head_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult; + + async fn get_object_version_metadata( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult>; + async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>; + async fn delete_object_version( + &self, + bucket: &str, + key: &str, + version_id: &str, + ) -> StorageResult<()>; + async fn copy_object( &self, src_bucket: &str, @@ -120,6 +155,12 @@ pub trait StorageEngine: Send + Sync { key: &str, ) -> StorageResult>; + async fn list_bucket_object_versions( + &self, + bucket: &str, + prefix: Option<&str>, + ) -> StorageResult>; + async fn get_object_tags(&self, bucket: &str, key: &str) -> StorageResult>; async fn set_object_tags(&self, bucket: &str, key: &str, tags: &[Tag]) -> StorageResult<()>;