Fix SigV4/SHA256/TCP_NODELAY critical paths; tighten multipart, copy, versioning, and S3 error conformance

This commit is contained in:
2026-04-23 17:52:30 +08:00
parent e1fb225034
commit 7ef3820f6e
16 changed files with 821 additions and 80 deletions

14
Cargo.lock generated
View File

@@ -2639,7 +2639,7 @@ dependencies = [
[[package]]
name = "myfsio-auth"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"aes",
"base64",
@@ -2664,7 +2664,7 @@ dependencies = [
[[package]]
name = "myfsio-common"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"chrono",
"serde",
@@ -2675,7 +2675,7 @@ dependencies = [
[[package]]
name = "myfsio-crypto"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"aes-gcm",
"base64",
@@ -2696,7 +2696,7 @@ dependencies = [
[[package]]
name = "myfsio-server"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"aes-gcm",
"async-trait",
@@ -2714,6 +2714,8 @@ dependencies = [
"dotenvy",
"duckdb",
"futures",
"hex",
"http-body 1.0.1",
"http-body-util",
"hyper 1.9.0",
"md-5 0.10.6",
@@ -2751,7 +2753,7 @@ dependencies = [
[[package]]
name = "myfsio-storage"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"chrono",
"dashmap",
@@ -2774,7 +2776,7 @@ dependencies = [
[[package]]
name = "myfsio-xml"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"chrono",
"myfsio-common",

View File

@@ -10,14 +10,14 @@ members = [
]
[workspace.package]
version = "0.4.3"
version = "0.4.4"
edition = "2021"
[workspace.dependencies]
tokio = { version = "1", features = ["full"] }
axum = { version = "0.8" }
tower = { version = "0.5" }
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip"] }
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip", "timeout"] }
hyper = { version = "1" }
bytes = "1"
serde = { version = "1", features = ["derive"] }

View File

@@ -7,11 +7,14 @@ pub enum S3ErrorCode {
BucketAlreadyExists,
BucketNotEmpty,
EntityTooLarge,
EntityTooSmall,
InternalError,
InvalidAccessKeyId,
InvalidArgument,
InvalidBucketName,
InvalidKey,
InvalidPart,
InvalidPartOrder,
InvalidPolicyDocument,
InvalidRange,
InvalidRequest,
@@ -19,13 +22,16 @@ pub enum S3ErrorCode {
MalformedXML,
MethodNotAllowed,
NoSuchBucket,
NoSuchBucketPolicy,
NoSuchKey,
NoSuchLifecycleConfiguration,
NoSuchUpload,
NoSuchVersion,
NoSuchTagSet,
PreconditionFailed,
NotModified,
QuotaExceeded,
ServerSideEncryptionConfigurationNotFoundError,
SignatureDoesNotMatch,
SlowDown,
}
@@ -38,11 +44,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => 409,
Self::BucketNotEmpty => 409,
Self::EntityTooLarge => 413,
Self::EntityTooSmall => 400,
Self::InternalError => 500,
Self::InvalidAccessKeyId => 403,
Self::InvalidArgument => 400,
Self::InvalidBucketName => 400,
Self::InvalidKey => 400,
Self::InvalidPart => 400,
Self::InvalidPartOrder => 400,
Self::InvalidPolicyDocument => 400,
Self::InvalidRange => 416,
Self::InvalidRequest => 400,
@@ -50,13 +59,16 @@ impl S3ErrorCode {
Self::MalformedXML => 400,
Self::MethodNotAllowed => 405,
Self::NoSuchBucket => 404,
Self::NoSuchBucketPolicy => 404,
Self::NoSuchKey => 404,
Self::NoSuchLifecycleConfiguration => 404,
Self::NoSuchUpload => 404,
Self::NoSuchVersion => 404,
Self::NoSuchTagSet => 404,
Self::PreconditionFailed => 412,
Self::NotModified => 304,
Self::QuotaExceeded => 403,
Self::ServerSideEncryptionConfigurationNotFoundError => 404,
Self::SignatureDoesNotMatch => 403,
Self::SlowDown => 429,
}
@@ -69,11 +81,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => "BucketAlreadyExists",
Self::BucketNotEmpty => "BucketNotEmpty",
Self::EntityTooLarge => "EntityTooLarge",
Self::EntityTooSmall => "EntityTooSmall",
Self::InternalError => "InternalError",
Self::InvalidAccessKeyId => "InvalidAccessKeyId",
Self::InvalidArgument => "InvalidArgument",
Self::InvalidBucketName => "InvalidBucketName",
Self::InvalidKey => "InvalidKey",
Self::InvalidPart => "InvalidPart",
Self::InvalidPartOrder => "InvalidPartOrder",
Self::InvalidPolicyDocument => "InvalidPolicyDocument",
Self::InvalidRange => "InvalidRange",
Self::InvalidRequest => "InvalidRequest",
@@ -81,13 +96,18 @@ impl S3ErrorCode {
Self::MalformedXML => "MalformedXML",
Self::MethodNotAllowed => "MethodNotAllowed",
Self::NoSuchBucket => "NoSuchBucket",
Self::NoSuchBucketPolicy => "NoSuchBucketPolicy",
Self::NoSuchKey => "NoSuchKey",
Self::NoSuchLifecycleConfiguration => "NoSuchLifecycleConfiguration",
Self::NoSuchUpload => "NoSuchUpload",
Self::NoSuchVersion => "NoSuchVersion",
Self::NoSuchTagSet => "NoSuchTagSet",
Self::PreconditionFailed => "PreconditionFailed",
Self::NotModified => "NotModified",
Self::QuotaExceeded => "QuotaExceeded",
Self::ServerSideEncryptionConfigurationNotFoundError => {
"ServerSideEncryptionConfigurationNotFoundError"
}
Self::SignatureDoesNotMatch => "SignatureDoesNotMatch",
Self::SlowDown => "SlowDown",
}
@@ -100,11 +120,14 @@ impl S3ErrorCode {
Self::BucketAlreadyExists => "The requested bucket name is not available",
Self::BucketNotEmpty => "The bucket you tried to delete is not empty",
Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size",
Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size",
Self::InternalError => "We encountered an internal error. Please try again.",
Self::InvalidAccessKeyId => "The access key ID you provided does not exist",
Self::InvalidArgument => "Invalid argument",
Self::InvalidBucketName => "The specified bucket is not valid",
Self::InvalidKey => "The specified key is not valid",
Self::InvalidPart => "One or more of the specified parts could not be found",
Self::InvalidPartOrder => "The list of parts was not in ascending order",
Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document",
Self::InvalidRange => "The requested range is not satisfiable",
Self::InvalidRequest => "Invalid request",
@@ -112,13 +135,16 @@ impl S3ErrorCode {
Self::MalformedXML => "The XML you provided was not well-formed",
Self::MethodNotAllowed => "The specified method is not allowed against this resource",
Self::NoSuchBucket => "The specified bucket does not exist",
Self::NoSuchBucketPolicy => "The bucket policy does not exist",
Self::NoSuchKey => "The specified key does not exist",
Self::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist",
Self::NoSuchUpload => "The specified multipart upload does not exist",
Self::NoSuchVersion => "The specified version does not exist",
Self::NoSuchTagSet => "The TagSet does not exist",
Self::PreconditionFailed => "At least one of the preconditions you specified did not hold",
Self::NotModified => "Not Modified",
Self::QuotaExceeded => "The bucket quota has been exceeded",
Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found",
Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided",
Self::SlowDown => "Please reduce your request rate",
}

View File

@@ -27,12 +27,14 @@ tokio-stream = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
futures = { workspace = true }
http-body = "1"
http-body-util = "0.1"
percent-encoding = { workspace = true }
quick-xml = { workspace = true }
mime_guess = "2"
crc32fast = { workspace = true }
sha2 = { workspace = true }
hex = { workspace = true }
duckdb = { workspace = true }
roxmltree = "0.20"
parking_lot = { workspace = true }

View File

@@ -81,6 +81,7 @@ pub struct ServerConfig {
pub multipart_min_part_size: u64,
pub bulk_delete_max_keys: usize,
pub stream_chunk_size: usize,
pub request_body_timeout_secs: u64,
pub ratelimit_default: RateLimitSetting,
pub ratelimit_admin: RateLimitSetting,
pub ratelimit_storage_uri: String,
@@ -225,6 +226,7 @@ impl ServerConfig {
let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880);
let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000);
let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576);
let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60);
let ratelimit_default =
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60));
let ratelimit_admin =
@@ -304,6 +306,7 @@ impl ServerConfig {
multipart_min_part_size,
bulk_delete_max_keys,
stream_chunk_size,
request_body_timeout_secs,
ratelimit_default,
ratelimit_admin,
ratelimit_storage_uri,
@@ -387,6 +390,7 @@ impl Default for ServerConfig {
multipart_min_part_size: 5_242_880,
bulk_delete_max_keys: 1000,
stream_chunk_size: 1_048_576,
request_body_timeout_secs: 60,
ratelimit_default: RateLimitSetting::new(200, 60),
ratelimit_admin: RateLimitSetting::new(60, 60),
ratelimit_storage_uri: "memory://".to_string(),

View File

@@ -218,11 +218,8 @@ pub async fn get_encryption(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(
S3ErrorCode::InvalidRequest,
"The server side encryption configuration was not found",
)
.to_xml(),
S3Error::from_code(S3ErrorCode::ServerSideEncryptionConfigurationNotFoundError)
.to_xml(),
)
}
}
@@ -270,11 +267,7 @@ pub async fn get_lifecycle(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(
S3ErrorCode::NoSuchKey,
"The lifecycle configuration does not exist",
)
.to_xml(),
S3Error::from_code(S3ErrorCode::NoSuchLifecycleConfiguration).to_xml(),
)
}
}
@@ -421,7 +414,7 @@ pub async fn get_policy(state: &AppState, bucket: &str) -> Response {
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(),
S3Error::from_code(S3ErrorCode::NoSuchBucketPolicy).to_xml(),
)
}
}
@@ -1095,6 +1088,35 @@ pub async fn list_object_versions(
|| archived_versions.len() > archived_count;
xml.push_str(&format!("<IsTruncated>{}</IsTruncated>", is_truncated));
let current_keys: std::collections::HashSet<String> = objects
.iter()
.take(current_count)
.map(|o| o.key.clone())
.collect();
let mut latest_archived_per_key: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for v in archived_versions.iter().take(archived_count) {
if current_keys.contains(&v.key) {
continue;
}
let existing = latest_archived_per_key.get(&v.key).cloned();
match existing {
None => {
latest_archived_per_key.insert(v.key.clone(), v.version_id.clone());
}
Some(existing_id) => {
let existing_ts = archived_versions
.iter()
.find(|x| x.key == v.key && x.version_id == existing_id)
.map(|x| x.last_modified)
.unwrap_or(v.last_modified);
if v.last_modified > existing_ts {
latest_archived_per_key.insert(v.key.clone(), v.version_id.clone());
}
}
}
}
for obj in objects.iter().take(current_count) {
xml.push_str("<Version>");
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&obj.key)));
@@ -1116,23 +1138,34 @@ pub async fn list_object_versions(
}
for version in archived_versions.iter().take(archived_count) {
xml.push_str("<Version>");
let is_latest = latest_archived_per_key
.get(&version.key)
.map(|id| id == &version.version_id)
.unwrap_or(false);
let tag = if version.is_delete_marker {
"DeleteMarker"
} else {
"Version"
};
xml.push_str(&format!("<{}>", tag));
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&version.key)));
xml.push_str(&format!(
"<VersionId>{}</VersionId>",
xml_escape(&version.version_id)
));
xml.push_str("<IsLatest>false</IsLatest>");
xml.push_str(&format!("<IsLatest>{}</IsLatest>", is_latest));
xml.push_str(&format!(
"<LastModified>{}</LastModified>",
myfsio_xml::response::format_s3_datetime(&version.last_modified)
));
if let Some(ref etag) = version.etag {
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
if !version.is_delete_marker {
if let Some(ref etag) = version.etag {
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
}
xml.push_str(&format!("<Size>{}</Size>", version.size));
xml.push_str("<StorageClass>STANDARD</StorageClass>");
}
xml.push_str(&format!("<Size>{}</Size>", version.size));
xml.push_str("<StorageClass>STANDARD</StorageClass>");
xml.push_str("</Version>");
xml.push_str(&format!("</{}>", tag));
}
xml.push_str("</ListVersionsResult>");
@@ -1182,6 +1215,26 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body:
.to_xml(),
);
}
for tag in &tags {
if tag.key.is_empty() || tag.key.len() > 128 {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag key length must be 1-128").to_xml(),
);
}
if tag.value.len() > 256 {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag value length must be 0-256").to_xml(),
);
}
if tag.key.contains('=') {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag keys must not contain '='").to_xml(),
);
}
}
match state.storage.set_object_tags(bucket, key, &tags).await {
Ok(()) => StatusCode::OK.into_response(),

View File

@@ -47,6 +47,11 @@ fn s3_error_response(err: S3Error) -> Response {
}
fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response {
if let myfsio_storage::error::StorageError::Io(io_err) = &err {
if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(io_err) {
return bad_digest_response(message);
}
}
s3_error_response(S3Error::from(err))
}
@@ -391,6 +396,75 @@ pub async fn get_bucket(
Some(marker.clone())
};
if max_keys == 0 {
let has_any = if delimiter.is_empty() {
state
.storage
.list_objects(
&bucket,
&myfsio_common::types::ListParams {
max_keys: 1,
continuation_token: effective_start.clone(),
prefix: if prefix.is_empty() {
None
} else {
Some(prefix.clone())
},
start_after: if is_v2 {
query.start_after.clone()
} else {
None
},
},
)
.await
.map(|r| !r.objects.is_empty())
.unwrap_or(false)
} else {
state
.storage
.list_objects_shallow(
&bucket,
&myfsio_common::types::ShallowListParams {
prefix: prefix.clone(),
delimiter: delimiter.clone(),
max_keys: 1,
continuation_token: effective_start.clone(),
},
)
.await
.map(|r| !r.objects.is_empty() || !r.common_prefixes.is_empty())
.unwrap_or(false)
};
let xml = if is_v2 {
myfsio_xml::response::list_objects_v2_xml(
&bucket,
&prefix,
&delimiter,
0,
&[],
&[],
has_any,
query.continuation_token.as_deref(),
None,
0,
)
} else {
myfsio_xml::response::list_objects_v1_xml(
&bucket,
&prefix,
&marker,
&delimiter,
0,
&[],
&[],
has_any,
None,
)
};
return (StatusCode::OK, [("content-type", "application/xml")], xml).into_response();
}
if delimiter.is_empty() {
let params = myfsio_common::types::ListParams {
max_keys,
@@ -408,10 +482,14 @@ pub async fn get_bucket(
};
match state.storage.list_objects(&bucket, &params).await {
Ok(result) => {
let next_marker = result
.next_continuation_token
.clone()
.or_else(|| result.objects.last().map(|o| o.key.clone()));
let next_marker = if result.is_truncated {
result
.next_continuation_token
.clone()
.or_else(|| result.objects.last().map(|o| o.key.clone()))
} else {
None
};
let xml = if is_v2 {
let next_token = next_marker
.as_deref()
@@ -922,11 +1000,15 @@ async fn collect_upload_body(body: Body, aws_chunked: bool) -> Result<Vec<u8>, R
http_body_util::BodyExt::collect(body)
.await
.map(|collected| collected.to_bytes().to_vec())
.map_err(|_| {
s3_error_response(S3Error::new(
S3ErrorCode::InvalidRequest,
"Failed to read request body",
))
.map_err(|err| {
if let Some(message) = crate::middleware::sha_body::sha256_mismatch_message(&err) {
bad_digest_response(message)
} else {
s3_error_response(S3Error::new(
S3ErrorCode::InvalidRequest,
"Failed to read request body",
))
}
})
}
@@ -1537,7 +1619,7 @@ pub async fn delete_object(
Ok(()) => {
notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
trigger_replication(&state, &bucket, &key, "delete");
StatusCode::NO_CONTENT.into_response()
(StatusCode::NO_CONTENT, HeaderMap::new()).into_response()
}
Err(e) => storage_err_response(e),
}
@@ -1764,6 +1846,70 @@ async fn complete_multipart_handler(
}
};
if parsed.parts.is_empty() {
return s3_error_response(S3Error::new(
S3ErrorCode::MalformedXML,
"CompleteMultipartUpload requires at least one part",
));
}
let mut last_part_num: u32 = 0;
for p in &parsed.parts {
if p.part_number == 0 {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidPartOrder,
"Part numbers must be greater than zero",
));
}
if p.part_number <= last_part_num {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidPartOrder,
"Parts must be specified in ascending order with no duplicates",
));
}
last_part_num = p.part_number;
}
let stored_parts = match state.storage.list_parts(bucket, upload_id).await {
Ok(list) => list,
Err(e) => return storage_err_response(e),
};
let stored_map: HashMap<u32, (String, u64)> = stored_parts
.iter()
.map(|p| (p.part_number, (p.etag.clone(), p.size)))
.collect();
let min_part_size: u64 = state.config.multipart_min_part_size;
let total_parts = parsed.parts.len();
for (idx, p) in parsed.parts.iter().enumerate() {
let stored = match stored_map.get(&p.part_number) {
Some(s) => s,
None => {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidPart,
format!("Part {} not found", p.part_number),
));
}
};
let client_etag = p.etag.trim().trim_matches('"').to_ascii_lowercase();
let stored_etag = stored.0.trim().trim_matches('"').to_ascii_lowercase();
if !client_etag.is_empty() && client_etag != stored_etag {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidPart,
format!("ETag mismatch for part {}", p.part_number),
));
}
let is_final = idx + 1 == total_parts;
if !is_final && stored.1 < min_part_size {
return s3_error_response(S3Error::new(
S3ErrorCode::EntityTooSmall,
format!(
"Part {} is smaller than the minimum allowed size of {} bytes",
p.part_number, min_part_size
),
));
}
}
let parts: Vec<PartInfo> = parsed
.parts
.iter()
@@ -1852,7 +1998,8 @@ async fn object_attributes_handler(
if all || attrs.contains("etag") {
if let Some(etag) = &meta.etag {
xml.push_str(&format!("<ETag>{}</ETag>", xml_escape(etag)));
let trimmed = etag.trim_matches('"');
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(trimmed)));
}
}
if all || attrs.contains("storageclass") {
@@ -1909,48 +2056,151 @@ async fn copy_object_handler(
return resp;
}
let copy_result = if let Some(version_id) = src_version_id
.as_deref()
.filter(|value| !is_null_version(Some(*value)))
{
let (_meta, mut reader) = match state
let metadata_directive = headers
.get("x-amz-metadata-directive")
.and_then(|v| v.to_str().ok())
.map(|v| v.trim().to_ascii_uppercase())
.unwrap_or_else(|| "COPY".to_string());
let tagging_directive = headers
.get("x-amz-tagging-directive")
.and_then(|v| v.to_str().ok())
.map(|v| v.trim().to_ascii_uppercase())
.unwrap_or_else(|| "COPY".to_string());
let replace_metadata = metadata_directive == "REPLACE";
let replace_tagging = tagging_directive == "REPLACE";
let same_object = src_bucket == dst_bucket
&& src_key == dst_key
&& src_version_id.as_deref().unwrap_or("") == "";
if same_object && !replace_metadata && !replace_tagging {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidRequest,
"This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.",
));
}
let source_metadata_existing = match src_version_id.as_deref() {
Some(version_id) if version_id != "null" => {
match state
.storage
.get_object_version_metadata(&src_bucket, &src_key, version_id)
.await
{
Ok(metadata) => metadata,
Err(e) => return storage_err_response(e),
}
}
_ => match state
.storage
.get_object_version(&src_bucket, &src_key, version_id)
.get_object_metadata(&src_bucket, &src_key)
.await
{
Ok(m) => m,
Err(e) => return storage_err_response(e),
},
};
let dst_metadata = if replace_metadata {
let mut m: HashMap<String, String> = HashMap::new();
for (request_header, metadata_key, _) in internal_header_pairs() {
if let Some(value) = headers.get(*request_header).and_then(|v| v.to_str().ok()) {
if *request_header == "content-encoding" {
if let Some(decoded_encoding) = decoded_content_encoding(value) {
m.insert((*metadata_key).to_string(), decoded_encoding);
}
} else {
m.insert((*metadata_key).to_string(), value.to_string());
}
}
}
let content_type = guessed_content_type(
dst_key,
headers.get("content-type").and_then(|v| v.to_str().ok()),
);
m.insert("__content_type__".to_string(), content_type);
for (name, value) in headers.iter() {
let name_str = name.as_str();
if let Some(meta_key) = name_str.strip_prefix("x-amz-meta-") {
if let Ok(val) = value.to_str() {
m.insert(meta_key.to_string(), val.to_string());
}
}
}
if let Some(value) = headers
.get("x-amz-storage-class")
.and_then(|v| v.to_str().ok())
{
m.insert("__storage_class__".to_string(), value.to_ascii_uppercase());
}
m
} else {
source_metadata_existing.clone()
};
let (_meta, mut reader) = match src_version_id.as_deref() {
Some(version_id) if version_id != "null" => {
match state
.storage
.get_object_version(&src_bucket, &src_key, version_id)
.await
{
Ok(result) => result,
Err(e) => return storage_err_response(e),
}
}
_ => match state.storage.get_object(&src_bucket, &src_key).await {
Ok(result) => result,
Err(e) => return storage_err_response(e),
};
let mut data = Vec::new();
if let Err(e) = reader.read_to_end(&mut data).await {
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
let metadata = match state
.storage
.get_object_version_metadata(&src_bucket, &src_key, version_id)
.await
{
Ok(metadata) => metadata,
Err(e) => return storage_err_response(e),
};
state
.storage
.put_object(
dst_bucket,
dst_key,
Box::pin(std::io::Cursor::new(data)),
Some(metadata),
)
.await
} else {
state
.storage
.copy_object(&src_bucket, &src_key, dst_bucket, dst_key)
.await
},
};
let mut data = Vec::new();
if let Err(e) = reader.read_to_end(&mut data).await {
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
let copy_result = state
.storage
.put_object(
dst_bucket,
dst_key,
Box::pin(std::io::Cursor::new(data)),
Some(dst_metadata),
)
.await;
match copy_result {
Ok(meta) => {
if replace_tagging {
let tags = match headers
.get("x-amz-tagging")
.and_then(|value| value.to_str().ok())
.map(parse_tagging_header)
.transpose()
{
Ok(tags) => tags,
Err(response) => return response,
};
if let Some(ref tags) = tags {
if tags.len() > state.config.object_tag_limit {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidTag,
format!("Maximum {} tags allowed", state.config.object_tag_limit),
));
}
if let Err(e) = state
.storage
.set_object_tags(dst_bucket, dst_key, tags)
.await
{
return storage_err_response(e);
}
} else {
let _ = state
.storage
.set_object_tags(dst_bucket, dst_key, &[])
.await;
}
}
let etag = meta.etag.as_deref().unwrap_or("");
let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified);
let xml = myfsio_xml::response::copy_object_result_xml(etag, &last_modified);
@@ -1983,6 +2233,13 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
}
};
if parsed.objects.len() > 1000 {
return s3_error_response(S3Error::new(
S3ErrorCode::MalformedXML,
"The request must not contain more than 1000 keys",
));
}
let mut deleted = Vec::new();
let mut errors = Vec::new();
@@ -2280,9 +2537,20 @@ fn evaluate_copy_preconditions(
}
fn parse_http_date(value: &str) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc2822(value)
.ok()
.map(|dt| dt.with_timezone(&Utc))
let trimmed = value.trim();
if let Ok(dt) = DateTime::parse_from_rfc2822(trimmed) {
return Some(dt.with_timezone(&Utc));
}
if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) {
return Some(dt.with_timezone(&Utc));
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%A, %d-%b-%y %H:%M:%S GMT") {
return Some(naive.and_utc());
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%a %b %e %H:%M:%S %Y") {
return Some(naive.and_utc());
}
None
}
fn etag_condition_matches(condition: &str, etag: Option<&str>) -> bool {

View File

@@ -577,11 +577,17 @@ pub fn create_router(state: state::AppState) -> Router {
middleware::rate_limit_layer,
));
let request_body_timeout =
std::time::Duration::from_secs(state.config.request_body_timeout_secs);
api_router
.merge(admin_router)
.layer(axum::middleware::from_fn(middleware::server_header))
.layer(cors_layer(&state.config))
.layer(tower_http::compression::CompressionLayer::new())
.layer(tower_http::timeout::RequestBodyTimeoutLayer::new(
request_body_timeout,
))
.with_state(state)
}

