6 Commits

Author SHA1 Message Date
1ea6dfae07 Fix S3 conformance: XML config round-trip, Suspended versioning, ListVersions pagination, per-bucket CORS, canned ACL/SSE rejection, checksum attrs, request logging 2026-04-24 13:09:30 +08:00
f2df64479c Fix S3 versioning (live-object VersionId, DM PUT/DELETE), harden DeleteObjects/ListObjects conformance, and run hot paths on blocking threads 2026-04-23 22:40:38 +08:00
bd405cc2fe Fix S3 versioning/delete markers, path-safety leaks, and error-code conformance; parallelize DeleteObjects; restore per-op rate limits 2026-04-23 20:23:11 +08:00
7ef3820f6e Fix SigV4/SHA256/TCP_NODELAY critical paths; tighten multipart, copy, versioning, and S3 error conformance 2026-04-23 17:52:30 +08:00
e1fb225034 csrf fixes 2026-04-22 23:01:32 +08:00
2767e7e79d Optimize bucket listing for 10K-100K objects
- Shallow listing: read per-directory _index.json once for eTags instead
  of N serial .meta.json reads. Validate prefix for path traversal and
  verify normalized target stays within bucket root.
- Recursive listing: cache full per-directory index during the walk so
  each _index.json is parsed at most once per call.
- Per-bucket listing cache with 5s TTL and per-bucket rebuild mutex.
  Invalidated on put/delete/copy/metadata/tags/multipart-complete.
  Pagination uses partition_point for O(log n) start lookup.
- UI stream endpoint now actually streams via mpsc + Body::from_stream
  instead of buffering into a Vec<String>. Cancels producer on client
  disconnect.
- UI JSON endpoint honors delimiter=/ and returns common_prefixes.
- run_blocking wrapper dispatches sync filesystem work via
  block_in_place on multi-threaded runtimes, falls back to inline on
  current-thread runtimes (unit tests).
2026-04-22 19:55:44 +08:00
28 changed files with 4006 additions and 663 deletions

27
Cargo.lock generated
View File

@@ -2639,7 +2639,7 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-auth" name = "myfsio-auth"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"aes", "aes",
"base64", "base64",
@@ -2664,7 +2664,7 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-common" name = "myfsio-common"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",
@@ -2675,7 +2675,7 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-crypto" name = "myfsio-crypto"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"aes-gcm", "aes-gcm",
"base64", "base64",
@@ -2696,7 +2696,7 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-server" name = "myfsio-server"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"aes-gcm", "aes-gcm",
"async-trait", "async-trait",
@@ -2714,6 +2714,8 @@ dependencies = [
"dotenvy", "dotenvy",
"duckdb", "duckdb",
"futures", "futures",
"hex",
"http-body 1.0.1",
"http-body-util", "http-body-util",
"hyper 1.9.0", "hyper 1.9.0",
"md-5 0.10.6", "md-5 0.10.6",
@@ -2740,6 +2742,7 @@ dependencies = [
"tempfile", "tempfile",
"tera", "tera",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",
"tower-http", "tower-http",
@@ -2750,7 +2753,7 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-storage" name = "myfsio-storage"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"chrono", "chrono",
"dashmap", "dashmap",
@@ -2773,10 +2776,11 @@ dependencies = [
[[package]] [[package]]
name = "myfsio-xml" name = "myfsio-xml"
version = "0.4.3" version = "0.4.4"
dependencies = [ dependencies = [
"chrono", "chrono",
"myfsio-common", "myfsio-common",
"percent-encoding",
"quick-xml", "quick-xml",
"serde", "serde",
] ]
@@ -4193,6 +4197,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"

View File

@@ -10,14 +10,14 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.4.3" version = "0.4.4"
edition = "2021" edition = "2021"
[workspace.dependencies] [workspace.dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
axum = { version = "0.8" } axum = { version = "0.8" }
tower = { version = "0.5" } tower = { version = "0.5" }
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip"] } tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip", "timeout"] }
hyper = { version = "1" } hyper = { version = "1" }
bytes = "1" bytes = "1"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
@@ -43,6 +43,7 @@ thiserror = "2"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
base64 = "0.22" base64 = "0.22"
tokio-util = { version = "0.7", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
tokio-stream = "0.1"
futures = "0.3" futures = "0.3"
dashmap = "6" dashmap = "6"
crc32fast = "1" crc32fast = "1"

View File

@@ -8,6 +8,7 @@ pub const STATS_FILE: &str = "stats.json";
pub const ETAG_INDEX_FILE: &str = "etag_index.json"; pub const ETAG_INDEX_FILE: &str = "etag_index.json";
pub const INDEX_FILE: &str = "_index.json"; pub const INDEX_FILE: &str = "_index.json";
pub const MANIFEST_FILE: &str = "manifest.json"; pub const MANIFEST_FILE: &str = "manifest.json";
pub const DIR_MARKER_FILE: &str = ".__myfsio_dirobj__";
pub const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"]; pub const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"];

View File

@@ -5,13 +5,17 @@ pub enum S3ErrorCode {
AccessDenied, AccessDenied,
BadDigest, BadDigest,
BucketAlreadyExists, BucketAlreadyExists,
BucketAlreadyOwnedByYou,
BucketNotEmpty, BucketNotEmpty,
EntityTooLarge, EntityTooLarge,
EntityTooSmall,
InternalError, InternalError,
InvalidAccessKeyId, InvalidAccessKeyId,
InvalidArgument, InvalidArgument,
InvalidBucketName, InvalidBucketName,
InvalidKey, InvalidKey,
InvalidPart,
InvalidPartOrder,
InvalidPolicyDocument, InvalidPolicyDocument,
InvalidRange, InvalidRange,
InvalidRequest, InvalidRequest,
@@ -19,13 +23,17 @@ pub enum S3ErrorCode {
MalformedXML, MalformedXML,
MethodNotAllowed, MethodNotAllowed,
NoSuchBucket, NoSuchBucket,
NoSuchBucketPolicy,
NoSuchKey, NoSuchKey,
NoSuchLifecycleConfiguration,
NoSuchUpload, NoSuchUpload,
NoSuchVersion, NoSuchVersion,
NoSuchTagSet, NoSuchTagSet,
PreconditionFailed, PreconditionFailed,
NotModified, NotModified,
QuotaExceeded, QuotaExceeded,
RequestTimeTooSkewed,
ServerSideEncryptionConfigurationNotFoundError,
SignatureDoesNotMatch, SignatureDoesNotMatch,
SlowDown, SlowDown,
} }
@@ -36,13 +44,17 @@ impl S3ErrorCode {
Self::AccessDenied => 403, Self::AccessDenied => 403,
Self::BadDigest => 400, Self::BadDigest => 400,
Self::BucketAlreadyExists => 409, Self::BucketAlreadyExists => 409,
Self::BucketAlreadyOwnedByYou => 409,
Self::BucketNotEmpty => 409, Self::BucketNotEmpty => 409,
Self::EntityTooLarge => 413, Self::EntityTooLarge => 413,
Self::EntityTooSmall => 400,
Self::InternalError => 500, Self::InternalError => 500,
Self::InvalidAccessKeyId => 403, Self::InvalidAccessKeyId => 403,
Self::InvalidArgument => 400, Self::InvalidArgument => 400,
Self::InvalidBucketName => 400, Self::InvalidBucketName => 400,
Self::InvalidKey => 400, Self::InvalidKey => 400,
Self::InvalidPart => 400,
Self::InvalidPartOrder => 400,
Self::InvalidPolicyDocument => 400, Self::InvalidPolicyDocument => 400,
Self::InvalidRange => 416, Self::InvalidRange => 416,
Self::InvalidRequest => 400, Self::InvalidRequest => 400,
@@ -50,15 +62,19 @@ impl S3ErrorCode {
Self::MalformedXML => 400, Self::MalformedXML => 400,
Self::MethodNotAllowed => 405, Self::MethodNotAllowed => 405,
Self::NoSuchBucket => 404, Self::NoSuchBucket => 404,
Self::NoSuchBucketPolicy => 404,
Self::NoSuchKey => 404, Self::NoSuchKey => 404,
Self::NoSuchLifecycleConfiguration => 404,
Self::NoSuchUpload => 404, Self::NoSuchUpload => 404,
Self::NoSuchVersion => 404, Self::NoSuchVersion => 404,
Self::NoSuchTagSet => 404, Self::NoSuchTagSet => 404,
Self::PreconditionFailed => 412, Self::PreconditionFailed => 412,
Self::NotModified => 304, Self::NotModified => 304,
Self::QuotaExceeded => 403, Self::QuotaExceeded => 403,
Self::RequestTimeTooSkewed => 403,
Self::ServerSideEncryptionConfigurationNotFoundError => 404,
Self::SignatureDoesNotMatch => 403, Self::SignatureDoesNotMatch => 403,
Self::SlowDown => 429, Self::SlowDown => 503,
} }
} }
@@ -67,13 +83,17 @@ impl S3ErrorCode {
Self::AccessDenied => "AccessDenied", Self::AccessDenied => "AccessDenied",
Self::BadDigest => "BadDigest", Self::BadDigest => "BadDigest",
Self::BucketAlreadyExists => "BucketAlreadyExists", Self::BucketAlreadyExists => "BucketAlreadyExists",
Self::BucketAlreadyOwnedByYou => "BucketAlreadyOwnedByYou",
Self::BucketNotEmpty => "BucketNotEmpty", Self::BucketNotEmpty => "BucketNotEmpty",
Self::EntityTooLarge => "EntityTooLarge", Self::EntityTooLarge => "EntityTooLarge",
Self::EntityTooSmall => "EntityTooSmall",
Self::InternalError => "InternalError", Self::InternalError => "InternalError",
Self::InvalidAccessKeyId => "InvalidAccessKeyId", Self::InvalidAccessKeyId => "InvalidAccessKeyId",
Self::InvalidArgument => "InvalidArgument", Self::InvalidArgument => "InvalidArgument",
Self::InvalidBucketName => "InvalidBucketName", Self::InvalidBucketName => "InvalidBucketName",
Self::InvalidKey => "InvalidKey", Self::InvalidKey => "InvalidKey",
Self::InvalidPart => "InvalidPart",
Self::InvalidPartOrder => "InvalidPartOrder",
Self::InvalidPolicyDocument => "InvalidPolicyDocument", Self::InvalidPolicyDocument => "InvalidPolicyDocument",
Self::InvalidRange => "InvalidRange", Self::InvalidRange => "InvalidRange",
Self::InvalidRequest => "InvalidRequest", Self::InvalidRequest => "InvalidRequest",
@@ -81,13 +101,19 @@ impl S3ErrorCode {
Self::MalformedXML => "MalformedXML", Self::MalformedXML => "MalformedXML",
Self::MethodNotAllowed => "MethodNotAllowed", Self::MethodNotAllowed => "MethodNotAllowed",
Self::NoSuchBucket => "NoSuchBucket", Self::NoSuchBucket => "NoSuchBucket",
Self::NoSuchBucketPolicy => "NoSuchBucketPolicy",
Self::NoSuchKey => "NoSuchKey", Self::NoSuchKey => "NoSuchKey",
Self::NoSuchLifecycleConfiguration => "NoSuchLifecycleConfiguration",
Self::NoSuchUpload => "NoSuchUpload", Self::NoSuchUpload => "NoSuchUpload",
Self::NoSuchVersion => "NoSuchVersion", Self::NoSuchVersion => "NoSuchVersion",
Self::NoSuchTagSet => "NoSuchTagSet", Self::NoSuchTagSet => "NoSuchTagSet",
Self::PreconditionFailed => "PreconditionFailed", Self::PreconditionFailed => "PreconditionFailed",
Self::NotModified => "NotModified", Self::NotModified => "NotModified",
Self::QuotaExceeded => "QuotaExceeded", Self::QuotaExceeded => "QuotaExceeded",
Self::RequestTimeTooSkewed => "RequestTimeTooSkewed",
Self::ServerSideEncryptionConfigurationNotFoundError => {
"ServerSideEncryptionConfigurationNotFoundError"
}
Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::SignatureDoesNotMatch => "SignatureDoesNotMatch",
Self::SlowDown => "SlowDown", Self::SlowDown => "SlowDown",
} }
@@ -98,13 +124,17 @@ impl S3ErrorCode {
Self::AccessDenied => "Access Denied", Self::AccessDenied => "Access Denied",
Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received", Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received",
Self::BucketAlreadyExists => "The requested bucket name is not available", Self::BucketAlreadyExists => "The requested bucket name is not available",
Self::BucketAlreadyOwnedByYou => "Your previous request to create the named bucket succeeded and you already own it",
Self::BucketNotEmpty => "The bucket you tried to delete is not empty", Self::BucketNotEmpty => "The bucket you tried to delete is not empty",
Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size", Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size",
Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size",
Self::InternalError => "We encountered an internal error. Please try again.", Self::InternalError => "We encountered an internal error. Please try again.",
Self::InvalidAccessKeyId => "The access key ID you provided does not exist", Self::InvalidAccessKeyId => "The access key ID you provided does not exist",
Self::InvalidArgument => "Invalid argument", Self::InvalidArgument => "Invalid argument",
Self::InvalidBucketName => "The specified bucket is not valid", Self::InvalidBucketName => "The specified bucket is not valid",
Self::InvalidKey => "The specified key is not valid", Self::InvalidKey => "The specified key is not valid",
Self::InvalidPart => "One or more of the specified parts could not be found",
Self::InvalidPartOrder => "The list of parts was not in ascending order",
Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document", Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document",
Self::InvalidRange => "The requested range is not satisfiable", Self::InvalidRange => "The requested range is not satisfiable",
Self::InvalidRequest => "Invalid request", Self::InvalidRequest => "Invalid request",
@@ -112,13 +142,17 @@ impl S3ErrorCode {
Self::MalformedXML => "The XML you provided was not well-formed", Self::MalformedXML => "The XML you provided was not well-formed",
Self::MethodNotAllowed => "The specified method is not allowed against this resource", Self::MethodNotAllowed => "The specified method is not allowed against this resource",
Self::NoSuchBucket => "The specified bucket does not exist", Self::NoSuchBucket => "The specified bucket does not exist",
Self::NoSuchBucketPolicy => "The bucket policy does not exist",
Self::NoSuchKey => "The specified key does not exist", Self::NoSuchKey => "The specified key does not exist",
Self::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist",
Self::NoSuchUpload => "The specified multipart upload does not exist", Self::NoSuchUpload => "The specified multipart upload does not exist",
Self::NoSuchVersion => "The specified version does not exist", Self::NoSuchVersion => "The specified version does not exist",
Self::NoSuchTagSet => "The TagSet does not exist", Self::NoSuchTagSet => "The TagSet does not exist",
Self::PreconditionFailed => "At least one of the preconditions you specified did not hold", Self::PreconditionFailed => "At least one of the preconditions you specified did not hold",
Self::NotModified => "Not Modified", Self::NotModified => "Not Modified",
Self::QuotaExceeded => "The bucket quota has been exceeded", Self::QuotaExceeded => "The bucket quota has been exceeded",
Self::RequestTimeTooSkewed => "The difference between the request time and the server's time is too large",
Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found",
Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided", Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided",
Self::SlowDown => "Please reduce your request rate", Self::SlowDown => "Please reduce your request rate",
} }

