diff --git a/Cargo.lock b/Cargo.lock
index 682f84e..e8cdfee 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2639,7 +2639,7 @@ dependencies = [
[[package]]
name = "myfsio-auth"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"aes",
"base64",
@@ -2664,7 +2664,7 @@ dependencies = [
[[package]]
name = "myfsio-common"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"chrono",
"serde",
@@ -2675,7 +2675,7 @@ dependencies = [
[[package]]
name = "myfsio-crypto"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"aes-gcm",
"base64",
@@ -2696,7 +2696,7 @@ dependencies = [
[[package]]
name = "myfsio-server"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"aes-gcm",
"async-trait",
@@ -2714,6 +2714,8 @@ dependencies = [
"dotenvy",
"duckdb",
"futures",
+ "hex",
+ "http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
"md-5 0.10.6",
@@ -2751,7 +2753,7 @@ dependencies = [
[[package]]
name = "myfsio-storage"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"chrono",
"dashmap",
@@ -2774,7 +2776,7 @@ dependencies = [
[[package]]
name = "myfsio-xml"
-version = "0.4.3"
+version = "0.4.4"
dependencies = [
"chrono",
"myfsio-common",
diff --git a/Cargo.toml b/Cargo.toml
index e025cc3..f7bb2e4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,14 +10,14 @@ members = [
]
[workspace.package]
-version = "0.4.3"
+version = "0.4.4"
edition = "2021"
[workspace.dependencies]
tokio = { version = "1", features = ["full"] }
axum = { version = "0.8" }
tower = { version = "0.5" }
-tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip"] }
+tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip", "timeout"] }
hyper = { version = "1" }
bytes = "1"
serde = { version = "1", features = ["derive"] }
diff --git a/crates/myfsio-common/src/error.rs b/crates/myfsio-common/src/error.rs
index 5785d5f..c8e506e 100644
--- a/crates/myfsio-common/src/error.rs
+++ b/crates/myfsio-common/src/error.rs
@@ -7,11 +7,14 @@ pub enum S3ErrorCode {
BucketAlreadyExists,
BucketNotEmpty,
EntityTooLarge,
+ EntityTooSmall,
InternalError,
InvalidAccessKeyId,
InvalidArgument,
InvalidBucketName,
InvalidKey,
+ InvalidPart,
+ InvalidPartOrder,
InvalidPolicyDocument,
InvalidRange,
InvalidRequest,
@@ -19,13 +22,16 @@ pub enum S3ErrorCode {
MalformedXML,
MethodNotAllowed,
NoSuchBucket,
+ NoSuchBucketPolicy,
NoSuchKey,
+ NoSuchLifecycleConfiguration,
NoSuchUpload,
NoSuchVersion,
NoSuchTagSet,
PreconditionFailed,
NotModified,
QuotaExceeded,
+ ServerSideEncryptionConfigurationNotFoundError,
SignatureDoesNotMatch,
SlowDown,
}
@@ -38,11 +44,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => 409,
Self::BucketNotEmpty => 409,
Self::EntityTooLarge => 413,
+ Self::EntityTooSmall => 400,
Self::InternalError => 500,
Self::InvalidAccessKeyId => 403,
Self::InvalidArgument => 400,
Self::InvalidBucketName => 400,
Self::InvalidKey => 400,
+ Self::InvalidPart => 400,
+ Self::InvalidPartOrder => 400,
Self::InvalidPolicyDocument => 400,
Self::InvalidRange => 416,
Self::InvalidRequest => 400,
@@ -50,13 +59,16 @@ impl S3ErrorCode {
Self::MalformedXML => 400,
Self::MethodNotAllowed => 405,
Self::NoSuchBucket => 404,
+ Self::NoSuchBucketPolicy => 404,
Self::NoSuchKey => 404,
+ Self::NoSuchLifecycleConfiguration => 404,
Self::NoSuchUpload => 404,
Self::NoSuchVersion => 404,
Self::NoSuchTagSet => 404,
Self::PreconditionFailed => 412,
Self::NotModified => 304,
Self::QuotaExceeded => 403,
+ Self::ServerSideEncryptionConfigurationNotFoundError => 404,
Self::SignatureDoesNotMatch => 403,
Self::SlowDown => 429,
}
@@ -69,11 +81,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => "BucketAlreadyExists",
Self::BucketNotEmpty => "BucketNotEmpty",
Self::EntityTooLarge => "EntityTooLarge",
+ Self::EntityTooSmall => "EntityTooSmall",
Self::InternalError => "InternalError",
Self::InvalidAccessKeyId => "InvalidAccessKeyId",
Self::InvalidArgument => "InvalidArgument",
Self::InvalidBucketName => "InvalidBucketName",
Self::InvalidKey => "InvalidKey",
+ Self::InvalidPart => "InvalidPart",
+ Self::InvalidPartOrder => "InvalidPartOrder",
Self::InvalidPolicyDocument => "InvalidPolicyDocument",
Self::InvalidRange => "InvalidRange",
Self::InvalidRequest => "InvalidRequest",
@@ -81,13 +96,18 @@ impl S3ErrorCode {
Self::MalformedXML => "MalformedXML",
Self::MethodNotAllowed => "MethodNotAllowed",
Self::NoSuchBucket => "NoSuchBucket",
+ Self::NoSuchBucketPolicy => "NoSuchBucketPolicy",
Self::NoSuchKey => "NoSuchKey",
+ Self::NoSuchLifecycleConfiguration => "NoSuchLifecycleConfiguration",
Self::NoSuchUpload => "NoSuchUpload",
Self::NoSuchVersion => "NoSuchVersion",
Self::NoSuchTagSet => "NoSuchTagSet",
Self::PreconditionFailed => "PreconditionFailed",
Self::NotModified => "NotModified",
Self::QuotaExceeded => "QuotaExceeded",
+ Self::ServerSideEncryptionConfigurationNotFoundError => {
+ "ServerSideEncryptionConfigurationNotFoundError"
+ }
Self::SignatureDoesNotMatch => "SignatureDoesNotMatch",
Self::SlowDown => "SlowDown",
}
@@ -100,11 +120,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => "The requested bucket name is not available",
Self::BucketNotEmpty => "The bucket you tried to delete is not empty",
Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size",
+ Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size",
Self::InternalError => "We encountered an internal error. Please try again.",
Self::InvalidAccessKeyId => "The access key ID you provided does not exist",
Self::InvalidArgument => "Invalid argument",
Self::InvalidBucketName => "The specified bucket is not valid",
Self::InvalidKey => "The specified key is not valid",
+ Self::InvalidPart => "One or more of the specified parts could not be found",
+ Self::InvalidPartOrder => "The list of parts was not in ascending order",
Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document",
Self::InvalidRange => "The requested range is not satisfiable",
Self::InvalidRequest => "Invalid request",
@@ -112,13 +135,16 @@ impl S3ErrorCode {
Self::MalformedXML => "The XML you provided was not well-formed",
Self::MethodNotAllowed => "The specified method is not allowed against this resource",
Self::NoSuchBucket => "The specified bucket does not exist",
+ Self::NoSuchBucketPolicy => "The bucket policy does not exist",
Self::NoSuchKey => "The specified key does not exist",
+ Self::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist",
Self::NoSuchUpload => "The specified multipart upload does not exist",
Self::NoSuchVersion => "The specified version does not exist",
Self::NoSuchTagSet => "The TagSet does not exist",
Self::PreconditionFailed => "At least one of the preconditions you specified did not hold",
Self::NotModified => "Not Modified",
Self::QuotaExceeded => "The bucket quota has been exceeded",
+ Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found",
Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided",
Self::SlowDown => "Please reduce your request rate",
}
diff --git a/crates/myfsio-server/Cargo.toml b/crates/myfsio-server/Cargo.toml
index b1e305a..eb0cd8a 100644
--- a/crates/myfsio-server/Cargo.toml
+++ b/crates/myfsio-server/Cargo.toml
@@ -27,12 +27,14 @@ tokio-stream = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
futures = { workspace = true }
+http-body = "1"
http-body-util = "0.1"
percent-encoding = { workspace = true }
quick-xml = { workspace = true }
mime_guess = "2"
crc32fast = { workspace = true }
sha2 = { workspace = true }
+hex = { workspace = true }
duckdb = { workspace = true }
roxmltree = "0.20"
parking_lot = { workspace = true }
diff --git a/crates/myfsio-server/src/config.rs b/crates/myfsio-server/src/config.rs
index 6bfd147..1264b3f 100644
--- a/crates/myfsio-server/src/config.rs
+++ b/crates/myfsio-server/src/config.rs
@@ -81,6 +81,7 @@ pub struct ServerConfig {
pub multipart_min_part_size: u64,
pub bulk_delete_max_keys: usize,
pub stream_chunk_size: usize,
+ pub request_body_timeout_secs: u64,
pub ratelimit_default: RateLimitSetting,
pub ratelimit_admin: RateLimitSetting,
pub ratelimit_storage_uri: String,
@@ -225,6 +226,7 @@ impl ServerConfig {
let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880);
let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000);
let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576);
+ let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60);
let ratelimit_default =
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60));
let ratelimit_admin =
@@ -304,6 +306,7 @@ impl ServerConfig {
multipart_min_part_size,
bulk_delete_max_keys,
stream_chunk_size,
+ request_body_timeout_secs,
ratelimit_default,
ratelimit_admin,
ratelimit_storage_uri,
@@ -387,6 +390,7 @@ impl Default for ServerConfig {
multipart_min_part_size: 5_242_880,
bulk_delete_max_keys: 1000,
stream_chunk_size: 1_048_576,
+ request_body_timeout_secs: 60,
ratelimit_default: RateLimitSetting::new(200, 60),
ratelimit_admin: RateLimitSetting::new(60, 60),
ratelimit_storage_uri: "memory://".to_string(),
diff --git a/crates/myfsio-server/src/handlers/config.rs b/crates/myfsio-server/src/handlers/config.rs
index 4547374..16ac05e 100644
--- a/crates/myfsio-server/src/handlers/config.rs
+++ b/crates/myfsio-server/src/handlers/config.rs
@@ -218,11 +218,8 @@ pub async fn get_encryption(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
- S3Error::new(
- S3ErrorCode::InvalidRequest,
- "The server side encryption configuration was not found",
- )
- .to_xml(),
+ S3Error::from_code(S3ErrorCode::ServerSideEncryptionConfigurationNotFoundError)
+ .to_xml(),
)
}
}
@@ -270,11 +267,7 @@ pub async fn get_lifecycle(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
- S3Error::new(
- S3ErrorCode::NoSuchKey,
- "The lifecycle configuration does not exist",
- )
- .to_xml(),
+ S3Error::from_code(S3ErrorCode::NoSuchLifecycleConfiguration).to_xml(),
)
}
}
@@ -421,7 +414,7 @@ pub async fn get_policy(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
- S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(),
+ S3Error::from_code(S3ErrorCode::NoSuchBucketPolicy).to_xml(),
)
}
}
@@ -1095,6 +1088,35 @@ pub async fn list_object_versions(
|| archived_versions.len() > archived_count;
xml.push_str(&format!("{}", is_truncated));
+ let current_keys: std::collections::HashSet = objects
+ .iter()
+ .take(current_count)
+ .map(|o| o.key.clone())
+ .collect();
+ let mut latest_archived_per_key: std::collections::HashMap =
+ std::collections::HashMap::new();
+ for v in archived_versions.iter().take(archived_count) {
+ if current_keys.contains(&v.key) {
+ continue;
+ }
+ let existing = latest_archived_per_key.get(&v.key).cloned();
+ match existing {
+ None => {
+ latest_archived_per_key.insert(v.key.clone(), v.version_id.clone());
+ }
+ Some(existing_id) => {
+ let existing_ts = archived_versions
+ .iter()
+ .find(|x| x.key == v.key && x.version_id == existing_id)
+ .map(|x| x.last_modified)
+ .unwrap_or(v.last_modified);
+ if v.last_modified > existing_ts {
+ latest_archived_per_key.insert(v.key.clone(), v.version_id.clone());
+ }
+ }
+ }
+ }
+
for obj in objects.iter().take(current_count) {
xml.push_str("");
xml.push_str(&format!("{}", xml_escape(&obj.key)));
@@ -1116,23 +1138,34 @@ pub async fn list_object_versions(
}
for version in archived_versions.iter().take(archived_count) {
- xml.push_str("");
+ let is_latest = latest_archived_per_key
+ .get(&version.key)
+ .map(|id| id == &version.version_id)
+ .unwrap_or(false);
+ let tag = if version.is_delete_marker {
+ "DeleteMarker"
+ } else {
+ "Version"
+ };
+ xml.push_str(&format!("<{}>", tag));
xml.push_str(&format!("{}", xml_escape(&version.key)));
xml.push_str(&format!(
"{}",
xml_escape(&version.version_id)
));
- xml.push_str("false");
+ xml.push_str(&format!("{}", is_latest));
xml.push_str(&format!(
"{}",
myfsio_xml::response::format_s3_datetime(&version.last_modified)
));
- if let Some(ref etag) = version.etag {
- xml.push_str(&format!("\"{}\"", xml_escape(etag)));
+ if !version.is_delete_marker {
+ if let Some(ref etag) = version.etag {
+ xml.push_str(&format!("\"{}\"", xml_escape(etag)));
+ }
+ xml.push_str(&format!("{}", version.size));
+ xml.push_str("STANDARD");
}
- xml.push_str(&format!("{}", version.size));
- xml.push_str("STANDARD");
- xml.push_str("");
+ xml.push_str(&format!("{}>", tag));
}
xml.push_str("");
@@ -1182,6 +1215,26 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body:
.to_xml(),
);
}
+ for tag in &tags {
+ if tag.key.is_empty() || tag.key.len() > 128 {
+ return xml_response(
+ StatusCode::BAD_REQUEST,
+ S3Error::new(S3ErrorCode::InvalidTag, "Tag key length must be 1-128").to_xml(),
+ );
+ }
+ if tag.value.len() > 256 {
+ return xml_response(
+ StatusCode::BAD_REQUEST,
+ S3Error::new(S3ErrorCode::InvalidTag, "Tag value length must be 0-256").to_xml(),
+ );
+ }
+ if tag.key.contains('=') {
+ return xml_response(
+ StatusCode::BAD_REQUEST,
+ S3Error::new(S3ErrorCode::InvalidTag, "Tag keys must not contain '='").to_xml(),
+ );
+ }
+ }
match state.storage.set_object_tags(bucket, key, &tags).await {
Ok(()) => StatusCode::OK.into_response(),
diff --git a/crates/myfsio-server/src/handlers/mod.rs b/crates/myfsio-server/src/handlers/mod.rs
index 1acd085..4ed1df8 100644
--- a/crates/myfsio-server/src/handlers/mod.rs
+++ b/crates/myfsio-server/src/handlers/mod.rs
@@ -47,6 +47,11 @@ fn s3_error_response(err: S3Error) -> Response {
}
fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response {
+ if let myfsio_storage::error::StorageError::Io(io_err) = &err {
+ if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(io_err) {
+ return bad_digest_response(message);
+ }
+ }
s3_error_response(S3Error::from(err))
}
@@ -391,6 +396,75 @@ pub async fn get_bucket(
Some(marker.clone())
};
+ if max_keys == 0 {
+ let has_any = if delimiter.is_empty() {
+ state
+ .storage
+ .list_objects(
+ &bucket,
+ &myfsio_common::types::ListParams {
+ max_keys: 1,
+ continuation_token: effective_start.clone(),
+ prefix: if prefix.is_empty() {
+ None
+ } else {
+ Some(prefix.clone())
+ },
+ start_after: if is_v2 {
+ query.start_after.clone()
+ } else {
+ None
+ },
+ },
+ )
+ .await
+ .map(|r| !r.objects.is_empty())
+ .unwrap_or(false)
+ } else {
+ state
+ .storage
+ .list_objects_shallow(
+ &bucket,
+ &myfsio_common::types::ShallowListParams {
+ prefix: prefix.clone(),
+ delimiter: delimiter.clone(),
+ max_keys: 1,
+ continuation_token: effective_start.clone(),
+ },
+ )
+ .await
+ .map(|r| !r.objects.is_empty() || !r.common_prefixes.is_empty())
+ .unwrap_or(false)
+ };
+ let xml = if is_v2 {
+ myfsio_xml::response::list_objects_v2_xml(
+ &bucket,
+ &prefix,
+ &delimiter,
+ 0,
+ &[],
+ &[],
+ has_any,
+ query.continuation_token.as_deref(),
+ None,
+ 0,
+ )
+ } else {
+ myfsio_xml::response::list_objects_v1_xml(
+ &bucket,
+ &prefix,
+ &marker,
+ &delimiter,
+ 0,
+ &[],
+ &[],
+ has_any,
+ None,
+ )
+ };
+ return (StatusCode::OK, [("content-type", "application/xml")], xml).into_response();
+ }
+
if delimiter.is_empty() {
let params = myfsio_common::types::ListParams {
max_keys,
@@ -408,10 +482,14 @@ pub async fn get_bucket(
};
match state.storage.list_objects(&bucket, ¶ms).await {
Ok(result) => {
- let next_marker = result
- .next_continuation_token
- .clone()
- .or_else(|| result.objects.last().map(|o| o.key.clone()));
+ let next_marker = if result.is_truncated {
+ result
+ .next_continuation_token
+ .clone()
+ .or_else(|| result.objects.last().map(|o| o.key.clone()))
+ } else {
+ None
+ };
let xml = if is_v2 {
let next_token = next_marker
.as_deref()
@@ -922,11 +1000,15 @@ async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result, R
http_body_util::BodyExt::collect(body)
.await
.map(|collected| collected.to_bytes().to_vec())
- .map_err(|_| {
- s3_error_response(S3Error::new(
- S3ErrorCode::InvalidRequest,
- "Failed to read request body",
- ))
+ .map_err(|err| {
+ if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(&err) {
+ bad_digest_response(message)
+ } else {
+ s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidRequest,
+ "Failed to read request body",
+ ))
+ }
})
}
@@ -1537,7 +1619,7 @@ pub async fn delete_object(
Ok(()) => {
notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
trigger_replication(&state, &bucket, &key, "delete");
- StatusCode::NO_CONTENT.into_response()
+ (StatusCode::NO_CONTENT, HeaderMap::new()).into_response()
}
Err(e) => storage_err_response(e),
}
@@ -1764,6 +1846,70 @@ async fn complete_multipart_handler(
}
};
+ if parsed.parts.is_empty() {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::MalformedXML,
+ "CompleteMultipartUpload requires at least one part",
+ ));
+ }
+
+ let mut last_part_num: u32 = 0;
+ for p in &parsed.parts {
+ if p.part_number == 0 {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidPartOrder,
+ "Part numbers must be greater than zero",
+ ));
+ }
+ if p.part_number <= last_part_num {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidPartOrder,
+ "Parts must be specified in ascending order with no duplicates",
+ ));
+ }
+ last_part_num = p.part_number;
+ }
+
+ let stored_parts = match state.storage.list_parts(bucket, upload_id).await {
+ Ok(list) => list,
+ Err(e) => return storage_err_response(e),
+ };
+ let stored_map: HashMap = stored_parts
+ .iter()
+ .map(|p| (p.part_number, (p.etag.clone(), p.size)))
+ .collect();
+ let min_part_size: u64 = state.config.multipart_min_part_size;
+ let total_parts = parsed.parts.len();
+ for (idx, p) in parsed.parts.iter().enumerate() {
+ let stored = match stored_map.get(&p.part_number) {
+ Some(s) => s,
+ None => {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidPart,
+ format!("Part {} not found", p.part_number),
+ ));
+ }
+ };
+ let client_etag = p.etag.trim().trim_matches('"').to_ascii_lowercase();
+ let stored_etag = stored.0.trim().trim_matches('"').to_ascii_lowercase();
+ if !client_etag.is_empty() && client_etag != stored_etag {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidPart,
+ format!("ETag mismatch for part {}", p.part_number),
+ ));
+ }
+ let is_final = idx + 1 == total_parts;
+ if !is_final && stored.1 < min_part_size {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::EntityTooSmall,
+ format!(
+ "Part {} is smaller than the minimum allowed size of {} bytes",
+ p.part_number, min_part_size
+ ),
+ ));
+ }
+ }
+
let parts: Vec = parsed
.parts
.iter()
@@ -1852,7 +1998,8 @@ async fn object_attributes_handler(
if all || attrs.contains("etag") {
if let Some(etag) = &meta.etag {
- xml.push_str(&format!("{}", xml_escape(etag)));
+ let trimmed = etag.trim_matches('"');
+ xml.push_str(&format!("\"{}\"", xml_escape(trimmed)));
}
}
if all || attrs.contains("storageclass") {
@@ -1909,48 +2056,151 @@ async fn copy_object_handler(
return resp;
}
- let copy_result = if let Some(version_id) = src_version_id
- .as_deref()
- .filter(|value| !is_null_version(Some(*value)))
- {
- let (_meta, mut reader) = match state
+ let metadata_directive = headers
+ .get("x-amz-metadata-directive")
+ .and_then(|v| v.to_str().ok())
+ .map(|v| v.trim().to_ascii_uppercase())
+ .unwrap_or_else(|| "COPY".to_string());
+ let tagging_directive = headers
+ .get("x-amz-tagging-directive")
+ .and_then(|v| v.to_str().ok())
+ .map(|v| v.trim().to_ascii_uppercase())
+ .unwrap_or_else(|| "COPY".to_string());
+ let replace_metadata = metadata_directive == "REPLACE";
+ let replace_tagging = tagging_directive == "REPLACE";
+
+ let same_object = src_bucket == dst_bucket
+ && src_key == dst_key
+ && src_version_id.as_deref().unwrap_or("") == "";
+ if same_object && !replace_metadata && !replace_tagging {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidRequest,
+ "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.",
+ ));
+ }
+
+ let source_metadata_existing = match src_version_id.as_deref() {
+ Some(version_id) if version_id != "null" => {
+ match state
+ .storage
+ .get_object_version_metadata(&src_bucket, &src_key, version_id)
+ .await
+ {
+ Ok(metadata) => metadata,
+ Err(e) => return storage_err_response(e),
+ }
+ }
+ _ => match state
.storage
- .get_object_version(&src_bucket, &src_key, version_id)
+ .get_object_metadata(&src_bucket, &src_key)
.await
{
+ Ok(m) => m,
+ Err(e) => return storage_err_response(e),
+ },
+ };
+
+ let dst_metadata = if replace_metadata {
+ let mut m: HashMap = HashMap::new();
+ for (request_header, metadata_key, _) in internal_header_pairs() {
+ if let Some(value) = headers.get(*request_header).and_then(|v| v.to_str().ok()) {
+ if *request_header == "content-encoding" {
+ if let Some(decoded_encoding) = decoded_content_encoding(value) {
+ m.insert((*metadata_key).to_string(), decoded_encoding);
+ }
+ } else {
+ m.insert((*metadata_key).to_string(), value.to_string());
+ }
+ }
+ }
+ let content_type = guessed_content_type(
+ dst_key,
+ headers.get("content-type").and_then(|v| v.to_str().ok()),
+ );
+ m.insert("__content_type__".to_string(), content_type);
+ for (name, value) in headers.iter() {
+ let name_str = name.as_str();
+ if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
+ if let Ok(val) = value.to_str() {
+ m.insert(meta_key.to_string(), val.to_string());
+ }
+ }
+ }
+ if let Some(value) = headers
+ .get("x-amz-storage-class")
+ .and_then(|v| v.to_str().ok())
+ {
+ m.insert("__storage_class__".to_string(), value.to_ascii_uppercase());
+ }
+ m
+ } else {
+ source_metadata_existing.clone()
+ };
+
+ let (_meta, mut reader) = match src_version_id.as_deref() {
+ Some(version_id) if version_id != "null" => {
+ match state
+ .storage
+ .get_object_version(&src_bucket, &src_key, version_id)
+ .await
+ {
+ Ok(result) => result,
+ Err(e) => return storage_err_response(e),
+ }
+ }
+ _ => match state.storage.get_object(&src_bucket, &src_key).await {
Ok(result) => result,
Err(e) => return storage_err_response(e),
- };
- let mut data = Vec::new();
- if let Err(e) = reader.read_to_end(&mut data).await {
- return storage_err_response(myfsio_storage::error::StorageError::Io(e));
- }
- let metadata = match state
- .storage
- .get_object_version_metadata(&src_bucket, &src_key, version_id)
- .await
- {
- Ok(metadata) => metadata,
- Err(e) => return storage_err_response(e),
- };
- state
- .storage
- .put_object(
- dst_bucket,
- dst_key,
- Box::pin(std::io::Cursor::new(data)),
- Some(metadata),
- )
- .await
- } else {
- state
- .storage
- .copy_object(&src_bucket, &src_key, dst_bucket, dst_key)
- .await
+ },
};
+ let mut data = Vec::new();
+ if let Err(e) = reader.read_to_end(&mut data).await {
+ return storage_err_response(myfsio_storage::error::StorageError::Io(e));
+ }
+
+ let copy_result = state
+ .storage
+ .put_object(
+ dst_bucket,
+ dst_key,
+ Box::pin(std::io::Cursor::new(data)),
+ Some(dst_metadata),
+ )
+ .await;
match copy_result {
Ok(meta) => {
+ if replace_tagging {
+ let tags = match headers
+ .get("x-amz-tagging")
+ .and_then(|value| value.to_str().ok())
+ .map(parse_tagging_header)
+ .transpose()
+ {
+ Ok(tags) => tags,
+ Err(response) => return response,
+ };
+ if let Some(ref tags) = tags {
+ if tags.len() > state.config.object_tag_limit {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::InvalidTag,
+ format!("Maximum {} tags allowed", state.config.object_tag_limit),
+ ));
+ }
+ if let Err(e) = state
+ .storage
+ .set_object_tags(dst_bucket, dst_key, tags)
+ .await
+ {
+ return storage_err_response(e);
+ }
+ } else {
+ let _ = state
+ .storage
+ .set_object_tags(dst_bucket, dst_key, &[])
+ .await;
+ }
+ }
let etag = meta.etag.as_deref().unwrap_or("");
let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified);
let xml = myfsio_xml::response::copy_object_result_xml(etag, &last_modified);
@@ -1983,6 +2233,13 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
}
};
+ if parsed.objects.len() > 1000 {
+ return s3_error_response(S3Error::new(
+ S3ErrorCode::MalformedXML,
+ "The request must not contain more than 1000 keys",
+ ));
+ }
+
let mut deleted = Vec::new();
let mut errors = Vec::new();
@@ -2280,9 +2537,20 @@ fn evaluate_copy_preconditions(
}
fn parse_http_date(value: &str) -> Option> {
- DateTime::parse_from_rfc2822(value)
- .ok()
- .map(|dt| dt.with_timezone(&Utc))
+ let trimmed = value.trim();
+ if let Ok(dt) = DateTime::parse_from_rfc2822(trimmed) {
+ return Some(dt.with_timezone(&Utc));
+ }
+ if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) {
+ return Some(dt.with_timezone(&Utc));
+ }
+ if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%A, %d-%b-%y %H:%M:%S GMT") {
+ return Some(naive.and_utc());
+ }
+ if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%a %b %e %H:%M:%S %Y") {
+ return Some(naive.and_utc());
+ }
+ None
}
fn etag_condition_matches(condition: &str, etag: Option<&str>) -> bool {
diff --git a/crates/myfsio-server/src/lib.rs b/crates/myfsio-server/src/lib.rs
index 93c7b77..e613b18 100644
--- a/crates/myfsio-server/src/lib.rs
+++ b/crates/myfsio-server/src/lib.rs
@@ -577,11 +577,17 @@ pub fn create_router(state: state::AppState) -> Router {
middleware::rate_limit_layer,
));
+ let request_body_timeout =
+ std::time::Duration::from_secs(state.config.request_body_timeout_secs);
+
api_router
.merge(admin_router)
.layer(axum::middleware::from_fn(middleware::server_header))
.layer(cors_layer(&state.config))
.layer(tower_http::compression::CompressionLayer::new())
+ .layer(tower_http::timeout::RequestBodyTimeoutLayer::new(
+ request_body_timeout,
+ ))
.with_state(state)
}
diff --git a/crates/myfsio-server/src/main.rs b/crates/myfsio-server/src/main.rs
index a77175f..0732f3c 100644
--- a/crates/myfsio-server/src/main.rs
+++ b/crates/myfsio-server/src/main.rs
@@ -189,6 +189,11 @@ async fn main() {
let shutdown = shutdown_signal_shared();
let api_shutdown = shutdown.clone();
+ let api_listener = axum::serve::ListenerExt::tap_io(api_listener, |stream| {
+ if let Err(err) = stream.set_nodelay(true) {
+ tracing::trace!("failed to set TCP_NODELAY on api socket: {}", err);
+ }
+ });
let api_task = tokio::spawn(async move {
axum::serve(
api_listener,
@@ -202,6 +207,11 @@ async fn main() {
let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) {
let ui_shutdown = shutdown.clone();
+ let listener = axum::serve::ListenerExt::tap_io(listener, |stream| {
+ if let Err(err) = stream.set_nodelay(true) {
+ tracing::trace!("failed to set TCP_NODELAY on ui socket: {}", err);
+ }
+ });
Some(tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
diff --git a/crates/myfsio-server/src/middleware/auth.rs b/crates/myfsio-server/src/middleware/auth.rs
index dce2ea4..202c03c 100644
--- a/crates/myfsio-server/src/middleware/auth.rs
+++ b/crates/myfsio-server/src/middleware/auth.rs
@@ -12,9 +12,36 @@ use serde_json::Value;
use std::time::Instant;
use tokio::io::AsyncReadExt;
+use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody};
use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState;
+fn wrap_body_for_sha256_verification(req: &mut Request) {
+ let declared = match req
+ .headers()
+ .get("x-amz-content-sha256")
+ .and_then(|v| v.to_str().ok())
+ {
+ Some(v) => v.to_string(),
+ None => return,
+ };
+ if !is_hex_sha256(&declared) {
+ return;
+ }
+ let is_chunked = req
+ .headers()
+ .get("content-encoding")
+ .and_then(|v| v.to_str().ok())
+ .map(|v| v.to_ascii_lowercase().contains("aws-chunked"))
+ .unwrap_or(false);
+ if is_chunked {
+ return;
+ }
+ let body = std::mem::replace(req.body_mut(), axum::body::Body::empty());
+ let wrapped = Sha256VerifyBody::new(body, declared);
+ *req.body_mut() = axum::body::Body::new(wrapped);
+}
+
#[derive(Clone, Debug)]
struct OriginalCanonicalPath(String);
@@ -475,6 +502,7 @@ pub async fn auth_layer(State(state): State, mut req: Request, next: N
error_response(err, &auth_path)
} else {
req.extensions_mut().insert(principal);
+ wrap_body_for_sha256_verification(&mut req);
next.run(req).await
}
}
@@ -1102,7 +1130,9 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
let parts: Vec<&str> = auth_str
.strip_prefix("AWS4-HMAC-SHA256 ")
.unwrap()
- .split(", ")
+ .split(',')
+ .map(str::trim)
+ .filter(|s| !s.is_empty())
.collect();
if parts.len() != 3 {
@@ -1112,9 +1142,24 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
));
}
- let credential = parts[0].strip_prefix("Credential=").unwrap_or("");
- let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or("");
- let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or("");
+ let mut credential: &str = "";
+ let mut signed_headers_str: &str = "";
+ let mut provided_signature: &str = "";
+ for part in &parts {
+ if let Some(v) = part.strip_prefix("Credential=") {
+ credential = v;
+ } else if let Some(v) = part.strip_prefix("SignedHeaders=") {
+ signed_headers_str = v;
+ } else if let Some(v) = part.strip_prefix("Signature=") {
+ provided_signature = v;
+ }
+ }
+ if credential.is_empty() || signed_headers_str.is_empty() || provided_signature.is_empty() {
+ return AuthResult::Denied(S3Error::new(
+ S3ErrorCode::InvalidArgument,
+ "Malformed Authorization header",
+ ));
+ }
let cred_parts: Vec<&str> = credential.split('/').collect();
if cred_parts.len() != 5 {
diff --git a/crates/myfsio-server/src/middleware/mod.rs b/crates/myfsio-server/src/middleware/mod.rs
index 2f3727d..aa0c96b 100644
--- a/crates/myfsio-server/src/middleware/mod.rs
+++ b/crates/myfsio-server/src/middleware/mod.rs
@@ -1,6 +1,7 @@
mod auth;
pub mod ratelimit;
pub mod session;
+pub(crate) mod sha_body;
pub use auth::auth_layer;
pub use ratelimit::{rate_limit_layer, RateLimitLayerState};
diff --git a/crates/myfsio-server/src/middleware/session.rs b/crates/myfsio-server/src/middleware/session.rs
index 68f8207..ee2bcc4 100644
--- a/crates/myfsio-server/src/middleware/session.rs
+++ b/crates/myfsio-server/src/middleware/session.rs
@@ -181,8 +181,8 @@ pub async fn csrf_layer(
.unwrap_or("");
let is_form_submit = content_type.starts_with("application/x-www-form-urlencoded")
|| content_type.starts_with("multipart/form-data");
- let wants_json = accept.contains("application/json")
- || content_type.starts_with("application/json");
+ let wants_json =
+ accept.contains("application/json") || content_type.starts_with("application/json");
if is_form_submit && !wants_json {
let ctx = crate::handlers::ui::base_context(&handle, None);
diff --git a/crates/myfsio-server/src/middleware/sha_body.rs b/crates/myfsio-server/src/middleware/sha_body.rs
new file mode 100644
index 0000000..cd2a97f
--- /dev/null
+++ b/crates/myfsio-server/src/middleware/sha_body.rs
@@ -0,0 +1,107 @@
+use axum::body::Body;
+use bytes::Bytes;
+use http_body::{Body as HttpBody, Frame};
+use sha2::{Digest, Sha256};
+use std::error::Error;
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[derive(Debug)]
+struct Sha256MismatchError {
+ expected: String,
+ computed: String,
+}
+
+impl Sha256MismatchError {
+ fn message(&self) -> String {
+ format!(
+ "The x-amz-content-sha256 you specified did not match what we received (expected {}, computed {})",
+ self.expected, self.computed
+ )
+ }
+}
+
+impl fmt::Display for Sha256MismatchError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "XAmzContentSHA256Mismatch: expected {}, computed {}",
+ self.expected, self.computed
+ )
+ }
+}
+
+impl Error for Sha256MismatchError {}
+
+pub struct Sha256VerifyBody {
+ inner: Body,
+ expected: String,
+ hasher: Option,
+}
+
+impl Sha256VerifyBody {
+ pub fn new(inner: Body, expected_hex: String) -> Self {
+ Self {
+ inner,
+ expected: expected_hex.to_ascii_lowercase(),
+ hasher: Some(Sha256::new()),
+ }
+ }
+}
+
+impl HttpBody for Sha256VerifyBody {
+ type Data = Bytes;
+ type Error = Box;
+
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll