diff --git a/README.md b/README.md
index b9a6318..8d59b81 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
MyFSIO is an S3-compatible object storage server with a Rust runtime and a filesystem-backed storage engine. The active server lives under `rust/myfsio-engine` and serves both the S3 API and the built-in web UI from a single process.
-The repository still contains a `python/` tree, but you do not need Python to run the current server.
+The `python/` implementation is deprecated as of 2026-04-21. It remains in the repository for migration reference and legacy tests, but new development and supported runtime usage should target the Rust server.
## Features
diff --git a/docs.md b/docs.md
index b2438b7..7b236a1 100644
--- a/docs.md
+++ b/docs.md
@@ -2,6 +2,8 @@
This document describes the current Rust server in `rust/myfsio-engine`. It replaces the older Python-oriented runbook.
+The `python/` implementation is deprecated as of 2026-04-21. It is retained for migration reference and legacy validation only; production usage and new development should use the Rust server.
+
## 1. What Changed
The active runtime is now Rust:
@@ -11,7 +13,7 @@ The active runtime is now Rust:
- The main development workflow is `cargo run -p myfsio-server --`.
- API-only mode is controlled with `UI_ENABLED=false`.
-The `python/` directory may still contain older implementation code, templates, and tests, but it is not required to run the current server.
+The deprecated `python/` directory may still contain older implementation code, templates, and tests, but it is not required to run the current server.
## 2. Quick Start
diff --git a/python/README.md b/python/README.md
new file mode 100644
index 0000000..853b88a
--- /dev/null
+++ b/python/README.md
@@ -0,0 +1,14 @@
+# Deprecated Python Implementation
+
+The Python implementation of MyFSIO is deprecated as of 2026-04-21.
+
+The supported server runtime now lives in `../rust/myfsio-engine` and serves the S3 API and web UI from the Rust `myfsio-server` binary. Keep this tree for migration reference, compatibility checks, and legacy tests only.
+
+For normal development and operations, run:
+
+```bash
+cd ../rust/myfsio-engine
+cargo run -p myfsio-server --
+```
+
+Do not add new product features to the Python implementation unless they are needed to unblock a migration or compare behavior with the Rust server.
diff --git a/python/app/__init__.py b/python/app/__init__.py
index f37a33a..fc465fb 100644
--- a/python/app/__init__.py
+++ b/python/app/__init__.py
@@ -720,7 +720,7 @@ def _configure_logging(app: Flask) -> None:
def _website_error_response(status_code, message):
if status_code == 404:
- body = "404 page not found"
+ body = "
404 page not found "
else:
body = f"{status_code} {message}"
return Response(body, status=status_code, mimetype="text/html")
diff --git a/python/run.py b/python/run.py
index 40ab540..1e3c0c0 100644
--- a/python/run.py
+++ b/python/run.py
@@ -28,6 +28,11 @@ from app.config import AppConfig
from app.iam import IamService, IamError, ALLOWED_ACTIONS, _derive_fernet_key
from app.version import get_version
+PYTHON_DEPRECATION_MESSAGE = (
+ "The Python MyFSIO runtime is deprecated as of 2026-04-21. "
+ "Use the Rust server in rust/myfsio-engine for supported development and production usage."
+)
+
def _server_host() -> str:
"""Return the bind host for API and UI servers."""
@@ -233,6 +238,8 @@ if __name__ == "__main__":
parser.add_argument("--version", action="version", version=f"MyFSIO {get_version()}")
args = parser.parse_args()
+ warnings.warn(PYTHON_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=1)
+
if args.reset_cred or args.mode == "reset-cred":
reset_credentials()
sys.exit(0)
diff --git a/python/tests/test_website_hosting.py b/python/tests/test_website_hosting.py
index ae000b9..a12e65f 100644
--- a/python/tests/test_website_hosting.py
+++ b/python/tests/test_website_hosting.py
@@ -439,4 +439,4 @@ class TestWebsiteServing:
store.set_mapping("noerr.example.com", "no-err")
resp = website_client.get("/missing.html", headers={"Host": "noerr.example.com"})
assert resp.status_code == 404
- assert resp.data == b"404 page not found"
+ assert resp.data == b"404 page not found "
diff --git a/rust/myfsio-engine/Cargo.lock b/rust/myfsio-engine/Cargo.lock
index d8c061c..1c0817f 100644
--- a/rust/myfsio-engine/Cargo.lock
+++ b/rust/myfsio-engine/Cargo.lock
@@ -2707,6 +2707,7 @@ dependencies = [
"futures",
"http-body-util",
"hyper 1.9.0",
+ "md-5 0.10.6",
"mime_guess",
"multer",
"myfsio-auth",
@@ -2723,6 +2724,7 @@ dependencies = [
"roxmltree",
"serde",
"serde_json",
+ "sha2 0.10.9",
"subtle",
"sysinfo",
"tempfile",
diff --git a/rust/myfsio-engine/crates/myfsio-common/src/error.rs b/rust/myfsio-engine/crates/myfsio-common/src/error.rs
index 64e9110..5785d5f 100644
--- a/rust/myfsio-engine/crates/myfsio-common/src/error.rs
+++ b/rust/myfsio-engine/crates/myfsio-common/src/error.rs
@@ -3,6 +3,7 @@ use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum S3ErrorCode {
AccessDenied,
+ BadDigest,
BucketAlreadyExists,
BucketNotEmpty,
EntityTooLarge,
@@ -14,6 +15,7 @@ pub enum S3ErrorCode {
InvalidPolicyDocument,
InvalidRange,
InvalidRequest,
+ InvalidTag,
MalformedXML,
MethodNotAllowed,
NoSuchBucket,
@@ -32,6 +34,7 @@ impl S3ErrorCode {
pub fn http_status(&self) -> u16 {
match self {
Self::AccessDenied => 403,
+ Self::BadDigest => 400,
Self::BucketAlreadyExists => 409,
Self::BucketNotEmpty => 409,
Self::EntityTooLarge => 413,
@@ -43,6 +46,7 @@ impl S3ErrorCode {
Self::InvalidPolicyDocument => 400,
Self::InvalidRange => 416,
Self::InvalidRequest => 400,
+ Self::InvalidTag => 400,
Self::MalformedXML => 400,
Self::MethodNotAllowed => 405,
Self::NoSuchBucket => 404,
@@ -61,6 +65,7 @@ impl S3ErrorCode {
pub fn as_str(&self) -> &'static str {
match self {
Self::AccessDenied => "AccessDenied",
+ Self::BadDigest => "BadDigest",
Self::BucketAlreadyExists => "BucketAlreadyExists",
Self::BucketNotEmpty => "BucketNotEmpty",
Self::EntityTooLarge => "EntityTooLarge",
@@ -72,6 +77,7 @@ impl S3ErrorCode {
Self::InvalidPolicyDocument => "InvalidPolicyDocument",
Self::InvalidRange => "InvalidRange",
Self::InvalidRequest => "InvalidRequest",
+ Self::InvalidTag => "InvalidTag",
Self::MalformedXML => "MalformedXML",
Self::MethodNotAllowed => "MethodNotAllowed",
Self::NoSuchBucket => "NoSuchBucket",
@@ -90,6 +96,7 @@ impl S3ErrorCode {
pub fn default_message(&self) -> &'static str {
match self {
Self::AccessDenied => "Access Denied",
+ Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received",
Self::BucketAlreadyExists => "The requested bucket name is not available",
Self::BucketNotEmpty => "The bucket you tried to delete is not empty",
Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size",
@@ -101,6 +108,7 @@ impl S3ErrorCode {
Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document",
Self::InvalidRange => "The requested range is not satisfiable",
Self::InvalidRequest => "Invalid request",
+ Self::InvalidTag => "The Tagging header is invalid",
Self::MalformedXML => "The XML you provided was not well-formed",
Self::MethodNotAllowed => "The specified method is not allowed against this resource",
Self::NoSuchBucket => "The specified bucket does not exist",
diff --git a/rust/myfsio-engine/crates/myfsio-common/src/types.rs b/rust/myfsio-engine/crates/myfsio-common/src/types.rs
index a07d565..91195f7 100644
--- a/rust/myfsio-engine/crates/myfsio-common/src/types.rs
+++ b/rust/myfsio-engine/crates/myfsio-common/src/types.rs
@@ -112,6 +112,8 @@ pub struct VersionInfo {
pub last_modified: DateTime,
pub etag: Option,
pub is_latest: bool,
+ #[serde(default)]
+ pub is_delete_marker: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml
index 6a1092b..27d8f75 100644
--- a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml
+++ b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml
@@ -10,6 +10,7 @@ myfsio-crypto = { path = "../myfsio-crypto" }
myfsio-storage = { path = "../myfsio-storage" }
myfsio-xml = { path = "../myfsio-xml" }
base64 = { workspace = true }
+md-5 = { workspace = true }
axum = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
@@ -29,6 +30,7 @@ percent-encoding = { workspace = true }
quick-xml = { workspace = true }
mime_guess = "2"
crc32fast = { workspace = true }
+sha2 = { workspace = true }
duckdb = { workspace = true }
roxmltree = "0.20"
parking_lot = { workspace = true }
diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs
index 973c4db..5bb6163 100644
--- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs
@@ -1038,7 +1038,12 @@ fn s3_error_response(code: S3ErrorCode, message: &str, status: StatusCode) -> Re
(status, [("content-type", "application/xml")], err.to_xml()).into_response()
}
-pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response {
+pub async fn list_object_versions(
+ state: &AppState,
+ bucket: &str,
+ prefix: Option<&str>,
+ max_keys: usize,
+) -> Response {
match state.storage.list_buckets().await {
Ok(buckets) => {
if !buckets.iter().any(|b| b.name == bucket) {
@@ -1050,13 +1055,24 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response {
Err(e) => return storage_err(e),
}
+ let fetch_limit = max_keys.saturating_add(1).max(1);
let params = myfsio_common::types::ListParams {
- max_keys: 1000,
+ max_keys: fetch_limit,
+ prefix: prefix.map(ToOwned::to_owned),
..Default::default()
};
- let objects = match state.storage.list_objects(bucket, ¶ms).await {
- Ok(result) => result.objects,
+ let object_result = match state.storage.list_objects(bucket, ¶ms).await {
+ Ok(result) => result,
+ Err(e) => return storage_err(e),
+ };
+ let objects = object_result.objects;
+ let archived_versions = match state
+ .storage
+ .list_bucket_object_versions(bucket, prefix)
+ .await
+ {
+ Ok(versions) => versions,
Err(e) => return storage_err(e),
};
@@ -1064,11 +1080,24 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response {
"\
",
);
- xml.push_str(&format!("{} ", bucket));
+ xml.push_str(&format!("{} ", xml_escape(bucket)));
+ xml.push_str(&format!(
+ "{} ",
+ xml_escape(prefix.unwrap_or(""))
+ ));
+ xml.push_str(&format!("{} ", max_keys));
- for obj in &objects {
+ let current_count = objects.len().min(max_keys);
+ let remaining = max_keys.saturating_sub(current_count);
+ let archived_count = archived_versions.len().min(remaining);
+ let is_truncated = object_result.is_truncated
+ || objects.len() > current_count
+ || archived_versions.len() > archived_count;
+ xml.push_str(&format!("{} ", is_truncated));
+
+ for obj in objects.iter().take(current_count) {
xml.push_str("");
- xml.push_str(&format!("{} ", obj.key));
+ xml.push_str(&format!("{} ", xml_escape(&obj.key)));
xml.push_str("null ");
xml.push_str("true ");
xml.push_str(&format!(
@@ -1076,9 +1105,32 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response {
myfsio_xml::response::format_s3_datetime(&obj.last_modified)
));
if let Some(ref etag) = obj.etag {
- xml.push_str(&format!("\"{}\" ", etag));
+ xml.push_str(&format!("\"{}\" ", xml_escape(etag)));
}
xml.push_str(&format!("{} ", obj.size));
+ xml.push_str(&format!(
+ "{} ",
+ xml_escape(obj.storage_class.as_deref().unwrap_or("STANDARD"))
+ ));
+ xml.push_str(" ");
+ }
+
+ for version in archived_versions.iter().take(archived_count) {
+ xml.push_str("");
+ xml.push_str(&format!("{} ", xml_escape(&version.key)));
+ xml.push_str(&format!(
+ "{} ",
+ xml_escape(&version.version_id)
+ ));
+ xml.push_str("false ");
+ xml.push_str(&format!(
+ "{} ",
+ myfsio_xml::response::format_s3_datetime(&version.last_modified)
+ ));
+ if let Some(ref etag) = version.etag {
+ xml.push_str(&format!("\"{}\" ", xml_escape(etag)));
+ }
+ xml.push_str(&format!("{} ", version.size));
xml.push_str("STANDARD ");
xml.push_str(" ");
}
diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs
index de6f02e..45d1b51 100644
--- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs
@@ -13,10 +13,13 @@ use axum::body::Body;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
-use base64::engine::general_purpose::URL_SAFE;
+use base64::engine::general_purpose::{STANDARD, URL_SAFE};
use base64::Engine;
use chrono::{DateTime, Utc};
+use md5::Md5;
+use percent_encoding::percent_decode_str;
use serde_json::json;
+use sha2::{Digest, Sha256};
use myfsio_common::error::{S3Error, S3ErrorCode};
use myfsio_common::types::PartInfo;
@@ -90,7 +93,44 @@ async fn ensure_object_lock_allows_write(
}
}
-pub async fn list_buckets(State(state): State) -> Response {
+async fn ensure_object_version_lock_allows_delete(
+ state: &AppState,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ headers: &HeaderMap,
+) -> Result<(), Response> {
+ let metadata = match state
+ .storage
+ .get_object_version_metadata(bucket, key, version_id)
+ .await
+ {
+ Ok(metadata) => metadata,
+ Err(err) => return Err(storage_err_response(err)),
+ };
+ let bypass_governance = headers
+ .get("x-amz-bypass-governance-retention")
+ .and_then(|value| value.to_str().ok())
+ .map(|value| value.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+ if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) {
+ return Err(s3_error_response(S3Error::new(
+ S3ErrorCode::AccessDenied,
+ message,
+ )));
+ }
+ Ok(())
+}
+
+pub async fn list_buckets(
+ State(state): State,
+ Query(query): Query,
+ headers: HeaderMap,
+) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ return get_bucket(State(state), Path(host_bucket), Query(query), headers).await;
+ }
+
match state.storage.list_buckets().await {
Ok(buckets) => {
let xml = myfsio_xml::response::list_buckets_xml("myfsio", "myfsio", &buckets);
@@ -117,8 +157,22 @@ pub async fn create_bucket(
State(state): State,
Path(bucket): Path,
Query(query): Query,
+ headers: HeaderMap,
body: Body,
) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ if host_bucket != bucket {
+ return put_object(
+ State(state),
+ Path((host_bucket, bucket)),
+ Query(ObjectQuery::default()),
+ headers,
+ body,
+ )
+ .await;
+ }
+ }
+
if query.quota.is_some() {
return config::put_quota(&state, &bucket, body).await;
}
@@ -205,11 +259,41 @@ pub struct BucketQuery {
pub versions: Option,
}
+async fn virtual_host_bucket_from_headers(state: &AppState, headers: &HeaderMap) -> Option {
+ let host = headers
+ .get("host")
+ .and_then(|value| value.to_str().ok())
+ .and_then(|value| value.split(':').next())?
+ .trim()
+ .to_ascii_lowercase();
+ let (candidate, _) = host.split_once('.')?;
+ if myfsio_storage::validation::validate_bucket_name(candidate).is_some() {
+ return None;
+ }
+ match state.storage.bucket_exists(candidate).await {
+ Ok(true) => Some(candidate.to_string()),
+ _ => None,
+ }
+}
+
pub async fn get_bucket(
State(state): State,
Path(bucket): Path,
Query(query): Query,
+ headers: HeaderMap,
) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ if host_bucket != bucket {
+ return get_object(
+ State(state),
+ Path((host_bucket, bucket)),
+ Query(ObjectQuery::default()),
+ headers,
+ )
+ .await;
+ }
+ }
+
if !matches!(state.storage.bucket_exists(&bucket).await, Ok(true)) {
return storage_err_response(myfsio_storage::error::StorageError::BucketNotFound(bucket));
}
@@ -260,7 +344,13 @@ pub async fn get_bucket(
return config::get_logging(&state, &bucket).await;
}
if query.versions.is_some() {
- return config::list_object_versions(&state, &bucket).await;
+ return config::list_object_versions(
+ &state,
+ &bucket,
+ query.prefix.as_deref(),
+ query.max_keys.unwrap_or(1000),
+ )
+ .await;
}
if query.uploads.is_some() {
return list_multipart_uploads_handler(&state, &bucket).await;
@@ -408,6 +498,19 @@ pub async fn post_bucket(
headers: HeaderMap,
body: Body,
) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ if host_bucket != bucket {
+ return post_object(
+ State(state),
+ Path((host_bucket, bucket)),
+ Query(ObjectQuery::default()),
+ headers,
+ body,
+ )
+ .await;
+ }
+ }
+
if query.delete.is_some() {
return delete_objects_handler(&state, &bucket, body).await;
}
@@ -425,7 +528,20 @@ pub async fn delete_bucket(
State(state): State,
Path(bucket): Path,
Query(query): Query,
+ headers: HeaderMap,
) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ if host_bucket != bucket {
+ return delete_object(
+ State(state),
+ Path((host_bucket, bucket)),
+ Query(ObjectQuery::default()),
+ headers,
+ )
+ .await;
+ }
+ }
+
if query.quota.is_some() {
return config::delete_quota(&state, &bucket).await;
}
@@ -466,7 +582,23 @@ pub async fn delete_bucket(
}
}
-pub async fn head_bucket(State(state): State, Path(bucket): Path) -> Response {
+pub async fn head_bucket(
+ State(state): State,
+ Path(bucket): Path,
+ headers: HeaderMap,
+) -> Response {
+ if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
+ if host_bucket != bucket {
+ return head_object(
+ State(state),
+ Path((host_bucket, bucket)),
+ Query(ObjectQuery::default()),
+ headers,
+ )
+ .await;
+ }
+ }
+
match state.storage.bucket_exists(&bucket).await {
Ok(true) => {
let mut headers = HeaderMap::new();
@@ -489,6 +621,8 @@ pub struct ObjectQuery {
pub upload_id: Option,
#[serde(rename = "partNumber")]
pub part_number: Option,
+ #[serde(rename = "versionId")]
+ pub version_id: Option,
pub tagging: Option,
pub acl: Option,
pub retention: Option,
@@ -583,6 +717,314 @@ fn insert_content_type(headers: &mut HeaderMap, key: &str, explicit: Option<&str
}
}
+fn internal_header_pairs() -> &'static [(&'static str, &'static str, &'static str)] {
+ &[
+ ("cache-control", "__cache_control__", "cache-control"),
+ (
+ "content-disposition",
+ "__content_disposition__",
+ "content-disposition",
+ ),
+ (
+ "content-language",
+ "__content_language__",
+ "content-language",
+ ),
+ (
+ "content-encoding",
+ "__content_encoding__",
+ "content-encoding",
+ ),
+ ("expires", "__expires__", "expires"),
+ (
+ "x-amz-website-redirect-location",
+ "__website_redirect_location__",
+ "x-amz-website-redirect-location",
+ ),
+ ]
+}
+
+fn decoded_content_encoding(value: &str) -> Option {
+ let filtered: Vec<&str> = value
+ .split(',')
+ .map(str::trim)
+ .filter(|part| !part.is_empty() && !part.eq_ignore_ascii_case("aws-chunked"))
+ .collect();
+ if filtered.is_empty() {
+ None
+ } else {
+ Some(filtered.join(", "))
+ }
+}
+
+fn insert_standard_object_metadata(
+ headers: &HeaderMap,
+ metadata: &mut HashMap,
+) -> Result<(), Response> {
+ for (request_header, metadata_key, _) in internal_header_pairs() {
+ if let Some(value) = headers.get(*request_header).and_then(|v| v.to_str().ok()) {
+ if *request_header == "content-encoding" {
+ if let Some(decoded_encoding) = decoded_content_encoding(value) {
+ metadata.insert((*metadata_key).to_string(), decoded_encoding);
+ }
+ } else {
+ metadata.insert((*metadata_key).to_string(), value.to_string());
+ }
+ }
+ }
+ if let Some(value) = headers
+ .get("x-amz-storage-class")
+ .and_then(|v| v.to_str().ok())
+ {
+ metadata.insert("__storage_class__".to_string(), value.to_ascii_uppercase());
+ }
+
+ if let Some(value) = headers
+ .get("x-amz-object-lock-legal-hold")
+ .and_then(|v| v.to_str().ok())
+ {
+ object_lock::set_legal_hold(metadata, value.eq_ignore_ascii_case("ON"));
+ }
+
+ let retention_mode = headers
+ .get("x-amz-object-lock-mode")
+ .and_then(|v| v.to_str().ok());
+ let retain_until = headers
+ .get("x-amz-object-lock-retain-until-date")
+ .and_then(|v| v.to_str().ok());
+ if let (Some(mode), Some(retain_until)) = (retention_mode, retain_until) {
+ let mode = match mode.to_ascii_uppercase().as_str() {
+ "GOVERNANCE" => object_lock::RetentionMode::GOVERNANCE,
+ "COMPLIANCE" => object_lock::RetentionMode::COMPLIANCE,
+ _ => {
+ return Err(s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-object-lock-mode",
+ )))
+ }
+ };
+ let retain_until_date = DateTime::parse_from_rfc3339(retain_until)
+ .map(|value| value.with_timezone(&Utc))
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-object-lock-retain-until-date",
+ ))
+ })?;
+ object_lock::set_object_retention(
+ metadata,
+ &object_lock::ObjectLockRetention {
+ mode,
+ retain_until_date,
+ },
+ )
+ .map_err(|message| {
+ s3_error_response(S3Error::new(S3ErrorCode::InvalidArgument, message))
+ })?;
+ }
+ Ok(())
+}
+
+fn apply_stored_response_headers(headers: &mut HeaderMap, metadata: &HashMap) {
+ for (_, metadata_key, response_header) in internal_header_pairs() {
+ if let Some(value) = metadata
+ .get(*metadata_key)
+ .and_then(|value| value.parse().ok())
+ {
+ headers.insert(*response_header, value);
+ }
+ }
+ if let Some(value) = metadata
+ .get("__storage_class__")
+ .and_then(|value| value.parse().ok())
+ {
+ headers.insert("x-amz-storage-class", value);
+ }
+}
+
+fn apply_user_metadata(headers: &mut HeaderMap, metadata: &HashMap) {
+ for (k, v) in metadata {
+ if let Ok(header_val) = v.parse() {
+ let header_name = format!("x-amz-meta-{}", k);
+ if let Ok(name) = header_name.parse::() {
+ headers.insert(name, header_val);
+ }
+ }
+ }
+}
+
+fn is_null_version(version_id: Option<&str>) -> bool {
+ version_id.is_none_or(|value| value == "null")
+}
+
+fn bad_digest_response(message: impl Into) -> Response {
+ s3_error_response(S3Error::new(S3ErrorCode::BadDigest, message))
+}
+
+fn base64_header_bytes(headers: &HeaderMap, name: &str) -> Result>, Response> {
+ let Some(value) = headers.get(name).and_then(|v| v.to_str().ok()) else {
+ return Ok(None);
+ };
+ STANDARD
+ .decode(value.trim())
+ .map(Some)
+ .map_err(|_| bad_digest_response(format!("Invalid base64 value for {}", name)))
+}
+
+fn has_upload_checksum(headers: &HeaderMap) -> bool {
+ headers.contains_key("content-md5")
+ || headers.contains_key("x-amz-checksum-sha256")
+ || headers.contains_key("x-amz-checksum-crc32")
+}
+
+fn validate_upload_checksums(headers: &HeaderMap, data: &[u8]) -> Result<(), Response> {
+ if let Some(expected) = base64_header_bytes(headers, "content-md5")? {
+ if expected.len() != 16 || Md5::digest(data).as_slice() != expected.as_slice() {
+ return Err(bad_digest_response(
+ "The Content-MD5 you specified did not match what we received",
+ ));
+ }
+ }
+
+ if let Some(expected) = base64_header_bytes(headers, "x-amz-checksum-sha256")? {
+ if Sha256::digest(data).as_slice() != expected.as_slice() {
+ return Err(bad_digest_response(
+ "The x-amz-checksum-sha256 you specified did not match what we received",
+ ));
+ }
+ }
+
+ if let Some(expected) = base64_header_bytes(headers, "x-amz-checksum-crc32")? {
+ let actual = crc32fast::hash(data).to_be_bytes();
+ if expected.as_slice() != actual {
+ return Err(bad_digest_response(
+ "The x-amz-checksum-crc32 you specified did not match what we received",
+ ));
+ }
+ }
+
+ Ok(())
+}
+
+async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, Response> {
+ if aws_chunked {
+ let mut reader = chunked::decode_body(body);
+ let mut data = Vec::new();
+ reader.read_to_end(&mut data).await.map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidRequest,
+ "Failed to read aws-chunked request body",
+ ))
+ })?;
+ return Ok(data);
+ }
+
+ http_body_util::BodyExt::collect(body)
+ .await
+ .map(|collected| collected.to_bytes().to_vec())
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidRequest,
+ "Failed to read request body",
+ ))
+ })
+}
+
+fn parse_tagging_header(value: &str) -> Result, Response> {
+ let mut tags = Vec::new();
+ if value.trim().is_empty() {
+ return Ok(tags);
+ }
+
+ for pair in value.split('&') {
+ let (raw_key, raw_value) = pair.split_once('=').ok_or_else(|| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidTag,
+ "The x-amz-tagging header must use query-string key=value pairs",
+ ))
+ })?;
+ let key = percent_decode_str(raw_key)
+ .decode_utf8()
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidTag,
+ "Tag keys must be valid UTF-8",
+ ))
+ })?
+ .to_string();
+ let value = percent_decode_str(raw_value)
+ .decode_utf8()
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidTag,
+ "Tag values must be valid UTF-8",
+ ))
+ })?
+ .to_string();
+ tags.push(myfsio_common::types::Tag { key, value });
+ }
+
+ Ok(tags)
+}
+
+fn parse_copy_source(copy_source: &str) -> Result<(String, String, Option), Response> {
+ let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
+ let (bucket_raw, key_and_query) = source.split_once('/').ok_or_else(|| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-copy-source",
+ ))
+ })?;
+ let (key_raw, query) = key_and_query
+ .split_once('?')
+ .map(|(key, query)| (key, Some(query)))
+ .unwrap_or((key_and_query, None));
+
+ let bucket = percent_decode_str(bucket_raw)
+ .decode_utf8()
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-copy-source bucket encoding",
+ ))
+ })?
+ .to_string();
+ let key = percent_decode_str(key_raw)
+ .decode_utf8()
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-copy-source key encoding",
+ ))
+ })?
+ .to_string();
+
+ let mut version_id = None;
+ if let Some(query) = query {
+ for pair in query.split('&') {
+ let Some((name, value)) = pair.split_once('=') else {
+ continue;
+ };
+ if name == "versionId" {
+ version_id = Some(
+ percent_decode_str(value)
+ .decode_utf8()
+ .map_err(|_| {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Invalid x-amz-copy-source versionId encoding",
+ ))
+ })?
+ .to_string(),
+ );
+ break;
+ }
+ }
+ }
+
+ Ok((bucket, key, version_id))
+}
+
pub async fn put_object(
State(state): State,
Path((bucket, key)): Path<(String, String)>,
@@ -647,6 +1089,9 @@ pub async fn put_object(
{
return response;
}
+ if let Some(response) = evaluate_put_preconditions(&state, &bucket, &key, &headers).await {
+ return response;
+ }
let content_type = guessed_content_type(
&key,
@@ -655,6 +1100,9 @@ pub async fn put_object(
let mut metadata = HashMap::new();
metadata.insert("__content_type__".to_string(), content_type);
+ if let Err(response) = insert_standard_object_metadata(&headers, &mut metadata) {
+ return response;
+ }
for (name, value) in headers.iter() {
let name_str = name.as_str();
@@ -665,7 +1113,27 @@ pub async fn put_object(
}
}
- let boxed: myfsio_storage::traits::AsyncReadStream = if is_aws_chunked(&headers) {
+ let tags = match headers
+ .get("x-amz-tagging")
+ .and_then(|value| value.to_str().ok())
+ .map(parse_tagging_header)
+ .transpose()
+ {
+ Ok(tags) => tags,
+ Err(response) => return response,
+ };
+
+ let aws_chunked = is_aws_chunked(&headers);
+ let boxed: myfsio_storage::traits::AsyncReadStream = if has_upload_checksum(&headers) {
+ let data = match collect_upload_body(body, aws_chunked).await {
+ Ok(data) => data,
+ Err(response) => return response,
+ };
+ if let Err(response) = validate_upload_checksums(&headers, &data) {
+ return response;
+ }
+ Box::pin(std::io::Cursor::new(data))
+ } else if aws_chunked {
Box::pin(chunked::decode_body(body))
} else {
let stream = tokio_util::io::StreamReader::new(
@@ -682,6 +1150,11 @@ pub async fn put_object(
.await
{
Ok(meta) => {
+ if let Some(ref tags) = tags {
+ if let Err(e) = state.storage.set_object_tags(&bucket, &key, tags).await {
+ return storage_err_response(e);
+ }
+ }
if let Some(enc_ctx) = resolve_encryption_context(&state, &bucket, &headers).await {
if let Some(ref enc_svc) = state.encryption {
let obj_path = match state.storage.get_object_path(&bucket, &key).await {
@@ -801,9 +1274,23 @@ pub async fn get_object(
return list_parts_handler(&state, &bucket, &key, upload_id).await;
}
- let head_meta = match state.storage.head_object(&bucket, &key).await {
- Ok(m) => m,
- Err(e) => return storage_err_response(e),
+ let version_id = query
+ .version_id
+ .as_deref()
+ .filter(|value| !is_null_version(Some(*value)));
+ let head_meta = match version_id {
+ Some(version_id) => match state
+ .storage
+ .head_object_version(&bucket, &key, version_id)
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
+ None => match state.storage.head_object(&bucket, &key).await {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
};
if let Some(resp) = evaluate_get_preconditions(&headers, &head_meta) {
return resp;
@@ -818,17 +1305,34 @@ pub async fn get_object(
return range_get_handler(&state, &bucket, &key, range_str, &query).await;
}
- let all_meta = state
- .storage
- .get_object_metadata(&bucket, &key)
- .await
- .unwrap_or_default();
+ let all_meta = match version_id {
+ Some(version_id) => state
+ .storage
+ .get_object_version_metadata(&bucket, &key, version_id)
+ .await
+ .unwrap_or_default(),
+ None => state
+ .storage
+ .get_object_metadata(&bucket, &key)
+ .await
+ .unwrap_or_default(),
+ };
let enc_meta = myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&all_meta);
if let (Some(ref enc_info), Some(ref enc_svc)) = (&enc_meta, &state.encryption) {
- let obj_path = match state.storage.get_object_path(&bucket, &key).await {
- Ok(p) => p,
- Err(e) => return storage_err_response(e),
+ let obj_path = match version_id {
+ Some(version_id) => match state
+ .storage
+ .get_object_version_path(&bucket, &key, version_id)
+ .await
+ {
+ Ok(p) => p,
+ Err(e) => return storage_err_response(e),
+ },
+ None => match state.storage.get_object_path(&bucket, &key).await {
+ Ok(p) => p,
+ Err(e) => return storage_err_response(e),
+ },
};
let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp");
let _ = tokio::fs::create_dir_all(&tmp_dir).await;
@@ -886,22 +1390,31 @@ pub async fn get_object(
"x-amz-server-side-encryption",
enc_info.algorithm.parse().unwrap(),
);
-
- for (k, v) in &meta.metadata {
- if let Ok(header_val) = v.parse() {
- let header_name = format!("x-amz-meta-{}", k);
- if let Ok(name) = header_name.parse::() {
- resp_headers.insert(name, header_val);
- }
+ apply_stored_response_headers(&mut resp_headers, &all_meta);
+ if let Some(ref requested_version) = query.version_id {
+ if let Ok(value) = requested_version.parse() {
+ resp_headers.insert("x-amz-version-id", value);
}
}
+ apply_user_metadata(&mut resp_headers, &meta.metadata);
+
apply_response_overrides(&mut resp_headers, &query);
return (StatusCode::OK, resp_headers, body).into_response();
}
- match state.storage.get_object(&bucket, &key).await {
+ let object_result = match version_id {
+ Some(version_id) => {
+ state
+ .storage
+ .get_object_version(&bucket, &key, version_id)
+ .await
+ }
+ None => state.storage.get_object(&bucket, &key).await,
+ };
+
+ match object_result {
Ok((meta, reader)) => {
let stream = ReaderStream::new(reader);
let body = Body::from_stream(stream);
@@ -921,16 +1434,15 @@ pub async fn get_object(
.unwrap(),
);
headers.insert("accept-ranges", "bytes".parse().unwrap());
-
- for (k, v) in &meta.metadata {
- if let Ok(header_val) = v.parse() {
- let header_name = format!("x-amz-meta-{}", k);
- if let Ok(name) = header_name.parse::() {
- headers.insert(name, header_val);
- }
+ apply_stored_response_headers(&mut headers, &all_meta);
+ if let Some(ref requested_version) = query.version_id {
+ if let Ok(value) = requested_version.parse() {
+ headers.insert("x-amz-version-id", value);
}
}
+ apply_user_metadata(&mut headers, &meta.metadata);
+
apply_response_overrides(&mut headers, &query);
(StatusCode::OK, headers, body).into_response()
@@ -978,6 +1490,35 @@ pub async fn delete_object(
return abort_multipart_handler(&state, &bucket, upload_id).await;
}
+ if let Some(version_id) = query
+ .version_id
+ .as_deref()
+ .filter(|value| !is_null_version(Some(*value)))
+ {
+ if let Err(response) =
+ ensure_object_version_lock_allows_delete(&state, &bucket, &key, version_id, &headers)
+ .await
+ {
+ return response;
+ }
+ return match state
+ .storage
+ .delete_object_version(&bucket, &key, version_id)
+ .await
+ {
+ Ok(()) => {
+ let mut resp_headers = HeaderMap::new();
+ if let Ok(value) = version_id.parse() {
+ resp_headers.insert("x-amz-version-id", value);
+ }
+ notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
+ trigger_replication(&state, &bucket, &key, "delete");
+ (StatusCode::NO_CONTENT, resp_headers).into_response()
+ }
+ Err(e) => storage_err_response(e),
+ };
+ }
+
if let Err(response) =
ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await
{
@@ -997,13 +1538,40 @@ pub async fn delete_object(
pub async fn head_object(
State(state): State,
Path((bucket, key)): Path<(String, String)>,
+ Query(query): Query,
headers: HeaderMap,
) -> Response {
- match state.storage.head_object(&bucket, &key).await {
+ let version_id = query
+ .version_id
+ .as_deref()
+ .filter(|value| !is_null_version(Some(*value)));
+ let result = match version_id {
+ Some(version_id) => {
+ state
+ .storage
+ .head_object_version(&bucket, &key, version_id)
+ .await
+ }
+ None => state.storage.head_object(&bucket, &key).await,
+ };
+
+ match result {
Ok(meta) => {
if let Some(resp) = evaluate_get_preconditions(&headers, &meta) {
return resp;
}
+ let all_meta = match version_id {
+ Some(version_id) => state
+ .storage
+ .get_object_version_metadata(&bucket, &key, version_id)
+ .await
+ .unwrap_or_default(),
+ None => state
+ .storage
+ .get_object_metadata(&bucket, &key)
+ .await
+ .unwrap_or_default(),
+ };
let mut headers = HeaderMap::new();
headers.insert("content-length", meta.size.to_string().parse().unwrap());
if let Some(ref etag) = meta.etag {
@@ -1019,16 +1587,15 @@ pub async fn head_object(
.unwrap(),
);
headers.insert("accept-ranges", "bytes".parse().unwrap());
-
- for (k, v) in &meta.metadata {
- if let Ok(header_val) = v.parse() {
- let header_name = format!("x-amz-meta-{}", k);
- if let Ok(name) = header_name.parse::() {
- headers.insert(name, header_val);
- }
+ apply_stored_response_headers(&mut headers, &all_meta);
+ if let Some(ref requested_version) = query.version_id {
+ if let Ok(value) = requested_version.parse() {
+ headers.insert("x-amz-version-id", value);
}
}
+ apply_user_metadata(&mut headers, &meta.metadata);
+
(StatusCode::OK, headers).into_response()
}
Err(e) => storage_err_response(e),
@@ -1311,30 +1878,70 @@ async fn copy_object_handler(
return response;
}
- let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
- let (src_bucket, src_key) = match source.split_once('/') {
- Some(parts) => parts,
- None => {
- return s3_error_response(S3Error::new(
- myfsio_common::error::S3ErrorCode::InvalidArgument,
- "Invalid x-amz-copy-source",
- ));
- }
+ let (src_bucket, src_key, src_version_id) = match parse_copy_source(copy_source) {
+ Ok(parts) => parts,
+ Err(response) => return response,
};
- let source_meta = match state.storage.head_object(src_bucket, src_key).await {
- Ok(m) => m,
- Err(e) => return storage_err_response(e),
+ let source_meta = match src_version_id.as_deref() {
+ Some(version_id) if version_id != "null" => match state
+ .storage
+ .head_object_version(&src_bucket, &src_key, version_id)
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
+ _ => match state.storage.head_object(&src_bucket, &src_key).await {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
};
if let Some(resp) = evaluate_copy_preconditions(headers, &source_meta) {
return resp;
}
- match state
- .storage
- .copy_object(src_bucket, src_key, dst_bucket, dst_key)
- .await
+ let copy_result = if let Some(version_id) = src_version_id
+ .as_deref()
+ .filter(|value| !is_null_version(Some(*value)))
{
+ let (_meta, mut reader) = match state
+ .storage
+ .get_object_version(&src_bucket, &src_key, version_id)
+ .await
+ {
+ Ok(result) => result,
+ Err(e) => return storage_err_response(e),
+ };
+ let mut data = Vec::new();
+ if let Err(e) = reader.read_to_end(&mut data).await {
+ return storage_err_response(myfsio_storage::error::StorageError::Io(e));
+ }
+ let metadata = match state
+ .storage
+ .get_object_version_metadata(&src_bucket, &src_key, version_id)
+ .await
+ {
+ Ok(metadata) => metadata,
+ Err(e) => return storage_err_response(e),
+ };
+ state
+ .storage
+ .put_object(
+ dst_bucket,
+ dst_key,
+ Box::pin(std::io::Cursor::new(data)),
+ Some(metadata),
+ )
+ .await
+ } else {
+ state
+ .storage
+ .copy_object(&src_bucket, &src_key, dst_bucket, dst_key)
+ .await
+ };
+
+ match copy_result {
Ok(meta) => {
let etag = meta.etag.as_deref().unwrap_or("");
let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified);
@@ -1372,13 +1979,23 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
let mut errors = Vec::new();
for obj in &parsed.objects {
- if let Err(message) = match state.storage.head_object(bucket, &obj.key).await {
- Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await {
+ if let Err(message) = match obj.version_id.as_deref() {
+ Some(version_id) if version_id != "null" => match state
+ .storage
+ .get_object_version_metadata(bucket, &obj.key, version_id)
+ .await
+ {
Ok(metadata) => object_lock::can_delete_object(&metadata, false),
Err(err) => Err(S3Error::from(err).message),
},
- Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()),
- Err(err) => Err(S3Error::from(err).message),
+ _ => match state.storage.head_object(bucket, &obj.key).await {
+ Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await {
+ Ok(metadata) => object_lock::can_delete_object(&metadata, false),
+ Err(err) => Err(S3Error::from(err).message),
+ },
+ Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()),
+ Err(err) => Err(S3Error::from(err).message),
+ },
} {
errors.push((
obj.key.clone(),
@@ -1387,7 +2004,20 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
));
continue;
}
- match state.storage.delete_object(bucket, &obj.key).await {
+ let delete_result = if let Some(version_id) = obj.version_id.as_deref() {
+ if version_id == "null" {
+ state.storage.delete_object(bucket, &obj.key).await
+ } else {
+ state
+ .storage
+ .delete_object_version(bucket, &obj.key, version_id)
+ .await
+ }
+ } else {
+ state.storage.delete_object(bucket, &obj.key).await
+ };
+
+ match delete_result {
Ok(()) => {
notifications::emit_object_removed(state, bucket, &obj.key, "", "", "", "Delete");
trigger_replication(state, bucket, &obj.key, "delete");
@@ -1415,9 +2045,23 @@ async fn range_get_handler(
range_str: &str,
query: &ObjectQuery,
) -> Response {
- let meta = match state.storage.head_object(bucket, key).await {
- Ok(m) => m,
- Err(e) => return storage_err_response(e),
+ let version_id = query
+ .version_id
+ .as_deref()
+ .filter(|value| !is_null_version(Some(*value)));
+ let meta = match version_id {
+ Some(version_id) => match state
+ .storage
+ .head_object_version(bucket, key, version_id)
+ .await
+ {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
+ None => match state.storage.head_object(bucket, key).await {
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
};
let total_size = meta.size;
@@ -1431,9 +2075,19 @@ async fn range_get_handler(
}
};
- let path = match state.storage.get_object_path(bucket, key).await {
- Ok(p) => p,
- Err(e) => return storage_err_response(e),
+ let path = match version_id {
+ Some(version_id) => match state
+ .storage
+ .get_object_version_path(bucket, key, version_id)
+ .await
+ {
+ Ok(p) => p,
+ Err(e) => return storage_err_response(e),
+ },
+ None => match state.storage.get_object_path(bucket, key).await {
+ Ok(p) => p,
+ Err(e) => return storage_err_response(e),
+ },
};
let mut file = match tokio::fs::File::open(&path).await {
@@ -1463,6 +2117,11 @@ async fn range_get_handler(
}
insert_content_type(&mut headers, key, meta.content_type.as_deref());
headers.insert("accept-ranges", "bytes".parse().unwrap());
+ if let Some(ref requested_version) = query.version_id {
+ if let Ok(value) = requested_version.parse() {
+ headers.insert("x-amz-version-id", value);
+ }
+ }
apply_response_overrides(&mut headers, query);
@@ -1514,6 +2173,49 @@ fn evaluate_get_preconditions(
None
}
+async fn evaluate_put_preconditions(
+ state: &AppState,
+ bucket: &str,
+ key: &str,
+ headers: &HeaderMap,
+) -> Option {
+ let has_if_match = headers.contains_key("if-match");
+ let has_if_none_match = headers.contains_key("if-none-match");
+ if !has_if_match && !has_if_none_match {
+ return None;
+ }
+
+ match state.storage.head_object(bucket, key).await {
+ Ok(meta) => {
+ if let Some(value) = headers.get("if-match").and_then(|v| v.to_str().ok()) {
+ if !etag_condition_matches(value, meta.etag.as_deref()) {
+ return Some(s3_error_response(S3Error::from_code(
+ S3ErrorCode::PreconditionFailed,
+ )));
+ }
+ }
+ if let Some(value) = headers.get("if-none-match").and_then(|v| v.to_str().ok()) {
+ if etag_condition_matches(value, meta.etag.as_deref()) {
+ return Some(s3_error_response(S3Error::from_code(
+ S3ErrorCode::PreconditionFailed,
+ )));
+ }
+ }
+ None
+ }
+ Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => {
+ if has_if_match {
+ Some(s3_error_response(S3Error::from_code(
+ S3ErrorCode::PreconditionFailed,
+ )))
+ } else {
+ None
+ }
+ }
+ Err(err) => Some(storage_err_response(err)),
+ }
+}
+
fn evaluate_copy_preconditions(
headers: &HeaderMap,
source_meta: &myfsio_common::types::ObjectMeta,
@@ -2176,6 +2878,20 @@ mod tests {
.unwrap()
}
+ #[test]
+ fn aws_chunked_wire_encoding_is_not_persisted_as_object_encoding() {
+ let mut headers = HeaderMap::new();
+ headers.insert("content-encoding", "aws-chunked".parse().unwrap());
+ let mut metadata = HashMap::new();
+ insert_standard_object_metadata(&headers, &mut metadata).unwrap();
+ assert!(!metadata.contains_key("__content_encoding__"));
+
+ headers.insert("content-encoding", "aws-chunked, gzip".parse().unwrap());
+ let mut metadata = HashMap::new();
+ insert_standard_object_metadata(&headers, &mut metadata).unwrap();
+ assert_eq!(metadata.get("__content_encoding__").unwrap(), "gzip");
+ }
+
#[tokio::test]
async fn public_bucket_acl_allows_anonymous_reads() {
let (state, _tmp) = test_state();
diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs
index 82da4df..0217117 100644
--- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs
@@ -117,6 +117,7 @@ fn storage_status(err: &StorageError) -> StatusCode {
match err {
StorageError::BucketNotFound(_)
| StorageError::ObjectNotFound { .. }
+ | StorageError::VersionNotFound { .. }
| StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND,
StorageError::InvalidBucketName(_)
| StorageError::InvalidObjectKey(_)
diff --git a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs
index 30ed1ec..8199121 100644
--- a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs
@@ -9,7 +9,7 @@ pub mod templates;
use axum::Router;
-pub const SERVER_HEADER: &str = "MyFSIO";
+pub const SERVER_HEADER: &str = concat!("MyFSIO-Rust/", env!("CARGO_PKG_VERSION"));
pub fn create_ui_router(state: state::AppState) -> Router {
use axum::routing::{delete, get, post, put};
diff --git a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs
index 9a8f189..dce2ea4 100644
--- a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs
@@ -1,5 +1,5 @@
use axum::extract::{Request, State};
-use axum::http::{header, HeaderMap, Method, StatusCode};
+use axum::http::{header, HeaderMap, Method, StatusCode, Uri};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
@@ -15,6 +15,9 @@ use tokio::io::AsyncReadExt;
use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState;
+#[derive(Clone, Debug)]
+struct OriginalCanonicalPath(String);
+
fn website_error_response(
status: StatusCode,
body: Option>,
@@ -45,7 +48,7 @@ fn website_error_response(
fn default_website_error_body(status: StatusCode) -> String {
let code = status.as_u16();
if status == StatusCode::NOT_FOUND {
- "404 page not found".to_string()
+ "404 page not found ".to_string()
} else {
let reason = status.canonical_reason().unwrap_or("Error");
format!("{code} {reason}")
@@ -324,6 +327,67 @@ async fn maybe_serve_website(
.await
}
+fn virtual_host_candidate(host: &str) -> Option {
+ let (candidate, _) = host.split_once('.')?;
+ if candidate.is_empty() || matches!(candidate, "www" | "s3" | "api" | "admin" | "kms") {
+ return None;
+ }
+ if myfsio_storage::validation::validate_bucket_name(candidate).is_some() {
+ return None;
+ }
+ Some(candidate.to_string())
+}
+
+async fn virtual_host_bucket(
+ state: &AppState,
+ host: &str,
+ path: &str,
+ method: &Method,
+) -> Option {
+ if path.starts_with("/ui")
+ || path.starts_with("/admin")
+ || path.starts_with("/kms")
+ || path.starts_with("/myfsio")
+ {
+ return None;
+ }
+
+ let bucket = virtual_host_candidate(host)?;
+ if path == format!("/{}", bucket) || path.starts_with(&format!("/{}/", bucket)) {
+ return None;
+ }
+
+ match state.storage.bucket_exists(&bucket).await {
+ Ok(true) => Some(bucket),
+ Ok(false) if *method == Method::PUT && path == "/" => Some(bucket),
+ _ => None,
+ }
+}
+
+fn rewrite_uri_for_virtual_host(uri: &Uri, bucket: &str) -> Option {
+ let path = uri.path();
+ let rewritten_path = if path == "/" {
+ format!("/{}/", bucket)
+ } else {
+ format!("/{}{}", bucket, path)
+ };
+ let path_and_query = match uri.query() {
+ Some(query) => format!("{}?{}", rewritten_path, query),
+ None => rewritten_path,
+ };
+
+ let mut parts = uri.clone().into_parts();
+ parts.path_and_query = Some(path_and_query.parse().ok()?);
+ Uri::from_parts(parts).ok()
+}
+
+fn sigv4_canonical_path(req: &Request) -> &str {
+ req.extensions()
+ .get::()
+ .map(|path| path.0.as_str())
+ .unwrap_or_else(|| req.uri().path())
+}
+
pub async fn auth_layer(State(state): State, mut req: Request, next: Next) -> Response {
let start = Instant::now();
let uri = req.uri().clone();
@@ -360,7 +424,7 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N
} else if let Some(response) = maybe_serve_website(
&state,
method.clone(),
- host.unwrap_or_default(),
+ host.clone().unwrap_or_default(),
path.clone(),
range_header,
)
@@ -368,38 +432,53 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N
{
response
} else {
+ let auth_path = if let Some(bucket) =
+ virtual_host_bucket(&state, host.as_deref().unwrap_or_default(), &path, &method).await
+ {
+ if let Some(rewritten) = rewrite_uri_for_virtual_host(req.uri(), &bucket) {
+ req.extensions_mut()
+ .insert(OriginalCanonicalPath(path.clone()));
+ *req.uri_mut() = rewritten;
+ req.uri().path().to_string()
+ } else {
+ path.clone()
+ }
+ } else {
+ path.clone()
+ };
+
match try_auth(&state, &req) {
AuthResult::NoAuth => match authorize_request(
&state,
None,
&method,
- &path,
+ &auth_path,
&query,
copy_source.as_deref(),
)
.await
{
Ok(()) => next.run(req).await,
- Err(err) => error_response(err, &path),
+ Err(err) => error_response(err, &auth_path),
},
AuthResult::Ok(principal) => {
if let Err(err) = authorize_request(
&state,
Some(&principal),
&method,
- &path,
+ &auth_path,
&query,
copy_source.as_deref(),
)
.await
{
- error_response(err, &path)
+ error_response(err, &auth_path)
} else {
req.extensions_mut().insert(principal);
next.run(req).await
}
}
- AuthResult::Denied(err) => error_response(err, &path),
+ AuthResult::Denied(err) => error_response(err, &auth_path),
}
};
@@ -1078,7 +1157,7 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
};
let method = req.method().as_str();
- let canonical_uri = req.uri().path();
+ let canonical_uri = sigv4_canonical_path(req);
let query_params = parse_query_params(req.uri().query().unwrap_or(""));
@@ -1234,7 +1313,7 @@ fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult {
};
let method = req.method().as_str();
- let canonical_uri = req.uri().path();
+ let canonical_uri = sigv4_canonical_path(req);
let query_params_no_sig: Vec<(String, String)> = params
.iter()
diff --git a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs
index 09767f9..8a50ece 100644
--- a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs
+++ b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs
@@ -2121,6 +2121,445 @@ async fn test_bucket_versioning() {
assert!(body.contains("Enabled "));
}
+#[tokio::test]
+async fn test_versioned_object_can_be_read_and_deleted_by_version_id() {
+ let (app, _tmp) = test_app();
+
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/versions-bucket",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/versions-bucket?versioning")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .body(Body::from(
+ "Enabled ",
+ ))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/versions-bucket/doc.txt",
+ Body::from("first"),
+ ))
+ .await
+ .unwrap();
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/versions-bucket/doc.txt",
+ Body::from("second"),
+ ))
+ .await
+ .unwrap();
+
+ let list_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::GET,
+ "/versions-bucket?versions",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(list_resp.status(), StatusCode::OK);
+ let list_body = String::from_utf8(
+ list_resp
+ .into_body()
+ .collect()
+ .await
+ .unwrap()
+ .to_bytes()
+ .to_vec(),
+ )
+ .unwrap();
+ let archived_version_id = list_body
+ .split("")
+ .filter_map(|part| part.split_once(" ").map(|(id, _)| id))
+ .find(|id| *id != "null")
+ .expect("archived version id")
+ .to_string();
+
+ let version_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::GET,
+ &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id),
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(version_resp.status(), StatusCode::OK);
+ assert_eq!(
+ version_resp.headers()["x-amz-version-id"].to_str().unwrap(),
+ archived_version_id
+ );
+ let version_body = version_resp.into_body().collect().await.unwrap().to_bytes();
+ assert_eq!(&version_body[..], b"first");
+
+ let traversal_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::GET,
+ &format!(
+ "/versions-bucket/doc.txt?versionId=../other/{}",
+ archived_version_id
+ ),
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(traversal_resp.status(), StatusCode::NOT_FOUND);
+
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/versions-bucket/doc.txt",
+ Body::from("third"),
+ ))
+ .await
+ .unwrap();
+ let limited_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::GET,
+ "/versions-bucket?versions&max-keys=1",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(limited_resp.status(), StatusCode::OK);
+ let limited_body = String::from_utf8(
+ limited_resp
+ .into_body()
+ .collect()
+ .await
+ .unwrap()
+ .to_bytes()
+ .to_vec(),
+ )
+ .unwrap();
+ assert_eq!(limited_body.matches("").count(), 1);
+ assert!(limited_body.contains("true "));
+
+ let delete_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::DELETE,
+ &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id),
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT);
+
+ let missing_resp = app
+ .oneshot(signed_request(
+ Method::GET,
+ &format!("/versions-bucket/doc.txt?versionId={}", archived_version_id),
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND);
+}
+
+#[tokio::test]
+async fn test_retention_is_enforced_when_deleting_archived_version() {
+ let (app, _tmp) = test_app();
+
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/locked-versions",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/locked-versions?versioning")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .body(Body::from(
+ "Enabled ",
+ ))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/locked-versions/doc.txt")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("x-amz-object-lock-mode", "GOVERNANCE")
+ .header(
+ "x-amz-object-lock-retain-until-date",
+ "2099-01-01T00:00:00Z",
+ )
+ .body(Body::from("locked"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/locked-versions/doc.txt")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("x-amz-bypass-governance-retention", "true")
+ .body(Body::from("replacement"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let list_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::GET,
+ "/locked-versions?versions",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(list_resp.status(), StatusCode::OK);
+ let list_body = String::from_utf8(
+ list_resp
+ .into_body()
+ .collect()
+ .await
+ .unwrap()
+ .to_bytes()
+ .to_vec(),
+ )
+ .unwrap();
+ let archived_version_id = list_body
+ .split("")
+ .filter_map(|part| part.split_once(" ").map(|(id, _)| id))
+ .find(|id| *id != "null")
+ .expect("archived version id")
+ .to_string();
+
+ let denied = app
+ .clone()
+ .oneshot(signed_request(
+ Method::DELETE,
+ &format!("/locked-versions/doc.txt?versionId={}", archived_version_id),
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(denied.status(), StatusCode::FORBIDDEN);
+
+ let allowed = app
+ .oneshot(
+ Request::builder()
+ .method(Method::DELETE)
+ .uri(format!(
+ "/locked-versions/doc.txt?versionId={}",
+ archived_version_id
+ ))
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("x-amz-bypass-governance-retention", "true")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(allowed.status(), StatusCode::NO_CONTENT);
+}
+
+#[tokio::test]
+async fn test_put_object_validates_content_md5() {
+ let (app, _tmp) = test_app();
+
+ app.clone()
+ .oneshot(signed_request(Method::PUT, "/md5-bucket", Body::empty()))
+ .await
+ .unwrap();
+
+ let bad_resp = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/md5-bucket/object.txt")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("content-md5", "AAAAAAAAAAAAAAAAAAAAAA==")
+ .body(Body::from("hello"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(bad_resp.status(), StatusCode::BAD_REQUEST);
+ let bad_body = String::from_utf8(
+ bad_resp
+ .into_body()
+ .collect()
+ .await
+ .unwrap()
+ .to_bytes()
+ .to_vec(),
+ )
+ .unwrap();
+ assert!(bad_body.contains("BadDigest"));
+
+ let good_resp = app
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/md5-bucket/object.txt")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("content-md5", "XUFAKrxLKna5cZ2REBfFkg==")
+ .body(Body::from("hello"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(good_resp.status(), StatusCode::OK);
+}
+
+#[tokio::test]
+async fn test_put_object_tagging_and_standard_headers_are_persisted() {
+ let (app, _tmp) = test_app();
+
+ app.clone()
+ .oneshot(signed_request(
+ Method::PUT,
+ "/headers-bucket",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+
+ let put_resp = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/headers-bucket/report.txt")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .header("x-amz-tagging", "env=prod&name=quarter%201")
+ .header("cache-control", "max-age=60")
+ .header("content-disposition", "attachment")
+ .header("content-language", "en-US")
+ .header("x-amz-storage-class", "STANDARD_IA")
+ .body(Body::from("report"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(put_resp.status(), StatusCode::OK);
+
+ let head_resp = app
+ .clone()
+ .oneshot(signed_request(
+ Method::HEAD,
+ "/headers-bucket/report.txt",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(head_resp.status(), StatusCode::OK);
+ assert_eq!(head_resp.headers()["cache-control"], "max-age=60");
+ assert_eq!(head_resp.headers()["content-disposition"], "attachment");
+ assert_eq!(head_resp.headers()["content-language"], "en-US");
+ assert_eq!(head_resp.headers()["x-amz-storage-class"], "STANDARD_IA");
+
+ let tags_resp = app
+ .oneshot(signed_request(
+ Method::GET,
+ "/headers-bucket/report.txt?tagging",
+ Body::empty(),
+ ))
+ .await
+ .unwrap();
+ assert_eq!(tags_resp.status(), StatusCode::OK);
+ let tags_body = String::from_utf8(
+ tags_resp
+ .into_body()
+ .collect()
+ .await
+ .unwrap()
+ .to_bytes()
+ .to_vec(),
+ )
+ .unwrap();
+ assert!(tags_body.contains("env "));
+ assert!(tags_body.contains("prod "));
+ assert!(tags_body.contains("name "));
+ assert!(tags_body.contains("quarter 1 "));
+}
+
+#[tokio::test]
+async fn test_virtual_host_bucket_routes_to_s3_object_handlers() {
+ let (app, _tmp) = test_app();
+
+ app.clone()
+ .oneshot(signed_request(Method::PUT, "/vh-bucket", Body::empty()))
+ .await
+ .unwrap();
+
+ let put_resp = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method(Method::PUT)
+ .uri("/hello.txt")
+ .header("host", "vh-bucket.localhost")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .body(Body::from("virtual host body"))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(put_resp.status(), StatusCode::OK);
+
+ let get_resp = app
+ .oneshot(
+ Request::builder()
+ .method(Method::GET)
+ .uri("/hello.txt")
+ .header("host", "vh-bucket.localhost")
+ .header("x-access-key", TEST_ACCESS_KEY)
+ .header("x-secret-key", TEST_SECRET_KEY)
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+ assert_eq!(get_resp.status(), StatusCode::OK);
+ let body = get_resp.into_body().collect().await.unwrap().to_bytes();
+ assert_eq!(&body[..], b"virtual host body");
+}
+
#[tokio::test]
async fn test_bucket_tagging() {
let (app, _tmp) = test_app();
@@ -3323,7 +3762,7 @@ async fn test_static_website_default_404_returns_html_body() {
)
.unwrap();
assert_eq!(body.len(), content_length);
- assert_eq!(body, "404 page not found");
+ assert_eq!(body, "404 page not found ");
let head_resp = app
.oneshot(website_request(Method::HEAD, "/missing.html"))
diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/error.rs b/rust/myfsio-engine/crates/myfsio-storage/src/error.rs
index 992d1fc..4bb652a 100644
--- a/rust/myfsio-engine/crates/myfsio-storage/src/error.rs
+++ b/rust/myfsio-engine/crates/myfsio-storage/src/error.rs
@@ -11,6 +11,12 @@ pub enum StorageError {
BucketNotEmpty(String),
#[error("Object not found: {bucket}/{key}")]
ObjectNotFound { bucket: String, key: String },
+ #[error("Object version not found: {bucket}/{key}?versionId={version_id}")]
+ VersionNotFound {
+ bucket: String,
+ key: String,
+ version_id: String,
+ },
#[error("Invalid bucket name: {0}")]
InvalidBucketName(String),
#[error("Invalid object key: {0}")]
@@ -46,6 +52,12 @@ impl From for S3Error {
S3Error::from_code(S3ErrorCode::NoSuchKey)
.with_resource(format!("/{}/{}", bucket, key))
}
+ StorageError::VersionNotFound {
+ bucket,
+ key,
+ version_id,
+ } => S3Error::from_code(S3ErrorCode::NoSuchVersion)
+ .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
StorageError::InvalidBucketName(msg) => {
S3Error::new(S3ErrorCode::InvalidBucketName, msg)
}
diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs
index 7f5bd36..9f7a72f 100644
--- a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs
+++ b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs
@@ -605,6 +605,144 @@ impl FsStorageBackend {
Ok(source_size)
}
+ fn version_record_paths(
+ &self,
+ bucket_name: &str,
+ key: &str,
+ version_id: &str,
+ ) -> (PathBuf, PathBuf) {
+ let version_dir = self.version_dir(bucket_name, key);
+ (
+ version_dir.join(format!("{}.json", version_id)),
+ version_dir.join(format!("{}.bin", version_id)),
+ )
+ }
+
+ fn validate_version_id(bucket_name: &str, key: &str, version_id: &str) -> StorageResult<()> {
+ if version_id.is_empty()
+ || version_id.contains('/')
+ || version_id.contains('\\')
+ || version_id.contains("..")
+ {
+ return Err(StorageError::VersionNotFound {
+ bucket: bucket_name.to_string(),
+ key: key.to_string(),
+ version_id: version_id.to_string(),
+ });
+ }
+ Ok(())
+ }
+
+ fn read_version_record_sync(
+ &self,
+ bucket_name: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult<(Value, PathBuf)> {
+ self.require_bucket(bucket_name)?;
+ self.validate_key(key)?;
+ Self::validate_version_id(bucket_name, key, version_id)?;
+ let (manifest_path, data_path) = self.version_record_paths(bucket_name, key, version_id);
+ if !manifest_path.is_file() || !data_path.is_file() {
+ return Err(StorageError::VersionNotFound {
+ bucket: bucket_name.to_string(),
+ key: key.to_string(),
+ version_id: version_id.to_string(),
+ });
+ }
+
+ let content = std::fs::read_to_string(&manifest_path).map_err(StorageError::Io)?;
+ let record = serde_json::from_str::(&content).map_err(StorageError::Json)?;
+ Ok((record, data_path))
+ }
+
+ fn version_metadata_from_record(record: &Value) -> HashMap {
+ record
+ .get("metadata")
+ .and_then(Value::as_object)
+ .map(|meta| {
+ meta.iter()
+ .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
+ .collect::>()
+ })
+ .unwrap_or_default()
+ }
+
+ fn object_meta_from_version_record(
+ &self,
+ key: &str,
+ record: &Value,
+ data_path: &Path,
+ ) -> StorageResult {
+ let metadata = Self::version_metadata_from_record(record);
+
+ let data_len = std::fs::metadata(data_path)
+ .map(|meta| meta.len())
+ .unwrap_or_default();
+ let size = record
+ .get("size")
+ .and_then(Value::as_u64)
+ .unwrap_or(data_len);
+ let last_modified = record
+ .get("archived_at")
+ .and_then(Value::as_str)
+ .and_then(|value| DateTime::parse_from_rfc3339(value).ok())
+ .map(|value| value.with_timezone(&Utc))
+ .unwrap_or_else(Utc::now);
+ let etag = record
+ .get("etag")
+ .and_then(Value::as_str)
+ .map(ToOwned::to_owned)
+ .or_else(|| metadata.get("__etag__").cloned());
+
+ let mut obj = ObjectMeta::new(key.to_string(), size, last_modified);
+ obj.etag = etag;
+ obj.content_type = metadata.get("__content_type__").cloned();
+ obj.storage_class = metadata
+ .get("__storage_class__")
+ .cloned()
+ .or_else(|| Some("STANDARD".to_string()));
+ obj.metadata = metadata
+ .into_iter()
+ .filter(|(k, _)| !k.starts_with("__"))
+ .collect();
+ Ok(obj)
+ }
+
+ fn version_info_from_record(&self, fallback_key: &str, record: &Value) -> VersionInfo {
+ let version_id = record
+ .get("version_id")
+ .and_then(Value::as_str)
+ .unwrap_or("")
+ .to_string();
+ let key = record
+ .get("key")
+ .and_then(Value::as_str)
+ .unwrap_or(fallback_key)
+ .to_string();
+ let size = record.get("size").and_then(Value::as_u64).unwrap_or(0);
+ let archived_at = record
+ .get("archived_at")
+ .and_then(Value::as_str)
+ .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
+ .map(|d| d.with_timezone(&Utc))
+ .unwrap_or_else(Utc::now);
+ let etag = record
+ .get("etag")
+ .and_then(Value::as_str)
+ .map(|s| s.to_string());
+
+ VersionInfo {
+ version_id,
+ key,
+ size,
+ last_modified: archived_at,
+ etag,
+ is_latest: false,
+ is_delete_marker: false,
+ }
+ }
+
fn bucket_stats_sync(&self, bucket_name: &str) -> StorageResult {
let bucket_path = self.require_bucket(bucket_name)?;
@@ -1241,6 +1379,10 @@ impl crate::traits::StorageEngine for FsStorageBackend {
let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm);
obj.etag = stored_meta.get("__etag__").cloned();
obj.content_type = stored_meta.get("__content_type__").cloned();
+ obj.storage_class = stored_meta
+ .get("__storage_class__")
+ .cloned()
+ .or_else(|| Some("STANDARD".to_string()));
obj.metadata = stored_meta
.into_iter()
.filter(|(k, _)| !k.starts_with("__"))
@@ -1289,6 +1431,10 @@ impl crate::traits::StorageEngine for FsStorageBackend {
let mut obj = ObjectMeta::new(key.to_string(), meta.len(), lm);
obj.etag = stored_meta.get("__etag__").cloned();
obj.content_type = stored_meta.get("__content_type__").cloned();
+ obj.storage_class = stored_meta
+ .get("__storage_class__")
+ .cloned()
+ .or_else(|| Some("STANDARD".to_string()));
obj.metadata = stored_meta
.into_iter()
.filter(|(k, _)| !k.starts_with("__"))
@@ -1296,6 +1442,51 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Ok(obj)
}
+ async fn get_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult<(ObjectMeta, AsyncReadStream)> {
+ let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?;
+ let obj = self.object_meta_from_version_record(key, &record, &data_path)?;
+ let file = tokio::fs::File::open(&data_path)
+ .await
+ .map_err(StorageError::Io)?;
+ let stream: AsyncReadStream = Box::pin(file);
+ Ok((obj, stream))
+ }
+
+ async fn get_object_version_path(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult {
+ let (_record, data_path) = self.read_version_record_sync(bucket, key, version_id)?;
+ Ok(data_path)
+ }
+
+ async fn head_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult {
+ let (record, data_path) = self.read_version_record_sync(bucket, key, version_id)?;
+ self.object_meta_from_version_record(key, &record, &data_path)
+ }
+
+ async fn get_object_version_metadata(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult> {
+ let (record, _data_path) = self.read_version_record_sync(bucket, key, version_id)?;
+ Ok(Self::version_metadata_from_record(&record))
+ }
+
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()> {
let bucket_path = self.require_bucket(bucket)?;
let path = self.object_path(bucket, key)?;
@@ -1317,6 +1508,32 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Ok(())
}
+ async fn delete_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult<()> {
+ self.require_bucket(bucket)?;
+ self.validate_key(key)?;
+ Self::validate_version_id(bucket, key, version_id)?;
+ let (manifest_path, data_path) = self.version_record_paths(bucket, key, version_id);
+ if !manifest_path.is_file() && !data_path.is_file() {
+ return Err(StorageError::VersionNotFound {
+ bucket: bucket.to_string(),
+ key: key.to_string(),
+ version_id: version_id.to_string(),
+ });
+ }
+
+ Self::safe_unlink(&data_path).map_err(StorageError::Io)?;
+ Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?;
+ let versions_root = self.bucket_versions_root(bucket);
+ Self::cleanup_empty_parents(&manifest_path, &versions_root);
+ self.stats_cache.remove(bucket);
+ Ok(())
+ }
+
async fn copy_object(
&self,
src_bucket: &str,
@@ -1817,40 +2034,73 @@ impl crate::traits::StorageEngine for FsStorageBackend {
}
if let Ok(content) = std::fs::read_to_string(entry.path()) {
if let Ok(record) = serde_json::from_str::(&content) {
- let version_id = record
- .get("version_id")
- .and_then(|v| v.as_str())
- .unwrap_or("")
- .to_string();
- let size = record.get("size").and_then(|v| v.as_u64()).unwrap_or(0);
- let archived_at = record
- .get("archived_at")
- .and_then(|v| v.as_str())
- .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
- .map(|d| d.with_timezone(&Utc))
- .unwrap_or_else(Utc::now);
- let etag = record
- .get("etag")
- .and_then(|v| v.as_str())
- .map(|s| s.to_string());
-
- versions.push(VersionInfo {
- version_id,
- key: key.to_string(),
- size,
- last_modified: archived_at,
- etag,
- is_latest: false,
- });
+ versions.push(self.version_info_from_record(key, &record));
}
}
}
versions.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
- if let Some(first) = versions.first_mut() {
- first.is_latest = true;
+
+ Ok(versions)
+ }
+
+ async fn list_bucket_object_versions(
+ &self,
+ bucket: &str,
+ prefix: Option<&str>,
+ ) -> StorageResult> {
+ self.require_bucket(bucket)?;
+ let root = self.bucket_versions_root(bucket);
+ if !root.exists() {
+ return Ok(Vec::new());
}
+ let mut versions = Vec::new();
+ let mut stack = vec![root.clone()];
+ while let Some(current) = stack.pop() {
+ let entries = match std::fs::read_dir(¤t) {
+ Ok(entries) => entries,
+ Err(_) => continue,
+ };
+ for entry in entries.flatten() {
+ let path = entry.path();
+ let ft = match entry.file_type() {
+ Ok(ft) => ft,
+ Err(_) => continue,
+ };
+ if ft.is_dir() {
+ stack.push(path);
+ continue;
+ }
+ if !ft.is_file() || path.extension().and_then(|ext| ext.to_str()) != Some("json") {
+ continue;
+ }
+ let content = match std::fs::read_to_string(&path) {
+ Ok(content) => content,
+ Err(_) => continue,
+ };
+ let record = match serde_json::from_str::(&content) {
+ Ok(record) => record,
+ Err(_) => continue,
+ };
+ let fallback_key = path
+ .parent()
+ .and_then(|parent| parent.strip_prefix(&root).ok())
+ .map(|rel| rel.to_string_lossy().replace('\\', "/"))
+ .unwrap_or_default();
+ let info = self.version_info_from_record(&fallback_key, &record);
+ if prefix.is_some_and(|value| !info.key.starts_with(value)) {
+ continue;
+ }
+ versions.push(info);
+ }
+ }
+
+ versions.sort_by(|a, b| {
+ a.key
+ .cmp(&b.key)
+ .then_with(|| b.last_modified.cmp(&a.last_modified))
+ });
Ok(versions)
}
@@ -2271,6 +2521,12 @@ mod tests {
.unwrap();
assert_eq!(versions.len(), 1);
assert_eq!(versions[0].size, 8);
+
+ let invalid_version = format!("../other/{}", versions[0].version_id);
+ let result = backend
+ .get_object_version("test-bucket", "file.txt", &invalid_version)
+ .await;
+ assert!(matches!(result, Err(StorageError::VersionNotFound { .. })));
}
#[tokio::test]
diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs b/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs
index 32ee5cb..af069b9 100644
--- a/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs
+++ b/rust/myfsio-engine/crates/myfsio-storage/src/traits.rs
@@ -34,8 +34,43 @@ pub trait StorageEngine: Send + Sync {
async fn head_object(&self, bucket: &str, key: &str) -> StorageResult;
+ async fn get_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult<(ObjectMeta, AsyncReadStream)>;
+
+ async fn get_object_version_path(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult;
+
+ async fn head_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult;
+
+ async fn get_object_version_metadata(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult>;
+
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>;
+ async fn delete_object_version(
+ &self,
+ bucket: &str,
+ key: &str,
+ version_id: &str,
+ ) -> StorageResult<()>;
+
async fn copy_object(
&self,
src_bucket: &str,
@@ -120,6 +155,12 @@ pub trait StorageEngine: Send + Sync {
key: &str,
) -> StorageResult>;
+ async fn list_bucket_object_versions(
+ &self,
+ bucket: &str,
+ prefix: Option<&str>,
+ ) -> StorageResult>;
+
async fn get_object_tags(&self, bucket: &str, key: &str) -> StorageResult>;
async fn set_object_tags(&self, bucket: &str, key: &str, tags: &[Tag]) -> StorageResult<()>;