View File

@@ -12,6 +12,10 @@ pub struct ObjectMeta {
pub content_type: Option<String>, pub content_type: Option<String>,
pub storage_class: Option<String>, pub storage_class: Option<String>,
pub metadata: HashMap<String, String>, pub metadata: HashMap<String, String>,
#[serde(default)]
pub version_id: Option<String>,
#[serde(default)]
pub is_delete_marker: bool,
} }
impl ObjectMeta { impl ObjectMeta {
@@ -24,10 +28,19 @@ impl ObjectMeta {
content_type: None, content_type: None,
storage_class: Some("STANDARD".to_string()), storage_class: Some("STANDARD".to_string()),
metadata: HashMap::new(), metadata: HashMap::new(),
version_id: None,
is_delete_marker: false,
} }
} }
} }
#[derive(Debug, Clone, Default)]
pub struct DeleteOutcome {
pub version_id: Option<String>,
pub is_delete_marker: bool,
pub existed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketMeta { pub struct BucketMeta {
pub name: String, pub name: String,
@@ -122,11 +135,31 @@ pub struct Tag {
pub value: String, pub value: String,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum VersioningStatus {
#[default]
Disabled,
Enabled,
Suspended,
}
impl VersioningStatus {
pub fn is_enabled(self) -> bool {
matches!(self, VersioningStatus::Enabled)
}
pub fn is_active(self) -> bool {
matches!(self, VersioningStatus::Enabled | VersioningStatus::Suspended)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BucketConfig { pub struct BucketConfig {
#[serde(default)] #[serde(default)]
pub versioning_enabled: bool, pub versioning_enabled: bool,
#[serde(default)] #[serde(default)]
pub versioning_suspended: bool,
#[serde(default)]
pub tags: Vec<Tag>, pub tags: Vec<Tag>,
#[serde(default)] #[serde(default)]
pub cors: Option<serde_json::Value>, pub cors: Option<serde_json::Value>,
@@ -152,6 +185,35 @@ pub struct BucketConfig {
pub replication: Option<serde_json::Value>, pub replication: Option<serde_json::Value>,
} }
impl BucketConfig {
pub fn versioning_status(&self) -> VersioningStatus {
if self.versioning_enabled {
VersioningStatus::Enabled
} else if self.versioning_suspended {
VersioningStatus::Suspended
} else {
VersioningStatus::Disabled
}
}
pub fn set_versioning_status(&mut self, status: VersioningStatus) {
match status {
VersioningStatus::Enabled => {
self.versioning_enabled = true;
self.versioning_suspended = false;
}
VersioningStatus::Suspended => {
self.versioning_enabled = false;
self.versioning_suspended = true;
}
VersioningStatus::Disabled => {
self.versioning_enabled = false;
self.versioning_suspended = false;
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuotaConfig { pub struct QuotaConfig {
pub max_bytes: Option<u64>, pub max_bytes: Option<u64>,

View File

@@ -23,15 +23,18 @@ serde_urlencoded = "0.7"
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tokio-stream = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
http-body = "1"
http-body-util = "0.1" http-body-util = "0.1"
percent-encoding = { workspace = true } percent-encoding = { workspace = true }
quick-xml = { workspace = true } quick-xml = { workspace = true }
mime_guess = "2" mime_guess = "2"
crc32fast = { workspace = true } crc32fast = { workspace = true }
sha2 = { workspace = true } sha2 = { workspace = true }
hex = { workspace = true }
duckdb = { workspace = true } duckdb = { workspace = true }
roxmltree = "0.20" roxmltree = "0.20"
parking_lot = { workspace = true } parking_lot = { workspace = true }

View File

@@ -81,7 +81,12 @@ pub struct ServerConfig {
pub multipart_min_part_size: u64, pub multipart_min_part_size: u64,
pub bulk_delete_max_keys: usize, pub bulk_delete_max_keys: usize,
pub stream_chunk_size: usize, pub stream_chunk_size: usize,
pub request_body_timeout_secs: u64,
pub ratelimit_default: RateLimitSetting, pub ratelimit_default: RateLimitSetting,
pub ratelimit_list_buckets: RateLimitSetting,
pub ratelimit_bucket_ops: RateLimitSetting,
pub ratelimit_object_ops: RateLimitSetting,
pub ratelimit_head_ops: RateLimitSetting,
pub ratelimit_admin: RateLimitSetting, pub ratelimit_admin: RateLimitSetting,
pub ratelimit_storage_uri: String, pub ratelimit_storage_uri: String,
pub ui_enabled: bool, pub ui_enabled: bool,
@@ -225,8 +230,17 @@ impl ServerConfig {
let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880); let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880);
let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000); let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000);
let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576); let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576);
let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60);
let ratelimit_default = let ratelimit_default =
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60)); parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(5000, 60));
let ratelimit_list_buckets =
parse_rate_limit_env("RATE_LIMIT_LIST_BUCKETS", ratelimit_default);
let ratelimit_bucket_ops =
parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default);
let ratelimit_object_ops =
parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default);
let ratelimit_head_ops =
parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default);
let ratelimit_admin = let ratelimit_admin =
parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60)); parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60));
let ratelimit_storage_uri = let ratelimit_storage_uri =
@@ -304,7 +318,12 @@ impl ServerConfig {
multipart_min_part_size, multipart_min_part_size,
bulk_delete_max_keys, bulk_delete_max_keys,
stream_chunk_size, stream_chunk_size,
request_body_timeout_secs,
ratelimit_default, ratelimit_default,
ratelimit_list_buckets,
ratelimit_bucket_ops,
ratelimit_object_ops,
ratelimit_head_ops,
ratelimit_admin, ratelimit_admin,
ratelimit_storage_uri, ratelimit_storage_uri,
ui_enabled, ui_enabled,
@@ -387,7 +406,12 @@ impl Default for ServerConfig {
multipart_min_part_size: 5_242_880, multipart_min_part_size: 5_242_880,
bulk_delete_max_keys: 1000, bulk_delete_max_keys: 1000,
stream_chunk_size: 1_048_576, stream_chunk_size: 1_048_576,
ratelimit_default: RateLimitSetting::new(200, 60), request_body_timeout_secs: 60,
ratelimit_default: RateLimitSetting::new(5000, 60),
ratelimit_list_buckets: RateLimitSetting::new(5000, 60),
ratelimit_bucket_ops: RateLimitSetting::new(5000, 60),
ratelimit_object_ops: RateLimitSetting::new(5000, 60),
ratelimit_head_ops: RateLimitSetting::new(5000, 60),
ratelimit_admin: RateLimitSetting::new(60, 60), ratelimit_admin: RateLimitSetting::new(60, 60),
ratelimit_storage_uri: "memory://".to_string(), ratelimit_storage_uri: "memory://".to_string(),
ui_enabled: true, ui_enabled: true,
@@ -472,7 +496,31 @@ fn parse_list_env(key: &str, default: &str) -> Vec<String> {
} }
pub fn parse_rate_limit(value: &str) -> Option<RateLimitSetting> { pub fn parse_rate_limit(value: &str) -> Option<RateLimitSetting> {
let parts = value.split_whitespace().collect::<Vec<_>>(); let trimmed = value.trim();
if let Some((requests, window)) = trimmed.split_once('/') {
let max_requests = requests.trim().parse::<u32>().ok()?;
if max_requests == 0 {
return None;
}
let window_str = window.trim().to_ascii_lowercase();
let window_seconds = if let Ok(n) = window_str.parse::<u64>() {
if n == 0 {
return None;
}
n
} else {
match window_str.as_str() {
"s" | "sec" | "second" | "seconds" => 1,
"m" | "min" | "minute" | "minutes" => 60,
"h" | "hr" | "hour" | "hours" => 3600,
"d" | "day" | "days" => 86_400,
_ => return None,
}
};
return Some(RateLimitSetting::new(max_requests, window_seconds));
}
let parts = trimmed.split_whitespace().collect::<Vec<_>>();
if parts.len() != 3 || !parts[1].eq_ignore_ascii_case("per") { if parts.len() != 3 || !parts[1].eq_ignore_ascii_case("per") {
return None; return None;
} }
@@ -517,6 +565,15 @@ mod tests {
parse_rate_limit("3 per hours"), parse_rate_limit("3 per hours"),
Some(RateLimitSetting::new(3, 3600)) Some(RateLimitSetting::new(3, 3600))
); );
assert_eq!(
parse_rate_limit("50000/60"),
Some(RateLimitSetting::new(50000, 60))
);
assert_eq!(
parse_rate_limit("100/minute"),
Some(RateLimitSetting::new(100, 60))
);
assert_eq!(parse_rate_limit("0/60"), None);
assert_eq!(parse_rate_limit("0 per minute"), None); assert_eq!(parse_rate_limit("0 per minute"), None);
assert_eq!(parse_rate_limit("bad"), None); assert_eq!(parse_rate_limit("bad"), None);
} }
@@ -532,7 +589,7 @@ mod tests {
assert_eq!(config.object_key_max_length_bytes, 1024); assert_eq!(config.object_key_max_length_bytes, 1024);
assert_eq!(config.object_tag_limit, 50); assert_eq!(config.object_tag_limit, 50);
assert_eq!(config.ratelimit_default, RateLimitSetting::new(200, 60)); assert_eq!(config.ratelimit_default, RateLimitSetting::new(5000, 60));
std::env::remove_var("OBJECT_TAG_LIMIT"); std::env::remove_var("OBJECT_TAG_LIMIT");
std::env::remove_var("RATE_LIMIT_DEFAULT"); std::env::remove_var("RATE_LIMIT_DEFAULT");

View File

@@ -20,6 +20,13 @@ fn xml_response(status: StatusCode, xml: String) -> Response {
(status, [("content-type", "application/xml")], xml).into_response() (status, [("content-type", "application/xml")], xml).into_response()
} }
fn stored_xml(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn storage_err(err: myfsio_storage::error::StorageError) -> Response { fn storage_err(err: myfsio_storage::error::StorageError) -> Response {
let s3err = S3Error::from(err); let s3err = S3Error::from(err);
let status = let status =
@@ -52,17 +59,31 @@ fn custom_xml_error(status: StatusCode, code: &str, message: &str) -> Response {
} }
pub async fn get_versioning(state: &AppState, bucket: &str) -> Response { pub async fn get_versioning(state: &AppState, bucket: &str) -> Response {
match state.storage.is_versioning_enabled(bucket).await { match state.storage.get_versioning_status(bucket).await {
Ok(enabled) => { Ok(status) => {
let status_str = if enabled { "Enabled" } else { "Suspended" }; let body = match status {
let xml = format!( myfsio_common::types::VersioningStatus::Enabled => {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ <VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Status>{}</Status>\ <Status>Enabled</Status>\
</VersioningConfiguration>", </VersioningConfiguration>"
status_str .to_string()
); }
xml_response(StatusCode::OK, xml) myfsio_common::types::VersioningStatus::Suspended => {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Status>Suspended</Status>\
</VersioningConfiguration>"
.to_string()
}
myfsio_common::types::VersioningStatus::Disabled => {
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
</VersioningConfiguration>"
.to_string()
}
};
xml_response(StatusCode::OK, body)
} }
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
@@ -80,9 +101,22 @@ pub async fn put_versioning(state: &AppState, bucket: &str, body: Body) -> Respo
}; };
let xml_str = String::from_utf8_lossy(&body_bytes); let xml_str = String::from_utf8_lossy(&body_bytes);
let enabled = xml_str.contains("<Status>Enabled</Status>"); let status = if xml_str.contains("<Status>Enabled</Status>") {
myfsio_common::types::VersioningStatus::Enabled
} else if xml_str.contains("<Status>Suspended</Status>") {
myfsio_common::types::VersioningStatus::Suspended
} else {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(
S3ErrorCode::MalformedXML,
"VersioningConfiguration Status must be Enabled or Suspended",
)
.to_xml(),
);
};
match state.storage.set_versioning(bucket, enabled).await { match state.storage.set_versioning_status(bucket, status).await {
Ok(()) => StatusCode::OK.into_response(), Ok(()) => StatusCode::OK.into_response(),
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
@@ -151,7 +185,7 @@ pub async fn get_cors(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(cors) = &config.cors { if let Some(cors) = &config.cors {
xml_response(StatusCode::OK, cors.to_string()) xml_response(StatusCode::OK, stored_xml(cors))
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -214,14 +248,11 @@ pub async fn get_encryption(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(enc) = &config.encryption { if let Some(enc) = &config.encryption {
xml_response(StatusCode::OK, enc.to_string()) xml_response(StatusCode::OK, stored_xml(enc))
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
S3Error::new( S3Error::from_code(S3ErrorCode::ServerSideEncryptionConfigurationNotFoundError)
S3ErrorCode::InvalidRequest,
"The server side encryption configuration was not found",
)
.to_xml(), .to_xml(),
) )
} }
@@ -266,15 +297,11 @@ pub async fn get_lifecycle(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(lc) = &config.lifecycle { if let Some(lc) = &config.lifecycle {
xml_response(StatusCode::OK, lc.to_string()) xml_response(StatusCode::OK, stored_xml(lc))
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
S3Error::new( S3Error::from_code(S3ErrorCode::NoSuchLifecycleConfiguration).to_xml(),
S3ErrorCode::NoSuchKey,
"The lifecycle configuration does not exist",
)
.to_xml(),
) )
} }
} }
@@ -421,7 +448,7 @@ pub async fn get_policy(state: &AppState, bucket: &str) -> Response {
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(), S3Error::from_code(S3ErrorCode::NoSuchBucketPolicy).to_xml(),
) )
} }
} }
@@ -497,10 +524,7 @@ pub async fn get_replication(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(replication) = &config.replication { if let Some(replication) = &config.replication {
match replication { xml_response(StatusCode::OK, stored_xml(replication))
serde_json::Value::String(s) => xml_response(StatusCode::OK, s.clone()),
other => xml_response(StatusCode::OK, other.to_string()),
}
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -593,7 +617,7 @@ pub async fn get_acl(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(acl) = &config.acl { if let Some(acl) = &config.acl {
xml_response(StatusCode::OK, acl.to_string()) xml_response(StatusCode::OK, stored_xml(acl))
} else { } else {
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ <AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
@@ -633,7 +657,7 @@ pub async fn get_website(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(ws) = &config.website { if let Some(ws) = &config.website {
xml_response(StatusCode::OK, ws.to_string()) xml_response(StatusCode::OK, stored_xml(ws))
} else { } else {
xml_response( xml_response(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -685,7 +709,7 @@ pub async fn get_object_lock(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(ol) = &config.object_lock { if let Some(ol) = &config.object_lock {
xml_response(StatusCode::OK, ol.to_string()) xml_response(StatusCode::OK, stored_xml(ol))
} else { } else {
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<ObjectLockConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ <ObjectLockConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
@@ -702,7 +726,7 @@ pub async fn get_notification(state: &AppState, bucket: &str) -> Response {
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(config) => { Ok(config) => {
if let Some(n) = &config.notification { if let Some(n) = &config.notification {
xml_response(StatusCode::OK, n.to_string()) xml_response(StatusCode::OK, stored_xml(n))
} else { } else {
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<NotificationConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ <NotificationConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
@@ -1042,22 +1066,23 @@ pub async fn list_object_versions(
state: &AppState, state: &AppState,
bucket: &str, bucket: &str,
prefix: Option<&str>, prefix: Option<&str>,
delimiter: Option<&str>,
key_marker: Option<&str>,
version_id_marker: Option<&str>,
max_keys: usize, max_keys: usize,
) -> Response { ) -> Response {
match state.storage.list_buckets().await { match state.storage.bucket_exists(bucket).await {
Ok(buckets) => { Ok(true) => {}
if !buckets.iter().any(|b| b.name == bucket) { Ok(false) => {
return storage_err(myfsio_storage::error::StorageError::BucketNotFound( return storage_err(myfsio_storage::error::StorageError::BucketNotFound(
bucket.to_string(), bucket.to_string(),
)); ));
} }
}
Err(e) => return storage_err(e), Err(e) => return storage_err(e),
} }
let fetch_limit = max_keys.saturating_add(1).max(1);
let params = myfsio_common::types::ListParams { let params = myfsio_common::types::ListParams {
max_keys: fetch_limit, max_keys: usize::MAX,
prefix: prefix.map(ToOwned::to_owned), prefix: prefix.map(ToOwned::to_owned),
..Default::default() ..Default::default()
}; };
@@ -1066,7 +1091,8 @@ pub async fn list_object_versions(
Ok(result) => result, Ok(result) => result,
Err(e) => return storage_err(e), Err(e) => return storage_err(e),
}; };
let objects = object_result.objects; let live_objects = object_result.objects;
let archived_versions = match state let archived_versions = match state
.storage .storage
.list_bucket_object_versions(bucket, prefix) .list_bucket_object_versions(bucket, prefix)
@@ -1076,63 +1102,215 @@ pub async fn list_object_versions(
Err(e) => return storage_err(e), Err(e) => return storage_err(e),
}; };
#[derive(Clone)]
struct Entry {
key: String,
version_id: String,
last_modified: chrono::DateTime<chrono::Utc>,
etag: Option<String>,
size: u64,
storage_class: String,
is_delete_marker: bool,
}
let mut entries: Vec<Entry> = Vec::with_capacity(live_objects.len() + archived_versions.len());
for obj in &live_objects {
entries.push(Entry {
key: obj.key.clone(),
version_id: obj.version_id.clone().unwrap_or_else(|| "null".to_string()),
last_modified: obj.last_modified,
etag: obj.etag.clone(),
size: obj.size,
storage_class: obj
.storage_class
.clone()
.unwrap_or_else(|| "STANDARD".to_string()),
is_delete_marker: false,
});
}
for version in &archived_versions {
entries.push(Entry {
key: version.key.clone(),
version_id: version.version_id.clone(),
last_modified: version.last_modified,
etag: version.etag.clone(),
size: version.size,
storage_class: "STANDARD".to_string(),
is_delete_marker: version.is_delete_marker,
});
}
entries.sort_by(|a, b| {
a.key
.cmp(&b.key)
.then_with(|| b.last_modified.cmp(&a.last_modified))
.then_with(|| a.version_id.cmp(&b.version_id))
});
let mut latest_marked: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut is_latest_flags: Vec<bool> = Vec::with_capacity(entries.len());
for entry in &entries {
if latest_marked.insert(entry.key.clone()) {
is_latest_flags.push(true);
} else {
is_latest_flags.push(false);
}
}
let km = key_marker.unwrap_or("");
let vim = version_id_marker.unwrap_or("");
let start_index = if km.is_empty() {
0
} else if vim.is_empty() {
entries
.iter()
.position(|e| e.key.as_str() > km)
.unwrap_or(entries.len())
} else if let Some(pos) = entries
.iter()
.position(|e| e.key == km && e.version_id == vim)
{
pos + 1
} else {
entries
.iter()
.position(|e| e.key.as_str() > km)
.unwrap_or(entries.len())
};
let delim = delimiter.unwrap_or("");
let prefix_str = prefix.unwrap_or("");
let mut common_prefixes: Vec<String> = Vec::new();
let mut seen_prefixes: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut rendered = String::new();
let mut count = 0usize;
let mut is_truncated = false;
let mut next_key_marker: Option<String> = None;
let mut next_version_id_marker: Option<String> = None;
let mut last_emitted: Option<(String, String)> = None;
let mut idx = start_index;
while idx < entries.len() {
let entry = &entries[idx];
let is_latest = is_latest_flags[idx];
if !delim.is_empty() {
let rest = entry.key.strip_prefix(prefix_str).unwrap_or(&entry.key);
if let Some(delim_pos) = rest.find(delim) {
let grouped = entry.key[..prefix_str.len() + delim_pos + delim.len()].to_string();
if seen_prefixes.contains(&grouped) {
idx += 1;
continue;
}
if count >= max_keys {
is_truncated = true;
if let Some((k, v)) = last_emitted.clone() {
next_key_marker = Some(k);
next_version_id_marker = Some(v);
}
break;
}
common_prefixes.push(grouped.clone());
seen_prefixes.insert(grouped.clone());
count += 1;
let mut group_last = (entry.key.clone(), entry.version_id.clone());
idx += 1;
while idx < entries.len() && entries[idx].key.starts_with(&grouped) {
group_last = (entries[idx].key.clone(), entries[idx].version_id.clone());
idx += 1;
}
last_emitted = Some(group_last);
continue;
}
}
if count >= max_keys {
is_truncated = true;
if let Some((k, v)) = last_emitted.clone() {
next_key_marker = Some(k);
next_version_id_marker = Some(v);
}
break;
}
let tag = if entry.is_delete_marker {
"DeleteMarker"
} else {
"Version"
};
rendered.push_str(&format!("<{}>", tag));
rendered.push_str(&format!("<Key>{}</Key>", xml_escape(&entry.key)));
rendered.push_str(&format!(
"<VersionId>{}</VersionId>",
xml_escape(&entry.version_id)
));
rendered.push_str(&format!("<IsLatest>{}</IsLatest>", is_latest));
rendered.push_str(&format!(
"<LastModified>{}</LastModified>",
myfsio_xml::response::format_s3_datetime(&entry.last_modified)
));
if !entry.is_delete_marker {
if let Some(ref etag) = entry.etag {
rendered.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
}
rendered.push_str(&format!("<Size>{}</Size>", entry.size));
rendered.push_str(&format!(
"<StorageClass>{}</StorageClass>",
xml_escape(&entry.storage_class)
));
}
rendered.push_str(&format!("</{}>", tag));
last_emitted = Some((entry.key.clone(), entry.version_id.clone()));
count += 1;
idx += 1;
}
let mut xml = String::from( let mut xml = String::from(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<ListVersionsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", <ListVersionsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
); );
xml.push_str(&format!("<Name>{}</Name>", xml_escape(bucket))); xml.push_str(&format!("<Name>{}</Name>", xml_escape(bucket)));
xml.push_str(&format!("<Prefix>{}</Prefix>", xml_escape(prefix_str)));
if !km.is_empty() {
xml.push_str(&format!("<KeyMarker>{}</KeyMarker>", xml_escape(km)));
} else {
xml.push_str("<KeyMarker></KeyMarker>");
}
if !vim.is_empty() {
xml.push_str(&format!( xml.push_str(&format!(
"<Prefix>{}</Prefix>", "<VersionIdMarker>{}</VersionIdMarker>",
xml_escape(prefix.unwrap_or("")) xml_escape(vim)
)); ));
} else {
xml.push_str("<VersionIdMarker></VersionIdMarker>");
}
xml.push_str(&format!("<MaxKeys>{}</MaxKeys>", max_keys)); xml.push_str(&format!("<MaxKeys>{}</MaxKeys>", max_keys));
if !delim.is_empty() {
let current_count = objects.len().min(max_keys); xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(delim)));
let remaining = max_keys.saturating_sub(current_count); }
let archived_count = archived_versions.len().min(remaining);
let is_truncated = object_result.is_truncated
|| objects.len() > current_count
|| archived_versions.len() > archived_count;
xml.push_str(&format!("<IsTruncated>{}</IsTruncated>", is_truncated)); xml.push_str(&format!("<IsTruncated>{}</IsTruncated>", is_truncated));
if let Some(ref nk) = next_key_marker {
for obj in objects.iter().take(current_count) {
xml.push_str("<Version>");
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&obj.key)));
xml.push_str("<VersionId>null</VersionId>");
xml.push_str("<IsLatest>true</IsLatest>");
xml.push_str(&format!( xml.push_str(&format!(
"<LastModified>{}</LastModified>", "<NextKeyMarker>{}</NextKeyMarker>",
myfsio_xml::response::format_s3_datetime(&obj.last_modified) xml_escape(nk)
)); ));
if let Some(ref etag) = obj.etag {
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
} }
xml.push_str(&format!("<Size>{}</Size>", obj.size)); if let Some(ref nv) = next_version_id_marker {
xml.push_str(&format!( xml.push_str(&format!(
"<StorageClass>{}</StorageClass>", "<NextVersionIdMarker>{}</NextVersionIdMarker>",
xml_escape(obj.storage_class.as_deref().unwrap_or("STANDARD")) xml_escape(nv)
)); ));
xml.push_str("</Version>");
} }
for version in archived_versions.iter().take(archived_count) { xml.push_str(&rendered);
xml.push_str("<Version>"); for cp in &common_prefixes {
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&version.key)));
xml.push_str(&format!( xml.push_str(&format!(
"<VersionId>{}</VersionId>", "<CommonPrefixes><Prefix>{}</Prefix></CommonPrefixes>",
xml_escape(&version.version_id) xml_escape(cp)
)); ));
xml.push_str("<IsLatest>false</IsLatest>");
xml.push_str(&format!(
"<LastModified>{}</LastModified>",
myfsio_xml::response::format_s3_datetime(&version.last_modified)
));
if let Some(ref etag) = version.etag {
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
}
xml.push_str(&format!("<Size>{}</Size>", version.size));
xml.push_str("<StorageClass>STANDARD</StorageClass>");
xml.push_str("</Version>");
} }
xml.push_str("</ListVersionsResult>"); xml.push_str("</ListVersionsResult>");
@@ -1182,6 +1360,26 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body:
.to_xml(), .to_xml(),
); );
} }
for tag in &tags {
if tag.key.is_empty() || tag.key.len() > 128 {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag key length must be 1-128").to_xml(),
);
}
if tag.value.len() > 256 {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag value length must be 0-256").to_xml(),
);
}
if tag.key.contains('=') {
return xml_response(
StatusCode::BAD_REQUEST,
S3Error::new(S3ErrorCode::InvalidTag, "Tag keys must not contain '='").to_xml(),
);
}
}
match state.storage.set_object_tags(bucket, key, &tags).await { match state.storage.set_object_tags(bucket, key, &tags).await {
Ok(()) => StatusCode::OK.into_response(), Ok(()) => StatusCode::OK.into_response(),

File diff suppressed because it is too large Load Diff

View File

@@ -117,16 +117,6 @@ pub async fn logout(Extension(session): Extension<SessionHandle>) -> Response {
Redirect::to("/login").into_response() Redirect::to("/login").into_response()
} }
pub async fn csrf_error_page(
State(state): State<AppState>,
Extension(session): Extension<SessionHandle>,
) -> Response {
let ctx = base_context(&session, None);
let mut resp = render(&state, "csrf_error.html", &ctx);
*resp.status_mut() = StatusCode::FORBIDDEN;
resp
}
pub async fn root_redirect() -> Response { pub async fn root_redirect() -> Response {
Redirect::to("/ui/buckets").into_response() Redirect::to("/ui/buckets").into_response()
} }

View File

@@ -121,6 +121,8 @@ fn storage_status(err: &StorageError) -> StatusCode {
| StorageError::ObjectNotFound { .. } | StorageError::ObjectNotFound { .. }
| StorageError::VersionNotFound { .. } | StorageError::VersionNotFound { .. }
| StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND, | StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND,
StorageError::DeleteMarker { .. } => StatusCode::NOT_FOUND,
StorageError::MethodNotAllowed(_) => StatusCode::METHOD_NOT_ALLOWED,
StorageError::InvalidBucketName(_) StorageError::InvalidBucketName(_)
| StorageError::InvalidObjectKey(_) | StorageError::InvalidObjectKey(_)
| StorageError::InvalidRange | StorageError::InvalidRange
@@ -904,6 +906,35 @@ pub struct ListObjectsQuery {
pub prefix: Option<String>, pub prefix: Option<String>,
#[serde(default)] #[serde(default)]
pub start_after: Option<String>, pub start_after: Option<String>,
#[serde(default)]
pub delimiter: Option<String>,
}
fn object_json(bucket_name: &str, o: &myfsio_common::types::ObjectMeta) -> Value {
json!({
"key": o.key,
"size": o.size,
"last_modified": o.last_modified.to_rfc3339(),
"last_modified_iso": o.last_modified.to_rfc3339(),
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
"etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
"content_type": o.content_type.clone().unwrap_or_default(),
"download_url": build_ui_object_url(bucket_name, &o.key, "download"),
"preview_url": build_ui_object_url(bucket_name, &o.key, "preview"),
"delete_endpoint": build_ui_object_url(bucket_name, &o.key, "delete"),
"presign_endpoint": build_ui_object_url(bucket_name, &o.key, "presign"),
"metadata_url": build_ui_object_url(bucket_name, &o.key, "metadata"),
"versions_endpoint": build_ui_object_url(bucket_name, &o.key, "versions"),
"restore_template": format!(
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
bucket_name,
encode_object_key(&o.key)
),
"tags_url": build_ui_object_url(bucket_name, &o.key, "tags"),
"copy_url": build_ui_object_url(bucket_name, &o.key, "copy"),
"move_url": build_ui_object_url(bucket_name, &o.key, "move"),
})
} }
pub async fn list_bucket_objects( pub async fn list_bucket_objects(
@@ -917,6 +948,49 @@ pub async fn list_bucket_objects(
} }
let max_keys = q.max_keys.unwrap_or(1000).min(5000); let max_keys = q.max_keys.unwrap_or(1000).min(5000);
let versioning_enabled = state
.storage
.is_versioning_enabled(&bucket_name)
.await
.unwrap_or(false);
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
let use_shallow = q.delimiter.as_deref() == Some("/");
if use_shallow {
let params = myfsio_common::types::ShallowListParams {
prefix: q.prefix.clone().unwrap_or_default(),
delimiter: "/".to_string(),
max_keys,
continuation_token: q.continuation_token.clone(),
};
return match state
.storage
.list_objects_shallow(&bucket_name, &params)
.await
{
Ok(res) => {
let objects: Vec<Value> = res
.objects
.iter()
.map(|o| object_json(&bucket_name, o))
.collect();
Json(json!({
"versioning_enabled": versioning_enabled,
"total_count": total_count,
"is_truncated": res.is_truncated,
"next_continuation_token": res.next_continuation_token,
"url_templates": url_templates_for(&bucket_name),
"objects": objects,
"common_prefixes": res.common_prefixes,
}))
.into_response()
}
Err(e) => storage_json_error(e),
};
}
let params = ListParams { let params = ListParams {
max_keys, max_keys,
continuation_token: q.continuation_token.clone(), continuation_token: q.continuation_token.clone(),
@@ -924,46 +998,12 @@ pub async fn list_bucket_objects(
start_after: q.start_after.clone(), start_after: q.start_after.clone(),
}; };
let versioning_enabled = state
.storage
.is_versioning_enabled(&bucket_name)
.await
.unwrap_or(false);
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
match state.storage.list_objects(&bucket_name, &params).await { match state.storage.list_objects(&bucket_name, &params).await {
Ok(res) => { Ok(res) => {
let objects: Vec<Value> = res let objects: Vec<Value> = res
.objects .objects
.iter() .iter()
.map(|o| { .map(|o| object_json(&bucket_name, o))
json!({
"key": o.key,
"size": o.size,
"last_modified": o.last_modified.to_rfc3339(),
"last_modified_iso": o.last_modified.to_rfc3339(),
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
"etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
"content_type": o.content_type.clone().unwrap_or_default(),
"download_url": build_ui_object_url(&bucket_name, &o.key, "download"),
"preview_url": build_ui_object_url(&bucket_name, &o.key, "preview"),
"delete_endpoint": build_ui_object_url(&bucket_name, &o.key, "delete"),
"presign_endpoint": build_ui_object_url(&bucket_name, &o.key, "presign"),
"metadata_url": build_ui_object_url(&bucket_name, &o.key, "metadata"),
"versions_endpoint": build_ui_object_url(&bucket_name, &o.key, "versions"),
"restore_template": format!(
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
bucket_name,
encode_object_key(&o.key)
),
"tags_url": build_ui_object_url(&bucket_name, &o.key, "tags"),
"copy_url": build_ui_object_url(&bucket_name, &o.key, "copy"),
"move_url": build_ui_object_url(&bucket_name, &o.key, "move"),
})
})
.collect(); .collect();
Json(json!({ Json(json!({
@@ -1006,19 +1046,38 @@ pub async fn stream_bucket_objects(
let stats = state.storage.bucket_stats(&bucket_name).await.ok(); let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0); let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
let mut lines: Vec<String> = Vec::new(); let use_delimiter = q.delimiter.as_deref() == Some("/");
lines.push( let prefix = q.prefix.clone().unwrap_or_default();
json!({
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(64);
let meta_line = json!({
"type": "meta", "type": "meta",
"url_templates": url_templates_for(&bucket_name), "url_templates": url_templates_for(&bucket_name),
"versioning_enabled": versioning_enabled, "versioning_enabled": versioning_enabled,
}) })
.to_string(), .to_string()
); + "\n";
lines.push(json!({ "type": "count", "total_count": total_count }).to_string()); let count_line = json!({ "type": "count", "total_count": total_count }).to_string() + "\n";
let use_delimiter = q.delimiter.as_deref() == Some("/"); let storage = state.storage.clone();
let prefix = q.prefix.clone().unwrap_or_default(); let bucket = bucket_name.clone();
tokio::spawn(async move {
if tx
.send(Ok(bytes::Bytes::from(meta_line.into_bytes())))
.await
.is_err()
{
return;
}
if tx
.send(Ok(bytes::Bytes::from(count_line.into_bytes())))
.await
.is_err()
{
return;
}
if use_delimiter { if use_delimiter {
let mut token: Option<String> = None; let mut token: Option<String> = None;
@@ -1029,18 +1088,20 @@ pub async fn stream_bucket_objects(
max_keys: UI_OBJECT_BROWSER_MAX_KEYS, max_keys: UI_OBJECT_BROWSER_MAX_KEYS,
continuation_token: token.clone(), continuation_token: token.clone(),
}; };
match state match storage.list_objects_shallow(&bucket, &params).await {
.storage
.list_objects_shallow(&bucket_name, &params)
.await
{
Ok(res) => { Ok(res) => {
for p in &res.common_prefixes { for p in &res.common_prefixes {
lines.push(json!({ "type": "folder", "prefix": p }).to_string()); let line = json!({ "type": "folder", "prefix": p }).to_string() + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
} }
for o in &res.objects { for o in &res.objects {
lines.push( let line = json!({
json!({
"type": "object", "type": "object",
"key": o.key, "key": o.key,
"size": o.size, "size": o.size,
@@ -1050,8 +1111,15 @@ pub async fn stream_bucket_objects(
"etag": o.etag.clone().unwrap_or_default(), "etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
}) })
.to_string(), .to_string()
); + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
} }
if !res.is_truncated || res.next_continuation_token.is_none() { if !res.is_truncated || res.next_continuation_token.is_none() {
break; break;
@@ -1059,8 +1127,10 @@ pub async fn stream_bucket_objects(
token = res.next_continuation_token; token = res.next_continuation_token;
} }
Err(e) => { Err(e) => {
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()); let line =
break; json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
return;
} }
} }
} }
@@ -1077,11 +1147,10 @@ pub async fn stream_bucket_objects(
}, },
start_after: None, start_after: None,
}; };
match state.storage.list_objects(&bucket_name, &params).await { match storage.list_objects(&bucket, &params).await {
Ok(res) => { Ok(res) => {
for o in &res.objects { for o in &res.objects {
lines.push( let line = json!({
json!({
"type": "object", "type": "object",
"key": o.key, "key": o.key,
"size": o.size, "size": o.size,
@@ -1091,8 +1160,15 @@ pub async fn stream_bucket_objects(
"etag": o.etag.clone().unwrap_or_default(), "etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
}) })
.to_string(), .to_string()
); + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
} }
if !res.is_truncated || res.next_continuation_token.is_none() { if !res.is_truncated || res.next_continuation_token.is_none() {
break; break;
@@ -1100,21 +1176,32 @@ pub async fn stream_bucket_objects(
token = res.next_continuation_token; token = res.next_continuation_token;
} }
Err(e) => { Err(e) => {
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()); let line =
break; json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
return;
} }
} }
} }
} }
lines.push(json!({ "type": "done" }).to_string()); let done_line = json!({ "type": "done" }).to_string() + "\n";
let _ = tx
.send(Ok(bytes::Bytes::from(done_line.into_bytes())))
.await;
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let body = Body::from_stream(stream);
let body = lines.join("\n") + "\n";
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert( headers.insert(
header::CONTENT_TYPE, header::CONTENT_TYPE,
"application/x-ndjson; charset=utf-8".parse().unwrap(), "application/x-ndjson; charset=utf-8".parse().unwrap(),
); );
headers.insert(header::CACHE_CONTROL, "no-cache".parse().unwrap());
headers.insert("x-accel-buffering", "no".parse().unwrap());
(StatusCode::OK, headers, body).into_response() (StatusCode::OK, headers, body).into_response()
} }
@@ -2514,7 +2601,7 @@ async fn move_object_json(state: &AppState, bucket: &str, key: &str, body: Body)
match state.storage.copy_object(bucket, key, dest_bucket, dest_key).await { match state.storage.copy_object(bucket, key, dest_bucket, dest_key).await {
Ok(_) => match state.storage.delete_object(bucket, key).await { Ok(_) => match state.storage.delete_object(bucket, key).await {
Ok(()) => { Ok(_) => {
super::trigger_replication(state, dest_bucket, dest_key, "write"); super::trigger_replication(state, dest_bucket, dest_key, "write");
super::trigger_replication(state, bucket, key, "delete"); super::trigger_replication(state, bucket, key, "delete");
Json(json!({ Json(json!({
@@ -2589,7 +2676,7 @@ async fn delete_object_json(
} }
match state.storage.delete_object(bucket, key).await { match state.storage.delete_object(bucket, key).await {
Ok(()) => { Ok(_) => {
super::trigger_replication(state, bucket, key, "delete"); super::trigger_replication(state, bucket, key, "delete");
Json(json!({ Json(json!({
"status": "ok", "status": "ok",
@@ -2868,7 +2955,7 @@ pub async fn bulk_delete_objects(
for key in keys { for key in keys {
match state.storage.delete_object(&bucket_name, &key).await { match state.storage.delete_object(&bucket_name, &key).await {
Ok(()) => { Ok(_) => {
super::trigger_replication(&state, &bucket_name, &key, "delete"); super::trigger_replication(&state, &bucket_name, &key, "delete");
if payload.purge_versions { if payload.purge_versions {
if let Err(err) = if let Err(err) =

View File

@@ -227,9 +227,7 @@ async fn parse_form_any(
if is_multipart { if is_multipart {
let boundary = multer::parse_boundary(&content_type) let boundary = multer::parse_boundary(&content_type)
.map_err(|_| "Missing multipart boundary".to_string())?; .map_err(|_| "Missing multipart boundary".to_string())?;
let stream = futures::stream::once(async move { let stream = futures::stream::once(async move { Ok::<_, std::io::Error>(bytes) });
Ok::<_, std::io::Error>(bytes)
});
let mut multipart = multer::Multipart::new(stream, boundary); let mut multipart = multer::Multipart::new(stream, boundary);
let mut out = HashMap::new(); let mut out = HashMap::new();
while let Some(field) = multipart while let Some(field) = multipart
@@ -2173,10 +2171,7 @@ pub async fn create_bucket(
let wants_json = wants_json(&headers); let wants_json = wants_json(&headers);
let form = match parse_form_any(&headers, body).await { let form = match parse_form_any(&headers, body).await {
Ok(fields) => CreateBucketForm { Ok(fields) => CreateBucketForm {
bucket_name: fields bucket_name: fields.get("bucket_name").cloned().unwrap_or_default(),
.get("bucket_name")
.cloned()
.unwrap_or_default(),
csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(), csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(),
}, },
Err(message) => { Err(message) => {

View File

@@ -304,8 +304,7 @@ pub fn create_ui_router(state: state::AppState) -> Router {
let public = Router::new() let public = Router::new()
.route("/login", get(ui::login_page).post(ui::login_submit)) .route("/login", get(ui::login_page).post(ui::login_submit))
.route("/logout", post(ui::logout).get(ui::logout)) .route("/logout", post(ui::logout).get(ui::logout));
.route("/csrf-error", get(ui::csrf_error_page));
let session_state = middleware::SessionLayerState { let session_state = middleware::SessionLayerState {
store: state.sessions.clone(), store: state.sessions.clone(),
@@ -317,7 +316,10 @@ pub fn create_ui_router(state: state::AppState) -> Router {
protected protected
.merge(public) .merge(public)
.fallback(ui::not_found_page) .fallback(ui::not_found_page)
.layer(axum::middleware::from_fn(middleware::csrf_layer)) .layer(axum::middleware::from_fn_with_state(
state.clone(),
middleware::csrf_layer,
))
.layer(axum::middleware::from_fn_with_state( .layer(axum::middleware::from_fn_with_state(
session_state, session_state,
middleware::session_layer, middleware::session_layer,
@@ -333,8 +335,12 @@ pub fn create_ui_router(state: state::AppState) -> Router {
} }
pub fn create_router(state: state::AppState) -> Router { pub fn create_router(state: state::AppState) -> Router {
let default_rate_limit = middleware::RateLimitLayerState::new( let default_rate_limit = middleware::RateLimitLayerState::with_per_op(
state.config.ratelimit_default, state.config.ratelimit_default,
state.config.ratelimit_list_buckets,
state.config.ratelimit_bucket_ops,
state.config.ratelimit_object_ops,
state.config.ratelimit_head_ops,
state.config.num_trusted_proxies, state.config.num_trusted_proxies,
); );
let admin_rate_limit = middleware::RateLimitLayerState::new( let admin_rate_limit = middleware::RateLimitLayerState::new(
@@ -575,11 +581,22 @@ pub fn create_router(state: state::AppState) -> Router {
middleware::rate_limit_layer, middleware::rate_limit_layer,
)); ));
let request_body_timeout =
std::time::Duration::from_secs(state.config.request_body_timeout_secs);
api_router api_router
.merge(admin_router) .merge(admin_router)
.layer(axum::middleware::from_fn(middleware::server_header)) .layer(axum::middleware::from_fn(middleware::server_header))
.layer(cors_layer(&state.config)) .layer(cors_layer(&state.config))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
middleware::bucket_cors_layer,
))
.layer(axum::middleware::from_fn(middleware::request_log_layer))
.layer(tower_http::compression::CompressionLayer::new()) .layer(tower_http::compression::CompressionLayer::new())
.layer(tower_http::timeout::RequestBodyTimeoutLayer::new(
request_body_timeout,
))
.with_state(state) .with_state(state)
} }

View File

@@ -189,6 +189,11 @@ async fn main() {
let shutdown = shutdown_signal_shared(); let shutdown = shutdown_signal_shared();
let api_shutdown = shutdown.clone(); let api_shutdown = shutdown.clone();
let api_listener = axum::serve::ListenerExt::tap_io(api_listener, |stream| {
if let Err(err) = stream.set_nodelay(true) {
tracing::trace!("failed to set TCP_NODELAY on api socket: {}", err);
}
});
let api_task = tokio::spawn(async move { let api_task = tokio::spawn(async move {
axum::serve( axum::serve(
api_listener, api_listener,
@@ -202,6 +207,11 @@ async fn main() {
let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) { let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) {
let ui_shutdown = shutdown.clone(); let ui_shutdown = shutdown.clone();
let listener = axum::serve::ListenerExt::tap_io(listener, |stream| {
if let Err(err) = stream.set_nodelay(true) {
tracing::trace!("failed to set TCP_NODELAY on ui socket: {}", err);
}
});
Some(tokio::spawn(async move { Some(tokio::spawn(async move {
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(async move { .with_graceful_shutdown(async move {

View File

@@ -12,9 +12,36 @@ use serde_json::Value;
use std::time::Instant; use std::time::Instant;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody};
use crate::services::acl::acl_from_bucket_config; use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState; use crate::state::AppState;
fn wrap_body_for_sha256_verification(req: &mut Request) {
let declared = match req
.headers()
.get("x-amz-content-sha256")
.and_then(|v| v.to_str().ok())
{
Some(v) => v.to_string(),
None => return,
};
if !is_hex_sha256(&declared) {
return;
}
let is_chunked = req
.headers()
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_ascii_lowercase().contains("aws-chunked"))
.unwrap_or(false);
if is_chunked {
return;
}
let body = std::mem::replace(req.body_mut(), axum::body::Body::empty());
let wrapped = Sha256VerifyBody::new(body, declared);
*req.body_mut() = axum::body::Body::new(wrapped);
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct OriginalCanonicalPath(String); struct OriginalCanonicalPath(String);
@@ -475,6 +502,7 @@ pub async fn auth_layer(State(state): State<AppState>, mut req: Request, next: N
error_response(err, &auth_path) error_response(err, &auth_path)
} else { } else {
req.extensions_mut().insert(principal); req.extensions_mut().insert(principal);
wrap_body_for_sha256_verification(&mut req);
next.run(req).await next.run(req).await
} }
} }
@@ -1102,7 +1130,9 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
let parts: Vec<&str> = auth_str let parts: Vec<&str> = auth_str
.strip_prefix("AWS4-HMAC-SHA256 ") .strip_prefix("AWS4-HMAC-SHA256 ")
.unwrap() .unwrap()
.split(", ") .split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect(); .collect();
if parts.len() != 3 { if parts.len() != 3 {
@@ -1112,9 +1142,24 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
)); ));
} }
let credential = parts[0].strip_prefix("Credential=").unwrap_or(""); let mut credential: &str = "";
let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or(""); let mut signed_headers_str: &str = "";
let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or(""); let mut provided_signature: &str = "";
for part in &parts {
if let Some(v) = part.strip_prefix("Credential=") {
credential = v;
} else if let Some(v) = part.strip_prefix("SignedHeaders=") {
signed_headers_str = v;
} else if let Some(v) = part.strip_prefix("Signature=") {
provided_signature = v;
}
}
if credential.is_empty() || signed_headers_str.is_empty() || provided_signature.is_empty() {
return AuthResult::Denied(S3Error::new(
S3ErrorCode::InvalidArgument,
"Malformed Authorization header",
));
}
let cred_parts: Vec<&str> = credential.split('/').collect(); let cred_parts: Vec<&str> = credential.split('/').collect();
if cred_parts.len() != 5 { if cred_parts.len() != 5 {
@@ -1299,7 +1344,7 @@ fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult {
} }
if elapsed < -(state.config.sigv4_timestamp_tolerance_secs as i64) { if elapsed < -(state.config.sigv4_timestamp_tolerance_secs as i64) {
return AuthResult::Denied(S3Error::new( return AuthResult::Denied(S3Error::new(
S3ErrorCode::AccessDenied, S3ErrorCode::RequestTimeTooSkewed,
"Request is too far in the future", "Request is too far in the future",
)); ));
} }
@@ -1369,8 +1414,11 @@ fn check_timestamp_freshness(amz_date: &str, tolerance_secs: u64) -> Option<S3Er
if diff > tolerance_secs { if diff > tolerance_secs {
return Some(S3Error::new( return Some(S3Error::new(
S3ErrorCode::AccessDenied, S3ErrorCode::RequestTimeTooSkewed,
"Request timestamp too old or too far in the future", format!(
"The difference between the request time and the server's time is too large ({}s, tolerance {}s)",
diff, tolerance_secs
),
)); ));
} }
None None

View File

@@ -0,0 +1,281 @@
use axum::extract::{Request, State};
use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use myfsio_storage::traits::StorageEngine;
use crate::state::AppState;
#[derive(Debug, Default, Clone)]
struct CorsRule {
allowed_origins: Vec<String>,
allowed_methods: Vec<String>,
allowed_headers: Vec<String>,
expose_headers: Vec<String>,
max_age_seconds: Option<u64>,
}
fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
let doc = match roxmltree::Document::parse(xml) {
Ok(d) => d,
Err(_) => return Vec::new(),
};
let mut rules = Vec::new();
for rule_node in doc
.descendants()
.filter(|node| node.is_element() && node.tag_name().name() == "CORSRule")
{
let mut rule = CorsRule::default();
for child in rule_node.children().filter(|n| n.is_element()) {
let text = child.text().unwrap_or("").trim().to_string();
match child.tag_name().name() {
"AllowedOrigin" => rule.allowed_origins.push(text),
"AllowedMethod" => rule.allowed_methods.push(text.to_ascii_uppercase()),
"AllowedHeader" => rule.allowed_headers.push(text),
"ExposeHeader" => rule.expose_headers.push(text),
"MaxAgeSeconds" => {
if let Ok(v) = text.parse::<u64>() {
rule.max_age_seconds = Some(v);
}
}
_ => {}
}
}
rules.push(rule);
}
rules
}
fn match_origin(pattern: &str, origin: &str) -> bool {
if pattern == "*" {
return true;
}
if pattern == origin {
return true;
}
if let Some(suffix) = pattern.strip_prefix('*') {
return origin.ends_with(suffix);
}
if let Some(prefix) = pattern.strip_suffix('*') {
return origin.starts_with(prefix);
}
false
}
fn match_header(pattern: &str, header: &str) -> bool {
if pattern == "*" {
return true;
}
pattern.eq_ignore_ascii_case(header)
}
fn find_matching_rule<'a>(
rules: &'a [CorsRule],
origin: &str,
method: &str,
request_headers: &[&str],
) -> Option<&'a CorsRule> {
rules.iter().find(|rule| {
let origin_match = rule
.allowed_origins
.iter()
.any(|p| match_origin(p, origin));
if !origin_match {
return false;
}
let method_match = rule
.allowed_methods
.iter()
.any(|m| m.eq_ignore_ascii_case(method));
if !method_match {
return false;
}
request_headers.iter().all(|h| {
rule.allowed_headers
.iter()
.any(|pattern| match_header(pattern, h))
})
})
}
fn find_matching_rule_for_actual<'a>(
rules: &'a [CorsRule],
origin: &str,
method: &str,
) -> Option<&'a CorsRule> {
rules.iter().find(|rule| {
rule.allowed_origins
.iter()
.any(|p| match_origin(p, origin))
&& rule
.allowed_methods
.iter()
.any(|m| m.eq_ignore_ascii_case(method))
})
}
fn bucket_from_path(path: &str) -> Option<&str> {
let trimmed = path.trim_start_matches('/');
if trimmed.is_empty() {
return None;
}
if trimmed.starts_with("admin/")
|| trimmed.starts_with("myfsio/")
|| trimmed.starts_with("kms/")
{
return None;
}
let first = trimmed.split('/').next().unwrap_or("");
if myfsio_storage::validation::validate_bucket_name(first).is_some() {
return None;
}
Some(first)
}
async fn bucket_from_host(state: &AppState, headers: &HeaderMap) -> Option<String> {
let host = headers
.get("host")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.split(':').next())?
.trim()
.to_ascii_lowercase();
let (candidate, _) = host.split_once('.')?;
if myfsio_storage::validation::validate_bucket_name(candidate).is_some() {
return None;
}
match state.storage.bucket_exists(candidate).await {
Ok(true) => Some(candidate.to_string()),
_ => None,
}
}
async fn resolve_bucket(state: &AppState, headers: &HeaderMap, path: &str) -> Option<String> {
if let Some(name) = bucket_from_host(state, headers).await {
return Some(name);
}
bucket_from_path(path).map(str::to_string)
}
fn apply_rule_headers(headers: &mut axum::http::HeaderMap, rule: &CorsRule, origin: &str) {
headers.remove("access-control-allow-origin");
headers.remove("vary");
if let Ok(val) = HeaderValue::from_str(origin) {
headers.insert("access-control-allow-origin", val);
}
headers.insert("vary", HeaderValue::from_static("Origin"));
if !rule.expose_headers.is_empty() {
let value = rule.expose_headers.join(", ");
if let Ok(val) = HeaderValue::from_str(&value) {
headers.remove("access-control-expose-headers");
headers.insert("access-control-expose-headers", val);
}
}
}
fn strip_cors_response_headers(headers: &mut HeaderMap) {
headers.remove("access-control-allow-origin");
headers.remove("access-control-allow-credentials");
headers.remove("access-control-expose-headers");
headers.remove("access-control-allow-methods");
headers.remove("access-control-allow-headers");
headers.remove("access-control-max-age");
}
pub async fn bucket_cors_layer(
State(state): State<AppState>,
req: Request,
next: Next,
) -> Response {
let path = req.uri().path().to_string();
let bucket = match resolve_bucket(&state, req.headers(), &path).await {
Some(name) => name,
None => return next.run(req).await,
};
let origin = req
.headers()
.get("origin")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let bucket_rules = if origin.is_some() {
match state.storage.get_bucket_config(&bucket).await {
Ok(cfg) => cfg
.cors
.as_ref()
.map(|v| match v {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
})
.map(|xml| parse_cors_config(&xml))
.filter(|rules| !rules.is_empty()),
Err(_) => None,
}
} else {
None
};
let is_preflight = req.method() == Method::OPTIONS
&& req.headers().contains_key("access-control-request-method");
if is_preflight {
if let (Some(origin), Some(rules)) = (origin.as_deref(), bucket_rules.as_ref()) {
let req_method = req
.headers()
.get("access-control-request-method")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let req_headers_raw = req
.headers()
.get("access-control-request-headers")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let req_headers: Vec<&str> = req_headers_raw
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
if let Some(rule) = find_matching_rule(rules, origin, req_method, &req_headers) {
let mut resp = StatusCode::NO_CONTENT.into_response();
apply_rule_headers(resp.headers_mut(), rule, origin);
let methods_value = rule.allowed_methods.join(", ");
if let Ok(val) = HeaderValue::from_str(&methods_value) {
resp.headers_mut()
.insert("access-control-allow-methods", val);
}
let headers_value = if rule.allowed_headers.iter().any(|h| h == "*") {
req_headers_raw.to_string()
} else {
rule.allowed_headers.join(", ")
};
if !headers_value.is_empty() {
if let Ok(val) = HeaderValue::from_str(&headers_value) {
resp.headers_mut()
.insert("access-control-allow-headers", val);
}
}
if let Some(max_age) = rule.max_age_seconds {
if let Ok(val) = HeaderValue::from_str(&max_age.to_string()) {
resp.headers_mut().insert("access-control-max-age", val);
}
}
return resp;
}
return (StatusCode::FORBIDDEN, "CORSResponse: CORS is not enabled").into_response();
}
}
let method = req.method().clone();
let mut resp = next.run(req).await;
if let (Some(origin), Some(rules)) = (origin.as_deref(), bucket_rules.as_ref()) {
if let Some(rule) = find_matching_rule_for_actual(rules, origin, method.as_str()) {
apply_rule_headers(resp.headers_mut(), rule, origin);
} else {
strip_cors_response_headers(resp.headers_mut());
}
}
resp
}

View File

@@ -1,8 +1,11 @@
mod auth; mod auth;
mod bucket_cors;
pub mod ratelimit; pub mod ratelimit;
pub mod session; pub mod session;
pub(crate) mod sha_body;
pub use auth::auth_layer; pub use auth::auth_layer;
pub use bucket_cors::bucket_cors_layer;
pub use ratelimit::{rate_limit_layer, RateLimitLayerState}; pub use ratelimit::{rate_limit_layer, RateLimitLayerState};
pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState}; pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState};
@@ -20,6 +23,42 @@ pub async fn server_header(req: Request, next: Next) -> Response {
resp resp
} }
pub async fn request_log_layer(req: Request, next: Next) -> Response {
let start = Instant::now();
let method = req.method().clone();
let uri = req.uri().clone();
let version = req.version();
let remote = req
.extensions()
.get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
.map(|ci| ci.0.ip().to_string())
.unwrap_or_else(|| "-".to_string());
let response = next.run(req).await;
let status = response.status().as_u16();
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
let bytes_out = response
.headers()
.get(axum::http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok());
tracing::info!(
target: "myfsio::access",
remote = %remote,
method = %method,
uri = %uri,
version = ?version,
status,
bytes_out = bytes_out.unwrap_or(0),
elapsed_ms = format!("{:.3}", elapsed_ms),
"request"
);
response
}
pub async fn ui_metrics_layer(State(state): State<AppState>, req: Request, next: Next) -> Response { pub async fn ui_metrics_layer(State(state): State<AppState>, req: Request, next: Next) -> Response {
let metrics = match state.metrics.clone() { let metrics = match state.metrics.clone() {
Some(m) => m, Some(m) => m,

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use axum::extract::{ConnectInfo, Request, State}; use axum::extract::{ConnectInfo, Request, State};
use axum::http::{header, StatusCode}; use axum::http::{header, Method, StatusCode};
use axum::middleware::Next; use axum::middleware::Next;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -13,17 +13,77 @@ use crate::config::RateLimitSetting;
#[derive(Clone)] #[derive(Clone)]
pub struct RateLimitLayerState { pub struct RateLimitLayerState {
limiter: Arc<FixedWindowLimiter>, default_limiter: Arc<FixedWindowLimiter>,
list_buckets_limiter: Option<Arc<FixedWindowLimiter>>,
bucket_ops_limiter: Option<Arc<FixedWindowLimiter>>,
object_ops_limiter: Option<Arc<FixedWindowLimiter>>,
head_ops_limiter: Option<Arc<FixedWindowLimiter>>,
num_trusted_proxies: usize, num_trusted_proxies: usize,
} }
impl RateLimitLayerState { impl RateLimitLayerState {
pub fn new(setting: RateLimitSetting, num_trusted_proxies: usize) -> Self { pub fn new(setting: RateLimitSetting, num_trusted_proxies: usize) -> Self {
Self { Self {
limiter: Arc::new(FixedWindowLimiter::new(setting)), default_limiter: Arc::new(FixedWindowLimiter::new(setting)),
list_buckets_limiter: None,
bucket_ops_limiter: None,
object_ops_limiter: None,
head_ops_limiter: None,
num_trusted_proxies, num_trusted_proxies,
} }
} }
pub fn with_per_op(
default: RateLimitSetting,
list_buckets: RateLimitSetting,
bucket_ops: RateLimitSetting,
object_ops: RateLimitSetting,
head_ops: RateLimitSetting,
num_trusted_proxies: usize,
) -> Self {
Self {
default_limiter: Arc::new(FixedWindowLimiter::new(default)),
list_buckets_limiter: (list_buckets != default)
.then(|| Arc::new(FixedWindowLimiter::new(list_buckets))),
bucket_ops_limiter: (bucket_ops != default)
.then(|| Arc::new(FixedWindowLimiter::new(bucket_ops))),
object_ops_limiter: (object_ops != default)
.then(|| Arc::new(FixedWindowLimiter::new(object_ops))),
head_ops_limiter: (head_ops != default)
.then(|| Arc::new(FixedWindowLimiter::new(head_ops))),
num_trusted_proxies,
}
}
fn select_limiter(&self, req: &Request) -> &Arc<FixedWindowLimiter> {
let path = req.uri().path();
let method = req.method();
if path == "/" && *method == Method::GET {
if let Some(ref limiter) = self.list_buckets_limiter {
return limiter;
}
}
let segments: Vec<&str> = path
.trim_start_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
if *method == Method::HEAD {
if let Some(ref limiter) = self.head_ops_limiter {
return limiter;
}
}
if segments.len() == 1 {
if let Some(ref limiter) = self.bucket_ops_limiter {
return limiter;
}
} else if segments.len() >= 2 {
if let Some(ref limiter) = self.object_ops_limiter {
return limiter;
}
}
&self.default_limiter
}
} }
#[derive(Debug)] #[derive(Debug)]
@@ -99,22 +159,34 @@ pub async fn rate_limit_layer(
next: Next, next: Next,
) -> Response { ) -> Response {
let key = rate_limit_key(&req, state.num_trusted_proxies); let key = rate_limit_key(&req, state.num_trusted_proxies);
match state.limiter.check(&key) { let limiter = state.select_limiter(&req);
match limiter.check(&key) {
Ok(()) => next.run(req).await, Ok(()) => next.run(req).await,
Err(retry_after) => too_many_requests(retry_after), Err(retry_after) => {
let resource = req.uri().path().to_string();
too_many_requests(retry_after, &resource)
}
} }
} }
fn too_many_requests(retry_after: u64) -> Response { fn too_many_requests(retry_after: u64, resource: &str) -> Response {
( let request_id = uuid::Uuid::new_v4().simple().to_string();
StatusCode::TOO_MANY_REQUESTS, let body = myfsio_xml::response::rate_limit_exceeded_xml(resource, &request_id);
let mut response = (
StatusCode::SERVICE_UNAVAILABLE,
[ [
(header::CONTENT_TYPE, "application/xml".to_string()), (header::CONTENT_TYPE, "application/xml".to_string()),
(header::RETRY_AFTER, retry_after.to_string()), (header::RETRY_AFTER, retry_after.to_string()),
], ],
myfsio_xml::response::rate_limit_exceeded_xml(), body,
) )
.into_response() .into_response();
if let Ok(value) = request_id.parse() {
response
.headers_mut()
.insert("x-amz-request-id", value);
}
response
} }
fn rate_limit_key(req: &Request, num_trusted_proxies: usize) -> String { fn rate_limit_key(req: &Request, num_trusted_proxies: usize) -> String {

View File

@@ -90,7 +90,11 @@ pub async fn session_layer(
resp resp
} }
pub async fn csrf_layer(req: Request, next: Next) -> Response { pub async fn csrf_layer(
State(state): State<crate::state::AppState>,
req: Request,
next: Next,
) -> Response {
const CSRF_HEADER_ALIAS: &str = "x-csrftoken"; const CSRF_HEADER_ALIAS: &str = "x-csrftoken";
let method = req.method().clone(); let method = req.method().clone();
@@ -169,7 +173,32 @@ pub async fn csrf_layer(req: Request, next: Next) -> Response {
header_present = header_token.is_some(), header_present = header_token.is_some(),
"CSRF token mismatch" "CSRF token mismatch"
); );
(StatusCode::FORBIDDEN, "Invalid CSRF token").into_response()
let accept = parts
.headers
.get(header::ACCEPT)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let is_form_submit = content_type.starts_with("application/x-www-form-urlencoded")
|| content_type.starts_with("multipart/form-data");
let wants_json =
accept.contains("application/json") || content_type.starts_with("application/json");
if is_form_submit && !wants_json {
let ctx = crate::handlers::ui::base_context(&handle, None);
let mut resp = crate::handlers::ui::render(&state, "csrf_error.html", &ctx);
*resp.status_mut() = StatusCode::FORBIDDEN;
return resp;
}
let mut resp = (
StatusCode::FORBIDDEN,
[(header::CONTENT_TYPE, "application/json")],
r#"{"error":"Invalid CSRF token"}"#,
)
.into_response();
*resp.status_mut() = StatusCode::FORBIDDEN;
resp
} }
fn extract_multipart_token(content_type: &str, body: &[u8]) -> Option<String> { fn extract_multipart_token(content_type: &str, body: &[u8]) -> Option<String> {

View File

@@ -0,0 +1,107 @@
use axum::body::Body;
use bytes::Bytes;
use http_body::{Body as HttpBody, Frame};
use sha2::{Digest, Sha256};
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
struct Sha256MismatchError {
expected: String,
computed: String,
}
impl Sha256MismatchError {
fn message(&self) -> String {
format!(
"The x-amz-content-sha256 you specified did not match what we received (expected {}, computed {})",
self.expected, self.computed
)
}
}
impl fmt::Display for Sha256MismatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"XAmzContentSHA256Mismatch: expected {}, computed {}",
self.expected, self.computed
)
}
}
impl Error for Sha256MismatchError {}
pub struct Sha256VerifyBody {
inner: Body,
expected: String,
hasher: Option<Sha256>,
}
impl Sha256VerifyBody {
pub fn new(inner: Body, expected_hex: String) -> Self {
Self {
inner,
expected: expected_hex.to_ascii_lowercase(),
hasher: Some(Sha256::new()),
}
}
}
impl HttpBody for Sha256VerifyBody {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync>;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.as_mut().get_mut();
match Pin::new(&mut this.inner).poll_frame(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
if let Some(h) = this.hasher.as_mut() {
h.update(data);
}
}
Poll::Ready(Some(Ok(frame)))
}
Poll::Ready(None) => {
if let Some(hasher) = this.hasher.take() {
let computed = hex::encode(hasher.finalize());
if computed != this.expected {
return Poll::Ready(Some(Err(Box::new(Sha256MismatchError {
expected: this.expected.clone(),
computed,
}))));
}
}
Poll::Ready(None)
}
}
}
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}
pub fn is_hex_sha256(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
pub fn sha256_mismatch_message(err: &(dyn Error + 'static)) -> Option<String> {
if let Some(mismatch) = err.downcast_ref::<Sha256MismatchError>() {
return Some(mismatch.message());
}
err.source().and_then(sha256_mismatch_message)
}

View File

@@ -1,5 +1,7 @@
use axum::body::Body; use axum::body::Body;
use axum::http::{Method, Request, StatusCode}; use axum::http::{Method, Request, StatusCode};
use base64::engine::general_purpose::URL_SAFE;
use base64::Engine;
use http_body_util::BodyExt; use http_body_util::BodyExt;
use myfsio_storage::traits::{AsyncReadStream, StorageEngine}; use myfsio_storage::traits::{AsyncReadStream, StorageEngine};
use serde_json::Value; use serde_json::Value;
@@ -53,6 +55,7 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te
ui_enabled: false, ui_enabled: false,
templates_dir: std::path::PathBuf::from("templates"), templates_dir: std::path::PathBuf::from("templates"),
static_dir: std::path::PathBuf::from("static"), static_dir: std::path::PathBuf::from("static"),
multipart_min_part_size: 1,
..myfsio_server::config::ServerConfig::default() ..myfsio_server::config::ServerConfig::default()
}; };
let state = myfsio_server::state::AppState::new(config); let state = myfsio_server::state::AppState::new(config);
@@ -118,6 +121,10 @@ fn test_app_with_rate_limits(
storage_root: tmp.path().to_path_buf(), storage_root: tmp.path().to_path_buf(),
iam_config_path: iam_path.join("iam.json"), iam_config_path: iam_path.join("iam.json"),
ratelimit_default: default, ratelimit_default: default,
ratelimit_list_buckets: default,
ratelimit_bucket_ops: default,
ratelimit_object_ops: default,
ratelimit_head_ops: default,
ratelimit_admin: admin, ratelimit_admin: admin,
ui_enabled: false, ui_enabled: false,
..myfsio_server::config::ServerConfig::default() ..myfsio_server::config::ServerConfig::default()
@@ -156,7 +163,7 @@ async fn rate_limit_default_and_admin_are_independent() {
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(second.status(), StatusCode::TOO_MANY_REQUESTS); assert_eq!(second.status(), StatusCode::SERVICE_UNAVAILABLE);
assert!(second.headers().contains_key("retry-after")); assert!(second.headers().contains_key("retry-after"));
let admin_first = app let admin_first = app
@@ -192,7 +199,7 @@ async fn rate_limit_default_and_admin_are_independent() {
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(admin_third.status(), StatusCode::TOO_MANY_REQUESTS); assert_eq!(admin_third.status(), StatusCode::SERVICE_UNAVAILABLE);
} }
fn test_ui_state() -> (myfsio_server::state::AppState, tempfile::TempDir) { fn test_ui_state() -> (myfsio_server::state::AppState, tempfile::TempDir) {
@@ -2202,6 +2209,42 @@ async fn test_bucket_versioning() {
) )
.unwrap(); .unwrap();
assert!(body.contains("VersioningConfiguration")); assert!(body.contains("VersioningConfiguration"));
assert!(!body.contains("<Status>Enabled</Status>"));
assert!(!body.contains("<Status>Suspended</Status>"));
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/ver-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Suspended</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
let resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/ver-bucket?versioning",
Body::empty(),
))
.await
.unwrap();
let body = String::from_utf8(
resp.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(body.contains("<Status>Suspended</Status>")); assert!(body.contains("<Status>Suspended</Status>"));
app.clone() app.clone()
@@ -2304,9 +2347,16 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() {
) )
.unwrap(); .unwrap();
let archived_version_id = list_body let archived_version_id = list_body
.split("<Version>")
.skip(1)
.find(|block| block.contains("<IsLatest>false</IsLatest>"))
.and_then(|block| {
block
.split("<VersionId>") .split("<VersionId>")
.filter_map(|part| part.split_once("</VersionId>").map(|(id, _)| id)) .nth(1)
.find(|id| *id != "null") .and_then(|s| s.split_once("</VersionId>").map(|(id, _)| id))
})
.filter(|id| *id != "null")
.expect("archived version id") .expect("archived version id")
.to_string(); .to_string();
@@ -2394,6 +2444,457 @@ async fn test_versioned_object_can_be_read_and_deleted_by_version_id() {
assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND); assert_eq!(missing_resp.status(), StatusCode::NOT_FOUND);
} }
#[tokio::test]
async fn test_versioned_put_and_delete_emit_version_headers_and_delete_markers() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/compat-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/compat-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
let put_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/compat-bucket/doc.txt",
Body::from("first"),
))
.await
.unwrap();
assert_eq!(put_resp.status(), StatusCode::OK);
let first_version = put_resp
.headers()
.get("x-amz-version-id")
.expect("PUT on versioned bucket must emit x-amz-version-id")
.to_str()
.unwrap()
.to_string();
assert!(!first_version.is_empty());
let overwrite_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/compat-bucket/doc.txt",
Body::from("second"),
))
.await
.unwrap();
assert_eq!(overwrite_resp.status(), StatusCode::OK);
let second_version = overwrite_resp
.headers()
.get("x-amz-version-id")
.expect("overwrite on versioned bucket must emit a new x-amz-version-id")
.to_str()
.unwrap()
.to_string();
assert_ne!(first_version, second_version);
let delete_resp = app
.clone()
.oneshot(signed_request(
Method::DELETE,
"/compat-bucket/doc.txt",
Body::empty(),
))
.await
.unwrap();
assert_eq!(delete_resp.status(), StatusCode::NO_CONTENT);
assert_eq!(
delete_resp
.headers()
.get("x-amz-delete-marker")
.and_then(|v| v.to_str().ok()),
Some("true")
);
assert!(delete_resp.headers().contains_key("x-amz-version-id"));
let versions_resp = app
.oneshot(signed_request(
Method::GET,
"/compat-bucket?versions",
Body::empty(),
))
.await
.unwrap();
let versions_body = String::from_utf8(
versions_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(
versions_body.contains("<DeleteMarker>"),
"expected DeleteMarker entry in ListObjectVersions output, got: {}",
versions_body
);
}
#[tokio::test]
async fn test_consecutive_slashes_in_key_round_trip() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/slashes-bucket", Body::empty()))
.await
.unwrap();
let put_ab = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/slashes-bucket/a/b",
Body::from("single"),
))
.await
.unwrap();
assert_eq!(put_ab.status(), StatusCode::OK);
let put_double = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/slashes-bucket/a//b",
Body::from("double"),
))
.await
.unwrap();
assert_eq!(put_double.status(), StatusCode::OK);
let put_triple = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/slashes-bucket/a///b",
Body::from("triple"),
))
.await
.unwrap();
assert_eq!(put_triple.status(), StatusCode::OK);
let get_ab = app
.clone()
.oneshot(signed_request(
Method::GET,
"/slashes-bucket/a/b",
Body::empty(),
))
.await
.unwrap();
assert_eq!(get_ab.status(), StatusCode::OK);
let body_ab = get_ab.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body_ab[..], b"single");
let get_triple = app
.clone()
.oneshot(signed_request(
Method::GET,
"/slashes-bucket/a///b",
Body::empty(),
))
.await
.unwrap();
assert_eq!(get_triple.status(), StatusCode::OK);
let body_triple = get_triple.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body_triple[..], b"triple");
let list_resp = app
.oneshot(signed_request(
Method::GET,
"/slashes-bucket?list-type=2",
Body::empty(),
))
.await
.unwrap();
let list_body = String::from_utf8(
list_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(
list_body.contains("<Key>a/b</Key>"),
"expected a/b in listing: {}",
list_body
);
assert!(
list_body.contains("<Key>a//b</Key>"),
"expected a//b in listing: {}",
list_body
);
assert!(
list_body.contains("<Key>a///b</Key>"),
"expected a///b in listing: {}",
list_body
);
}
#[tokio::test]
async fn test_delete_live_version_restores_previous_to_live_slot() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/restore-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/restore-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
let v1_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/restore-bucket/k",
Body::from("one"),
))
.await
.unwrap();
let v1 = v1_resp
.headers()
.get("x-amz-version-id")
.unwrap()
.to_str()
.unwrap()
.to_string();
let v2_resp = app
.clone()
.oneshot(signed_request(
Method::PUT,
"/restore-bucket/k",
Body::from("two"),
))
.await
.unwrap();
let v2 = v2_resp
.headers()
.get("x-amz-version-id")
.unwrap()
.to_str()
.unwrap()
.to_string();
assert_ne!(v1, v2);
let del = app
.clone()
.oneshot(signed_request(
Method::DELETE,
&format!("/restore-bucket/k?versionId={}", v2),
Body::empty(),
))
.await
.unwrap();
assert_eq!(del.status(), StatusCode::NO_CONTENT);
let get_live = app
.clone()
.oneshot(signed_request(
Method::GET,
"/restore-bucket/k",
Body::empty(),
))
.await
.unwrap();
assert_eq!(get_live.status(), StatusCode::OK);
let body = get_live.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b"one");
let get_v1 = app
.oneshot(signed_request(
Method::GET,
&format!("/restore-bucket/k?versionId={}", v1),
Body::empty(),
))
.await
.unwrap();
assert_eq!(get_v1.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_delete_active_delete_marker_restores_previous_to_live_slot() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/undel-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/undel-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
app.clone()
.oneshot(signed_request(
Method::PUT,
"/undel-bucket/k",
Body::from("only"),
))
.await
.unwrap();
let del = app
.clone()
.oneshot(signed_request(
Method::DELETE,
"/undel-bucket/k",
Body::empty(),
))
.await
.unwrap();
let dm_version = del
.headers()
.get("x-amz-version-id")
.unwrap()
.to_str()
.unwrap()
.to_string();
assert_eq!(
del.headers()
.get("x-amz-delete-marker")
.and_then(|v| v.to_str().ok()),
Some("true")
);
let shadowed = app
.clone()
.oneshot(signed_request(
Method::GET,
"/undel-bucket/k",
Body::empty(),
))
.await
.unwrap();
assert_eq!(shadowed.status(), StatusCode::NOT_FOUND);
let del_dm = app
.clone()
.oneshot(signed_request(
Method::DELETE,
&format!("/undel-bucket/k?versionId={}", dm_version),
Body::empty(),
))
.await
.unwrap();
assert_eq!(del_dm.status(), StatusCode::NO_CONTENT);
let restored = app
.oneshot(signed_request(
Method::GET,
"/undel-bucket/k",
Body::empty(),
))
.await
.unwrap();
assert_eq!(restored.status(), StatusCode::OK);
let body = restored.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b"only");
}
#[tokio::test]
async fn test_versioned_get_on_delete_marker_returns_method_not_allowed() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/dm-bucket", Body::empty()))
.await
.unwrap();
app.clone()
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/dm-bucket?versioning")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(Body::from(
"<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>",
))
.unwrap(),
)
.await
.unwrap();
app.clone()
.oneshot(signed_request(
Method::PUT,
"/dm-bucket/k",
Body::from("x"),
))
.await
.unwrap();
let del = app
.clone()
.oneshot(signed_request(
Method::DELETE,
"/dm-bucket/k",
Body::empty(),
))
.await
.unwrap();
let dm_version = del
.headers()
.get("x-amz-version-id")
.unwrap()
.to_str()
.unwrap()
.to_string();
let versioned = app
.oneshot(signed_request(
Method::GET,
&format!("/dm-bucket/k?versionId={}", dm_version),
Body::empty(),
))
.await
.unwrap();
assert_eq!(versioned.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test] #[tokio::test]
async fn test_retention_is_enforced_when_deleting_archived_version() { async fn test_retention_is_enforced_when_deleting_archived_version() {
let (app, _tmp) = test_app(); let (app, _tmp) = test_app();
@@ -2474,9 +2975,16 @@ async fn test_retention_is_enforced_when_deleting_archived_version() {
) )
.unwrap(); .unwrap();
let archived_version_id = list_body let archived_version_id = list_body
.split("<Version>")
.skip(1)
.find(|block| block.contains("<IsLatest>false</IsLatest>"))
.and_then(|block| {
block
.split("<VersionId>") .split("<VersionId>")
.filter_map(|part| part.split_once("</VersionId>").map(|(id, _)| id)) .nth(1)
.find(|id| *id != "null") .and_then(|s| s.split_once("</VersionId>").map(|(id, _)| id))
})
.filter(|id| *id != "null")
.expect("archived version id") .expect("archived version id")
.to_string(); .to_string();
@@ -2562,6 +3070,132 @@ async fn test_put_object_validates_content_md5() {
assert_eq!(good_resp.status(), StatusCode::OK); assert_eq!(good_resp.status(), StatusCode::OK);
} }
#[tokio::test]
async fn test_x_amz_content_sha256_mismatch_returns_bad_digest() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/sha256-bucket", Body::empty()))
.await
.unwrap();
let bad_resp = app
.oneshot(
Request::builder()
.method(Method::PUT)
.uri("/sha256-bucket/object.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header(
"x-amz-content-sha256",
"0000000000000000000000000000000000000000000000000000000000000000",
)
.body(Body::from("hello"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(bad_resp.status(), StatusCode::BAD_REQUEST);
let bad_body = String::from_utf8(
bad_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(bad_body.contains("<Code>BadDigest</Code>"));
assert!(bad_body.contains("x-amz-content-sha256"));
}
#[tokio::test]
async fn test_max_keys_zero_respects_marker_and_v2_cursors() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/cursor-bucket", Body::empty()))
.await
.unwrap();
for key in ["a.txt", "b.txt"] {
app.clone()
.oneshot(signed_request(
Method::PUT,
&format!("/cursor-bucket/{}", key),
Body::from(key.to_string()),
))
.await
.unwrap();
}
let marker_resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/cursor-bucket?max-keys=0&marker=b.txt",
Body::empty(),
))
.await
.unwrap();
let marker_body = String::from_utf8(
marker_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(marker_body.contains("<IsTruncated>false</IsTruncated>"));
let start_after_resp = app
.clone()
.oneshot(signed_request(
Method::GET,
"/cursor-bucket?list-type=2&max-keys=0&start-after=b.txt",
Body::empty(),
))
.await
.unwrap();
let start_after_body = String::from_utf8(
start_after_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(start_after_body.contains("<IsTruncated>false</IsTruncated>"));
let token = URL_SAFE.encode("b.txt");
let token_resp = app
.oneshot(signed_request(
Method::GET,
&format!(
"/cursor-bucket?list-type=2&max-keys=0&continuation-token={}",
token
),
Body::empty(),
))
.await
.unwrap();
let token_body = String::from_utf8(
token_resp
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(token_body.contains("<IsTruncated>false</IsTruncated>"));
}
#[tokio::test] #[tokio::test]
async fn test_put_object_tagging_and_standard_headers_are_persisted() { async fn test_put_object_tagging_and_standard_headers_are_persisted() {
let (app, _tmp) = test_app(); let (app, _tmp) = test_app();

View File

@@ -17,10 +17,18 @@ pub enum StorageError {
key: String, key: String,
version_id: String, version_id: String,
}, },
#[error("Object is a delete marker: {bucket}/{key}")]
DeleteMarker {
bucket: String,
key: String,
version_id: String,
},
#[error("Invalid bucket name: {0}")] #[error("Invalid bucket name: {0}")]
InvalidBucketName(String), InvalidBucketName(String),
#[error("Invalid object key: {0}")] #[error("Invalid object key: {0}")]
InvalidObjectKey(String), InvalidObjectKey(String),
#[error("Method not allowed: {0}")]
MethodNotAllowed(String),
#[error("Upload not found: {0}")] #[error("Upload not found: {0}")]
UploadNotFound(String), UploadNotFound(String),
#[error("Quota exceeded: {0}")] #[error("Quota exceeded: {0}")]
@@ -42,7 +50,7 @@ impl From<StorageError> for S3Error {
S3Error::from_code(S3ErrorCode::NoSuchBucket).with_resource(format!("/{}", name)) S3Error::from_code(S3ErrorCode::NoSuchBucket).with_resource(format!("/{}", name))
} }
StorageError::BucketAlreadyExists(name) => { StorageError::BucketAlreadyExists(name) => {
S3Error::from_code(S3ErrorCode::BucketAlreadyExists) S3Error::from_code(S3ErrorCode::BucketAlreadyOwnedByYou)
.with_resource(format!("/{}", name)) .with_resource(format!("/{}", name))
} }
StorageError::BucketNotEmpty(name) => { StorageError::BucketNotEmpty(name) => {
@@ -58,10 +66,17 @@ impl From<StorageError> for S3Error {
version_id, version_id,
} => S3Error::from_code(S3ErrorCode::NoSuchVersion) } => S3Error::from_code(S3ErrorCode::NoSuchVersion)
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)), .with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
StorageError::DeleteMarker {
bucket,
key,
version_id,
} => S3Error::from_code(S3ErrorCode::MethodNotAllowed)
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
StorageError::InvalidBucketName(msg) => { StorageError::InvalidBucketName(msg) => {
S3Error::new(S3ErrorCode::InvalidBucketName, msg) S3Error::new(S3ErrorCode::InvalidBucketName, msg)
} }
StorageError::InvalidObjectKey(msg) => S3Error::new(S3ErrorCode::InvalidKey, msg), StorageError::InvalidObjectKey(msg) => S3Error::new(S3ErrorCode::InvalidKey, msg),
StorageError::MethodNotAllowed(msg) => S3Error::new(S3ErrorCode::MethodNotAllowed, msg),
StorageError::UploadNotFound(id) => S3Error::new( StorageError::UploadNotFound(id) => S3Error::new(
S3ErrorCode::NoSuchUpload, S3ErrorCode::NoSuchUpload,
format!("Upload {} not found", id), format!("Upload {} not found", id),

File diff suppressed because it is too large Load Diff

View File

@@ -62,14 +62,14 @@ pub trait StorageEngine: Send + Sync {
version_id: &str, version_id: &str,
) -> StorageResult<HashMap<String, String>>; ) -> StorageResult<HashMap<String, String>>;
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>; async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome>;
async fn delete_object_version( async fn delete_object_version(
&self, &self,
bucket: &str, bucket: &str,
key: &str, key: &str,
version_id: &str, version_id: &str,
) -> StorageResult<()>; ) -> StorageResult<DeleteOutcome>;
async fn copy_object( async fn copy_object(
&self, &self,
@@ -148,6 +148,12 @@ pub trait StorageEngine: Send + Sync {
async fn is_versioning_enabled(&self, bucket: &str) -> StorageResult<bool>; async fn is_versioning_enabled(&self, bucket: &str) -> StorageResult<bool>;
async fn set_versioning(&self, bucket: &str, enabled: bool) -> StorageResult<()>; async fn set_versioning(&self, bucket: &str, enabled: bool) -> StorageResult<()>;
async fn get_versioning_status(&self, bucket: &str) -> StorageResult<VersioningStatus>;
async fn set_versioning_status(
&self,
bucket: &str,
status: VersioningStatus,
) -> StorageResult<()>;
async fn list_object_versions( async fn list_object_versions(
&self, &self,

View File

@@ -47,6 +47,7 @@ pub fn validate_object_key(
normalized.split('/').collect() normalized.split('/').collect()
}; };
for part in &parts { for part in &parts {
if part.is_empty() { if part.is_empty() {
continue; continue;
@@ -60,6 +61,12 @@ pub fn validate_object_key(
return Some("Object key contains invalid segments".to_string()); return Some("Object key contains invalid segments".to_string());
} }
if part.len() > 255 {
return Some(
"Object key contains a path segment that exceeds 255 bytes".to_string(),
);
}
if part.chars().any(|c| (c as u32) < 32) { if part.chars().any(|c| (c as u32) < 32) {
return Some("Object key contains control characters".to_string()); return Some("Object key contains control characters".to_string());
} }
@@ -98,6 +105,15 @@ pub fn validate_object_key(
} }
} }
for part in &non_empty_parts {
if *part == ".__myfsio_dirobj__"
|| *part == ".__myfsio_empty__"
|| part.starts_with("_index.json")
{
return Some("Object key segment uses a reserved internal name".to_string());
}
}
None None
} }
@@ -132,6 +148,13 @@ pub fn validate_bucket_name(bucket_name: &str) -> Option<String> {
return Some("Bucket name must not be formatted as an IP address".to_string()); return Some("Bucket name must not be formatted as an IP address".to_string());
} }
if bucket_name.starts_with("xn--") {
return Some("Bucket name must not start with the reserved prefix 'xn--'".to_string());
}
if bucket_name.ends_with("-s3alias") || bucket_name.ends_with("--ol-s3") {
return Some("Bucket name must not end with a reserved suffix".to_string());
}
None None
} }

View File

@@ -8,3 +8,4 @@ myfsio-common = { path = "../myfsio-common" }
quick-xml = { workspace = true } quick-xml = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
percent-encoding = { workspace = true }

View File

@@ -1,13 +1,13 @@
use quick_xml::events::Event; use quick_xml::events::Event;
use quick_xml::Reader; use quick_xml::Reader;
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct DeleteObjectsRequest { pub struct DeleteObjectsRequest {
pub objects: Vec<ObjectIdentifier>, pub objects: Vec<ObjectIdentifier>,
pub quiet: bool, pub quiet: bool,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct ObjectIdentifier { pub struct ObjectIdentifier {
pub key: String, pub key: String,
pub version_id: Option<String>, pub version_id: Option<String>,
@@ -86,6 +86,11 @@ pub fn parse_complete_multipart_upload(xml: &str) -> Result<CompleteMultipartUpl
} }
pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> { pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
let trimmed = xml.trim();
if trimmed.is_empty() {
return Err("Request body is empty".to_string());
}
let mut reader = Reader::from_str(xml); let mut reader = Reader::from_str(xml);
let mut result = DeleteObjectsRequest::default(); let mut result = DeleteObjectsRequest::default();
let mut buf = Vec::new(); let mut buf = Vec::new();
@@ -93,18 +98,43 @@ pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
let mut current_key: Option<String> = None; let mut current_key: Option<String> = None;
let mut current_version_id: Option<String> = None; let mut current_version_id: Option<String> = None;
let mut in_object = false; let mut in_object = false;
let mut saw_delete_root = false;
let mut first_element_seen = false;
loop { loop {
match reader.read_event_into(&mut buf) { let event = reader.read_event_into(&mut buf);
match event {
Ok(Event::Start(ref e)) => { Ok(Event::Start(ref e)) => {
let name = String::from_utf8_lossy(e.name().as_ref()).to_string(); let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
current_tag = name.clone(); current_tag = name.clone();
if name == "Object" { if !first_element_seen {
first_element_seen = true;
if name != "Delete" {
return Err(format!(
"Expected <Delete> root element, found <{}>",
name
));
}
saw_delete_root = true;
} else if name == "Object" {
in_object = true; in_object = true;
current_key = None; current_key = None;
current_version_id = None; current_version_id = None;
} }
} }
Ok(Event::Empty(ref e)) => {
let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
if !first_element_seen {
first_element_seen = true;
if name != "Delete" {
return Err(format!(
"Expected <Delete> root element, found <{}>",
name
));
}
saw_delete_root = true;
}
}
Ok(Event::Text(ref e)) => { Ok(Event::Text(ref e)) => {
let text = e.unescape().map_err(|e| e.to_string())?.to_string(); let text = e.unescape().map_err(|e| e.to_string())?.to_string();
match current_tag.as_str() { match current_tag.as_str() {
@@ -139,6 +169,13 @@ pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
buf.clear(); buf.clear();
} }
if !saw_delete_root {
return Err("Expected <Delete> root element".to_string());
}
if result.objects.is_empty() {
return Err("Delete request must contain at least one <Object>".to_string());
}
Ok(result) Ok(result)
} }

View File

@@ -8,10 +8,21 @@ pub fn format_s3_datetime(dt: &DateTime<Utc>) -> String {
dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
} }
pub fn rate_limit_exceeded_xml() -> String { pub fn rate_limit_exceeded_xml(resource: &str, request_id: &str) -> String {
format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Error><Code>SlowDown</Code><Message>Rate limit exceeded</Message></Error>" <Error><Code>SlowDown</Code><Message>Please reduce your request rate</Message><Resource>{}</Resource><RequestId>{}</RequestId></Error>",
.to_string() xml_escape(resource),
xml_escape(request_id),
)
}
fn xml_escape(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&apos;")
} }
pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String { pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String {
@@ -62,6 +73,21 @@ pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]
String::from_utf8(writer.into_inner().into_inner()).unwrap() String::from_utf8(writer.into_inner().into_inner()).unwrap()
} }
fn maybe_url_encode(value: &str, encoding_type: Option<&str>) -> String {
if matches!(encoding_type, Some(v) if v.eq_ignore_ascii_case("url")) {
percent_encoding::utf8_percent_encode(value, KEY_ENCODE_SET).to_string()
} else {
value.to_string()
}
}
const KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::NON_ALPHANUMERIC
.remove(b'-')
.remove(b'_')
.remove(b'.')
.remove(b'~')
.remove(b'/');
pub fn list_objects_v2_xml( pub fn list_objects_v2_xml(
bucket_name: &str, bucket_name: &str,
prefix: &str, prefix: &str,
@@ -73,6 +99,34 @@ pub fn list_objects_v2_xml(
continuation_token: Option<&str>, continuation_token: Option<&str>,
next_continuation_token: Option<&str>, next_continuation_token: Option<&str>,
key_count: usize, key_count: usize,
) -> String {
list_objects_v2_xml_with_encoding(
bucket_name,
prefix,
delimiter,
max_keys,
objects,
common_prefixes,
is_truncated,
continuation_token,
next_continuation_token,
key_count,
None,
)
}
pub fn list_objects_v2_xml_with_encoding(
bucket_name: &str,
prefix: &str,
delimiter: &str,
max_keys: usize,
objects: &[ObjectMeta],
common_prefixes: &[String],
is_truncated: bool,
continuation_token: Option<&str>,
next_continuation_token: Option<&str>,
key_count: usize,
encoding_type: Option<&str>,
) -> String { ) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new())); let mut writer = Writer::new(Cursor::new(Vec::new()));
@@ -85,13 +139,22 @@ pub fn list_objects_v2_xml(
writer.write_event(Event::Start(start)).unwrap(); writer.write_event(Event::Start(start)).unwrap();
write_text_element(&mut writer, "Name", bucket_name); write_text_element(&mut writer, "Name", bucket_name);
write_text_element(&mut writer, "Prefix", prefix); write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
if !delimiter.is_empty() { if !delimiter.is_empty() {
write_text_element(&mut writer, "Delimiter", delimiter); write_text_element(
&mut writer,
"Delimiter",
&maybe_url_encode(delimiter, encoding_type),
);
} }
write_text_element(&mut writer, "MaxKeys", &max_keys.to_string()); write_text_element(&mut writer, "MaxKeys", &max_keys.to_string());
write_text_element(&mut writer, "KeyCount", &key_count.to_string()); write_text_element(&mut writer, "KeyCount", &key_count.to_string());
write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string()); write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string());
if let Some(encoding) = encoding_type {
if !encoding.is_empty() {
write_text_element(&mut writer, "EncodingType", encoding);
}
}
if let Some(token) = continuation_token { if let Some(token) = continuation_token {
write_text_element(&mut writer, "ContinuationToken", token); write_text_element(&mut writer, "ContinuationToken", token);
@@ -104,7 +167,7 @@ pub fn list_objects_v2_xml(
writer writer
.write_event(Event::Start(BytesStart::new("Contents"))) .write_event(Event::Start(BytesStart::new("Contents")))
.unwrap(); .unwrap();
write_text_element(&mut writer, "Key", &obj.key); write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type));
write_text_element( write_text_element(
&mut writer, &mut writer,
"LastModified", "LastModified",
@@ -128,7 +191,7 @@ pub fn list_objects_v2_xml(
writer writer
.write_event(Event::Start(BytesStart::new("CommonPrefixes"))) .write_event(Event::Start(BytesStart::new("CommonPrefixes")))
.unwrap(); .unwrap();
write_text_element(&mut writer, "Prefix", prefix); write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
writer writer
.write_event(Event::End(BytesEnd::new("CommonPrefixes"))) .write_event(Event::End(BytesEnd::new("CommonPrefixes")))
.unwrap(); .unwrap();
@@ -151,6 +214,32 @@ pub fn list_objects_v1_xml(
common_prefixes: &[String], common_prefixes: &[String],
is_truncated: bool, is_truncated: bool,
next_marker: Option<&str>, next_marker: Option<&str>,
) -> String {
list_objects_v1_xml_with_encoding(
bucket_name,
prefix,
marker,
delimiter,
max_keys,
objects,
common_prefixes,
is_truncated,
next_marker,
None,
)
}
pub fn list_objects_v1_xml_with_encoding(
bucket_name: &str,
prefix: &str,
marker: &str,
delimiter: &str,
max_keys: usize,
objects: &[ObjectMeta],
common_prefixes: &[String],
is_truncated: bool,
next_marker: Option<&str>,
encoding_type: Option<&str>,
) -> String { ) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new())); let mut writer = Writer::new(Cursor::new(Vec::new()));
@@ -163,27 +252,36 @@ pub fn list_objects_v1_xml(
writer.write_event(Event::Start(start)).unwrap(); writer.write_event(Event::Start(start)).unwrap();
write_text_element(&mut writer, "Name", bucket_name); write_text_element(&mut writer, "Name", bucket_name);
write_text_element(&mut writer, "Prefix", prefix); write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
write_text_element(&mut writer, "Marker", marker); write_text_element(&mut writer, "Marker", &maybe_url_encode(marker, encoding_type));
write_text_element(&mut writer, "MaxKeys", &max_keys.to_string()); write_text_element(&mut writer, "MaxKeys", &max_keys.to_string());
write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string()); write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string());
if !delimiter.is_empty() { if !delimiter.is_empty() {
write_text_element(&mut writer, "Delimiter", delimiter); write_text_element(
&mut writer,
"Delimiter",
&maybe_url_encode(delimiter, encoding_type),
);
} }
if !delimiter.is_empty() && is_truncated { if !delimiter.is_empty() && is_truncated {
if let Some(nm) = next_marker { if let Some(nm) = next_marker {
if !nm.is_empty() { if !nm.is_empty() {
write_text_element(&mut writer, "NextMarker", nm); write_text_element(&mut writer, "NextMarker", &maybe_url_encode(nm, encoding_type));
} }
} }
} }
if let Some(encoding) = encoding_type {
if !encoding.is_empty() {
write_text_element(&mut writer, "EncodingType", encoding);
}
}
for obj in objects { for obj in objects {
writer writer
.write_event(Event::Start(BytesStart::new("Contents"))) .write_event(Event::Start(BytesStart::new("Contents")))
.unwrap(); .unwrap();
write_text_element(&mut writer, "Key", &obj.key); write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type));
write_text_element( write_text_element(
&mut writer, &mut writer,
"LastModified", "LastModified",
@@ -202,7 +300,7 @@ pub fn list_objects_v1_xml(
writer writer
.write_event(Event::Start(BytesStart::new("CommonPrefixes"))) .write_event(Event::Start(BytesStart::new("CommonPrefixes")))
.unwrap(); .unwrap();
write_text_element(&mut writer, "Prefix", cp); write_text_element(&mut writer, "Prefix", &maybe_url_encode(cp, encoding_type));
writer writer
.write_event(Event::End(BytesEnd::new("CommonPrefixes"))) .write_event(Event::End(BytesEnd::new("CommonPrefixes")))
.unwrap(); .unwrap();
@@ -325,8 +423,15 @@ pub fn copy_object_result_xml(etag: &str, last_modified: &str) -> String {
String::from_utf8(writer.into_inner().into_inner()).unwrap() String::from_utf8(writer.into_inner().into_inner()).unwrap()
} }
pub struct DeletedEntry {
pub key: String,
pub version_id: Option<String>,
pub delete_marker: bool,
pub delete_marker_version_id: Option<String>,
}
pub fn delete_result_xml( pub fn delete_result_xml(
deleted: &[(String, Option<String>)], deleted: &[DeletedEntry],
errors: &[(String, String, String)], errors: &[(String, String, String)],
quiet: bool, quiet: bool,
) -> String { ) -> String {
@@ -340,14 +445,20 @@ pub fn delete_result_xml(
writer.write_event(Event::Start(start)).unwrap(); writer.write_event(Event::Start(start)).unwrap();
if !quiet { if !quiet {
for (key, version_id) in deleted { for entry in deleted {
writer writer
.write_event(Event::Start(BytesStart::new("Deleted"))) .write_event(Event::Start(BytesStart::new("Deleted")))
.unwrap(); .unwrap();
write_text_element(&mut writer, "Key", key); write_text_element(&mut writer, "Key", &entry.key);
if let Some(vid) = version_id { if let Some(ref vid) = entry.version_id {
write_text_element(&mut writer, "VersionId", vid); write_text_element(&mut writer, "VersionId", vid);
} }
if entry.delete_marker {
write_text_element(&mut writer, "DeleteMarker", "true");
if let Some(ref dm_vid) = entry.delete_marker_version_id {
write_text_element(&mut writer, "DeleteMarkerVersionId", dm_vid);
}
}
writer writer
.write_event(Event::End(BytesEnd::new("Deleted"))) .write_event(Event::End(BytesEnd::new("Deleted")))
.unwrap(); .unwrap();