Update myfsio rust engines - added more implementations

This commit is contained in:
2026-04-02 21:57:16 +08:00
parent 926a7e6366
commit 4c30efd802
12 changed files with 4154 additions and 118 deletions

1685
myfsio-engine/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -41,3 +41,5 @@ base64 = "0.22"
tokio-util = { version = "0.7", features = ["io"] }
futures = "0.3"
dashmap = "6"
crc32fast = "1"
duckdb = { version = "1", features = ["bundled"] }

View File

@@ -274,6 +274,61 @@ impl IamService {
self.get_principal(access_key)
}
pub fn authorize(
&self,
principal: &Principal,
bucket_name: Option<&str>,
action: &str,
object_key: Option<&str>,
) -> bool {
self.reload_if_needed();
if principal.is_admin {
return true;
}
let normalized_bucket = bucket_name
.unwrap_or("*")
.trim()
.to_ascii_lowercase();
let normalized_action = action.trim().to_ascii_lowercase();
let state = self.state.read();
let user = match state.user_records.get(&principal.user_id) {
Some(u) => u,
None => return false,
};
if !user.enabled {
return false;
}
if let Some(ref expires_at) = user.expires_at {
if let Ok(exp) = expires_at.parse::<DateTime<Utc>>() {
if Utc::now() > exp {
return false;
}
}
}
for policy in &user.policies {
if !bucket_matches(&policy.bucket, &normalized_bucket) {
continue;
}
if !action_matches(&policy.actions, &normalized_action) {
continue;
}
if let Some(key) = object_key {
if !prefix_matches(&policy.prefix, key) {
continue;
}
}
return true;
}
false
}
pub async fn list_users(&self) -> Vec<serde_json::Value> {
self.reload_if_needed();
let state = self.state.read();
@@ -353,6 +408,33 @@ impl IamService {
}
}
fn bucket_matches(policy_bucket: &str, bucket: &str) -> bool {
let pb = policy_bucket.trim().to_ascii_lowercase();
pb == "*" || pb == bucket
}
fn action_matches(policy_actions: &[String], action: &str) -> bool {
for policy_action in policy_actions {
let pa = policy_action.trim().to_ascii_lowercase();
if pa == "*" || pa == action {
return true;
}
if pa == "iam:*" && action.starts_with("iam:") {
return true;
}
}
false
}
fn prefix_matches(policy_prefix: &str, object_key: &str) -> bool {
let p = policy_prefix.trim();
if p.is_empty() || p == "*" {
return true;
}
let base = p.trim_end_matches('*');
object_key.starts_with(base)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -496,4 +578,59 @@ mod tests {
let svc = IamService::new(tmp.path().to_path_buf());
assert!(svc.get_secret_key("INACTIVE_KEY").is_none());
}
#[test]
fn test_authorize_allows_matching_policy() {
let json = serde_json::json!({
"version": 2,
"users": [{
"user_id": "u-reader",
"display_name": "reader",
"enabled": true,
"access_keys": [{
"access_key": "READER_KEY",
"secret_key": "reader-secret",
"status": "active"
}],
"policies": [{
"bucket": "docs",
"actions": ["read"],
"prefix": "reports/"
}]
}]
})
.to_string();
let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(json.as_bytes()).unwrap();
tmp.flush().unwrap();
let svc = IamService::new(tmp.path().to_path_buf());
let principal = svc.get_principal("READER_KEY").unwrap();
assert!(svc.authorize(
&principal,
Some("docs"),
"read",
Some("reports/2026.csv"),
));
assert!(!svc.authorize(
&principal,
Some("docs"),
"write",
Some("reports/2026.csv"),
));
assert!(!svc.authorize(
&principal,
Some("docs"),
"read",
Some("private/2026.csv"),
));
assert!(!svc.authorize(
&principal,
Some("other"),
"read",
Some("reports/2026.csv"),
));
}
}

View File