View File

@@ -189,6 +189,11 @@ async fn main() {
let shutdown = shutdown_signal_shared();
let api_shutdown = shutdown.clone();
let api_listener = axum::serve::ListenerExt::tap_io(api_listener, |stream| {
if let Err(err) = stream.set_nodelay(true) {
tracing::trace!("failed to set TCP_NODELAY on api socket: {}", err);
}
});
let api_task = tokio::spawn(async move {
axum::serve(
api_listener,
@@ -202,6 +207,11 @@ async fn main() {
let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) {
let ui_shutdown = shutdown.clone();
let listener = axum::serve::ListenerExt::tap_io(listener, |stream| {
if let Err(err) = stream.set_nodelay(true) {
tracing::trace!("failed to set TCP_NODELAY on ui socket: {}", err);
}
});
Some(tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {

View File

@@ -12,9 +12,36 @@ use serde_json::Value;
use std::time::Instant;
use tokio::io::AsyncReadExt;
use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody};
use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState;
fn wrap_body_for_sha256_verification(req: &mut Request) {
let declared = match req
.headers()
.get("x-amz-content-sha256")
.and_then(|v| v.to_str().ok())
{
Some(v) => v.to_string(),
None => return,
};
if !is_hex_sha256(&declared) {
return;
}
let is_chunked = req
.headers()
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_ascii_lowercase().contains("aws-chunked"))
.unwrap_or(false);
if is_chunked {
return;
}
let body = std::mem::replace(req.body_mut(), axum::body::Body::empty());
let wrapped = Sha256VerifyBody::new(body, declared);
*req.body_mut() = axum::body::Body::new(wrapped);
}
#[derive(Clone, Debug)]
struct OriginalCanonicalPath(String);
@@ -475,6 +502,7 @@ pub async fn auth_layer(State(state): State<AppState>, mut req: Request, next: N
error_response(err, &auth_path)
} else {
req.extensions_mut().insert(principal);
wrap_body_for_sha256_verification(&mut req);
next.run(req).await
}
}
@@ -1102,7 +1130,9 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
let parts: Vec<&str> = auth_str
.strip_prefix("AWS4-HMAC-SHA256 ")
.unwrap()
.split(", ")
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
if parts.len() != 3 {
@@ -1112,9 +1142,24 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
));
}
let credential = parts[0].strip_prefix("Credential=").unwrap_or("");
let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or("");
let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or("");
let mut credential: &str = "";
let mut signed_headers_str: &str = "";
let mut provided_signature: &str = "";
for part in &parts {
if let Some(v) = part.strip_prefix("Credential=") {
credential = v;
} else if let Some(v) = part.strip_prefix("SignedHeaders=") {
signed_headers_str = v;
} else if let Some(v) = part.strip_prefix("Signature=") {
provided_signature = v;
}
}
if credential.is_empty() || signed_headers_str.is_empty() || provided_signature.is_empty() {
return AuthResult::Denied(S3Error::new(
S3ErrorCode::InvalidArgument,
"Malformed Authorization header",
));
}
let cred_parts: Vec<&str> = credential.split('/').collect();
if cred_parts.len() != 5 {

View File

@@ -1,6 +1,7 @@
mod auth;
pub mod ratelimit;
pub mod session;
pub(crate) mod sha_body;
pub use auth::auth_layer;
pub use ratelimit::{rate_limit_layer, RateLimitLayerState};

View File

@@ -181,8 +181,8 @@ pub async fn csrf_layer(
.unwrap_or("");
let is_form_submit = content_type.starts_with("application/x-www-form-urlencoded")
|| content_type.starts_with("multipart/form-data");
let wants_json = accept.contains("application/json")
|| content_type.starts_with("application/json");
let wants_json =
accept.contains("application/json") || content_type.starts_with("application/json");
if is_form_submit && !wants_json {
let ctx = crate::handlers::ui::base_context(&handle, None);

View File

@@ -0,0 +1,107 @@
use axum::body::Body;
use bytes::Bytes;
use http_body::{Body as HttpBody, Frame};
use sha2::{Digest, Sha256};
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
struct Sha256MismatchError {
expected: String,
computed: String,
}
impl Sha256MismatchError {
fn message(&self) -> String {
format!(
"The x-amz-content-sha256 you specified did not match what we received (expected {}, computed {})",
self.expected, self.computed
)
}
}
impl fmt::Display for Sha256MismatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"XAmzContentSHA256Mismatch: expected {}, computed {}",
self.expected, self.computed
)
}
}
impl Error for Sha256MismatchError {}
pub struct Sha256VerifyBody {
inner: Body,
expected: String,
hasher: Option<Sha256>,
}
impl Sha256VerifyBody {
pub fn new(inner: Body, expected_hex: String) -> Self {
Self {
inner,
expected: expected_hex.to_ascii_lowercase(),
hasher: Some(Sha256::new()),
}
}
}
impl HttpBody for Sha256VerifyBody {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync>;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.as_mut().get_mut();
match Pin::new(&mut this.inner).poll_frame(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
if let Some(h) = this.hasher.as_mut() {
h.update(data);
}
}
Poll::Ready(Some(Ok(frame)))
}
Poll::Ready(None) => {
if let Some(hasher) = this.hasher.take() {
let computed = hex::encode(hasher.finalize());
if computed != this.expected {
return Poll::Ready(Some(Err(Box::new(Sha256MismatchError {
expected: this.expected.clone(),
computed,
}))));
}
}
Poll::Ready(None)
}
}
}
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}
pub fn is_hex_sha256(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
pub fn sha256_mismatch_message(err: &(dyn Error + 'static)) -> Option<String> {
if let Some(mismatch) = err.downcast_ref::<Sha256MismatchError>() {
return Some(mismatch.message());
}
err.source().and_then(sha256_mismatch_message)
}

View File

@@ -1,5 +1,7 @@
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use base64::engine::general_purpose::URL_SAFE;
use base64::Engine;
use http_body_util::BodyExt;
use myfsio_storage::traits::{AsyncReadStream, StorageEngine};
use serde_json::Value;
@@ -53,6 +55,7 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te
ui_enabled: false,
templates_dir: std::path::PathBuf::from("templates"),
static_dir: std::path::PathBuf::from("static"),
multipart_min_part_size: 1,
..myfsio_server::config::ServerConfig::default()
};
let state = myfsio_server::state::AppState::new(config);
@@ -2394,6 +2397,87 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() {
assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_versioned_put_and_delete_do_not_advertise_unstored_ids() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/compat-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/compat-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
let put_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/compat-bucket/doc.txt",
Body::from("first"),
))
.await
.unwrap();
assert_eq!(put_resp.status(), StatusCode::OK);
assert!(!put_resp.headers().contains_key("x-amz-version-id"));
let overwrite_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/compat-bucket/doc.txt",
Body::from("second"),
))
.await
.unwrap();
assert_eq!(overwrite_resp.status(), StatusCode::OK);
assert!(!overwrite_resp.headers().contains_key("x-amz-version-id"));
let delete_resp = app
.clone()
.oneshot(signed_request(
Method::DELETE,
"/compat-bucket/doc.txt",
Body::empty(),
))
.await
.unwrap();
assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT);
assert!(!delete_resp.headers().contains_key("x-amz-version-id"));
assert!(!delete_resp.headers().contains_key("x-amz-delete-marker"));
let versions_resp = app
.oneshot(signed_request(
Method::GET,
"/compat-bucket?versions",
Body::empty(),
))
.await
.unwrap();
let versions_body = String::from_utf8(
versions_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(!versions_body.contains("<DeleteMarker>"));
}
#[tokio::test]
async fn test_retention_is_enforced_when_deleting_archived_version() {
let (app, _tmp) = test_app();
@@ -2562,6 +2646,132 @@ async fn test_put_object_validates_content_md5() {
assert_eq!(good_resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_x_amz_content_sha256_mismatch_returns_bad_digest() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/sha256-bucket", Body::empty()))
.await
.unwrap();
let bad_resp = app
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/sha256-bucket/object.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header(
"x-amz-content-sha256",
"0000000000000000000000000000000000000000000000000000000000000000",
)
.body(Body::from("hello"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(bad_resp.status(), StatusCode::BAD_REQUEST);
let bad_body = String::from_utf8(
bad_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(bad_body.contains("<Code>BadDigest</Code>"));
assert!(bad_body.contains("x-amz-content-sha256"));
}
#[tokio::test]
async fn test_max_keys_zero_respects_marker_and_v2_cursors() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/cursor-bucket", Body::empty()))
.await
.unwrap();
for key in ["a.txt", "b.txt"] {
app.clone()
.oneshot(signed_request(
Method::PUT,
&format!("/cursor-bucket/{}", key),
Body::from(key.to_string()),
))
.await
.unwrap();
}
let marker_resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/cursor-bucket?max-keys=0&marker=b.txt",
Body::empty(),
))
.await
.unwrap();
let marker_body = String::from_utf8(
marker_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(marker_body.contains("<IsTruncated>false</IsTruncated>"));
let start_after_resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/cursor-bucket?list-type=2&max-keys=0&start-after=b.txt",
Body::empty(),
))
.await
.unwrap();
let start_after_body = String::from_utf8(
start_after_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(start_after_body.contains("<IsTruncated>false</IsTruncated>"));
let token = URL_SAFE.encode("b.txt");
let token_resp = app
.oneshot(signed_request(
Method::GET,
&format!(
"/cursor-bucket?list-type=2&max-keys=0&continuation-token={}",
token
),
Body::empty(),
))
.await
.unwrap();
let token_body = String::from_utf8(
token_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(token_body.contains("<IsTruncated>false</IsTruncated>"));
}
#[tokio::test]
async fn test_put_object_tagging_and_standard_headers_are_persisted() {
let (app, _tmp) = test_app();

View File

@@ -132,6 +132,13 @@ pub fn validate_bucket_name(bucket_name: &str) -> Option<String> {
return Some("Bucket name must not be formatted as an IP address".to_string());
}
if bucket_name.starts_with("xn--") {
return Some("Bucket name must not start with the reserved prefix 'xn--'".to_string());
}
if bucket_name.ends_with("-s3alias") || bucket_name.ends_with("--ol-s3") {
return Some("Bucket name must not end with a reserved suffix".to_string());
}
None
}

View File

@@ -10,7 +10,7 @@ pub fn format_s3_datetime(dt: &DateTime<Utc>) -> String {
pub fn rate_limit_exceeded_xml() -> String {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Error><Code>SlowDown</Code><Message>Rate limit exceeded</Message></Error>"
<Error><Code>SlowDown</Code><Message>Rate limit exceeded</Message><Resource></Resource><RequestId></RequestId></Error>"
.to_string()
}