@@ -144,6 +144,10 @@ pub struct BucketConfig {
pub logging: Option<serde_json::Value>,
#[serde(default)]
pub object_lock: Option<serde_json::Value>,
#[serde(default)]
pub policy: Option<serde_json::Value>,
#[serde(default)]
pub replication: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -27,6 +27,10 @@ futures = { workspace = true }
http-body-util = "0.1"
percent-encoding = { workspace = true }
quick-xml = { workspace = true }
mime_guess = "2"
crc32fast = { workspace = true }
duckdb = { workspace = true }
roxmltree = "0.20"
[dev-dependencies]
tempfile = "3"

View File

@@ -17,6 +17,15 @@ fn storage_err(err: myfsio_storage::error::StorageError) -> Response {
(status, [("content-type", "application/xml")], s3err.to_xml()).into_response()
}
fn json_response(status: StatusCode, value: serde_json::Value) -> Response {
(
status,
[("content-type", "application/json")],
value.to_string(),
)
.into_response()
}
pub async fn get_versioning(state: &AppState, bucket: &str) -> Response {
match state.storage.is_versioning_enabled(bucket).await {
Ok(enabled) => {
@@ -271,6 +280,281 @@ pub async fn delete_lifecycle(state: &AppState, bucket: &str) -> Response {
}
}
pub async fn get_quota(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(config) => {
if let Some(quota) = &config.quota {
let usage = match state.storage.bucket_stats(bucket).await {
Ok(s) => s,
Err(e) => return storage_err(e),
};
json_response(
StatusCode::OK,
serde_json::json!({
"quota": {
"max_size_bytes": quota.max_bytes,
"max_objects": quota.max_objects,
},
"usage": {
"bytes": usage.bytes,
"objects": usage.objects,
}
}),
)
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(S3ErrorCode::NoSuchKey, "No quota configuration found").to_xml(),
)
}
}
Err(e) => storage_err(e),
}
}
pub async fn put_quota(state: &AppState, bucket: &str, body: Body) -> Response {
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidArgument, "Invalid quota payload").to_xml(),
);
}
};
let payload: serde_json::Value = match serde_json::from_slice(&body_bytes) {
Ok(v) => v,
Err(_) => {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidArgument, "Request body must be valid JSON").to_xml(),
);
}
};
let max_size = payload
.get("max_size_bytes")
.and_then(|v| v.as_u64());
let max_objects = payload
.get("max_objects")
.and_then(|v| v.as_u64());
if max_size.is_none() && max_objects.is_none() {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(
S3ErrorCode::InvalidArgument,
"At least one of max_size_bytes or max_objects is required",
)
.to_xml(),
);
}
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.quota = Some(myfsio_common::types::QuotaConfig {
max_bytes: max_size,
max_objects,
});
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::OK.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
pub async fn delete_quota(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.quota = None;
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
pub async fn get_policy(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(config) => {
if let Some(policy) = &config.policy {
json_response(StatusCode::OK, policy.clone())
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(),
)
}
}
Err(e) => storage_err(e),
}
}
pub async fn put_policy(state: &AppState, bucket: &str, body: Body) -> Response {
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::MalformedXML, "Failed to read policy body").to_xml(),
);
}
};
let policy: serde_json::Value = match serde_json::from_slice(&body_bytes) {
Ok(v) => v,
Err(_) => {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidArgument, "Policy document must be JSON").to_xml(),
);
}
};
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.policy = Some(policy);
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
pub async fn delete_policy(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.policy = None;
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
pub async fn get_policy_status(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(config) => {
let is_public = config
.policy
.as_ref()
.map(policy_is_public)
.unwrap_or(false);
let xml = format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><PolicyStatus><IsPublic>{}</IsPublic></PolicyStatus>",
if is_public { "TRUE" } else { "FALSE" }
);
xml_response(StatusCode::OK, xml)
}
Err(e) => storage_err(e),
}
}
pub async fn get_replication(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(config) => {
if let Some(replication) = &config.replication {
match replication {
serde_json::Value::String(s) => xml_response(StatusCode::OK, s.clone()),
other => xml_response(StatusCode::OK, other.to_string()),
}
} else {
xml_response(
StatusCode::NOT_FOUND,
S3Error::new(
S3ErrorCode::NoSuchKey,
"Replication configuration not found",
)
.to_xml(),
)
}
}
Err(e) => storage_err(e),
}
}
pub async fn put_replication(state: &AppState, bucket: &str, body: Body) -> Response {
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::MalformedXML, "Failed to read replication body").to_xml(),
);
}
};
if body_bytes.is_empty() {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::MalformedXML, "Request body is required").to_xml(),
);
}
let body_str = String::from_utf8_lossy(&body_bytes).to_string();
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.replication = Some(serde_json::Value::String(body_str));
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::OK.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
pub async fn delete_replication(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => {
config.replication = None;
match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => storage_err(e),
}
}
Err(e) => storage_err(e),
}
}
fn policy_is_public(policy: &serde_json::Value) -> bool {
let statements = match policy.get("Statement") {
Some(serde_json::Value::Array(items)) => items,
Some(item) => {
return is_allow_public_statement(item);
}
None => return false,
};
statements.iter().any(is_allow_public_statement)
}
fn is_allow_public_statement(statement: &serde_json::Value) -> bool {
let effect_allow = statement
.get("Effect")
.and_then(|v| v.as_str())
.map(|s| s.eq_ignore_ascii_case("allow"))
.unwrap_or(false);
if !effect_allow {
return false;
}
match statement.get("Principal") {
Some(serde_json::Value::String(s)) => s == "*",
Some(serde_json::Value::Object(obj)) => obj.values().any(|v| v == "*"),
_ => false,
}
}
pub async fn get_acl(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await {
Ok(config) => {

View File

@@ -1,5 +1,6 @@
mod config;
pub mod kms;
mod select;
use std::collections::HashMap;
@@ -7,8 +8,11 @@ use axum::body::Body;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use base64::engine::general_purpose::URL_SAFE;
use base64::Engine;
use chrono::{DateTime, Utc};
use myfsio_common::error::S3Error;
use myfsio_common::error::{S3Error, S3ErrorCode};
use myfsio_common::types::PartInfo;
use myfsio_storage::traits::StorageEngine;
use tokio::io::AsyncSeekExt;
@@ -18,7 +22,15 @@ use crate::state::AppState;
fn s3_error_response(err: S3Error) -> Response {
let status = StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let body = err.to_xml();
let resource = if err.resource.is_empty() {
"/".to_string()
} else {
err.resource.clone()
};
let body = err
.with_resource(resource)
.with_request_id(uuid::Uuid::new_v4().simple().to_string())
.to_xml();
(
status,
[("content-type", "application/xml")],
@@ -52,6 +64,9 @@ pub async fn create_bucket(
Query(query): Query<BucketQuery>,
body: Body,
) -> Response {
if query.quota.is_some() {
return config::put_quota(&state, &bucket, body).await;
}
if query.versioning.is_some() {
return config::put_versioning(&state, &bucket, body).await;
}
@@ -70,6 +85,12 @@ pub async fn create_bucket(
if query.acl.is_some() {
return config::put_acl(&state, &bucket, body).await;
}
if query.policy.is_some() {
return config::put_policy(&state, &bucket, body).await;
}
if query.replication.is_some() {
return config::put_replication(&state, &bucket, body).await;
}
if query.website.is_some() {
return config::put_website(&state, &bucket, body).await;
}
@@ -91,6 +112,7 @@ pub async fn create_bucket(
pub struct BucketQuery {
#[serde(rename = "list-type")]
pub list_type: Option<String>,
pub marker: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
#[serde(rename = "max-keys")]
@@ -108,7 +130,11 @@ pub struct BucketQuery {
pub encryption: Option<String>,
pub lifecycle: Option<String>,
pub acl: Option<String>,
pub quota: Option<String>,
pub policy: Option<String>,
#[serde(rename = "policyStatus")]
pub policy_status: Option<String>,
pub replication: Option<String>,
pub website: Option<String>,
#[serde(rename = "object-lock")]
pub object_lock: Option<String>,
@@ -128,6 +154,9 @@ pub async fn get_bucket(
);
}
if query.quota.is_some() {
return config::get_quota(&state, &bucket).await;
}
if query.versioning.is_some() {
return config::get_versioning(&state, &bucket).await;
}
@@ -149,6 +178,15 @@ pub async fn get_bucket(
if query.acl.is_some() {
return config::get_acl(&state, &bucket).await;
}
if query.policy.is_some() {
return config::get_policy(&state, &bucket).await;
}
if query.policy_status.is_some() {
return config::get_policy_status(&state, &bucket).await;
}
if query.replication.is_some() {
return config::get_replication(&state, &bucket).await;
}
if query.website.is_some() {
return config::get_website(&state, &bucket).await;
}
@@ -171,17 +209,56 @@ pub async fn get_bucket(
let prefix = query.prefix.clone().unwrap_or_default();
let delimiter = query.delimiter.clone().unwrap_or_default();
let max_keys = query.max_keys.unwrap_or(1000);
let marker = query.marker.clone().unwrap_or_default();
let list_type = query.list_type.clone().unwrap_or_default();
let is_v2 = list_type == "2";
let effective_start = if is_v2 {
if let Some(token) = query.continuation_token.as_deref() {
match URL_SAFE.decode(token) {
Ok(bytes) => match String::from_utf8(bytes) {
Ok(decoded) => Some(decoded),
Err(_) => {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidArgument,
"Invalid continuation token",
));
}
},
Err(_) => {
return s3_error_response(S3Error::new(
S3ErrorCode::InvalidArgument,
"Invalid continuation token",
));
}
}
} else {
query.start_after.clone()
}
} else if marker.is_empty() {
None
} else {
Some(marker.clone())
};
if delimiter.is_empty() {
let params = myfsio_common::types::ListParams {
max_keys,
continuation_token: query.continuation_token.clone(),
continuation_token: effective_start.clone(),
prefix: if prefix.is_empty() { None } else { Some(prefix.clone()) },
start_after: query.start_after.clone(),
start_after: if is_v2 { query.start_after.clone() } else { None },
};
match state.storage.list_objects(&bucket, &params).await {
Ok(result) => {
let xml = myfsio_xml::response::list_objects_v2_xml(
let next_marker = result
.next_continuation_token
.clone()
.or_else(|| result.objects.last().map(|o| o.key.clone()));
let xml = if is_v2 {
let next_token = next_marker
.as_deref()
.map(|s| URL_SAFE.encode(s.as_bytes()));
myfsio_xml::response::list_objects_v2_xml(
&bucket,
&prefix,
&delimiter,
@@ -190,9 +267,22 @@ pub async fn get_bucket(
&[],
result.is_truncated,
query.continuation_token.as_deref(),
result.next_continuation_token.as_deref(),
next_token.as_deref(),
result.objects.len(),
);
)
} else {
myfsio_xml::response::list_objects_v1_xml(
&bucket,
&prefix,
&marker,
&delimiter,
max_keys,
&result.objects,
&[],
result.is_truncated,
next_marker.as_deref(),
)
};
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
Err(e) => storage_err_response(e),
@@ -202,11 +292,16 @@ pub async fn get_bucket(
prefix,
delimiter: delimiter.clone(),
max_keys,
continuation_token: query.continuation_token.clone(),
continuation_token: effective_start,
};
match state.storage.list_objects_shallow(&bucket, &params).await {
Ok(result) => {
let xml = myfsio_xml::response::list_objects_v2_xml(
let xml = if is_v2 {
let next_token = result
.next_continuation_token
.as_deref()
.map(|s| URL_SAFE.encode(s.as_bytes()));
myfsio_xml::response::list_objects_v2_xml(
&bucket,
&params.prefix,
&delimiter,
@@ -215,9 +310,22 @@ pub async fn get_bucket(
&result.common_prefixes,
result.is_truncated,
query.continuation_token.as_deref(),
result.next_continuation_token.as_deref(),
next_token.as_deref(),
result.objects.len() + result.common_prefixes.len(),
);
)
} else {
myfsio_xml::response::list_objects_v1_xml(
&bucket,
&params.prefix,
&marker,
&delimiter,
max_keys,
&result.objects,
&result.common_prefixes,
result.is_truncated,
result.next_continuation_token.as_deref(),
)
};
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
Err(e) => storage_err_response(e),
@@ -243,6 +351,9 @@ pub async fn delete_bucket(
Path(bucket): Path<String>,
Query(query): Query<BucketQuery>,
) -> Response {
if query.quota.is_some() {
return config::delete_quota(&state, &bucket).await;
}
if query.tagging.is_some() {
return config::delete_tagging(&state, &bucket).await;
}
@@ -258,6 +369,12 @@ pub async fn delete_bucket(
if query.website.is_some() {
return config::delete_website(&state, &bucket).await;
}
if query.policy.is_some() {
return config::delete_policy(&state, &bucket).await;
}
if query.replication.is_some() {
return config::delete_replication(&state, &bucket).await;
}
match state.storage.delete_bucket(&bucket).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
@@ -285,6 +402,8 @@ pub async fn head_bucket(
#[derive(serde::Deserialize, Default)]
pub struct ObjectQuery {
pub uploads: Option<String>,
pub attributes: Option<String>,
pub select: Option<String>,
#[serde(rename = "uploadId")]
pub upload_id: Option<String>,
#[serde(rename = "partNumber")]
@@ -329,6 +448,27 @@ fn apply_response_overrides(headers: &mut HeaderMap, query: &ObjectQuery) {
}
}
fn guessed_content_type(key: &str, explicit: Option<&str>) -> String {
explicit
.filter(|v| !v.trim().is_empty())
.map(|v| v.to_string())
.unwrap_or_else(|| {
mime_guess::from_path(key)
.first_raw()
.unwrap_or("application/octet-stream")
.to_string()
})
}
fn insert_content_type(headers: &mut HeaderMap, key: &str, explicit: Option<&str>) {
let value = guessed_content_type(key, explicit);
if let Ok(header_value) = value.parse() {
headers.insert("content-type", header_value);
} else {
headers.insert("content-type", "application/octet-stream".parse().unwrap());
}
}
pub async fn put_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
@@ -356,16 +496,18 @@ pub async fn put_object(
}
if let Some(copy_source) = headers.get("x-amz-copy-source").and_then(|v| v.to_str().ok()) {
return copy_object_handler(&state, copy_source, &bucket, &key).await;
return copy_object_handler(&state, copy_source, &bucket, &key, &headers).await;
}
let content_type = headers
let content_type = guessed_content_type(
&key,
headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream");
.and_then(|v| v.to_str().ok()),
);
let mut metadata = HashMap::new();
metadata.insert("__content_type__".to_string(), content_type.to_string());
metadata.insert("__content_type__".to_string(), content_type);
for (name, value) in headers.iter() {
let name_str = name.as_str();
@@ -460,6 +602,20 @@ pub async fn get_object(
if query.legal_hold.is_some() {
return config::get_object_legal_hold(&state, &bucket, &key).await;
}
if query.attributes.is_some() {
return object_attributes_handler(&state, &bucket, &key, &headers).await;
}
if let Some(ref upload_id) = query.upload_id {
return list_parts_handler(&state, &bucket, &key, upload_id).await;
}
let head_meta = match state.storage.head_object(&bucket, &key).await {
Ok(m) => m,
Err(e) => return storage_err_response(e),
};
if let Some(resp) = evaluate_get_preconditions(&headers, &head_meta) {
return resp;
}
let range_header = headers
.get("range")
@@ -504,13 +660,7 @@ pub async fn get_object(
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
let meta = match state.storage.head_object(&bucket, &key).await {
Ok(m) => m,
Err(e) => {
let _ = tokio::fs::remove_file(&dec_tmp).await;
return storage_err_response(e);
}
};
let meta = head_meta.clone();
let tmp_path = dec_tmp.clone();
tokio::spawn(async move {
@@ -523,11 +673,7 @@ pub async fn get_object(
if let Some(ref etag) = meta.etag {
resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
if let Some(ref ct) = meta.content_type {
resp_headers.insert("content-type", ct.parse().unwrap());
} else {
resp_headers.insert("content-type", "application/octet-stream".parse().unwrap());
}
insert_content_type(&mut resp_headers, &key, meta.content_type.as_deref());
resp_headers.insert(
"last-modified",
meta.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string().parse().unwrap(),
@@ -559,11 +705,7 @@ pub async fn get_object(
if let Some(ref etag) = meta.etag {
headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
if let Some(ref ct) = meta.content_type {
headers.insert("content-type", ct.parse().unwrap());
} else {
headers.insert("content-type", "application/octet-stream".parse().unwrap());
}
insert_content_type(&mut headers, &key, meta.content_type.as_deref());
headers.insert(
"last-modified",
meta.last_modified
@@ -595,6 +737,7 @@ pub async fn post_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQuery>,
headers: HeaderMap,
body: Body,
) -> Response {
if query.uploads.is_some() {
@@ -605,6 +748,10 @@ pub async fn post_object(
return complete_multipart_handler(&state, &bucket, &key, upload_id, body).await;
}
if query.select.is_some() {
return select::post_select_object_content(&state, &bucket, &key, &headers, body).await;
}
(StatusCode::METHOD_NOT_ALLOWED).into_response()
}
@@ -630,19 +777,19 @@ pub async fn delete_object(
pub async fn head_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
headers: HeaderMap,
) -> Response {
match state.storage.head_object(&bucket, &key).await {
Ok(meta) => {
if let Some(resp) = evaluate_get_preconditions(&headers, &meta) {
return resp;
}
let mut headers = HeaderMap::new();
headers.insert("content-length", meta.size.to_string().parse().unwrap());
if let Some(ref etag) = meta.etag {
headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
if let Some(ref ct) = meta.content_type {
headers.insert("content-type", ct.parse().unwrap());
} else {
headers.insert("content-type", "application/octet-stream".parse().unwrap());
}
insert_content_type(&mut headers, &key, meta.content_type.as_deref());
headers.insert(
"last-modified",
meta.last_modified
@@ -782,11 +929,80 @@ async fn list_multipart_uploads_handler(
}
}
async fn list_parts_handler(
state: &AppState,
bucket: &str,
key: &str,
upload_id: &str,
) -> Response {
match state.storage.list_parts(bucket, upload_id).await {
Ok(parts) => {
let xml = myfsio_xml::response::list_parts_xml(bucket, key, upload_id, &parts);
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
Err(e) => storage_err_response(e),
}
}
async fn object_attributes_handler(
state: &AppState,
bucket: &str,
key: &str,
headers: &HeaderMap,
) -> Response {
let meta = match state.storage.head_object(bucket, key).await {
Ok(m) => m,
Err(e) => return storage_err_response(e),
};
let requested = headers
.get("x-amz-object-attributes")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let attrs: std::collections::HashSet<String> = requested
.split(',')
.map(|s| s.trim().to_ascii_lowercase())
.filter(|s| !s.is_empty())
.collect();
let all = attrs.is_empty();
let mut xml = String::from(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
);
xml.push_str("<GetObjectAttributesResponse xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
if all || attrs.contains("etag") {
if let Some(etag) = &meta.etag {
xml.push_str(&format!("<ETag>{}</ETag>", xml_escape(etag)));
}
}
if all || attrs.contains("storageclass") {
let sc = meta
.storage_class
.as_deref()
.unwrap_or("STANDARD");
xml.push_str(&format!("<StorageClass>{}</StorageClass>", xml_escape(sc)));
}
if all || attrs.contains("objectsize") {
xml.push_str(&format!("<ObjectSize>{}</ObjectSize>", meta.size));
}
if attrs.contains("checksum") {
xml.push_str("<Checksum></Checksum>");
}
if attrs.contains("objectparts") {
xml.push_str("<ObjectParts></ObjectParts>");
}
xml.push_str("</GetObjectAttributesResponse>");
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
async fn copy_object_handler(
state: &AppState,
copy_source: &str,
dst_bucket: &str,
dst_key: &str,
headers: &HeaderMap,
) -> Response {
let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
let (src_bucket, src_key) = match source.split_once('/') {
@@ -799,6 +1015,14 @@ async fn copy_object_handler(
}
};
let source_meta = match state.storage.head_object(src_bucket, src_key).await {
Ok(m) => m,
Err(e) => return storage_err_response(e),
};
if let Some(resp) = evaluate_copy_preconditions(headers, &source_meta) {
return resp;
}
match state.storage.copy_object(src_bucket, src_key, dst_bucket, dst_key).await {
Ok(meta) => {
let etag = meta.etag.as_deref().unwrap_or("");
@@ -908,11 +1132,7 @@ async fn range_get_handler(
if let Some(ref etag) = meta.etag {
headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
if let Some(ref ct) = meta.content_type {
headers.insert("content-type", ct.parse().unwrap());
} else {
headers.insert("content-type", "application/octet-stream".parse().unwrap());
}
insert_content_type(&mut headers, key, meta.content_type.as_deref());
headers.insert("accept-ranges", "bytes".parse().unwrap());
apply_response_overrides(&mut headers, query);
@@ -920,6 +1140,138 @@ async fn range_get_handler(
(StatusCode::PARTIAL_CONTENT, headers, body).into_response()
}
fn evaluate_get_preconditions(
headers: &HeaderMap,
meta: &myfsio_common::types::ObjectMeta,
) -> Option<Response> {
if let Some(value) = headers.get("if-match").and_then(|v| v.to_str().ok()) {
if !etag_condition_matches(value, meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
.get("if-unmodified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if meta.last_modified > t {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
}
if let Some(value) = headers.get("if-none-match").and_then(|v| v.to_str().ok()) {
if etag_condition_matches(value, meta.etag.as_deref()) {
return Some(StatusCode::NOT_MODIFIED.into_response());
}
}
if let Some(value) = headers
.get("if-modified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if meta.last_modified <= t {
return Some(StatusCode::NOT_MODIFIED.into_response());
}
}
}
None
}
fn evaluate_copy_preconditions(
headers: &HeaderMap,
source_meta: &myfsio_common::types::ObjectMeta,
) -> Option<Response> {
if let Some(value) = headers
.get("x-amz-copy-source-if-match")
.and_then(|v| v.to_str().ok())
{
if !etag_condition_matches(value, source_meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-none-match")
.and_then(|v| v.to_str().ok())
{
if etag_condition_matches(value, source_meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-modified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if source_meta.last_modified <= t {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-unmodified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if source_meta.last_modified > t {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
}
None
}
fn parse_http_date(value: &str) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc2822(value)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
fn etag_condition_matches(condition: &str, etag: Option<&str>) -> bool {
let trimmed = condition.trim();
if trimmed == "*" {
return true;
}
let current = match etag {
Some(e) => e.trim_matches('"'),
None => return false,
};
trimmed
.split(',')
.map(|v| v.trim().trim_matches('"'))
.any(|candidate| candidate == current || candidate == "*")
}
fn xml_escape(value: &str) -> String {
value
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&apos;")
}
fn parse_range(range_str: &str, total_size: u64) -> Option<(u64, u64)> {
let range_spec = range_str.strip_prefix("bytes=")?;

View File

@@ -0,0 +1,552 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use axum::body::Body;
use axum::http::{HeaderMap, HeaderName, StatusCode};
use axum::response::{IntoResponse, Response};
use base64::Engine;
use bytes::Bytes;
use crc32fast::Hasher;
use duckdb::types::ValueRef;
use duckdb::Connection;
use futures::stream;
use http_body_util::BodyExt;
use myfsio_common::error::{S3Error, S3ErrorCode};
use myfsio_storage::traits::StorageEngine;
use crate::state::AppState;
#[cfg(target_os = "windows")]
#[link(name = "Rstrtmgr")]
extern "system" {}
const CHUNK_SIZE: usize = 65_536;
pub async fn post_select_object_content(
state: &AppState,
bucket: &str,
key: &str,
headers: &HeaderMap,
body: Body,
) -> Response {
if let Some(resp) = require_xml_content_type(headers) {
return resp;
}
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return s3_error_response(S3Error::new(
S3ErrorCode::MalformedXML,
"Unable to parse XML document",
));
}
};
let request = match parse_select_request(&body_bytes) {
Ok(r) => r,
Err(err) => return s3_error_response(err),
};
let object_path = match state.storage.get_object_path(bucket, key).await {
Ok(path) => path,
Err(_) => {
return s3_error_response(S3Error::new(
S3ErrorCode::NoSuchKey,
"Object not found",
));
}
};
let join_res = tokio::task::spawn_blocking(move || execute_select_query(object_path, request)).await;
let chunks = match join_res {
Ok(Ok(chunks)) => chunks,
Ok(Err(message)) => {
return s3_error_response(S3Error::new(S3ErrorCode::InvalidRequest, message));
}
Err(_) => {
return s3_error_response(S3Error::new(
S3ErrorCode::InternalError,
"SelectObjectContent execution failed",
));
}
};
let bytes_returned: usize = chunks.iter().map(|c| c.len()).sum();
let mut events: Vec<Bytes> = Vec::with_capacity(chunks.len() + 2);
for chunk in chunks {
events.push(Bytes::from(encode_select_event("Records", &chunk)));
}
let stats_payload = build_stats_xml(0, bytes_returned);
events.push(Bytes::from(encode_select_event("Stats", stats_payload.as_bytes())));
events.push(Bytes::from(encode_select_event("End", b"")));
let stream = stream::iter(events.into_iter().map(Ok::<Bytes, std::io::Error>));
let body = Body::from_stream(stream);
let mut response = (StatusCode::OK, body).into_response();
response.headers_mut().insert(
HeaderName::from_static("content-type"),
"application/octet-stream".parse().unwrap(),
);
response.headers_mut().insert(
HeaderName::from_static("x-amz-request-charged"),
"requester".parse().unwrap(),
);
response
}
#[derive(Clone)]
struct SelectRequest {
expression: String,
input_format: InputFormat,
output_format: OutputFormat,
}
#[derive(Clone)]
enum InputFormat {
Csv(CsvInputConfig),
Json(JsonInputConfig),
Parquet,
}
#[derive(Clone)]
struct CsvInputConfig {
file_header_info: String,
field_delimiter: String,
quote_character: String,
}
#[derive(Clone)]
struct JsonInputConfig {
json_type: String,
}
#[derive(Clone)]
enum OutputFormat {
Csv(CsvOutputConfig),
Json(JsonOutputConfig),
}
#[derive(Clone)]
struct CsvOutputConfig {
field_delimiter: String,
record_delimiter: String,
quote_character: String,
}
#[derive(Clone)]
struct JsonOutputConfig {
record_delimiter: String,
}
fn parse_select_request(payload: &[u8]) -> Result<SelectRequest, S3Error> {
let xml = String::from_utf8_lossy(payload);
let doc = roxmltree::Document::parse(&xml)
.map_err(|_| S3Error::new(S3ErrorCode::MalformedXML, "Unable to parse XML document"))?;
let root = doc.root_element();
if root.tag_name().name() != "SelectObjectContentRequest" {
return Err(S3Error::new(
S3ErrorCode::MalformedXML,
"Root element must be SelectObjectContentRequest",
));
}
let expression = child_text(&root, "Expression")
.filter(|v| !v.is_empty())
.ok_or_else(|| S3Error::new(S3ErrorCode::InvalidRequest, "Expression is required"))?;
let expression_type = child_text(&root, "ExpressionType").unwrap_or_else(|| "SQL".to_string());
if !expression_type.eq_ignore_ascii_case("SQL") {
return Err(S3Error::new(
S3ErrorCode::InvalidRequest,
"Only SQL expression type is supported",
));
}
let input_node = child(&root, "InputSerialization")
.ok_or_else(|| S3Error::new(S3ErrorCode::InvalidRequest, "InputSerialization is required"))?;
let output_node = child(&root, "OutputSerialization")
.ok_or_else(|| S3Error::new(S3ErrorCode::InvalidRequest, "OutputSerialization is required"))?;
let input_format = parse_input_format(&input_node)?;
let output_format = parse_output_format(&output_node)?;
Ok(SelectRequest {
expression,
input_format,
output_format,
})
}
fn parse_input_format(node: &roxmltree::Node<'_, '_>) -> Result<InputFormat, S3Error> {
if let Some(csv_node) = child(node, "CSV") {
return Ok(InputFormat::Csv(CsvInputConfig {
file_header_info: child_text(&csv_node, "FileHeaderInfo")
.unwrap_or_else(|| "NONE".to_string())
.to_ascii_uppercase(),
field_delimiter: child_text(&csv_node, "FieldDelimiter").unwrap_or_else(|| ",".to_string()),
quote_character: child_text(&csv_node, "QuoteCharacter").unwrap_or_else(|| "\"".to_string()),
}));
}
if let Some(json_node) = child(node, "JSON") {
return Ok(InputFormat::Json(JsonInputConfig {
json_type: child_text(&json_node, "Type")
.unwrap_or_else(|| "DOCUMENT".to_string())
.to_ascii_uppercase(),
}));
}
if child(node, "Parquet").is_some() {
return Ok(InputFormat::Parquet);
}
Err(S3Error::new(
S3ErrorCode::InvalidRequest,
"InputSerialization must specify CSV, JSON, or Parquet",
))
}
fn parse_output_format(node: &roxmltree::Node<'_, '_>) -> Result<OutputFormat, S3Error> {
if let Some(csv_node) = child(node, "CSV") {
return Ok(OutputFormat::Csv(CsvOutputConfig {
field_delimiter: child_text(&csv_node, "FieldDelimiter").unwrap_or_else(|| ",".to_string()),
record_delimiter: child_text(&csv_node, "RecordDelimiter").unwrap_or_else(|| "\n".to_string()),
quote_character: child_text(&csv_node, "QuoteCharacter").unwrap_or_else(|| "\"".to_string()),
}));
}
if let Some(json_node) = child(node, "JSON") {
return Ok(OutputFormat::Json(JsonOutputConfig {
record_delimiter: child_text(&json_node, "RecordDelimiter").unwrap_or_else(|| "\n".to_string()),
}));
}
Err(S3Error::new(
S3ErrorCode::InvalidRequest,
"OutputSerialization must specify CSV or JSON",
))
}
fn child<'a, 'input>(node: &'a roxmltree::Node<'a, 'input>, name: &str) -> Option<roxmltree::Node<'a, 'input>> {
node.children()
.find(|n| n.is_element() && n.tag_name().name() == name)
}
fn child_text(node: &roxmltree::Node<'_, '_>, name: &str) -> Option<String> {
child(node, name)
.and_then(|n| n.text())
.map(|s| s.to_string())
}
fn execute_select_query(path: PathBuf, request: SelectRequest) -> Result<Vec<Vec<u8>>, String> {
let conn = Connection::open_in_memory().map_err(|e| format!("DuckDB connection error: {}", e))?;
load_input_table(&conn, &path, &request.input_format)?;
let expression = request
.expression
.replace("s3object", "data")
.replace("S3Object", "data");
let mut stmt = conn
.prepare(&expression)
.map_err(|e| format!("SQL execution error: {}", e))?;
let mut rows = stmt
.query([])
.map_err(|e| format!("SQL execution error: {}", e))?;
let stmt_ref = rows
.as_ref()
.ok_or_else(|| "SQL execution error: statement metadata unavailable".to_string())?;
let col_count = stmt_ref.column_count();
let mut columns: Vec<String> = Vec::with_capacity(col_count);
for i in 0..col_count {
let name = stmt_ref
.column_name(i)
.map(|s| s.to_string())
.unwrap_or_else(|_| format!("_{}", i));
columns.push(name);
}
match request.output_format {
OutputFormat::Csv(cfg) => collect_csv_chunks(&mut rows, col_count, cfg),
OutputFormat::Json(cfg) => collect_json_chunks(&mut rows, col_count, &columns, cfg),
}
}
fn load_input_table(conn: &Connection, path: &Path, input: &InputFormat) -> Result<(), String> {
let path_str = path.to_string_lossy().replace('\\', "/");
match input {
InputFormat::Csv(cfg) => {
let header = cfg.file_header_info == "USE" || cfg.file_header_info == "IGNORE";
let delimiter = normalize_single_char(&cfg.field_delimiter, ',');
let quote = normalize_single_char(&cfg.quote_character, '"');
let sql = format!(
"CREATE TABLE data AS SELECT * FROM read_csv('{}', header={}, delim='{}', quote='{}')",
sql_escape(&path_str),
if header { "true" } else { "false" },
sql_escape(&delimiter),
sql_escape(&quote)
);
conn.execute_batch(&sql)
.map_err(|e| format!("Failed loading CSV data: {}", e))?;
}
InputFormat::Json(cfg) => {
let format = if cfg.json_type == "LINES" {
"newline_delimited"
} else {
"array"
};
let sql = format!(
"CREATE TABLE data AS SELECT * FROM read_json_auto('{}', format='{}')",
sql_escape(&path_str),
format
);
conn.execute_batch(&sql)
.map_err(|e| format!("Failed loading JSON data: {}", e))?;
}
InputFormat::Parquet => {
let sql = format!(
"CREATE TABLE data AS SELECT * FROM read_parquet('{}')",
sql_escape(&path_str)
);
conn.execute_batch(&sql)
.map_err(|e| format!("Failed loading Parquet data: {}", e))?;
}
}
Ok(())
}
fn sql_escape(value: &str) -> String {
value.replace('\'', "''")
}
fn normalize_single_char(value: &str, default_char: char) -> String {
value.chars().next().unwrap_or(default_char).to_string()
}
fn collect_csv_chunks(
rows: &mut duckdb::Rows<'_>,
col_count: usize,
cfg: CsvOutputConfig,
) -> Result<Vec<Vec<u8>>, String> {
let delimiter = cfg.field_delimiter;
let record_delimiter = cfg.record_delimiter;
let quote = cfg.quote_character;
let mut chunks: Vec<Vec<u8>> = Vec::new();
let mut buffer = String::new();
while let Some(row) = rows.next().map_err(|e| format!("SQL execution error: {}", e))? {
let mut fields: Vec<String> = Vec::with_capacity(col_count);
for i in 0..col_count {
let value = row
.get_ref(i)
.map_err(|e| format!("SQL execution error: {}", e))?;
if matches!(value, ValueRef::Null) {
fields.push(String::new());
continue;
}
let mut text = value_ref_to_string(value);
if text.contains(&delimiter) || text.contains(&quote) || text.contains(&record_delimiter) {
text = text.replace(&quote, &(quote.clone() + &quote));
text = format!("{}{}{}", quote, text, quote);
}
fields.push(text);
}
buffer.push_str(&fields.join(&delimiter));
buffer.push_str(&record_delimiter);
while buffer.len() >= CHUNK_SIZE {
let rest = buffer.split_off(CHUNK_SIZE);
chunks.push(buffer.into_bytes());
buffer = rest;
}
}
if !buffer.is_empty() {
chunks.push(buffer.into_bytes());
}
Ok(chunks)
}
fn collect_json_chunks(
rows: &mut duckdb::Rows<'_>,
col_count: usize,
columns: &[String],
cfg: JsonOutputConfig,
) -> Result<Vec<Vec<u8>>, String> {
let record_delimiter = cfg.record_delimiter;
let mut chunks: Vec<Vec<u8>> = Vec::new();
let mut buffer = String::new();
while let Some(row) = rows.next().map_err(|e| format!("SQL execution error: {}", e))? {
let mut record: HashMap<String, serde_json::Value> = HashMap::with_capacity(col_count);
for i in 0..col_count {
let value = row
.get_ref(i)
.map_err(|e| format!("SQL execution error: {}", e))?;
let key = columns
.get(i)
.cloned()
.unwrap_or_else(|| format!("_{}", i));
record.insert(key, value_ref_to_json(value));
}
let line = serde_json::to_string(&record)
.map_err(|e| format!("JSON output encoding failed: {}", e))?;
buffer.push_str(&line);
buffer.push_str(&record_delimiter);
while buffer.len() >= CHUNK_SIZE {
let rest = buffer.split_off(CHUNK_SIZE);
chunks.push(buffer.into_bytes());
buffer = rest;
}
}
if !buffer.is_empty() {
chunks.push(buffer.into_bytes());
}
Ok(chunks)
}
fn value_ref_to_string(value: ValueRef<'_>) -> String {
match value {
ValueRef::Null => String::new(),
ValueRef::Boolean(v) => v.to_string(),
ValueRef::TinyInt(v) => v.to_string(),
ValueRef::SmallInt(v) => v.to_string(),
ValueRef::Int(v) => v.to_string(),
ValueRef::BigInt(v) => v.to_string(),
ValueRef::UTinyInt(v) => v.to_string(),
ValueRef::USmallInt(v) => v.to_string(),
ValueRef::UInt(v) => v.to_string(),
ValueRef::UBigInt(v) => v.to_string(),
ValueRef::Float(v) => v.to_string(),
ValueRef::Double(v) => v.to_string(),
ValueRef::Decimal(v) => v.to_string(),
ValueRef::Text(v) => String::from_utf8_lossy(v).into_owned(),
ValueRef::Blob(v) => base64::engine::general_purpose::STANDARD.encode(v),
_ => format!("{:?}", value),
}
}
fn value_ref_to_json(value: ValueRef<'_>) -> serde_json::Value {
match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
ValueRef::TinyInt(v) => serde_json::json!(v),
ValueRef::SmallInt(v) => serde_json::json!(v),
ValueRef::Int(v) => serde_json::json!(v),
ValueRef::BigInt(v) => serde_json::json!(v),
ValueRef::UTinyInt(v) => serde_json::json!(v),
ValueRef::USmallInt(v) => serde_json::json!(v),
ValueRef::UInt(v) => serde_json::json!(v),
ValueRef::UBigInt(v) => serde_json::json!(v),
ValueRef::Float(v) => serde_json::json!(v),
ValueRef::Double(v) => serde_json::json!(v),
ValueRef::Decimal(v) => serde_json::Value::String(v.to_string()),
ValueRef::Text(v) => serde_json::Value::String(String::from_utf8_lossy(v).into_owned()),
ValueRef::Blob(v) => serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(v)),
_ => serde_json::Value::String(format!("{:?}", value)),
}
}
fn require_xml_content_type(headers: &HeaderMap) -> Option<Response> {
let value = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.trim();
if value.is_empty() {
return None;
}
let lowered = value.to_ascii_lowercase();
if lowered.starts_with("application/xml") || lowered.starts_with("text/xml") {
return None;
}
Some(s3_error_response(S3Error::new(
S3ErrorCode::InvalidRequest,
"Content-Type must be application/xml or text/xml",
)))
}
fn s3_error_response(err: S3Error) -> Response {
let status = StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let resource = if err.resource.is_empty() {
"/".to_string()
} else {
err.resource.clone()
};
let body = err
.with_resource(resource)
.with_request_id(uuid::Uuid::new_v4().simple().to_string())
.to_xml();
(
status,
[("content-type", "application/xml")],
body,
)
.into_response()
}
fn build_stats_xml(bytes_scanned: usize, bytes_returned: usize) -> String {
format!(
"<Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>",
bytes_scanned,
bytes_scanned,
bytes_returned
)
}
fn encode_select_event(event_type: &str, payload: &[u8]) -> Vec<u8> {
let mut headers = Vec::new();
headers.extend(encode_select_header(":event-type", event_type));
if event_type == "Records" {
headers.extend(encode_select_header(":content-type", "application/octet-stream"));
} else if event_type == "Stats" {
headers.extend(encode_select_header(":content-type", "text/xml"));
}
headers.extend(encode_select_header(":message-type", "event"));
let headers_len = headers.len() as u32;
let total_len = 4 + 4 + 4 + headers.len() + payload.len() + 4;
let mut message = Vec::with_capacity(total_len);
let mut prelude = Vec::with_capacity(8);
prelude.extend((total_len as u32).to_be_bytes());
prelude.extend(headers_len.to_be_bytes());
let prelude_crc = crc32(&prelude);
message.extend(prelude);
message.extend(prelude_crc.to_be_bytes());
message.extend(headers);
message.extend(payload);
let msg_crc = crc32(&message);
message.extend(msg_crc.to_be_bytes());
message
}
fn encode_select_header(name: &str, value: &str) -> Vec<u8> {
let name_bytes = name.as_bytes();
let value_bytes = value.as_bytes();
let mut header = Vec::with_capacity(1 + name_bytes.len() + 1 + 2 + value_bytes.len());
header.push(name_bytes.len() as u8);
header.extend(name_bytes);
header.push(7);
header.extend((value_bytes.len() as u16).to_be_bytes());
header.extend(value_bytes);
header
}
fn crc32(data: &[u8]) -> u32 {
let mut hasher = Hasher::new();
hasher.update(data);
hasher.finalize()
}

View File

@@ -57,13 +57,32 @@ async fn main() {
let app = myfsio_server::create_router(state);
let listener = tokio::net::TcpListener::bind(bind_addr).await.unwrap();
let listener = match tokio::net::TcpListener::bind(bind_addr).await {
Ok(listener) => listener,
Err(err) => {
if err.kind() == std::io::ErrorKind::AddrInUse {
tracing::error!("Port already in use: {}", bind_addr);
} else {
tracing::error!("Failed to bind {}: {}", bind_addr, err);
}
for handle in bg_handles {
handle.abort();
}
std::process::exit(1);
}
};
tracing::info!("Listening on {}", bind_addr);
axum::serve(listener, app)
if let Err(err) = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
{
tracing::error!("Server exited with error: {}", err);
for handle in bg_handles {
handle.abort();
}
std::process::exit(1);
}
for handle in bg_handles {
handle.abort();

View File

@@ -1,5 +1,5 @@
use axum::extract::{Request, State};
use axum::http::StatusCode;
use axum::http::{Method, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
@@ -16,17 +16,21 @@ pub async fn auth_layer(
next: Next,
) -> Response {
let uri = req.uri().clone();
let path = uri.path();
let path = uri.path().to_string();
if path == "/" && req.method() == axum::http::Method::GET {
match try_auth(&state, &req) {
AuthResult::Ok(principal) => {
if let Err(err) = authorize_request(&state, &principal, &req) {
return error_response(err, &path);
}
req.extensions_mut().insert(principal);
}
AuthResult::Denied(err) => return error_response(err),
AuthResult::Denied(err) => return error_response(err, &path),
AuthResult::NoAuth => {
return error_response(
S3Error::from_code(S3ErrorCode::AccessDenied),
S3Error::new(S3ErrorCode::AccessDenied, "Missing credentials"),
&path,
);
}
}
@@ -35,12 +39,18 @@ pub async fn auth_layer(
match try_auth(&state, &req) {
AuthResult::Ok(principal) => {
if let Err(err) = authorize_request(&state, &principal, &req) {
return error_response(err, &path);
}
req.extensions_mut().insert(principal);
next.run(req).await
}
AuthResult::Denied(err) => error_response(err),
AuthResult::Denied(err) => error_response(err, &path),
AuthResult::NoAuth => {
error_response(S3Error::from_code(S3ErrorCode::AccessDenied))
error_response(
S3Error::new(S3ErrorCode::AccessDenied, "Missing credentials"),
&path,
)
}
}
}
@@ -51,6 +61,167 @@ enum AuthResult {
NoAuth,
}
fn authorize_request(state: &AppState, principal: &Principal, req: &Request) -> Result<(), S3Error> {
let path = req.uri().path();
if path == "/" {
if state.iam.authorize(principal, None, "list", None) {
return Ok(());
}
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
}
let mut segments = path.trim_start_matches('/').split('/').filter(|s| !s.is_empty());
let bucket = match segments.next() {
Some(b) => b,
None => {
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
}
};
let remaining: Vec<&str> = segments.collect();
let query = req.uri().query().unwrap_or("");
if remaining.is_empty() {
let action = resolve_bucket_action(req.method(), query);
if state.iam.authorize(principal, Some(bucket), action, None) {
return Ok(());
}
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
}
let object_key = remaining.join("/");
if req.method() == Method::PUT {
if let Some(copy_source) = req
.headers()
.get("x-amz-copy-source")
.and_then(|v| v.to_str().ok())
{
let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
if let Some((src_bucket, src_key)) = source.split_once('/') {
let source_allowed =
state.iam.authorize(principal, Some(src_bucket), "read", Some(src_key));
let dest_allowed =
state.iam.authorize(principal, Some(bucket), "write", Some(&object_key));
if source_allowed && dest_allowed {
return Ok(());
}
return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"));
}
}
}
let action = resolve_object_action(req.method(), query);
if state
.iam
.authorize(principal, Some(bucket), action, Some(&object_key))
{
return Ok(());
}
Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"))
}
fn resolve_bucket_action(method: &Method, query: &str) -> &'static str {
if has_query_key(query, "versioning") {
return "versioning";
}
if has_query_key(query, "tagging") {
return "tagging";
}
if has_query_key(query, "cors") {
return "cors";
}
if has_query_key(query, "location") {
return "list";
}
if has_query_key(query, "encryption") {
return "encryption";
}
if has_query_key(query, "lifecycle") {
return "lifecycle";
}
if has_query_key(query, "acl") {
return "share";
}
if has_query_key(query, "policy") || has_query_key(query, "policyStatus") {
return "policy";
}
if has_query_key(query, "replication") {
return "replication";
}
if has_query_key(query, "quota") {
return "quota";
}
if has_query_key(query, "website") {
return "website";
}
if has_query_key(query, "object-lock") {
return "object_lock";
}
if has_query_key(query, "notification") {
return "notification";
}
if has_query_key(query, "logging") {
return "logging";
}
if has_query_key(query, "versions") || has_query_key(query, "uploads") {
return "list";
}
if has_query_key(query, "delete") {
return "delete";
}
match *method {
Method::GET => "list",
Method::HEAD => "read",
Method::PUT => "create_bucket",
Method::DELETE => "delete_bucket",
Method::POST => "write",
_ => "list",
}
}
fn resolve_object_action(method: &Method, query: &str) -> &'static str {
if has_query_key(query, "tagging") {
return if *method == Method::GET { "read" } else { "write" };
}
if has_query_key(query, "acl") {
return if *method == Method::GET { "read" } else { "write" };
}
if has_query_key(query, "retention") || has_query_key(query, "legal-hold") {
return "object_lock";
}
if has_query_key(query, "attributes") {
return "read";
}
if has_query_key(query, "uploads") || has_query_key(query, "uploadId") {
return match *method {
Method::GET => "read",
_ => "write",
};
}
if has_query_key(query, "select") {
return "read";
}
match *method {
Method::GET | Method::HEAD => "read",
Method::PUT => "write",
Method::DELETE => "delete",
Method::POST => "write",
_ => "read",
}
}
fn has_query_key(query: &str, key: &str) -> bool {
if query.is_empty() {
return false;
}
query
.split('&')
.filter(|part| !part.is_empty())
.any(|part| part == key || part.starts_with(&format!("{}=", key)))
}
fn try_auth(state: &AppState, req: &Request) -> AuthResult {
if let Some(auth_header) = req.headers().get("authorization") {
if let Ok(auth_str) = auth_header.to_str() {
@@ -382,9 +553,13 @@ fn urlencoding_decode(s: &str) -> String {
.into_owned()
}
fn error_response(err: S3Error) -> Response {
fn error_response(err: S3Error, resource: &str) -> Response {
let status =
StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let body = err.to_xml();
let request_id = uuid::Uuid::new_v4().simple().to_string();
let body = err
.with_resource(resource.to_string())
.with_request_id(request_id)
.to_xml();
(status, [("content-type", "application/xml")], body).into_response()
}

View File

@@ -1,34 +1,17 @@
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use http_body_util::BodyExt;
use myfsio_storage::traits::StorageEngine;
use tower::ServiceExt;
const TEST_ACCESS_KEY: &str = "AKIAIOSFODNN7EXAMPLE";
const TEST_SECRET_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
fn test_app() -> (axum::Router, tempfile::TempDir) {
fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::TempDir) {
let tmp = tempfile::TempDir::new().unwrap();
let iam_path = tmp.path().join(".myfsio.sys").join("config");
std::fs::create_dir_all(&iam_path).unwrap();
let iam_json = serde_json::json!({
"version": 2,
"users": [{
"user_id": "u-test1234",
"display_name": "admin",
"enabled": true,
"access_keys": [{
"access_key": TEST_ACCESS_KEY,
"secret_key": TEST_SECRET_KEY,
"status": "active"
}],
"policies": [{
"bucket": "*",
"actions": ["*"],
"prefix": "*"
}]
}]
});
std::fs::write(iam_path.join("iam.json"), iam_json.to_string()).unwrap();
let config = myfsio_server::config::ServerConfig {
@@ -52,6 +35,27 @@ fn test_app() -> (axum::Router, tempfile::TempDir) {
(app, tmp)
}
fn test_app() -> (axum::Router, tempfile::TempDir) {
test_app_with_iam(serde_json::json!({
"version": 2,
"users": [{
"user_id": "u-test1234",
"display_name": "admin",
"enabled": true,
"access_keys": [{
"access_key": TEST_ACCESS_KEY,
"secret_key": TEST_SECRET_KEY,
"status": "active"
}],
"policies": [{
"bucket": "*",
"actions": ["*"],
"prefix": "*"
}]
}]
}))
}
fn signed_request(method: Method, uri: &str, body: Body) -> Request<Body> {
Request::builder()
.method(method)
@@ -62,6 +66,75 @@ fn signed_request(method: Method, uri: &str, body: Body) -> Request<Body> {
.unwrap()
}
fn parse_select_events(body: &[u8]) -> Vec<(String, Vec<u8>)> {
let mut out = Vec::new();
let mut idx: usize = 0;
while idx + 16 <= body.len() {
let total_len = u32::from_be_bytes([
body[idx],
body[idx + 1],
body[idx + 2],
body[idx + 3],
]) as usize;
let headers_len = u32::from_be_bytes([
body[idx + 4],
body[idx + 5],
body[idx + 6],
body[idx + 7],
]) as usize;
if total_len < 16 || idx + total_len > body.len() {
break;
}
let headers_start = idx + 12;
let headers_end = headers_start + headers_len;
if headers_end > idx + total_len - 4 {
break;
}
let mut event_type: Option<String> = None;
let mut hidx = headers_start;
while hidx < headers_end {
let name_len = body[hidx] as usize;
hidx += 1;
if hidx + name_len + 3 > headers_end {
break;
}
let name = String::from_utf8_lossy(&body[hidx..hidx + name_len]).to_string();
hidx += name_len;
let value_type = body[hidx];
hidx += 1;
if value_type != 7 || hidx + 2 > headers_end {
break;
}
let value_len = u16::from_be_bytes([body[hidx], body[hidx + 1]]) as usize;
hidx += 2;
if hidx + value_len > headers_end {
break;
}
let value = String::from_utf8_lossy(&body[hidx..hidx + value_len]).to_string();
hidx += value_len;
if name == ":event-type" {
event_type = Some(value);
}
}
let payload_start = headers_end;
let payload_end = idx + total_len - 4;
let payload = body[payload_start..payload_end].to_vec();
out.push((event_type.unwrap_or_default(), payload));
idx += total_len;
}
out
}
#[tokio::test]
async fn test_unauthenticated_request_rejected() {
let (app, _tmp) = test_app();
@@ -70,6 +143,34 @@ async fn test_unauthenticated_request_rejected() {
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
let body = String::from_utf8(
resp.into_body().collect().await.unwrap().to_bytes().to_vec(),
)
.unwrap();
assert!(body.contains("<Code>AccessDenied</Code>"));
assert!(body.contains("<Message>Missing credentials</Message>"));
assert!(body.contains("<Resource>/</Resource>"));
assert!(body.contains("<RequestId>"));
assert!(!body.contains("<RequestId></RequestId>"));
}
#[tokio::test]
async fn test_unauthenticated_request_includes_requested_resource_path() {
let (app, _tmp) = test_app();
let resp = app
.oneshot(Request::builder().uri("/ui/").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
let body = String::from_utf8(
resp.into_body().collect().await.unwrap().to_bytes().to_vec(),
)
.unwrap();
assert!(body.contains("<Code>AccessDenied</Code>"));
assert!(body.contains("<Message>Missing credentials</Message>"));
assert!(body.contains("<Resource>/ui/</Resource>"));
assert!(body.contains("<RequestId>"));
assert!(!body.contains("<RequestId></RequestId>"));
}
#[tokio::test]
@@ -200,6 +301,54 @@ async fn test_put_and_get_object() {
assert_eq!(&body[..], b"Hello, World!");
}
#[tokio::test]
async fn test_content_type_falls_back_to_extension() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/img-bucket", Body::empty()))
.await
.unwrap();
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/img-bucket/yum.jpg")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(vec![0_u8, 1, 2, 3, 4]))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/img-bucket/yum.jpg",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers().get("content-type").unwrap(), "image/jpeg");
let resp = app
.oneshot(signed_request(
Method::HEAD,
"/img-bucket/yum.jpg",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers().get("content-type").unwrap(), "image/jpeg");
}
#[tokio::test]
async fn test_head_object() {
let (app, _tmp) = test_app();
@@ -1189,6 +1338,642 @@ async fn test_object_legal_hold() {
assert!(body.contains("<Status>OFF</Status>"));
}
#[tokio::test]
async fn test_list_objects_v1_marker_flow() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/v1-bucket", Body::empty()))
.await
.unwrap();
for name in ["a.txt", "b.txt", "c.txt"] {
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri(format!("/v1-bucket/{}", name))
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from("data"))
.unwrap(),
)
.await
.unwrap();
}
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/v1-bucket?max-keys=2",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("<Marker></Marker>"));
assert!(body.contains("<IsTruncated>true</IsTruncated>") || body.contains("<IsTruncated>false</IsTruncated>"));
}
#[tokio::test]
async fn test_bucket_quota_roundtrip() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/quota-bucket", Body::empty()))
.await
.unwrap();
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/quota-bucket?quota")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/json")
.body(Body::from(r#"{"max_size_bytes": 1024, "max_objects": 10}"#))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/quota-bucket?quota",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: serde_json::Value = serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
assert_eq!(body["quota"]["max_size_bytes"], 1024);
assert_eq!(body["quota"]["max_objects"], 10);
let resp = app
.oneshot(signed_request(
Method::DELETE,
"/quota-bucket?quota",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn test_bucket_policy_and_status_roundtrip() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/policy-bucket", Body::empty()))
.await
.unwrap();
let policy = r#"{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::policy-bucket/*"
}]
}"#;
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/policy-bucket?policy")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/json")
.body(Body::from(policy))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/policy-bucket?policy",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: serde_json::Value = serde_json::from_slice(&resp.into_body().collect().await.unwrap().to_bytes()).unwrap();
assert!(body.get("Statement").is_some());
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/policy-bucket?policyStatus",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("<IsPublic>TRUE</IsPublic>"));
let resp = app
.oneshot(signed_request(
Method::DELETE,
"/policy-bucket?policy",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn test_bucket_replication_roundtrip() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/repl-bucket", Body::empty()))
.await
.unwrap();
let repl_xml = "<ReplicationConfiguration><Role>arn:aws:iam::123456789012:role/s3-repl</Role><Rule><ID>rule-1</ID></Rule></ReplicationConfiguration>";
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/repl-bucket?replication")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/xml")
.body(Body::from(repl_xml))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/repl-bucket?replication",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("ReplicationConfiguration"));
let resp = app
.oneshot(signed_request(
Method::DELETE,
"/repl-bucket?replication",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn test_list_parts_via_get_upload_id() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/parts-bucket", Body::empty()))
.await
.unwrap();
let resp = app
.clone()
.oneshot(signed_request(
Method::POST,
"/parts-bucket/large.bin?uploads",
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
let upload_id = body
.split("<UploadId>")
.nth(1)
.unwrap()
.split("</UploadId>")
.next()
.unwrap();
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri(format!("/parts-bucket/large.bin?uploadId={}&partNumber=1", upload_id))
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(vec![1_u8, 2, 3, 4]))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(signed_request(
Method::GET,
&format!("/parts-bucket/large.bin?uploadId={}", upload_id),
Body::empty(),
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("ListPartsResult"));
assert!(body.contains("<PartNumber>1</PartNumber>"));
}
#[tokio::test]
async fn test_conditional_get_and_head() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/cond-bucket", Body::empty()))
.await
.unwrap();
let put_resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/cond-bucket/item.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from("abc"))
.unwrap(),
)
.await
.unwrap();
let etag = put_resp.headers().get("etag").unwrap().to_str().unwrap().to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/cond-bucket/item.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("if-none-match", etag.as_str())
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/cond-bucket/item.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("if-match", "\"does-not-match\"")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
let resp = app
.oneshot(
Request::builder()
.method(Method::HEAD)
.uri("/cond-bucket/item.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("if-none-match", etag.as_str())
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
}
#[tokio::test]
async fn test_copy_source_preconditions() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/src-pre", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(signed_request(Method::PUT, "/dst-pre", Body::empty()))
.await
.unwrap();
let put_resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/src-pre/original.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from("copy source"))
.unwrap(),
)
.await
.unwrap();
let etag = put_resp.headers().get("etag").unwrap().to_str().unwrap().to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/dst-pre/copied.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("x-amz-copy-source", "/src-pre/original.txt")
.header("x-amz-copy-source-if-match", "\"bad-etag\"")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
let resp = app
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/dst-pre/copied.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("x-amz-copy-source", "/src-pre/original.txt")
.header("x-amz-copy-source-if-match", etag.as_str())
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_select_object_content_csv_to_json_events() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/sel-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/sel-bucket/people.csv")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "text/csv")
.body(Body::from("name,age\nalice,30\nbob,40\n"))
.unwrap(),
)
.await
.unwrap();
let select_xml = r#"
<SelectObjectContentRequest>
<Expression>SELECT name, age FROM S3Object WHERE CAST(age AS INTEGER) &gt;= 35</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CSV>
<FileHeaderInfo>USE</FileHeaderInfo>
</CSV>
</InputSerialization>
<OutputSerialization>
<JSON>
<RecordDelimiter>\n</RecordDelimiter>
</JSON>
</OutputSerialization>
</SelectObjectContentRequest>
"#;
let resp = app
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/sel-bucket/people.csv?select&select-type=2")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/xml")
.body(Body::from(select_xml))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers().get("content-type").unwrap(), "application/octet-stream");
assert_eq!(resp.headers().get("x-amz-request-charged").unwrap(), "requester");
let body = resp.into_body().collect().await.unwrap().to_bytes();
let events = parse_select_events(&body);
assert!(events.iter().any(|(name, _)| name == "Records"));
assert!(events.iter().any(|(name, _)| name == "Stats"));
assert!(events.iter().any(|(name, _)| name == "End"));
let mut records = String::new();
for (name, payload) in events {
if name == "Records" {
records.push_str(&String::from_utf8_lossy(&payload));
}
}
assert!(records.contains("bob"));
assert!(!records.contains("alice"));
}
#[tokio::test]
async fn test_select_object_content_requires_expression() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/sel-missing-exp", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/sel-missing-exp/file.csv")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from("a,b\n1,2\n"))
.unwrap(),
)
.await
.unwrap();
let select_xml = r#"
<SelectObjectContentRequest>
<ExpressionType>SQL</ExpressionType>
<InputSerialization><CSV><FileHeaderInfo>USE</FileHeaderInfo></CSV></InputSerialization>
<OutputSerialization><CSV /></OutputSerialization>
</SelectObjectContentRequest>
"#;
let resp = app
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/sel-missing-exp/file.csv?select")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/xml")
.body(Body::from(select_xml))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("<Code>InvalidRequest</Code>"));
assert!(body.contains("Expression is required"));
}
#[tokio::test]
async fn test_select_object_content_rejects_non_xml_content_type() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/sel-ct", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/sel-ct/file.csv")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from("a,b\n1,2\n"))
.unwrap(),
)
.await
.unwrap();
let select_xml = r#"
<SelectObjectContentRequest>
<Expression>SELECT * FROM S3Object</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization><CSV><FileHeaderInfo>USE</FileHeaderInfo></CSV></InputSerialization>
<OutputSerialization><CSV /></OutputSerialization>
</SelectObjectContentRequest>
"#;
let resp = app
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/sel-ct/file.csv?select")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("content-type", "application/json")
.body(Body::from(select_xml))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let body = String::from_utf8(resp.into_body().collect().await.unwrap().to_bytes().to_vec()).unwrap();
assert!(body.contains("<Code>InvalidRequest</Code>"));
assert!(body.contains("Content-Type must be application/xml or text/xml"));
}
#[tokio::test]
async fn test_non_admin_authorization_enforced() {
let iam_json = serde_json::json!({
"version": 2,
"users": [{
"user_id": "u-limited",
"display_name": "limited",
"enabled": true,
"access_keys": [{
"access_key": TEST_ACCESS_KEY,
"secret_key": TEST_SECRET_KEY,
"status": "active"
}],
"policies": [{
"bucket": "authz-bucket",
"actions": ["list", "read"],
"prefix": "*"
}]
}]
});
let tmp = tempfile::TempDir::new().unwrap();
let iam_path = tmp.path().join(".myfsio.sys").join("config");
std::fs::create_dir_all(&iam_path).unwrap();
std::fs::write(iam_path.join("iam.json"), iam_json.to_string()).unwrap();
let config = myfsio_server::config::ServerConfig {
bind_addr: "127.0.0.1:0".parse().unwrap(),
storage_root: tmp.path().to_path_buf(),
region: "us-east-1".to_string(),
iam_config_path: iam_path.join("iam.json"),
sigv4_timestamp_tolerance_secs: 900,
presigned_url_min_expiry: 1,
presigned_url_max_expiry: 604800,
secret_key: None,
encryption_enabled: false,
kms_enabled: false,
gc_enabled: false,
integrity_enabled: false,
metrics_enabled: false,
lifecycle_enabled: false,
};
let state = myfsio_server::state::AppState::new(config);
state.storage.create_bucket("authz-bucket").await.unwrap();
let app = myfsio_server::create_router(state);
let resp = app
.clone()
.oneshot(signed_request(Method::PUT, "/denied-bucket", Body::empty()))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
let resp = app
.oneshot(signed_request(Method::GET, "/authz-bucket", Body::empty()))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
async fn test_app_encrypted() -> (axum::Router, tempfile::TempDir) {
let tmp = tempfile::TempDir::new().unwrap();
let iam_path = tmp.path().join(".myfsio.sys").join("config");

View File

@@ -90,6 +90,76 @@ pub fn list_objects_v2_xml(
String::from_utf8(writer.into_inner().into_inner()).unwrap()
}
pub fn list_objects_v1_xml(
bucket_name: &str,
prefix: &str,
marker: &str,
delimiter: &str,
max_keys: usize,
objects: &[ObjectMeta],
common_prefixes: &[String],
is_truncated: bool,
next_marker: Option<&str>,
) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
writer
.write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
.unwrap();
let start = BytesStart::new("ListBucketResult")
.with_attributes([("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")]);
writer.write_event(Event::Start(start)).unwrap();
write_text_element(&mut writer, "Name", bucket_name);
write_text_element(&mut writer, "Prefix", prefix);
write_text_element(&mut writer, "Marker", marker);
write_text_element(&mut writer, "MaxKeys", &max_keys.to_string());
write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string());
if !delimiter.is_empty() {
write_text_element(&mut writer, "Delimiter", delimiter);
}
if !delimiter.is_empty() && is_truncated {
if let Some(nm) = next_marker {
if !nm.is_empty() {
write_text_element(&mut writer, "NextMarker", nm);
}
}
}
for obj in objects {
writer
.write_event(Event::Start(BytesStart::new("Contents")))
.unwrap();
write_text_element(&mut writer, "Key", &obj.key);
write_text_element(&mut writer, "LastModified", &obj.last_modified.to_rfc3339());
if let Some(ref etag) = obj.etag {
write_text_element(&mut writer, "ETag", &format!("\"{}\"", etag));
}
write_text_element(&mut writer, "Size", &obj.size.to_string());
writer
.write_event(Event::End(BytesEnd::new("Contents")))
.unwrap();
}
for cp in common_prefixes {
writer
.write_event(Event::Start(BytesStart::new("CommonPrefixes")))
.unwrap();
write_text_element(&mut writer, "Prefix", cp);
writer
.write_event(Event::End(BytesEnd::new("CommonPrefixes")))
.unwrap();
}
writer
.write_event(Event::End(BytesEnd::new("ListBucketResult")))
.unwrap();
String::from_utf8(writer.into_inner().into_inner()).unwrap()
}
fn write_text_element(writer: &mut Writer<Cursor<Vec<u8>>>, tag: &str, text: &str) {
writer.write_event(Event::Start(BytesStart::new(tag))).unwrap();
writer.write_event(Event::Text(BytesText::new(text))).unwrap();
@@ -266,4 +336,23 @@ mod tests {
assert!(xml.contains("<Size>1024</Size>"));
assert!(xml.contains("<IsTruncated>false</IsTruncated>"));
}
#[test]
fn test_list_objects_v1_xml() {
let objects = vec![ObjectMeta::new("file.txt".to_string(), 1024, Utc::now())];
let xml = list_objects_v1_xml(
"my-bucket",
"",
"",
"/",
1000,
&objects,
&[],
false,
None,
);
assert!(xml.contains("<Key>file.txt</Key>"));
assert!(xml.contains("<Size>1024</Size>"));
assert!(xml.contains("<Marker></Marker>"));
}
}