Compare commits
9 Commits
217af6d1c6
...
dev-pyrust
| Author | SHA1 | Date | |
|---|---|---|---|
| 37541ffba1 | |||
| 5aba9ac9e9 | |||
| 4f05192548 | |||
| 1ea6dfae07 | |||
| f2df64479c | |||
| bd405cc2fe | |||
| 7ef3820f6e | |||
| e1fb225034 | |||
| 2767e7e79d |
28
Cargo.lock
generated
28
Cargo.lock
generated
@@ -2639,7 +2639,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-auth"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"base64",
|
||||
@@ -2664,7 +2664,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-common"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"serde",
|
||||
@@ -2675,7 +2675,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-crypto"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"base64",
|
||||
@@ -2696,7 +2696,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-server"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"async-trait",
|
||||
@@ -2714,6 +2714,8 @@ dependencies = [
|
||||
"dotenvy",
|
||||
"duckdb",
|
||||
"futures",
|
||||
"hex",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"hyper 1.9.0",
|
||||
"md-5 0.10.6",
|
||||
@@ -2740,6 +2742,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tera",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-http",
|
||||
@@ -2750,7 +2753,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-storage"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"dashmap",
|
||||
@@ -2766,6 +2769,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"unicode-normalization",
|
||||
"uuid",
|
||||
@@ -2773,10 +2777,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "myfsio-xml"
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"myfsio-common",
|
||||
"percent-encoding",
|
||||
"quick-xml",
|
||||
"serde",
|
||||
]
|
||||
@@ -4193,6 +4198,17 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.18"
|
||||
|
||||
@@ -10,14 +10,14 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "0.4.3"
|
||||
version = "0.5.0"
|
||||
edition = "2021"
|
||||
|
||||
[workspace.dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
axum = { version = "0.8" }
|
||||
tower = { version = "0.5" }
|
||||
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip"] }
|
||||
tower-http = { version = "0.6", features = ["cors", "trace", "fs", "compression-gzip", "timeout"] }
|
||||
hyper = { version = "1" }
|
||||
bytes = "1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
@@ -42,7 +42,8 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
thiserror = "2"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
base64 = "0.22"
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
tokio-util = { version = "0.7", features = ["io", "io-util"] }
|
||||
tokio-stream = "0.1"
|
||||
futures = "0.3"
|
||||
dashmap = "6"
|
||||
crc32fast = "1"
|
||||
|
||||
@@ -8,6 +8,7 @@ pub const STATS_FILE: &str = "stats.json";
|
||||
pub const ETAG_INDEX_FILE: &str = "etag_index.json";
|
||||
pub const INDEX_FILE: &str = "_index.json";
|
||||
pub const MANIFEST_FILE: &str = "manifest.json";
|
||||
pub const DIR_MARKER_FILE: &str = ".__myfsio_dirobj__";
|
||||
|
||||
pub const INTERNAL_FOLDERS: &[&str] = &[".meta", ".versions", ".multipart"];
|
||||
|
||||
|
||||
@@ -5,13 +5,17 @@ pub enum S3ErrorCode {
|
||||
AccessDenied,
|
||||
BadDigest,
|
||||
BucketAlreadyExists,
|
||||
BucketAlreadyOwnedByYou,
|
||||
BucketNotEmpty,
|
||||
EntityTooLarge,
|
||||
EntityTooSmall,
|
||||
InternalError,
|
||||
InvalidAccessKeyId,
|
||||
InvalidArgument,
|
||||
InvalidBucketName,
|
||||
InvalidKey,
|
||||
InvalidPart,
|
||||
InvalidPartOrder,
|
||||
InvalidPolicyDocument,
|
||||
InvalidRange,
|
||||
InvalidRequest,
|
||||
@@ -19,13 +23,17 @@ pub enum S3ErrorCode {
|
||||
MalformedXML,
|
||||
MethodNotAllowed,
|
||||
NoSuchBucket,
|
||||
NoSuchBucketPolicy,
|
||||
NoSuchKey,
|
||||
NoSuchLifecycleConfiguration,
|
||||
NoSuchUpload,
|
||||
NoSuchVersion,
|
||||
NoSuchTagSet,
|
||||
PreconditionFailed,
|
||||
NotModified,
|
||||
QuotaExceeded,
|
||||
RequestTimeTooSkewed,
|
||||
ServerSideEncryptionConfigurationNotFoundError,
|
||||
SignatureDoesNotMatch,
|
||||
SlowDown,
|
||||
}
|
||||
@@ -36,13 +44,17 @@ impl S3ErrorCode {
|
||||
Self::AccessDenied => 403,
|
||||
Self::BadDigest => 400,
|
||||
Self::BucketAlreadyExists => 409,
|
||||
Self::BucketAlreadyOwnedByYou => 409,
|
||||
Self::BucketNotEmpty => 409,
|
||||
Self::EntityTooLarge => 413,
|
||||
Self::EntityTooSmall => 400,
|
||||
Self::InternalError => 500,
|
||||
Self::InvalidAccessKeyId => 403,
|
||||
Self::InvalidArgument => 400,
|
||||
Self::InvalidBucketName => 400,
|
||||
Self::InvalidKey => 400,
|
||||
Self::InvalidPart => 400,
|
||||
Self::InvalidPartOrder => 400,
|
||||
Self::InvalidPolicyDocument => 400,
|
||||
Self::InvalidRange => 416,
|
||||
Self::InvalidRequest => 400,
|
||||
@@ -50,15 +62,19 @@ impl S3ErrorCode {
|
||||
Self::MalformedXML => 400,
|
||||
Self::MethodNotAllowed => 405,
|
||||
Self::NoSuchBucket => 404,
|
||||
Self::NoSuchBucketPolicy => 404,
|
||||
Self::NoSuchKey => 404,
|
||||
Self::NoSuchLifecycleConfiguration => 404,
|
||||
Self::NoSuchUpload => 404,
|
||||
Self::NoSuchVersion => 404,
|
||||
Self::NoSuchTagSet => 404,
|
||||
Self::PreconditionFailed => 412,
|
||||
Self::NotModified => 304,
|
||||
Self::QuotaExceeded => 403,
|
||||
Self::RequestTimeTooSkewed => 403,
|
||||
Self::ServerSideEncryptionConfigurationNotFoundError => 404,
|
||||
Self::SignatureDoesNotMatch => 403,
|
||||
Self::SlowDown => 429,
|
||||
Self::SlowDown => 503,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,13 +83,17 @@ impl S3ErrorCode {
|
||||
Self::AccessDenied => "AccessDenied",
|
||||
Self::BadDigest => "BadDigest",
|
||||
Self::BucketAlreadyExists => "BucketAlreadyExists",
|
||||
Self::BucketAlreadyOwnedByYou => "BucketAlreadyOwnedByYou",
|
||||
Self::BucketNotEmpty => "BucketNotEmpty",
|
||||
Self::EntityTooLarge => "EntityTooLarge",
|
||||
Self::EntityTooSmall => "EntityTooSmall",
|
||||
Self::InternalError => "InternalError",
|
||||
Self::InvalidAccessKeyId => "InvalidAccessKeyId",
|
||||
Self::InvalidArgument => "InvalidArgument",
|
||||
Self::InvalidBucketName => "InvalidBucketName",
|
||||
Self::InvalidKey => "InvalidKey",
|
||||
Self::InvalidPart => "InvalidPart",
|
||||
Self::InvalidPartOrder => "InvalidPartOrder",
|
||||
Self::InvalidPolicyDocument => "InvalidPolicyDocument",
|
||||
Self::InvalidRange => "InvalidRange",
|
||||
Self::InvalidRequest => "InvalidRequest",
|
||||
@@ -81,13 +101,19 @@ impl S3ErrorCode {
|
||||
Self::MalformedXML => "MalformedXML",
|
||||
Self::MethodNotAllowed => "MethodNotAllowed",
|
||||
Self::NoSuchBucket => "NoSuchBucket",
|
||||
Self::NoSuchBucketPolicy => "NoSuchBucketPolicy",
|
||||
Self::NoSuchKey => "NoSuchKey",
|
||||
Self::NoSuchLifecycleConfiguration => "NoSuchLifecycleConfiguration",
|
||||
Self::NoSuchUpload => "NoSuchUpload",
|
||||
Self::NoSuchVersion => "NoSuchVersion",
|
||||
Self::NoSuchTagSet => "NoSuchTagSet",
|
||||
Self::PreconditionFailed => "PreconditionFailed",
|
||||
Self::NotModified => "NotModified",
|
||||
Self::QuotaExceeded => "QuotaExceeded",
|
||||
Self::RequestTimeTooSkewed => "RequestTimeTooSkewed",
|
||||
Self::ServerSideEncryptionConfigurationNotFoundError => {
|
||||
"ServerSideEncryptionConfigurationNotFoundError"
|
||||
}
|
||||
Self::SignatureDoesNotMatch => "SignatureDoesNotMatch",
|
||||
Self::SlowDown => "SlowDown",
|
||||
}
|
||||
@@ -98,13 +124,17 @@ impl S3ErrorCode {
|
||||
Self::AccessDenied => "Access Denied",
|
||||
Self::BadDigest => "The Content-MD5 or checksum value you specified did not match what we received",
|
||||
Self::BucketAlreadyExists => "The requested bucket name is not available",
|
||||
Self::BucketAlreadyOwnedByYou => "Your previous request to create the named bucket succeeded and you already own it",
|
||||
Self::BucketNotEmpty => "The bucket you tried to delete is not empty",
|
||||
Self::EntityTooLarge => "Your proposed upload exceeds the maximum allowed size",
|
||||
Self::EntityTooSmall => "Your proposed upload is smaller than the minimum allowed object size",
|
||||
Self::InternalError => "We encountered an internal error. Please try again.",
|
||||
Self::InvalidAccessKeyId => "The access key ID you provided does not exist",
|
||||
Self::InvalidArgument => "Invalid argument",
|
||||
Self::InvalidBucketName => "The specified bucket is not valid",
|
||||
Self::InvalidKey => "The specified key is not valid",
|
||||
Self::InvalidPart => "One or more of the specified parts could not be found",
|
||||
Self::InvalidPartOrder => "The list of parts was not in ascending order",
|
||||
Self::InvalidPolicyDocument => "The content of the form does not meet the conditions specified in the policy document",
|
||||
Self::InvalidRange => "The requested range is not satisfiable",
|
||||
Self::InvalidRequest => "Invalid request",
|
||||
@@ -112,13 +142,17 @@ impl S3ErrorCode {
|
||||
Self::MalformedXML => "The XML you provided was not well-formed",
|
||||
Self::MethodNotAllowed => "The specified method is not allowed against this resource",
|
||||
Self::NoSuchBucket => "The specified bucket does not exist",
|
||||
Self::NoSuchBucketPolicy => "The bucket policy does not exist",
|
||||
Self::NoSuchKey => "The specified key does not exist",
|
||||
Self::NoSuchLifecycleConfiguration => "The lifecycle configuration does not exist",
|
||||
Self::NoSuchUpload => "The specified multipart upload does not exist",
|
||||
Self::NoSuchVersion => "The specified version does not exist",
|
||||
Self::NoSuchTagSet => "The TagSet does not exist",
|
||||
Self::PreconditionFailed => "At least one of the preconditions you specified did not hold",
|
||||
Self::NotModified => "Not Modified",
|
||||
Self::QuotaExceeded => "The bucket quota has been exceeded",
|
||||
Self::RequestTimeTooSkewed => "The difference between the request time and the server's time is too large",
|
||||
Self::ServerSideEncryptionConfigurationNotFoundError => "The server side encryption configuration was not found",
|
||||
Self::SignatureDoesNotMatch => "The request signature we calculated does not match the signature you provided",
|
||||
Self::SlowDown => "Please reduce your request rate",
|
||||
}
|
||||
|
||||
@@ -12,6 +12,12 @@ pub struct ObjectMeta {
|
||||
pub content_type: Option<String>,
|
||||
pub storage_class: Option<String>,
|
||||
pub metadata: HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub version_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub is_delete_marker: bool,
|
||||
#[serde(default, skip_serializing)]
|
||||
pub internal_metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl ObjectMeta {
|
||||
@@ -24,10 +30,20 @@ impl ObjectMeta {
|
||||
content_type: None,
|
||||
storage_class: Some("STANDARD".to_string()),
|
||||
metadata: HashMap::new(),
|
||||
version_id: None,
|
||||
is_delete_marker: false,
|
||||
internal_metadata: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct DeleteOutcome {
|
||||
pub version_id: Option<String>,
|
||||
pub is_delete_marker: bool,
|
||||
pub existed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct BucketMeta {
|
||||
pub name: String,
|
||||
@@ -122,11 +138,31 @@ pub struct Tag {
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
|
||||
pub enum VersioningStatus {
|
||||
#[default]
|
||||
Disabled,
|
||||
Enabled,
|
||||
Suspended,
|
||||
}
|
||||
|
||||
impl VersioningStatus {
|
||||
pub fn is_enabled(self) -> bool {
|
||||
matches!(self, VersioningStatus::Enabled)
|
||||
}
|
||||
|
||||
pub fn is_active(self) -> bool {
|
||||
matches!(self, VersioningStatus::Enabled | VersioningStatus::Suspended)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct BucketConfig {
|
||||
#[serde(default)]
|
||||
pub versioning_enabled: bool,
|
||||
#[serde(default)]
|
||||
pub versioning_suspended: bool,
|
||||
#[serde(default)]
|
||||
pub tags: Vec<Tag>,
|
||||
#[serde(default)]
|
||||
pub cors: Option<serde_json::Value>,
|
||||
@@ -152,6 +188,35 @@ pub struct BucketConfig {
|
||||
pub replication: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
impl BucketConfig {
|
||||
pub fn versioning_status(&self) -> VersioningStatus {
|
||||
if self.versioning_enabled {
|
||||
VersioningStatus::Enabled
|
||||
} else if self.versioning_suspended {
|
||||
VersioningStatus::Suspended
|
||||
} else {
|
||||
VersioningStatus::Disabled
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_versioning_status(&mut self, status: VersioningStatus) {
|
||||
match status {
|
||||
VersioningStatus::Enabled => {
|
||||
self.versioning_enabled = true;
|
||||
self.versioning_suspended = false;
|
||||
}
|
||||
VersioningStatus::Suspended => {
|
||||
self.versioning_enabled = false;
|
||||
self.versioning_suspended = true;
|
||||
}
|
||||
VersioningStatus::Disabled => {
|
||||
self.versioning_enabled = false;
|
||||
self.versioning_suspended = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct QuotaConfig {
|
||||
pub max_bytes: Option<u64>,
|
||||
|
||||
@@ -145,6 +145,113 @@ pub fn decrypt_stream_chunked(
|
||||
Ok(chunk_count)
|
||||
}
|
||||
|
||||
const GCM_TAG_LEN: usize = 16;
|
||||
|
||||
pub fn decrypt_stream_chunked_range(
|
||||
input_path: &Path,
|
||||
output_path: &Path,
|
||||
key: &[u8],
|
||||
base_nonce: &[u8],
|
||||
chunk_plain_size: usize,
|
||||
plaintext_size: u64,
|
||||
plain_start: u64,
|
||||
plain_end_inclusive: u64,
|
||||
) -> Result<u64, CryptoError> {
|
||||
if key.len() != 32 {
|
||||
return Err(CryptoError::InvalidKeySize(key.len()));
|
||||
}
|
||||
if base_nonce.len() != 12 {
|
||||
return Err(CryptoError::InvalidNonceSize(base_nonce.len()));
|
||||
}
|
||||
if chunk_plain_size == 0 {
|
||||
return Err(CryptoError::EncryptionFailed(
|
||||
"chunk_plain_size must be > 0".into(),
|
||||
));
|
||||
}
|
||||
if plaintext_size == 0 {
|
||||
let _ = File::create(output_path)?;
|
||||
return Ok(0);
|
||||
}
|
||||
if plain_start > plain_end_inclusive || plain_end_inclusive >= plaintext_size {
|
||||
return Err(CryptoError::EncryptionFailed(format!(
|
||||
"range [{}, {}] invalid for plaintext size {}",
|
||||
plain_start, plain_end_inclusive, plaintext_size
|
||||
)));
|
||||
}
|
||||
|
||||
let key_arr: [u8; 32] = key.try_into().unwrap();
|
||||
let nonce_arr: [u8; 12] = base_nonce.try_into().unwrap();
|
||||
let cipher = Aes256Gcm::new(&key_arr.into());
|
||||
|
||||
let n = chunk_plain_size as u64;
|
||||
let first_chunk = (plain_start / n) as u32;
|
||||
let last_chunk = (plain_end_inclusive / n) as u32;
|
||||
let total_chunks = plaintext_size.div_ceil(n) as u32;
|
||||
let final_chunk_plain = plaintext_size - (total_chunks as u64 - 1) * n;
|
||||
|
||||
let mut infile = File::open(input_path)?;
|
||||
|
||||
let mut header = [0u8; HEADER_SIZE];
|
||||
infile.read_exact(&mut header)?;
|
||||
let stored_chunk_count = u32::from_be_bytes(header);
|
||||
if stored_chunk_count != total_chunks {
|
||||
return Err(CryptoError::EncryptionFailed(format!(
|
||||
"chunk count mismatch: header says {}, plaintext_size implies {}",
|
||||
stored_chunk_count, total_chunks
|
||||
)));
|
||||
}
|
||||
|
||||
let mut outfile = File::create(output_path)?;
|
||||
|
||||
let stride = n + GCM_TAG_LEN as u64 + HEADER_SIZE as u64;
|
||||
let first_offset = HEADER_SIZE as u64 + first_chunk as u64 * stride;
|
||||
infile.seek(SeekFrom::Start(first_offset))?;
|
||||
|
||||
let mut size_buf = [0u8; HEADER_SIZE];
|
||||
let mut bytes_written: u64 = 0;
|
||||
|
||||
for chunk_index in first_chunk..=last_chunk {
|
||||
infile.read_exact(&mut size_buf)?;
|
||||
let ct_len = u32::from_be_bytes(size_buf) as usize;
|
||||
|
||||
let expected_plain = if chunk_index + 1 == total_chunks {
|
||||
final_chunk_plain as usize
|
||||
} else {
|
||||
chunk_plain_size
|
||||
};
|
||||
let expected_ct = expected_plain + GCM_TAG_LEN;
|
||||
if ct_len != expected_ct {
|
||||
return Err(CryptoError::EncryptionFailed(format!(
|
||||
"chunk {} stored length {} != expected {} (corrupt file or chunk_size mismatch)",
|
||||
chunk_index, ct_len, expected_ct
|
||||
)));
|
||||
}
|
||||
|
||||
let mut encrypted = vec![0u8; ct_len];
|
||||
infile.read_exact(&mut encrypted)?;
|
||||
|
||||
let nonce_bytes = derive_chunk_nonce(&nonce_arr, chunk_index)?;
|
||||
let nonce = Nonce::from_slice(&nonce_bytes);
|
||||
let decrypted = cipher
|
||||
.decrypt(nonce, encrypted.as_ref())
|
||||
.map_err(|_| CryptoError::DecryptionFailed(chunk_index))?;
|
||||
|
||||
let chunk_plain_start = chunk_index as u64 * n;
|
||||
let chunk_plain_end_exclusive = chunk_plain_start + decrypted.len() as u64;
|
||||
|
||||
let slice_start = plain_start.saturating_sub(chunk_plain_start) as usize;
|
||||
let slice_end = (plain_end_inclusive + 1).min(chunk_plain_end_exclusive);
|
||||
let slice_end_local = (slice_end - chunk_plain_start) as usize;
|
||||
|
||||
if slice_end_local > slice_start {
|
||||
outfile.write_all(&decrypted[slice_start..slice_end_local])?;
|
||||
bytes_written += (slice_end_local - slice_start) as u64;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bytes_written)
|
||||
}
|
||||
|
||||
pub async fn encrypt_stream_chunked_async(
|
||||
input_path: &Path,
|
||||
output_path: &Path,
|
||||
@@ -230,6 +337,191 @@ mod tests {
|
||||
assert!(matches!(result, Err(CryptoError::InvalidKeySize(16))));
|
||||
}
|
||||
|
||||
fn write_file(path: &Path, data: &[u8]) {
|
||||
std::fs::File::create(path).unwrap().write_all(data).unwrap();
|
||||
}
|
||||
|
||||
fn make_encrypted_file(
|
||||
dir: &Path,
|
||||
data: &[u8],
|
||||
key: &[u8; 32],
|
||||
nonce: &[u8; 12],
|
||||
chunk: usize,
|
||||
) -> std::path::PathBuf {
|
||||
let input = dir.join("input.bin");
|
||||
let encrypted = dir.join("encrypted.bin");
|
||||
write_file(&input, data);
|
||||
encrypt_stream_chunked(&input, &encrypted, key, nonce, Some(chunk)).unwrap();
|
||||
encrypted
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_within_single_chunk() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data: Vec<u8> = (0u8..=255).cycle().take(4096).collect();
|
||||
let key = [0x33u8; 32];
|
||||
let nonce = [0x07u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 1024);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let n = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
1024,
|
||||
data.len() as u64,
|
||||
200,
|
||||
399,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(n, 200);
|
||||
let got = std::fs::read(&out).unwrap();
|
||||
assert_eq!(got, &data[200..400]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_spanning_multiple_chunks() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data: Vec<u8> = (0..5000u32).map(|i| (i % 251) as u8).collect();
|
||||
let key = [0x44u8; 32];
|
||||
let nonce = [0x02u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let n = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
512,
|
||||
data.len() as u64,
|
||||
100,
|
||||
2999,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(n, 2900);
|
||||
let got = std::fs::read(&out).unwrap();
|
||||
assert_eq!(got, &data[100..3000]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_covers_final_partial_chunk() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data: Vec<u8> = (0..1300u32).map(|i| (i % 71) as u8).collect();
|
||||
let key = [0x55u8; 32];
|
||||
let nonce = [0x0au8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let n = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
512,
|
||||
data.len() as u64,
|
||||
900,
|
||||
1299,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(n, 400);
|
||||
let got = std::fs::read(&out).unwrap();
|
||||
assert_eq!(got, &data[900..1300]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_full_object() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data: Vec<u8> = (0..2048u32).map(|i| (i % 13) as u8).collect();
|
||||
let key = [0x11u8; 32];
|
||||
let nonce = [0x33u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let n = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
512,
|
||||
data.len() as u64,
|
||||
0,
|
||||
data.len() as u64 - 1,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(n, data.len() as u64);
|
||||
let got = std::fs::read(&out).unwrap();
|
||||
assert_eq!(got, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_wrong_key_fails() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data = b"range-auth-check".repeat(100);
|
||||
let key = [0x66u8; 32];
|
||||
let nonce = [0x09u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 256);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let wrong = [0x67u8; 32];
|
||||
let r = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&wrong,
|
||||
&nonce,
|
||||
256,
|
||||
data.len() as u64,
|
||||
0,
|
||||
data.len() as u64 - 1,
|
||||
);
|
||||
assert!(matches!(r, Err(CryptoError::DecryptionFailed(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_out_of_bounds_rejected() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data = vec![0u8; 100];
|
||||
let key = [0x22u8; 32];
|
||||
let nonce = [0x44u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 64);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let r = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
64,
|
||||
data.len() as u64,
|
||||
50,
|
||||
200,
|
||||
);
|
||||
assert!(r.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_mismatched_chunk_size_detected() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let data: Vec<u8> = (0..2048u32).map(|i| i as u8).collect();
|
||||
let key = [0x77u8; 32];
|
||||
let nonce = [0x88u8; 12];
|
||||
let encrypted = make_encrypted_file(dir.path(), &data, &key, &nonce, 512);
|
||||
let out = dir.path().join("range.bin");
|
||||
|
||||
let r = decrypt_stream_chunked_range(
|
||||
&encrypted,
|
||||
&out,
|
||||
&key,
|
||||
&nonce,
|
||||
1024,
|
||||
data.len() as u64,
|
||||
0,
|
||||
1023,
|
||||
);
|
||||
assert!(r.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wrong_key_fails_decrypt() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -4,7 +4,9 @@ use rand::RngCore;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::aes_gcm::{decrypt_stream_chunked, encrypt_stream_chunked, CryptoError};
|
||||
use crate::aes_gcm::{
|
||||
decrypt_stream_chunked, decrypt_stream_chunked_range, encrypt_stream_chunked, CryptoError,
|
||||
};
|
||||
use crate::kms::KmsService;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -37,6 +39,8 @@ pub struct EncryptionMetadata {
|
||||
pub nonce: String,
|
||||
pub encrypted_data_key: Option<String>,
|
||||
pub kms_key_id: Option<String>,
|
||||
pub chunk_size: Option<usize>,
|
||||
pub plaintext_size: Option<u64>,
|
||||
}
|
||||
|
||||
impl EncryptionMetadata {
|
||||
@@ -53,6 +57,15 @@ impl EncryptionMetadata {
|
||||
if let Some(ref kid) = self.kms_key_id {
|
||||
map.insert("x-amz-encryption-key-id".to_string(), kid.clone());
|
||||
}
|
||||
if let Some(cs) = self.chunk_size {
|
||||
map.insert("x-amz-encryption-chunk-size".to_string(), cs.to_string());
|
||||
}
|
||||
if let Some(ps) = self.plaintext_size {
|
||||
map.insert(
|
||||
"x-amz-encryption-plaintext-size".to_string(),
|
||||
ps.to_string(),
|
||||
);
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
@@ -64,6 +77,12 @@ impl EncryptionMetadata {
|
||||
nonce: nonce.clone(),
|
||||
encrypted_data_key: meta.get("x-amz-encrypted-data-key").cloned(),
|
||||
kms_key_id: meta.get("x-amz-encryption-key-id").cloned(),
|
||||
chunk_size: meta
|
||||
.get("x-amz-encryption-chunk-size")
|
||||
.and_then(|s| s.parse().ok()),
|
||||
plaintext_size: meta
|
||||
.get("x-amz-encryption-plaintext-size")
|
||||
.and_then(|s| s.parse().ok()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -76,6 +95,8 @@ impl EncryptionMetadata {
|
||||
meta.remove("x-amz-encryption-nonce");
|
||||
meta.remove("x-amz-encrypted-data-key");
|
||||
meta.remove("x-amz-encryption-key-id");
|
||||
meta.remove("x-amz-encryption-chunk-size");
|
||||
meta.remove("x-amz-encryption-plaintext-size");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,6 +233,11 @@ impl EncryptionService {
|
||||
data_key
|
||||
};
|
||||
|
||||
let plaintext_size = tokio::fs::metadata(input_path)
|
||||
.await
|
||||
.map_err(CryptoError::Io)?
|
||||
.len();
|
||||
|
||||
let ip = input_path.to_owned();
|
||||
let op = output_path.to_owned();
|
||||
let ak = actual_key;
|
||||
@@ -228,22 +254,23 @@ impl EncryptionService {
|
||||
nonce: B64.encode(nonce),
|
||||
encrypted_data_key,
|
||||
kms_key_id,
|
||||
chunk_size: Some(chunk_size),
|
||||
plaintext_size: Some(plaintext_size),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn decrypt_object(
|
||||
async fn resolve_data_key(
|
||||
&self,
|
||||
input_path: &Path,
|
||||
output_path: &Path,
|
||||
enc_meta: &EncryptionMetadata,
|
||||
customer_key: Option<&[u8]>,
|
||||
) -> Result<(), CryptoError> {
|
||||
) -> Result<([u8; 32], [u8; 12]), CryptoError> {
|
||||
let nonce_bytes = B64
|
||||
.decode(&enc_meta.nonce)
|
||||
.map_err(|e| CryptoError::EncryptionFailed(format!("Bad nonce encoding: {}", e)))?;
|
||||
if nonce_bytes.len() != 12 {
|
||||
return Err(CryptoError::InvalidNonceSize(nonce_bytes.len()));
|
||||
}
|
||||
let nonce: [u8; 12] = nonce_bytes.try_into().unwrap();
|
||||
|
||||
let data_key: [u8; 32] = if let Some(ck) = customer_key {
|
||||
if ck.len() != 32 {
|
||||
@@ -281,15 +308,62 @@ impl EncryptionService {
|
||||
self.unwrap_data_key(wrapped)?
|
||||
};
|
||||
|
||||
Ok((data_key, nonce))
|
||||
}
|
||||
|
||||
pub async fn decrypt_object(
|
||||
&self,
|
||||
input_path: &Path,
|
||||
output_path: &Path,
|
||||
enc_meta: &EncryptionMetadata,
|
||||
customer_key: Option<&[u8]>,
|
||||
) -> Result<(), CryptoError> {
|
||||
let (data_key, nonce) = self.resolve_data_key(enc_meta, customer_key).await?;
|
||||
|
||||
let ip = input_path.to_owned();
|
||||
let op = output_path.to_owned();
|
||||
let nb: [u8; 12] = nonce_bytes.try_into().unwrap();
|
||||
tokio::task::spawn_blocking(move || decrypt_stream_chunked(&ip, &op, &data_key, &nb))
|
||||
tokio::task::spawn_blocking(move || decrypt_stream_chunked(&ip, &op, &data_key, &nonce))
|
||||
.await
|
||||
.map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn decrypt_object_range(
|
||||
&self,
|
||||
input_path: &Path,
|
||||
output_path: &Path,
|
||||
enc_meta: &EncryptionMetadata,
|
||||
customer_key: Option<&[u8]>,
|
||||
plain_start: u64,
|
||||
plain_end_inclusive: u64,
|
||||
) -> Result<u64, CryptoError> {
|
||||
let chunk_size = enc_meta.chunk_size.ok_or_else(|| {
|
||||
CryptoError::EncryptionFailed("chunk_size missing from encryption metadata".into())
|
||||
})?;
|
||||
let plaintext_size = enc_meta.plaintext_size.ok_or_else(|| {
|
||||
CryptoError::EncryptionFailed("plaintext_size missing from encryption metadata".into())
|
||||
})?;
|
||||
|
||||
let (data_key, nonce) = self.resolve_data_key(enc_meta, customer_key).await?;
|
||||
|
||||
let ip = input_path.to_owned();
|
||||
let op = output_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
decrypt_stream_chunked_range(
|
||||
&ip,
|
||||
&op,
|
||||
&data_key,
|
||||
&nonce,
|
||||
chunk_size,
|
||||
plaintext_size,
|
||||
plain_start,
|
||||
plain_end_inclusive,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -383,12 +457,26 @@ mod tests {
|
||||
nonce: "dGVzdG5vbmNlMTI=".to_string(),
|
||||
encrypted_data_key: Some("c29tZWtleQ==".to_string()),
|
||||
kms_key_id: None,
|
||||
chunk_size: Some(65_536),
|
||||
plaintext_size: Some(1_234_567),
|
||||
};
|
||||
let map = meta.to_metadata_map();
|
||||
let restored = EncryptionMetadata::from_metadata(&map).unwrap();
|
||||
assert_eq!(restored.algorithm, "AES256");
|
||||
assert_eq!(restored.nonce, meta.nonce);
|
||||
assert_eq!(restored.encrypted_data_key, meta.encrypted_data_key);
|
||||
assert_eq!(restored.chunk_size, Some(65_536));
|
||||
assert_eq!(restored.plaintext_size, Some(1_234_567));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encryption_metadata_legacy_missing_sizes() {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("x-amz-server-side-encryption".to_string(), "AES256".into());
|
||||
map.insert("x-amz-encryption-nonce".to_string(), "aGVsbG8=".into());
|
||||
let restored = EncryptionMetadata::from_metadata(&map).unwrap();
|
||||
assert_eq!(restored.chunk_size, None);
|
||||
assert_eq!(restored.plaintext_size, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -23,15 +23,18 @@ serde_urlencoded = "0.7"
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
http-body = "1"
|
||||
http-body-util = "0.1"
|
||||
percent-encoding = { workspace = true }
|
||||
quick-xml = { workspace = true }
|
||||
mime_guess = "2"
|
||||
crc32fast = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
duckdb = { workspace = true }
|
||||
roxmltree = "0.20"
|
||||
parking_lot = { workspace = true }
|
||||
|
||||
@@ -81,7 +81,12 @@ pub struct ServerConfig {
|
||||
pub multipart_min_part_size: u64,
|
||||
pub bulk_delete_max_keys: usize,
|
||||
pub stream_chunk_size: usize,
|
||||
pub request_body_timeout_secs: u64,
|
||||
pub ratelimit_default: RateLimitSetting,
|
||||
pub ratelimit_list_buckets: RateLimitSetting,
|
||||
pub ratelimit_bucket_ops: RateLimitSetting,
|
||||
pub ratelimit_object_ops: RateLimitSetting,
|
||||
pub ratelimit_head_ops: RateLimitSetting,
|
||||
pub ratelimit_admin: RateLimitSetting,
|
||||
pub ratelimit_storage_uri: String,
|
||||
pub ui_enabled: bool,
|
||||
@@ -225,8 +230,17 @@ impl ServerConfig {
|
||||
let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880);
|
||||
let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000);
|
||||
let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576);
|
||||
let request_body_timeout_secs = parse_u64_env("REQUEST_BODY_TIMEOUT_SECONDS", 60);
|
||||
let ratelimit_default =
|
||||
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60));
|
||||
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(5000, 60));
|
||||
let ratelimit_list_buckets =
|
||||
parse_rate_limit_env("RATE_LIMIT_LIST_BUCKETS", ratelimit_default);
|
||||
let ratelimit_bucket_ops =
|
||||
parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default);
|
||||
let ratelimit_object_ops =
|
||||
parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default);
|
||||
let ratelimit_head_ops =
|
||||
parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default);
|
||||
let ratelimit_admin =
|
||||
parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60));
|
||||
let ratelimit_storage_uri =
|
||||
@@ -304,7 +318,12 @@ impl ServerConfig {
|
||||
multipart_min_part_size,
|
||||
bulk_delete_max_keys,
|
||||
stream_chunk_size,
|
||||
request_body_timeout_secs,
|
||||
ratelimit_default,
|
||||
ratelimit_list_buckets,
|
||||
ratelimit_bucket_ops,
|
||||
ratelimit_object_ops,
|
||||
ratelimit_head_ops,
|
||||
ratelimit_admin,
|
||||
ratelimit_storage_uri,
|
||||
ui_enabled,
|
||||
@@ -387,7 +406,12 @@ impl Default for ServerConfig {
|
||||
multipart_min_part_size: 5_242_880,
|
||||
bulk_delete_max_keys: 1000,
|
||||
stream_chunk_size: 1_048_576,
|
||||
ratelimit_default: RateLimitSetting::new(200, 60),
|
||||
request_body_timeout_secs: 60,
|
||||
ratelimit_default: RateLimitSetting::new(5000, 60),
|
||||
ratelimit_list_buckets: RateLimitSetting::new(5000, 60),
|
||||
ratelimit_bucket_ops: RateLimitSetting::new(5000, 60),
|
||||
ratelimit_object_ops: RateLimitSetting::new(5000, 60),
|
||||
ratelimit_head_ops: RateLimitSetting::new(5000, 60),
|
||||
ratelimit_admin: RateLimitSetting::new(60, 60),
|
||||
ratelimit_storage_uri: "memory://".to_string(),
|
||||
ui_enabled: true,
|
||||
@@ -472,7 +496,31 @@ fn parse_list_env(key: &str, default: &str) -> Vec<String> {
|
||||
}
|
||||
|
||||
pub fn parse_rate_limit(value: &str) -> Option<RateLimitSetting> {
|
||||
let parts = value.split_whitespace().collect::<Vec<_>>();
|
||||
let trimmed = value.trim();
|
||||
if let Some((requests, window)) = trimmed.split_once('/') {
|
||||
let max_requests = requests.trim().parse::<u32>().ok()?;
|
||||
if max_requests == 0 {
|
||||
return None;
|
||||
}
|
||||
let window_str = window.trim().to_ascii_lowercase();
|
||||
let window_seconds = if let Ok(n) = window_str.parse::<u64>() {
|
||||
if n == 0 {
|
||||
return None;
|
||||
}
|
||||
n
|
||||
} else {
|
||||
match window_str.as_str() {
|
||||
"s" | "sec" | "second" | "seconds" => 1,
|
||||
"m" | "min" | "minute" | "minutes" => 60,
|
||||
"h" | "hr" | "hour" | "hours" => 3600,
|
||||
"d" | "day" | "days" => 86_400,
|
||||
_ => return None,
|
||||
}
|
||||
};
|
||||
return Some(RateLimitSetting::new(max_requests, window_seconds));
|
||||
}
|
||||
|
||||
let parts = trimmed.split_whitespace().collect::<Vec<_>>();
|
||||
if parts.len() != 3 || !parts[1].eq_ignore_ascii_case("per") {
|
||||
return None;
|
||||
}
|
||||
@@ -517,6 +565,15 @@ mod tests {
|
||||
parse_rate_limit("3 per hours"),
|
||||
Some(RateLimitSetting::new(3, 3600))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_rate_limit("50000/60"),
|
||||
Some(RateLimitSetting::new(50000, 60))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_rate_limit("100/minute"),
|
||||
Some(RateLimitSetting::new(100, 60))
|
||||
);
|
||||
assert_eq!(parse_rate_limit("0/60"), None);
|
||||
assert_eq!(parse_rate_limit("0 per minute"), None);
|
||||
assert_eq!(parse_rate_limit("bad"), None);
|
||||
}
|
||||
@@ -532,7 +589,7 @@ mod tests {
|
||||
|
||||
assert_eq!(config.object_key_max_length_bytes, 1024);
|
||||
assert_eq!(config.object_tag_limit, 50);
|
||||
assert_eq!(config.ratelimit_default, RateLimitSetting::new(200, 60));
|
||||
assert_eq!(config.ratelimit_default, RateLimitSetting::new(5000, 60));
|
||||
|
||||
std::env::remove_var("OBJECT_TAG_LIMIT");
|
||||
std::env::remove_var("RATE_LIMIT_DEFAULT");
|
||||
|
||||
@@ -20,6 +20,13 @@ fn xml_response(status: StatusCode, xml: String) -> Response {
|
||||
(status, [("content-type", "application/xml")], xml).into_response()
|
||||
}
|
||||
|
||||
fn stored_xml(value: &serde_json::Value) -> String {
|
||||
match value {
|
||||
serde_json::Value::String(s) => s.clone(),
|
||||
other => other.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_err(err: myfsio_storage::error::StorageError) -> Response {
|
||||
let s3err = S3Error::from(err);
|
||||
let status =
|
||||
@@ -52,17 +59,31 @@ fn custom_xml_error(status: StatusCode, code: &str, message: &str) -> Response {
|
||||
}
|
||||
|
||||
pub async fn get_versioning(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.is_versioning_enabled(bucket).await {
|
||||
Ok(enabled) => {
|
||||
let status_str = if enabled { "Enabled" } else { "Suspended" };
|
||||
let xml = format!(
|
||||
match state.storage.get_versioning_status(bucket).await {
|
||||
Ok(status) => {
|
||||
let body = match status {
|
||||
myfsio_common::types::VersioningStatus::Enabled => {
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
<Status>{}</Status>\
|
||||
</VersioningConfiguration>",
|
||||
status_str
|
||||
);
|
||||
xml_response(StatusCode::OK, xml)
|
||||
<Status>Enabled</Status>\
|
||||
</VersioningConfiguration>"
|
||||
.to_string()
|
||||
}
|
||||
myfsio_common::types::VersioningStatus::Suspended => {
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
<Status>Suspended</Status>\
|
||||
</VersioningConfiguration>"
|
||||
.to_string()
|
||||
}
|
||||
myfsio_common::types::VersioningStatus::Disabled => {
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<VersioningConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
</VersioningConfiguration>"
|
||||
.to_string()
|
||||
}
|
||||
};
|
||||
xml_response(StatusCode::OK, body)
|
||||
}
|
||||
Err(e) => storage_err(e),
|
||||
}
|
||||
@@ -80,9 +101,22 @@ pub async fn put_versioning(state: &AppState, bucket: &str, body: Body) -> Respo
|
||||
};
|
||||
|
||||
let xml_str = String::from_utf8_lossy(&body_bytes);
|
||||
let enabled = xml_str.contains("<Status>Enabled</Status>");
|
||||
let status = if xml_str.contains("<Status>Enabled</Status>") {
|
||||
myfsio_common::types::VersioningStatus::Enabled
|
||||
} else if xml_str.contains("<Status>Suspended</Status>") {
|
||||
myfsio_common::types::VersioningStatus::Suspended
|
||||
} else {
|
||||
return xml_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
S3Error::new(
|
||||
S3ErrorCode::MalformedXML,
|
||||
"VersioningConfiguration Status must be Enabled or Suspended",
|
||||
)
|
||||
.to_xml(),
|
||||
);
|
||||
};
|
||||
|
||||
match state.storage.set_versioning(bucket, enabled).await {
|
||||
match state.storage.set_versioning_status(bucket, status).await {
|
||||
Ok(()) => StatusCode::OK.into_response(),
|
||||
Err(e) => storage_err(e),
|
||||
}
|
||||
@@ -151,7 +185,7 @@ pub async fn get_cors(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(cors) = &config.cors {
|
||||
xml_response(StatusCode::OK, cors.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(cors))
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
@@ -214,14 +248,11 @@ pub async fn get_encryption(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(enc) = &config.encryption {
|
||||
xml_response(StatusCode::OK, enc.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(enc))
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
S3Error::new(
|
||||
S3ErrorCode::InvalidRequest,
|
||||
"The server side encryption configuration was not found",
|
||||
)
|
||||
S3Error::from_code(S3ErrorCode::ServerSideEncryptionConfigurationNotFoundError)
|
||||
.to_xml(),
|
||||
)
|
||||
}
|
||||
@@ -266,15 +297,11 @@ pub async fn get_lifecycle(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(lc) = &config.lifecycle {
|
||||
xml_response(StatusCode::OK, lc.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(lc))
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
S3Error::new(
|
||||
S3ErrorCode::NoSuchKey,
|
||||
"The lifecycle configuration does not exist",
|
||||
)
|
||||
.to_xml(),
|
||||
S3Error::from_code(S3ErrorCode::NoSuchLifecycleConfiguration).to_xml(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -421,7 +448,7 @@ pub async fn get_policy(state: &AppState, bucket: &str) -> Response {
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
S3Error::new(S3ErrorCode::NoSuchKey, "No bucket policy attached").to_xml(),
|
||||
S3Error::from_code(S3ErrorCode::NoSuchBucketPolicy).to_xml(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -497,10 +524,7 @@ pub async fn get_replication(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(replication) = &config.replication {
|
||||
match replication {
|
||||
serde_json::Value::String(s) => xml_response(StatusCode::OK, s.clone()),
|
||||
other => xml_response(StatusCode::OK, other.to_string()),
|
||||
}
|
||||
xml_response(StatusCode::OK, stored_xml(replication))
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
@@ -593,7 +617,7 @@ pub async fn get_acl(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(acl) = &config.acl {
|
||||
xml_response(StatusCode::OK, acl.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(acl))
|
||||
} else {
|
||||
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
@@ -633,7 +657,7 @@ pub async fn get_website(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(ws) = &config.website {
|
||||
xml_response(StatusCode::OK, ws.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(ws))
|
||||
} else {
|
||||
xml_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
@@ -685,7 +709,7 @@ pub async fn get_object_lock(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(ol) = &config.object_lock {
|
||||
xml_response(StatusCode::OK, ol.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(ol))
|
||||
} else {
|
||||
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ObjectLockConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
@@ -702,7 +726,7 @@ pub async fn get_notification(state: &AppState, bucket: &str) -> Response {
|
||||
match state.storage.get_bucket_config(bucket).await {
|
||||
Ok(config) => {
|
||||
if let Some(n) = &config.notification {
|
||||
xml_response(StatusCode::OK, n.to_string())
|
||||
xml_response(StatusCode::OK, stored_xml(n))
|
||||
} else {
|
||||
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<NotificationConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
|
||||
@@ -1042,22 +1066,23 @@ pub async fn list_object_versions(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
prefix: Option<&str>,
|
||||
delimiter: Option<&str>,
|
||||
key_marker: Option<&str>,
|
||||
version_id_marker: Option<&str>,
|
||||
max_keys: usize,
|
||||
) -> Response {
|
||||
match state.storage.list_buckets().await {
|
||||
Ok(buckets) => {
|
||||
if !buckets.iter().any(|b| b.name == bucket) {
|
||||
match state.storage.bucket_exists(bucket).await {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
return storage_err(myfsio_storage::error::StorageError::BucketNotFound(
|
||||
bucket.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(e) => return storage_err(e),
|
||||
}
|
||||
|
||||
let fetch_limit = max_keys.saturating_add(1).max(1);
|
||||
let params = myfsio_common::types::ListParams {
|
||||
max_keys: fetch_limit,
|
||||
max_keys: usize::MAX,
|
||||
prefix: prefix.map(ToOwned::to_owned),
|
||||
..Default::default()
|
||||
};
|
||||
@@ -1066,7 +1091,8 @@ pub async fn list_object_versions(
|
||||
Ok(result) => result,
|
||||
Err(e) => return storage_err(e),
|
||||
};
|
||||
let objects = object_result.objects;
|
||||
let live_objects = object_result.objects;
|
||||
|
||||
let archived_versions = match state
|
||||
.storage
|
||||
.list_bucket_object_versions(bucket, prefix)
|
||||
@@ -1076,63 +1102,215 @@ pub async fn list_object_versions(
|
||||
Err(e) => return storage_err(e),
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Entry {
|
||||
key: String,
|
||||
version_id: String,
|
||||
last_modified: chrono::DateTime<chrono::Utc>,
|
||||
etag: Option<String>,
|
||||
size: u64,
|
||||
storage_class: String,
|
||||
is_delete_marker: bool,
|
||||
}
|
||||
|
||||
let mut entries: Vec<Entry> = Vec::with_capacity(live_objects.len() + archived_versions.len());
|
||||
for obj in &live_objects {
|
||||
entries.push(Entry {
|
||||
key: obj.key.clone(),
|
||||
version_id: obj.version_id.clone().unwrap_or_else(|| "null".to_string()),
|
||||
last_modified: obj.last_modified,
|
||||
etag: obj.etag.clone(),
|
||||
size: obj.size,
|
||||
storage_class: obj
|
||||
.storage_class
|
||||
.clone()
|
||||
.unwrap_or_else(|| "STANDARD".to_string()),
|
||||
is_delete_marker: false,
|
||||
});
|
||||
}
|
||||
for version in &archived_versions {
|
||||
entries.push(Entry {
|
||||
key: version.key.clone(),
|
||||
version_id: version.version_id.clone(),
|
||||
last_modified: version.last_modified,
|
||||
etag: version.etag.clone(),
|
||||
size: version.size,
|
||||
storage_class: "STANDARD".to_string(),
|
||||
is_delete_marker: version.is_delete_marker,
|
||||
});
|
||||
}
|
||||
|
||||
entries.sort_by(|a, b| {
|
||||
a.key
|
||||
.cmp(&b.key)
|
||||
.then_with(|| b.last_modified.cmp(&a.last_modified))
|
||||
.then_with(|| a.version_id.cmp(&b.version_id))
|
||||
});
|
||||
|
||||
let mut latest_marked: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
let mut is_latest_flags: Vec<bool> = Vec::with_capacity(entries.len());
|
||||
for entry in &entries {
|
||||
if latest_marked.insert(entry.key.clone()) {
|
||||
is_latest_flags.push(true);
|
||||
} else {
|
||||
is_latest_flags.push(false);
|
||||
}
|
||||
}
|
||||
|
||||
let km = key_marker.unwrap_or("");
|
||||
let vim = version_id_marker.unwrap_or("");
|
||||
let start_index = if km.is_empty() {
|
||||
0
|
||||
} else if vim.is_empty() {
|
||||
entries
|
||||
.iter()
|
||||
.position(|e| e.key.as_str() > km)
|
||||
.unwrap_or(entries.len())
|
||||
} else if let Some(pos) = entries
|
||||
.iter()
|
||||
.position(|e| e.key == km && e.version_id == vim)
|
||||
{
|
||||
pos + 1
|
||||
} else {
|
||||
entries
|
||||
.iter()
|
||||
.position(|e| e.key.as_str() > km)
|
||||
.unwrap_or(entries.len())
|
||||
};
|
||||
|
||||
let delim = delimiter.unwrap_or("");
|
||||
let prefix_str = prefix.unwrap_or("");
|
||||
|
||||
let mut common_prefixes: Vec<String> = Vec::new();
|
||||
let mut seen_prefixes: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
let mut rendered = String::new();
|
||||
let mut count = 0usize;
|
||||
let mut is_truncated = false;
|
||||
let mut next_key_marker: Option<String> = None;
|
||||
let mut next_version_id_marker: Option<String> = None;
|
||||
let mut last_emitted: Option<(String, String)> = None;
|
||||
|
||||
let mut idx = start_index;
|
||||
while idx < entries.len() {
|
||||
let entry = &entries[idx];
|
||||
let is_latest = is_latest_flags[idx];
|
||||
|
||||
if !delim.is_empty() {
|
||||
let rest = entry.key.strip_prefix(prefix_str).unwrap_or(&entry.key);
|
||||
if let Some(delim_pos) = rest.find(delim) {
|
||||
let grouped = entry.key[..prefix_str.len() + delim_pos + delim.len()].to_string();
|
||||
if seen_prefixes.contains(&grouped) {
|
||||
idx += 1;
|
||||
continue;
|
||||
}
|
||||
if count >= max_keys {
|
||||
is_truncated = true;
|
||||
if let Some((k, v)) = last_emitted.clone() {
|
||||
next_key_marker = Some(k);
|
||||
next_version_id_marker = Some(v);
|
||||
}
|
||||
break;
|
||||
}
|
||||
common_prefixes.push(grouped.clone());
|
||||
seen_prefixes.insert(grouped.clone());
|
||||
count += 1;
|
||||
|
||||
let mut group_last = (entry.key.clone(), entry.version_id.clone());
|
||||
idx += 1;
|
||||
while idx < entries.len() && entries[idx].key.starts_with(&grouped) {
|
||||
group_last = (entries[idx].key.clone(), entries[idx].version_id.clone());
|
||||
idx += 1;
|
||||
}
|
||||
last_emitted = Some(group_last);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if count >= max_keys {
|
||||
is_truncated = true;
|
||||
if let Some((k, v)) = last_emitted.clone() {
|
||||
next_key_marker = Some(k);
|
||||
next_version_id_marker = Some(v);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
let tag = if entry.is_delete_marker {
|
||||
"DeleteMarker"
|
||||
} else {
|
||||
"Version"
|
||||
};
|
||||
rendered.push_str(&format!("<{}>", tag));
|
||||
rendered.push_str(&format!("<Key>{}</Key>", xml_escape(&entry.key)));
|
||||
rendered.push_str(&format!(
|
||||
"<VersionId>{}</VersionId>",
|
||||
xml_escape(&entry.version_id)
|
||||
));
|
||||
rendered.push_str(&format!("<IsLatest>{}</IsLatest>", is_latest));
|
||||
rendered.push_str(&format!(
|
||||
"<LastModified>{}</LastModified>",
|
||||
myfsio_xml::response::format_s3_datetime(&entry.last_modified)
|
||||
));
|
||||
if !entry.is_delete_marker {
|
||||
if let Some(ref etag) = entry.etag {
|
||||
rendered.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
|
||||
}
|
||||
rendered.push_str(&format!("<Size>{}</Size>", entry.size));
|
||||
rendered.push_str(&format!(
|
||||
"<StorageClass>{}</StorageClass>",
|
||||
xml_escape(&entry.storage_class)
|
||||
));
|
||||
}
|
||||
rendered.push_str(&format!("</{}>", tag));
|
||||
|
||||
last_emitted = Some((entry.key.clone(), entry.version_id.clone()));
|
||||
count += 1;
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
let mut xml = String::from(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<ListVersionsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">",
|
||||
);
|
||||
xml.push_str(&format!("<Name>{}</Name>", xml_escape(bucket)));
|
||||
xml.push_str(&format!("<Prefix>{}</Prefix>", xml_escape(prefix_str)));
|
||||
if !km.is_empty() {
|
||||
xml.push_str(&format!("<KeyMarker>{}</KeyMarker>", xml_escape(km)));
|
||||
} else {
|
||||
xml.push_str("<KeyMarker></KeyMarker>");
|
||||
}
|
||||
if !vim.is_empty() {
|
||||
xml.push_str(&format!(
|
||||
"<Prefix>{}</Prefix>",
|
||||
xml_escape(prefix.unwrap_or(""))
|
||||
"<VersionIdMarker>{}</VersionIdMarker>",
|
||||
xml_escape(vim)
|
||||
));
|
||||
} else {
|
||||
xml.push_str("<VersionIdMarker></VersionIdMarker>");
|
||||
}
|
||||
xml.push_str(&format!("<MaxKeys>{}</MaxKeys>", max_keys));
|
||||
|
||||
let current_count = objects.len().min(max_keys);
|
||||
let remaining = max_keys.saturating_sub(current_count);
|
||||
let archived_count = archived_versions.len().min(remaining);
|
||||
let is_truncated = object_result.is_truncated
|
||||
|| objects.len() > current_count
|
||||
|| archived_versions.len() > archived_count;
|
||||
if !delim.is_empty() {
|
||||
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(delim)));
|
||||
}
|
||||
xml.push_str(&format!("<IsTruncated>{}</IsTruncated>", is_truncated));
|
||||
|
||||
for obj in objects.iter().take(current_count) {
|
||||
xml.push_str("<Version>");
|
||||
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&obj.key)));
|
||||
xml.push_str("<VersionId>null</VersionId>");
|
||||
xml.push_str("<IsLatest>true</IsLatest>");
|
||||
if let Some(ref nk) = next_key_marker {
|
||||
xml.push_str(&format!(
|
||||
"<LastModified>{}</LastModified>",
|
||||
myfsio_xml::response::format_s3_datetime(&obj.last_modified)
|
||||
"<NextKeyMarker>{}</NextKeyMarker>",
|
||||
xml_escape(nk)
|
||||
));
|
||||
if let Some(ref etag) = obj.etag {
|
||||
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
|
||||
}
|
||||
xml.push_str(&format!("<Size>{}</Size>", obj.size));
|
||||
if let Some(ref nv) = next_version_id_marker {
|
||||
xml.push_str(&format!(
|
||||
"<StorageClass>{}</StorageClass>",
|
||||
xml_escape(obj.storage_class.as_deref().unwrap_or("STANDARD"))
|
||||
"<NextVersionIdMarker>{}</NextVersionIdMarker>",
|
||||
xml_escape(nv)
|
||||
));
|
||||
xml.push_str("</Version>");
|
||||
}
|
||||
|
||||
for version in archived_versions.iter().take(archived_count) {
|
||||
xml.push_str("<Version>");
|
||||
xml.push_str(&format!("<Key>{}</Key>", xml_escape(&version.key)));
|
||||
xml.push_str(&rendered);
|
||||
for cp in &common_prefixes {
|
||||
xml.push_str(&format!(
|
||||
"<VersionId>{}</VersionId>",
|
||||
xml_escape(&version.version_id)
|
||||
"<CommonPrefixes><Prefix>{}</Prefix></CommonPrefixes>",
|
||||
xml_escape(cp)
|
||||
));
|
||||
xml.push_str("<IsLatest>false</IsLatest>");
|
||||
xml.push_str(&format!(
|
||||
"<LastModified>{}</LastModified>",
|
||||
myfsio_xml::response::format_s3_datetime(&version.last_modified)
|
||||
));
|
||||
if let Some(ref etag) = version.etag {
|
||||
xml.push_str(&format!("<ETag>\"{}\"</ETag>", xml_escape(etag)));
|
||||
}
|
||||
xml.push_str(&format!("<Size>{}</Size>", version.size));
|
||||
xml.push_str("<StorageClass>STANDARD</StorageClass>");
|
||||
xml.push_str("</Version>");
|
||||
}
|
||||
|
||||
xml.push_str("</ListVersionsResult>");
|
||||
@@ -1182,6 +1360,26 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body:
|
||||
.to_xml(),
|
||||
);
|
||||
}
|
||||
for tag in &tags {
|
||||
if tag.key.is_empty() || tag.key.len() > 128 {
|
||||
return xml_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
S3Error::new(S3ErrorCode::InvalidTag, "Tag key length must be 1-128").to_xml(),
|
||||
);
|
||||
}
|
||||
if tag.value.len() > 256 {
|
||||
return xml_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
S3Error::new(S3ErrorCode::InvalidTag, "Tag value length must be 0-256").to_xml(),
|
||||
);
|
||||
}
|
||||
if tag.key.contains('=') {
|
||||
return xml_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
S3Error::new(S3ErrorCode::InvalidTag, "Tag keys must not contain '='").to_xml(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
match state.storage.set_object_tags(bucket, key, &tags).await {
|
||||
Ok(()) => StatusCode::OK.into_response(),
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -117,16 +117,6 @@ pub async fn logout(Extension(session): Extension<SessionHandle>) -> Response {
|
||||
Redirect::to("/login").into_response()
|
||||
}
|
||||
|
||||
pub async fn csrf_error_page(
|
||||
State(state): State<AppState>,
|
||||
Extension(session): Extension<SessionHandle>,
|
||||
) -> Response {
|
||||
let ctx = base_context(&session, None);
|
||||
let mut resp = render(&state, "csrf_error.html", &ctx);
|
||||
*resp.status_mut() = StatusCode::FORBIDDEN;
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn root_redirect() -> Response {
|
||||
Redirect::to("/ui/buckets").into_response()
|
||||
}
|
||||
|
||||
@@ -121,6 +121,8 @@ fn storage_status(err: &StorageError) -> StatusCode {
|
||||
| StorageError::ObjectNotFound { .. }
|
||||
| StorageError::VersionNotFound { .. }
|
||||
| StorageError::UploadNotFound(_) => StatusCode::NOT_FOUND,
|
||||
StorageError::DeleteMarker { .. } => StatusCode::NOT_FOUND,
|
||||
StorageError::MethodNotAllowed(_) => StatusCode::METHOD_NOT_ALLOWED,
|
||||
StorageError::InvalidBucketName(_)
|
||||
| StorageError::InvalidObjectKey(_)
|
||||
| StorageError::InvalidRange
|
||||
@@ -904,6 +906,35 @@ pub struct ListObjectsQuery {
|
||||
pub prefix: Option<String>,
|
||||
#[serde(default)]
|
||||
pub start_after: Option<String>,
|
||||
#[serde(default)]
|
||||
pub delimiter: Option<String>,
|
||||
}
|
||||
|
||||
fn object_json(bucket_name: &str, o: &myfsio_common::types::ObjectMeta) -> Value {
|
||||
json!({
|
||||
"key": o.key,
|
||||
"size": o.size,
|
||||
"last_modified": o.last_modified.to_rfc3339(),
|
||||
"last_modified_iso": o.last_modified.to_rfc3339(),
|
||||
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
"etag": o.etag.clone().unwrap_or_default(),
|
||||
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
|
||||
"content_type": o.content_type.clone().unwrap_or_default(),
|
||||
"download_url": build_ui_object_url(bucket_name, &o.key, "download"),
|
||||
"preview_url": build_ui_object_url(bucket_name, &o.key, "preview"),
|
||||
"delete_endpoint": build_ui_object_url(bucket_name, &o.key, "delete"),
|
||||
"presign_endpoint": build_ui_object_url(bucket_name, &o.key, "presign"),
|
||||
"metadata_url": build_ui_object_url(bucket_name, &o.key, "metadata"),
|
||||
"versions_endpoint": build_ui_object_url(bucket_name, &o.key, "versions"),
|
||||
"restore_template": format!(
|
||||
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
|
||||
bucket_name,
|
||||
encode_object_key(&o.key)
|
||||
),
|
||||
"tags_url": build_ui_object_url(bucket_name, &o.key, "tags"),
|
||||
"copy_url": build_ui_object_url(bucket_name, &o.key, "copy"),
|
||||
"move_url": build_ui_object_url(bucket_name, &o.key, "move"),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn list_bucket_objects(
|
||||
@@ -917,6 +948,49 @@ pub async fn list_bucket_objects(
|
||||
}
|
||||
|
||||
let max_keys = q.max_keys.unwrap_or(1000).min(5000);
|
||||
let versioning_enabled = state
|
||||
.storage
|
||||
.is_versioning_enabled(&bucket_name)
|
||||
.await
|
||||
.unwrap_or(false);
|
||||
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
|
||||
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
|
||||
|
||||
let use_shallow = q.delimiter.as_deref() == Some("/");
|
||||
|
||||
if use_shallow {
|
||||
let params = myfsio_common::types::ShallowListParams {
|
||||
prefix: q.prefix.clone().unwrap_or_default(),
|
||||
delimiter: "/".to_string(),
|
||||
max_keys,
|
||||
continuation_token: q.continuation_token.clone(),
|
||||
};
|
||||
return match state
|
||||
.storage
|
||||
.list_objects_shallow(&bucket_name, ¶ms)
|
||||
.await
|
||||
{
|
||||
Ok(res) => {
|
||||
let objects: Vec<Value> = res
|
||||
.objects
|
||||
.iter()
|
||||
.map(|o| object_json(&bucket_name, o))
|
||||
.collect();
|
||||
Json(json!({
|
||||
"versioning_enabled": versioning_enabled,
|
||||
"total_count": total_count,
|
||||
"is_truncated": res.is_truncated,
|
||||
"next_continuation_token": res.next_continuation_token,
|
||||
"url_templates": url_templates_for(&bucket_name),
|
||||
"objects": objects,
|
||||
"common_prefixes": res.common_prefixes,
|
||||
}))
|
||||
.into_response()
|
||||
}
|
||||
Err(e) => storage_json_error(e),
|
||||
};
|
||||
}
|
||||
|
||||
let params = ListParams {
|
||||
max_keys,
|
||||
continuation_token: q.continuation_token.clone(),
|
||||
@@ -924,46 +998,12 @@ pub async fn list_bucket_objects(
|
||||
start_after: q.start_after.clone(),
|
||||
};
|
||||
|
||||
let versioning_enabled = state
|
||||
.storage
|
||||
.is_versioning_enabled(&bucket_name)
|
||||
.await
|
||||
.unwrap_or(false);
|
||||
|
||||
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
|
||||
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
|
||||
|
||||
match state.storage.list_objects(&bucket_name, ¶ms).await {
|
||||
Ok(res) => {
|
||||
let objects: Vec<Value> = res
|
||||
.objects
|
||||
.iter()
|
||||
.map(|o| {
|
||||
json!({
|
||||
"key": o.key,
|
||||
"size": o.size,
|
||||
"last_modified": o.last_modified.to_rfc3339(),
|
||||
"last_modified_iso": o.last_modified.to_rfc3339(),
|
||||
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
"etag": o.etag.clone().unwrap_or_default(),
|
||||
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
|
||||
"content_type": o.content_type.clone().unwrap_or_default(),
|
||||
"download_url": build_ui_object_url(&bucket_name, &o.key, "download"),
|
||||
"preview_url": build_ui_object_url(&bucket_name, &o.key, "preview"),
|
||||
"delete_endpoint": build_ui_object_url(&bucket_name, &o.key, "delete"),
|
||||
"presign_endpoint": build_ui_object_url(&bucket_name, &o.key, "presign"),
|
||||
"metadata_url": build_ui_object_url(&bucket_name, &o.key, "metadata"),
|
||||
"versions_endpoint": build_ui_object_url(&bucket_name, &o.key, "versions"),
|
||||
"restore_template": format!(
|
||||
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
|
||||
bucket_name,
|
||||
encode_object_key(&o.key)
|
||||
),
|
||||
"tags_url": build_ui_object_url(&bucket_name, &o.key, "tags"),
|
||||
"copy_url": build_ui_object_url(&bucket_name, &o.key, "copy"),
|
||||
"move_url": build_ui_object_url(&bucket_name, &o.key, "move"),
|
||||
})
|
||||
})
|
||||
.map(|o| object_json(&bucket_name, o))
|
||||
.collect();
|
||||
|
||||
Json(json!({
|
||||
@@ -1006,19 +1046,38 @@ pub async fn stream_bucket_objects(
|
||||
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
|
||||
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
|
||||
|
||||
let mut lines: Vec<String> = Vec::new();
|
||||
lines.push(
|
||||
json!({
|
||||
let use_delimiter = q.delimiter.as_deref() == Some("/");
|
||||
let prefix = q.prefix.clone().unwrap_or_default();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(64);
|
||||
|
||||
let meta_line = json!({
|
||||
"type": "meta",
|
||||
"url_templates": url_templates_for(&bucket_name),
|
||||
"versioning_enabled": versioning_enabled,
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
lines.push(json!({ "type": "count", "total_count": total_count }).to_string());
|
||||
.to_string()
|
||||
+ "\n";
|
||||
let count_line = json!({ "type": "count", "total_count": total_count }).to_string() + "\n";
|
||||
|
||||
let use_delimiter = q.delimiter.as_deref() == Some("/");
|
||||
let prefix = q.prefix.clone().unwrap_or_default();
|
||||
let storage = state.storage.clone();
|
||||
let bucket = bucket_name.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if tx
|
||||
.send(Ok(bytes::Bytes::from(meta_line.into_bytes())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
if tx
|
||||
.send(Ok(bytes::Bytes::from(count_line.into_bytes())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if use_delimiter {
|
||||
let mut token: Option<String> = None;
|
||||
@@ -1029,18 +1088,20 @@ pub async fn stream_bucket_objects(
|
||||
max_keys: UI_OBJECT_BROWSER_MAX_KEYS,
|
||||
continuation_token: token.clone(),
|
||||
};
|
||||
match state
|
||||
.storage
|
||||
.list_objects_shallow(&bucket_name, ¶ms)
|
||||
.await
|
||||
{
|
||||
match storage.list_objects_shallow(&bucket, ¶ms).await {
|
||||
Ok(res) => {
|
||||
for p in &res.common_prefixes {
|
||||
lines.push(json!({ "type": "folder", "prefix": p }).to_string());
|
||||
let line = json!({ "type": "folder", "prefix": p }).to_string() + "\n";
|
||||
if tx
|
||||
.send(Ok(bytes::Bytes::from(line.into_bytes())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
for o in &res.objects {
|
||||
lines.push(
|
||||
json!({
|
||||
let line = json!({
|
||||
"type": "object",
|
||||
"key": o.key,
|
||||
"size": o.size,
|
||||
@@ -1050,8 +1111,15 @@ pub async fn stream_bucket_objects(
|
||||
"etag": o.etag.clone().unwrap_or_default(),
|
||||
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
.to_string()
|
||||
+ "\n";
|
||||
if tx
|
||||
.send(Ok(bytes::Bytes::from(line.into_bytes())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
if !res.is_truncated || res.next_continuation_token.is_none() {
|
||||
break;
|
||||
@@ -1059,8 +1127,10 @@ pub async fn stream_bucket_objects(
|
||||
token = res.next_continuation_token;
|
||||
}
|
||||
Err(e) => {
|
||||
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string());
|
||||
break;
|
||||
let line =
|
||||
json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
|
||||
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1077,11 +1147,10 @@ pub async fn stream_bucket_objects(
|
||||
},
|
||||
start_after: None,
|
||||
};
|
||||
match state.storage.list_objects(&bucket_name, ¶ms).await {
|
||||
match storage.list_objects(&bucket, ¶ms).await {
|
||||
Ok(res) => {
|
||||
for o in &res.objects {
|
||||
lines.push(
|
||||
json!({
|
||||
let line = json!({
|
||||
"type": "object",
|
||||
"key": o.key,
|
||||
"size": o.size,
|
||||
@@ -1091,8 +1160,15 @@ pub async fn stream_bucket_objects(
|
||||
"etag": o.etag.clone().unwrap_or_default(),
|
||||
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
.to_string()
|
||||
+ "\n";
|
||||
if tx
|
||||
.send(Ok(bytes::Bytes::from(line.into_bytes())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
if !res.is_truncated || res.next_continuation_token.is_none() {
|
||||
break;
|
||||
@@ -1100,21 +1176,32 @@ pub async fn stream_bucket_objects(
|
||||
token = res.next_continuation_token;
|
||||
}
|
||||
Err(e) => {
|
||||
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string());
|
||||
break;
|
||||
let line =
|
||||
json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
|
||||
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lines.push(json!({ "type": "done" }).to_string());
|
||||
let done_line = json!({ "type": "done" }).to_string() + "\n";
|
||||
let _ = tx
|
||||
.send(Ok(bytes::Bytes::from(done_line.into_bytes())))
|
||||
.await;
|
||||
});
|
||||
|
||||
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||
let body = Body::from_stream(stream);
|
||||
|
||||
let body = lines.join("\n") + "\n";
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
header::CONTENT_TYPE,
|
||||
"application/x-ndjson; charset=utf-8".parse().unwrap(),
|
||||
);
|
||||
headers.insert(header::CACHE_CONTROL, "no-cache".parse().unwrap());
|
||||
headers.insert("x-accel-buffering", "no".parse().unwrap());
|
||||
|
||||
(StatusCode::OK, headers, body).into_response()
|
||||
}
|
||||
|
||||
@@ -2514,7 +2601,7 @@ async fn move_object_json(state: &AppState, bucket: &str, key: &str, body: Body)
|
||||
|
||||
match state.storage.copy_object(bucket, key, dest_bucket, dest_key).await {
|
||||
Ok(_) => match state.storage.delete_object(bucket, key).await {
|
||||
Ok(()) => {
|
||||
Ok(_) => {
|
||||
super::trigger_replication(state, dest_bucket, dest_key, "write");
|
||||
super::trigger_replication(state, bucket, key, "delete");
|
||||
Json(json!({
|
||||
@@ -2589,7 +2676,7 @@ async fn delete_object_json(
|
||||
}
|
||||
|
||||
match state.storage.delete_object(bucket, key).await {
|
||||
Ok(()) => {
|
||||
Ok(_) => {
|
||||
super::trigger_replication(state, bucket, key, "delete");
|
||||
Json(json!({
|
||||
"status": "ok",
|
||||
@@ -2868,7 +2955,7 @@ pub async fn bulk_delete_objects(
|
||||
|
||||
for key in keys {
|
||||
match state.storage.delete_object(&bucket_name, &key).await {
|
||||
Ok(()) => {
|
||||
Ok(_) => {
|
||||
super::trigger_replication(&state, &bucket_name, &key, "delete");
|
||||
if payload.purge_versions {
|
||||
if let Err(err) =
|
||||
|
||||
@@ -227,9 +227,7 @@ async fn parse_form_any(
|
||||
if is_multipart {
|
||||
let boundary = multer::parse_boundary(&content_type)
|
||||
.map_err(|_| "Missing multipart boundary".to_string())?;
|
||||
let stream = futures::stream::once(async move {
|
||||
Ok::<_, std::io::Error>(bytes)
|
||||
});
|
||||
let stream = futures::stream::once(async move { Ok::<_, std::io::Error>(bytes) });
|
||||
let mut multipart = multer::Multipart::new(stream, boundary);
|
||||
let mut out = HashMap::new();
|
||||
while let Some(field) = multipart
|
||||
@@ -2173,10 +2171,7 @@ pub async fn create_bucket(
|
||||
let wants_json = wants_json(&headers);
|
||||
let form = match parse_form_any(&headers, body).await {
|
||||
Ok(fields) => CreateBucketForm {
|
||||
bucket_name: fields
|
||||
.get("bucket_name")
|
||||
.cloned()
|
||||
.unwrap_or_default(),
|
||||
bucket_name: fields.get("bucket_name").cloned().unwrap_or_default(),
|
||||
csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(),
|
||||
},
|
||||
Err(message) => {
|
||||
|
||||
@@ -304,8 +304,7 @@ pub fn create_ui_router(state: state::AppState) -> Router {
|
||||
|
||||
let public = Router::new()
|
||||
.route("/login", get(ui::login_page).post(ui::login_submit))
|
||||
.route("/logout", post(ui::logout).get(ui::logout))
|
||||
.route("/csrf-error", get(ui::csrf_error_page));
|
||||
.route("/logout", post(ui::logout).get(ui::logout));
|
||||
|
||||
let session_state = middleware::SessionLayerState {
|
||||
store: state.sessions.clone(),
|
||||
@@ -317,7 +316,10 @@ pub fn create_ui_router(state: state::AppState) -> Router {
|
||||
protected
|
||||
.merge(public)
|
||||
.fallback(ui::not_found_page)
|
||||
.layer(axum::middleware::from_fn(middleware::csrf_layer))
|
||||
.layer(axum::middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
middleware::csrf_layer,
|
||||
))
|
||||
.layer(axum::middleware::from_fn_with_state(
|
||||
session_state,
|
||||
middleware::session_layer,
|
||||
@@ -333,8 +335,12 @@ pub fn create_ui_router(state: state::AppState) -> Router {
|
||||
}
|
||||
|
||||
pub fn create_router(state: state::AppState) -> Router {
|
||||
let default_rate_limit = middleware::RateLimitLayerState::new(
|
||||
let default_rate_limit = middleware::RateLimitLayerState::with_per_op(
|
||||
state.config.ratelimit_default,
|
||||
state.config.ratelimit_list_buckets,
|
||||
state.config.ratelimit_bucket_ops,
|
||||
state.config.ratelimit_object_ops,
|
||||
state.config.ratelimit_head_ops,
|
||||
state.config.num_trusted_proxies,
|
||||
);
|
||||
let admin_rate_limit = middleware::RateLimitLayerState::new(
|
||||
@@ -575,11 +581,22 @@ pub fn create_router(state: state::AppState) -> Router {
|
||||
middleware::rate_limit_layer,
|
||||
));
|
||||
|
||||
let request_body_timeout =
|
||||
std::time::Duration::from_secs(state.config.request_body_timeout_secs);
|
||||
|
||||
api_router
|
||||
.merge(admin_router)
|
||||
.layer(axum::middleware::from_fn(middleware::server_header))
|
||||
.layer(cors_layer(&state.config))
|
||||
.layer(axum::middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
middleware::bucket_cors_layer,
|
||||
))
|
||||
.layer(axum::middleware::from_fn(middleware::request_log_layer))
|
||||
.layer(tower_http::compression::CompressionLayer::new())
|
||||
.layer(tower_http::timeout::RequestBodyTimeoutLayer::new(
|
||||
request_body_timeout,
|
||||
))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
|
||||
@@ -189,6 +189,11 @@ async fn main() {
|
||||
|
||||
let shutdown = shutdown_signal_shared();
|
||||
let api_shutdown = shutdown.clone();
|
||||
let api_listener = axum::serve::ListenerExt::tap_io(api_listener, |stream| {
|
||||
if let Err(err) = stream.set_nodelay(true) {
|
||||
tracing::trace!("failed to set TCP_NODELAY on api socket: {}", err);
|
||||
}
|
||||
});
|
||||
let api_task = tokio::spawn(async move {
|
||||
axum::serve(
|
||||
api_listener,
|
||||
@@ -202,6 +207,11 @@ async fn main() {
|
||||
|
||||
let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) {
|
||||
let ui_shutdown = shutdown.clone();
|
||||
let listener = axum::serve::ListenerExt::tap_io(listener, |stream| {
|
||||
if let Err(err) = stream.set_nodelay(true) {
|
||||
tracing::trace!("failed to set TCP_NODELAY on ui socket: {}", err);
|
||||
}
|
||||
});
|
||||
Some(tokio::spawn(async move {
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move {
|
||||
|
||||
@@ -12,9 +12,36 @@ use serde_json::Value;
|
||||
use std::time::Instant;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody};
|
||||
use crate::services::acl::acl_from_bucket_config;
|
||||
use crate::state::AppState;
|
||||
|
||||
fn wrap_body_for_sha256_verification(req: &mut Request) {
|
||||
let declared = match req
|
||||
.headers()
|
||||
.get("x-amz-content-sha256")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
{
|
||||
Some(v) => v.to_string(),
|
||||
None => return,
|
||||
};
|
||||
if !is_hex_sha256(&declared) {
|
||||
return;
|
||||
}
|
||||
let is_chunked = req
|
||||
.headers()
|
||||
.get("content-encoding")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v.to_ascii_lowercase().contains("aws-chunked"))
|
||||
.unwrap_or(false);
|
||||
if is_chunked {
|
||||
return;
|
||||
}
|
||||
let body = std::mem::replace(req.body_mut(), axum::body::Body::empty());
|
||||
let wrapped = Sha256VerifyBody::new(body, declared);
|
||||
*req.body_mut() = axum::body::Body::new(wrapped);
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct OriginalCanonicalPath(String);
|
||||
|
||||
@@ -475,6 +502,7 @@ pub async fn auth_layer(State(state): State<AppState>, mut req: Request, next: N
|
||||
error_response(err, &auth_path)
|
||||
} else {
|
||||
req.extensions_mut().insert(principal);
|
||||
wrap_body_for_sha256_verification(&mut req);
|
||||
next.run(req).await
|
||||
}
|
||||
}
|
||||
@@ -1102,7 +1130,9 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
|
||||
let parts: Vec<&str> = auth_str
|
||||
.strip_prefix("AWS4-HMAC-SHA256 ")
|
||||
.unwrap()
|
||||
.split(", ")
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
if parts.len() != 3 {
|
||||
@@ -1112,9 +1142,24 @@ fn verify_sigv4_header(state: &AppState, req: &Request, auth_str: &str) -> AuthR
|
||||
));
|
||||
}
|
||||
|
||||
let credential = parts[0].strip_prefix("Credential=").unwrap_or("");
|
||||
let signed_headers_str = parts[1].strip_prefix("SignedHeaders=").unwrap_or("");
|
||||
let provided_signature = parts[2].strip_prefix("Signature=").unwrap_or("");
|
||||
let mut credential: &str = "";
|
||||
let mut signed_headers_str: &str = "";
|
||||
let mut provided_signature: &str = "";
|
||||
for part in &parts {
|
||||
if let Some(v) = part.strip_prefix("Credential=") {
|
||||
credential = v;
|
||||
} else if let Some(v) = part.strip_prefix("SignedHeaders=") {
|
||||
signed_headers_str = v;
|
||||
} else if let Some(v) = part.strip_prefix("Signature=") {
|
||||
provided_signature = v;
|
||||
}
|
||||
}
|
||||
if credential.is_empty() || signed_headers_str.is_empty() || provided_signature.is_empty() {
|
||||
return AuthResult::Denied(S3Error::new(
|
||||
S3ErrorCode::InvalidArgument,
|
||||
"Malformed Authorization header",
|
||||
));
|
||||
}
|
||||
|
||||
let cred_parts: Vec<&str> = credential.split('/').collect();
|
||||
if cred_parts.len() != 5 {
|
||||
@@ -1299,7 +1344,7 @@ fn verify_sigv4_query(state: &AppState, req: &Request) -> AuthResult {
|
||||
}
|
||||
if elapsed < -(state.config.sigv4_timestamp_tolerance_secs as i64) {
|
||||
return AuthResult::Denied(S3Error::new(
|
||||
S3ErrorCode::AccessDenied,
|
||||
S3ErrorCode::RequestTimeTooSkewed,
|
||||
"Request is too far in the future",
|
||||
));
|
||||
}
|
||||
@@ -1369,8 +1414,11 @@ fn check_timestamp_freshness(amz_date: &str, tolerance_secs: u64) -> Option<S3Er
|
||||
|
||||
if diff > tolerance_secs {
|
||||
return Some(S3Error::new(
|
||||
S3ErrorCode::AccessDenied,
|
||||
"Request timestamp too old or too far in the future",
|
||||
S3ErrorCode::RequestTimeTooSkewed,
|
||||
format!(
|
||||
"The difference between the request time and the server's time is too large ({}s, tolerance {}s)",
|
||||
diff, tolerance_secs
|
||||
),
|
||||
));
|
||||
}
|
||||
None
|
||||
|
||||
281
crates/myfsio-server/src/middleware/bucket_cors.rs
Normal file
281
crates/myfsio-server/src/middleware/bucket_cors.rs
Normal file
@@ -0,0 +1,281 @@
|
||||
use axum::extract::{Request, State};
|
||||
use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
|
||||
use axum::middleware::Next;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use myfsio_storage::traits::StorageEngine;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct CorsRule {
|
||||
allowed_origins: Vec<String>,
|
||||
allowed_methods: Vec<String>,
|
||||
allowed_headers: Vec<String>,
|
||||
expose_headers: Vec<String>,
|
||||
max_age_seconds: Option<u64>,
|
||||
}
|
||||
|
||||
fn parse_cors_config(xml: &str) -> Vec<CorsRule> {
|
||||
let doc = match roxmltree::Document::parse(xml) {
|
||||
Ok(d) => d,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
let mut rules = Vec::new();
|
||||
for rule_node in doc
|
||||
.descendants()
|
||||
.filter(|node| node.is_element() && node.tag_name().name() == "CORSRule")
|
||||
{
|
||||
let mut rule = CorsRule::default();
|
||||
for child in rule_node.children().filter(|n| n.is_element()) {
|
||||
let text = child.text().unwrap_or("").trim().to_string();
|
||||
match child.tag_name().name() {
|
||||
"AllowedOrigin" => rule.allowed_origins.push(text),
|
||||
"AllowedMethod" => rule.allowed_methods.push(text.to_ascii_uppercase()),
|
||||
"AllowedHeader" => rule.allowed_headers.push(text),
|
||||
"ExposeHeader" => rule.expose_headers.push(text),
|
||||
"MaxAgeSeconds" => {
|
||||
if let Ok(v) = text.parse::<u64>() {
|
||||
rule.max_age_seconds = Some(v);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
rules.push(rule);
|
||||
}
|
||||
rules
|
||||
}
|
||||
|
||||
fn match_origin(pattern: &str, origin: &str) -> bool {
|
||||
if pattern == "*" {
|
||||
return true;
|
||||
}
|
||||
if pattern == origin {
|
||||
return true;
|
||||
}
|
||||
if let Some(suffix) = pattern.strip_prefix('*') {
|
||||
return origin.ends_with(suffix);
|
||||
}
|
||||
if let Some(prefix) = pattern.strip_suffix('*') {
|
||||
return origin.starts_with(prefix);
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn match_header(pattern: &str, header: &str) -> bool {
|
||||
if pattern == "*" {
|
||||
return true;
|
||||
}
|
||||
pattern.eq_ignore_ascii_case(header)
|
||||
}
|
||||
|
||||
fn find_matching_rule<'a>(
|
||||
rules: &'a [CorsRule],
|
||||
origin: &str,
|
||||
method: &str,
|
||||
request_headers: &[&str],
|
||||
) -> Option<&'a CorsRule> {
|
||||
rules.iter().find(|rule| {
|
||||
let origin_match = rule
|
||||
.allowed_origins
|
||||
.iter()
|
||||
.any(|p| match_origin(p, origin));
|
||||
if !origin_match {
|
||||
return false;
|
||||
}
|
||||
let method_match = rule
|
||||
.allowed_methods
|
||||
.iter()
|
||||
.any(|m| m.eq_ignore_ascii_case(method));
|
||||
if !method_match {
|
||||
return false;
|
||||
}
|
||||
request_headers.iter().all(|h| {
|
||||
rule.allowed_headers
|
||||
.iter()
|
||||
.any(|pattern| match_header(pattern, h))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn find_matching_rule_for_actual<'a>(
|
||||
rules: &'a [CorsRule],
|
||||
origin: &str,
|
||||
method: &str,
|
||||
) -> Option<&'a CorsRule> {
|
||||
rules.iter().find(|rule| {
|
||||
rule.allowed_origins
|
||||
.iter()
|
||||
.any(|p| match_origin(p, origin))
|
||||
&& rule
|
||||
.allowed_methods
|
||||
.iter()
|
||||
.any(|m| m.eq_ignore_ascii_case(method))
|
||||
})
|
||||
}
|
||||
|
||||
fn bucket_from_path(path: &str) -> Option<&str> {
|
||||
let trimmed = path.trim_start_matches('/');
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
if trimmed.starts_with("admin/")
|
||||
|| trimmed.starts_with("myfsio/")
|
||||
|| trimmed.starts_with("kms/")
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let first = trimmed.split('/').next().unwrap_or("");
|
||||
if myfsio_storage::validation::validate_bucket_name(first).is_some() {
|
||||
return None;
|
||||
}
|
||||
Some(first)
|
||||
}
|
||||
|
||||
async fn bucket_from_host(state: &AppState, headers: &HeaderMap) -> Option<String> {
|
||||
let host = headers
|
||||
.get("host")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.split(':').next())?
|
||||
.trim()
|
||||
.to_ascii_lowercase();
|
||||
let (candidate, _) = host.split_once('.')?;
|
||||
if myfsio_storage::validation::validate_bucket_name(candidate).is_some() {
|
||||
return None;
|
||||
}
|
||||
match state.storage.bucket_exists(candidate).await {
|
||||
Ok(true) => Some(candidate.to_string()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve_bucket(state: &AppState, headers: &HeaderMap, path: &str) -> Option<String> {
|
||||
if let Some(name) = bucket_from_host(state, headers).await {
|
||||
return Some(name);
|
||||
}
|
||||
bucket_from_path(path).map(str::to_string)
|
||||
}
|
||||
|
||||
fn apply_rule_headers(headers: &mut axum::http::HeaderMap, rule: &CorsRule, origin: &str) {
|
||||
headers.remove("access-control-allow-origin");
|
||||
headers.remove("vary");
|
||||
if let Ok(val) = HeaderValue::from_str(origin) {
|
||||
headers.insert("access-control-allow-origin", val);
|
||||
}
|
||||
headers.insert("vary", HeaderValue::from_static("Origin"));
|
||||
if !rule.expose_headers.is_empty() {
|
||||
let value = rule.expose_headers.join(", ");
|
||||
if let Ok(val) = HeaderValue::from_str(&value) {
|
||||
headers.remove("access-control-expose-headers");
|
||||
headers.insert("access-control-expose-headers", val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn strip_cors_response_headers(headers: &mut HeaderMap) {
|
||||
headers.remove("access-control-allow-origin");
|
||||
headers.remove("access-control-allow-credentials");
|
||||
headers.remove("access-control-expose-headers");
|
||||
headers.remove("access-control-allow-methods");
|
||||
headers.remove("access-control-allow-headers");
|
||||
headers.remove("access-control-max-age");
|
||||
}
|
||||
|
||||
pub async fn bucket_cors_layer(
|
||||
State(state): State<AppState>,
|
||||
req: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
let path = req.uri().path().to_string();
|
||||
let bucket = match resolve_bucket(&state, req.headers(), &path).await {
|
||||
Some(name) => name,
|
||||
None => return next.run(req).await,
|
||||
};
|
||||
|
||||
let origin = req
|
||||
.headers()
|
||||
.get("origin")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let bucket_rules = if origin.is_some() {
|
||||
match state.storage.get_bucket_config(&bucket).await {
|
||||
Ok(cfg) => cfg
|
||||
.cors
|
||||
.as_ref()
|
||||
.map(|v| match v {
|
||||
serde_json::Value::String(s) => s.clone(),
|
||||
other => other.to_string(),
|
||||
})
|
||||
.map(|xml| parse_cors_config(&xml))
|
||||
.filter(|rules| !rules.is_empty()),
|
||||
Err(_) => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let is_preflight = req.method() == Method::OPTIONS
|
||||
&& req.headers().contains_key("access-control-request-method");
|
||||
|
||||
if is_preflight {
|
||||
if let (Some(origin), Some(rules)) = (origin.as_deref(), bucket_rules.as_ref()) {
|
||||
let req_method = req
|
||||
.headers()
|
||||
.get("access-control-request-method")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let req_headers_raw = req
|
||||
.headers()
|
||||
.get("access-control-request-headers")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let req_headers: Vec<&str> = req_headers_raw
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
if let Some(rule) = find_matching_rule(rules, origin, req_method, &req_headers) {
|
||||
let mut resp = StatusCode::NO_CONTENT.into_response();
|
||||
apply_rule_headers(resp.headers_mut(), rule, origin);
|
||||
let methods_value = rule.allowed_methods.join(", ");
|
||||
if let Ok(val) = HeaderValue::from_str(&methods_value) {
|
||||
resp.headers_mut()
|
||||
.insert("access-control-allow-methods", val);
|
||||
}
|
||||
let headers_value = if rule.allowed_headers.iter().any(|h| h == "*") {
|
||||
req_headers_raw.to_string()
|
||||
} else {
|
||||
rule.allowed_headers.join(", ")
|
||||
};
|
||||
if !headers_value.is_empty() {
|
||||
if let Ok(val) = HeaderValue::from_str(&headers_value) {
|
||||
resp.headers_mut()
|
||||
.insert("access-control-allow-headers", val);
|
||||
}
|
||||
}
|
||||
if let Some(max_age) = rule.max_age_seconds {
|
||||
if let Ok(val) = HeaderValue::from_str(&max_age.to_string()) {
|
||||
resp.headers_mut().insert("access-control-max-age", val);
|
||||
}
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
return (StatusCode::FORBIDDEN, "CORSResponse: CORS is not enabled").into_response();
|
||||
}
|
||||
}
|
||||
|
||||
let method = req.method().clone();
|
||||
let mut resp = next.run(req).await;
|
||||
|
||||
if let (Some(origin), Some(rules)) = (origin.as_deref(), bucket_rules.as_ref()) {
|
||||
if let Some(rule) = find_matching_rule_for_actual(rules, origin, method.as_str()) {
|
||||
apply_rule_headers(resp.headers_mut(), rule, origin);
|
||||
} else {
|
||||
strip_cors_response_headers(resp.headers_mut());
|
||||
}
|
||||
}
|
||||
|
||||
resp
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
mod auth;
|
||||
mod bucket_cors;
|
||||
pub mod ratelimit;
|
||||
pub mod session;
|
||||
pub(crate) mod sha_body;
|
||||
|
||||
pub use auth::auth_layer;
|
||||
pub use bucket_cors::bucket_cors_layer;
|
||||
pub use ratelimit::{rate_limit_layer, RateLimitLayerState};
|
||||
pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState};
|
||||
|
||||
@@ -20,6 +23,42 @@ pub async fn server_header(req: Request, next: Next) -> Response {
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn request_log_layer(req: Request, next: Next) -> Response {
|
||||
let start = Instant::now();
|
||||
let method = req.method().clone();
|
||||
let uri = req.uri().clone();
|
||||
let version = req.version();
|
||||
let remote = req
|
||||
.extensions()
|
||||
.get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
|
||||
.map(|ci| ci.0.ip().to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
|
||||
let response = next.run(req).await;
|
||||
|
||||
let status = response.status().as_u16();
|
||||
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
|
||||
let bytes_out = response
|
||||
.headers()
|
||||
.get(axum::http::header::CONTENT_LENGTH)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|v| v.parse::<u64>().ok());
|
||||
|
||||
tracing::info!(
|
||||
target: "myfsio::access",
|
||||
remote = %remote,
|
||||
method = %method,
|
||||
uri = %uri,
|
||||
version = ?version,
|
||||
status,
|
||||
bytes_out = bytes_out.unwrap_or(0),
|
||||
elapsed_ms = format!("{:.3}", elapsed_ms),
|
||||
"request"
|
||||
);
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
pub async fn ui_metrics_layer(State(state): State<AppState>, req: Request, next: Next) -> Response {
|
||||
let metrics = match state.metrics.clone() {
|
||||
Some(m) => m,
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use axum::extract::{ConnectInfo, Request, State};
|
||||
use axum::http::{header, StatusCode};
|
||||
use axum::http::{header, Method, StatusCode};
|
||||
use axum::middleware::Next;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use parking_lot::Mutex;
|
||||
@@ -13,17 +13,77 @@ use crate::config::RateLimitSetting;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RateLimitLayerState {
|
||||
limiter: Arc<FixedWindowLimiter>,
|
||||
default_limiter: Arc<FixedWindowLimiter>,
|
||||
list_buckets_limiter: Option<Arc<FixedWindowLimiter>>,
|
||||
bucket_ops_limiter: Option<Arc<FixedWindowLimiter>>,
|
||||
object_ops_limiter: Option<Arc<FixedWindowLimiter>>,
|
||||
head_ops_limiter: Option<Arc<FixedWindowLimiter>>,
|
||||
num_trusted_proxies: usize,
|
||||
}
|
||||
|
||||
impl RateLimitLayerState {
|
||||
pub fn new(setting: RateLimitSetting, num_trusted_proxies: usize) -> Self {
|
||||
Self {
|
||||
limiter: Arc::new(FixedWindowLimiter::new(setting)),
|
||||
default_limiter: Arc::new(FixedWindowLimiter::new(setting)),
|
||||
list_buckets_limiter: None,
|
||||
bucket_ops_limiter: None,
|
||||
object_ops_limiter: None,
|
||||
head_ops_limiter: None,
|
||||
num_trusted_proxies,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_per_op(
|
||||
default: RateLimitSetting,
|
||||
list_buckets: RateLimitSetting,
|
||||
bucket_ops: RateLimitSetting,
|
||||
object_ops: RateLimitSetting,
|
||||
head_ops: RateLimitSetting,
|
||||
num_trusted_proxies: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
default_limiter: Arc::new(FixedWindowLimiter::new(default)),
|
||||
list_buckets_limiter: (list_buckets != default)
|
||||
.then(|| Arc::new(FixedWindowLimiter::new(list_buckets))),
|
||||
bucket_ops_limiter: (bucket_ops != default)
|
||||
.then(|| Arc::new(FixedWindowLimiter::new(bucket_ops))),
|
||||
object_ops_limiter: (object_ops != default)
|
||||
.then(|| Arc::new(FixedWindowLimiter::new(object_ops))),
|
||||
head_ops_limiter: (head_ops != default)
|
||||
.then(|| Arc::new(FixedWindowLimiter::new(head_ops))),
|
||||
num_trusted_proxies,
|
||||
}
|
||||
}
|
||||
|
||||
fn select_limiter(&self, req: &Request) -> &Arc<FixedWindowLimiter> {
|
||||
let path = req.uri().path();
|
||||
let method = req.method();
|
||||
if path == "/" && *method == Method::GET {
|
||||
if let Some(ref limiter) = self.list_buckets_limiter {
|
||||
return limiter;
|
||||
}
|
||||
}
|
||||
let segments: Vec<&str> = path
|
||||
.trim_start_matches('/')
|
||||
.split('/')
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
if *method == Method::HEAD {
|
||||
if let Some(ref limiter) = self.head_ops_limiter {
|
||||
return limiter;
|
||||
}
|
||||
}
|
||||
if segments.len() == 1 {
|
||||
if let Some(ref limiter) = self.bucket_ops_limiter {
|
||||
return limiter;
|
||||
}
|
||||
} else if segments.len() >= 2 {
|
||||
if let Some(ref limiter) = self.object_ops_limiter {
|
||||
return limiter;
|
||||
}
|
||||
}
|
||||
&self.default_limiter
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -99,22 +159,34 @@ pub async fn rate_limit_layer(
|
||||
next: Next,
|
||||
) -> Response {
|
||||
let key = rate_limit_key(&req, state.num_trusted_proxies);
|
||||
match state.limiter.check(&key) {
|
||||
let limiter = state.select_limiter(&req);
|
||||
match limiter.check(&key) {
|
||||
Ok(()) => next.run(req).await,
|
||||
Err(retry_after) => too_many_requests(retry_after),
|
||||
Err(retry_after) => {
|
||||
let resource = req.uri().path().to_string();
|
||||
too_many_requests(retry_after, &resource)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn too_many_requests(retry_after: u64) -> Response {
|
||||
(
|
||||
StatusCode::TOO_MANY_REQUESTS,
|
||||
fn too_many_requests(retry_after: u64, resource: &str) -> Response {
|
||||
let request_id = uuid::Uuid::new_v4().simple().to_string();
|
||||
let body = myfsio_xml::response::rate_limit_exceeded_xml(resource, &request_id);
|
||||
let mut response = (
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
[
|
||||
(header::CONTENT_TYPE, "application/xml".to_string()),
|
||||
(header::RETRY_AFTER, retry_after.to_string()),
|
||||
],
|
||||
myfsio_xml::response::rate_limit_exceeded_xml(),
|
||||
body,
|
||||
)
|
||||
.into_response()
|
||||
.into_response();
|
||||
if let Ok(value) = request_id.parse() {
|
||||
response
|
||||
.headers_mut()
|
||||
.insert("x-amz-request-id", value);
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
fn rate_limit_key(req: &Request, num_trusted_proxies: usize) -> String {
|
||||
|
||||
@@ -90,7 +90,11 @@ pub async fn session_layer(
|
||||
resp
|
||||
}
|
||||
|
||||
pub async fn csrf_layer(req: Request, next: Next) -> Response {
|
||||
pub async fn csrf_layer(
|
||||
State(state): State<crate::state::AppState>,
|
||||
req: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
const CSRF_HEADER_ALIAS: &str = "x-csrftoken";
|
||||
|
||||
let method = req.method().clone();
|
||||
@@ -169,7 +173,32 @@ pub async fn csrf_layer(req: Request, next: Next) -> Response {
|
||||
header_present = header_token.is_some(),
|
||||
"CSRF token mismatch"
|
||||
);
|
||||
(StatusCode::FORBIDDEN, "Invalid CSRF token").into_response()
|
||||
|
||||
let accept = parts
|
||||
.headers
|
||||
.get(header::ACCEPT)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let is_form_submit = content_type.starts_with("application/x-www-form-urlencoded")
|
||||
|| content_type.starts_with("multipart/form-data");
|
||||
let wants_json =
|
||||
accept.contains("application/json") || content_type.starts_with("application/json");
|
||||
|
||||
if is_form_submit && !wants_json {
|
||||
let ctx = crate::handlers::ui::base_context(&handle, None);
|
||||
let mut resp = crate::handlers::ui::render(&state, "csrf_error.html", &ctx);
|
||||
*resp.status_mut() = StatusCode::FORBIDDEN;
|
||||
return resp;
|
||||
}
|
||||
|
||||
let mut resp = (
|
||||
StatusCode::FORBIDDEN,
|
||||
[(header::CONTENT_TYPE, "application/json")],
|
||||
r#"{"error":"Invalid CSRF token"}"#,
|
||||
)
|
||||
.into_response();
|
||||
*resp.status_mut() = StatusCode::FORBIDDEN;
|
||||
resp
|
||||
}
|
||||
|
||||
fn extract_multipart_token(content_type: &str, body: &[u8]) -> Option<String> {
|
||||
|
||||
107
crates/myfsio-server/src/middleware/sha_body.rs
Normal file
107
crates/myfsio-server/src/middleware/sha_body.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
use axum::body::Body;
|
||||
use bytes::Bytes;
|
||||
use http_body::{Body as HttpBody, Frame};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Sha256MismatchError {
|
||||
expected: String,
|
||||
computed: String,
|
||||
}
|
||||
|
||||
impl Sha256MismatchError {
|
||||
fn message(&self) -> String {
|
||||
format!(
|
||||
"The x-amz-content-sha256 you specified did not match what we received (expected {}, computed {})",
|
||||
self.expected, self.computed
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Sha256MismatchError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"XAmzContentSHA256Mismatch: expected {}, computed {}",
|
||||
self.expected, self.computed
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for Sha256MismatchError {}
|
||||
|
||||
pub struct Sha256VerifyBody {
|
||||
inner: Body,
|
||||
expected: String,
|
||||
hasher: Option<Sha256>,
|
||||
}
|
||||
|
||||
impl Sha256VerifyBody {
|
||||
pub fn new(inner: Body, expected_hex: String) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
expected: expected_hex.to_ascii_lowercase(),
|
||||
hasher: Some(Sha256::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpBody for Sha256VerifyBody {
|
||||
type Data = Bytes;
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||
let this = self.as_mut().get_mut();
|
||||
match Pin::new(&mut this.inner).poll_frame(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
|
||||
Poll::Ready(Some(Ok(frame))) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
if let Some(h) = this.hasher.as_mut() {
|
||||
h.update(data);
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(frame)))
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
if let Some(hasher) = this.hasher.take() {
|
||||
let computed = hex::encode(hasher.finalize());
|
||||
if computed != this.expected {
|
||||
return Poll::Ready(Some(Err(Box::new(Sha256MismatchError {
|
||||
expected: this.expected.clone(),
|
||||
computed,
|
||||
}))));
|
||||
}
|
||||
}
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_end_stream(&self) -> bool {
|
||||
self.inner.is_end_stream()
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> http_body::SizeHint {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_hex_sha256(s: &str) -> bool {
|
||||
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
|
||||
}
|
||||
|
||||
pub fn sha256_mismatch_message(err: &(dyn Error + 'static)) -> Option<String> {
|
||||
if let Some(mismatch) = err.downcast_ref::<Sha256MismatchError>() {
|
||||
return Some(mismatch.message());
|
||||
}
|
||||
|
||||
err.source().and_then(sha256_mismatch_message)
|
||||
}
|
||||
@@ -50,6 +50,7 @@ impl AppState {
|
||||
bucket_config_cache_ttl: Duration::from_secs_f64(
|
||||
config.bucket_config_cache_ttl_seconds,
|
||||
),
|
||||
stream_chunk_size: config.stream_chunk_size,
|
||||
},
|
||||
));
|
||||
let iam = Arc::new(IamService::new_with_secret(
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -9,6 +9,7 @@ myfsio-crypto = { path = "../myfsio-crypto" }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
@@ -17,10 +17,18 @@ pub enum StorageError {
|
||||
key: String,
|
||||
version_id: String,
|
||||
},
|
||||
#[error("Object is a delete marker: {bucket}/{key}")]
|
||||
DeleteMarker {
|
||||
bucket: String,
|
||||
key: String,
|
||||
version_id: String,
|
||||
},
|
||||
#[error("Invalid bucket name: {0}")]
|
||||
InvalidBucketName(String),
|
||||
#[error("Invalid object key: {0}")]
|
||||
InvalidObjectKey(String),
|
||||
#[error("Method not allowed: {0}")]
|
||||
MethodNotAllowed(String),
|
||||
#[error("Upload not found: {0}")]
|
||||
UploadNotFound(String),
|
||||
#[error("Quota exceeded: {0}")]
|
||||
@@ -42,7 +50,7 @@ impl From<StorageError> for S3Error {
|
||||
S3Error::from_code(S3ErrorCode::NoSuchBucket).with_resource(format!("/{}", name))
|
||||
}
|
||||
StorageError::BucketAlreadyExists(name) => {
|
||||
S3Error::from_code(S3ErrorCode::BucketAlreadyExists)
|
||||
S3Error::from_code(S3ErrorCode::BucketAlreadyOwnedByYou)
|
||||
.with_resource(format!("/{}", name))
|
||||
}
|
||||
StorageError::BucketNotEmpty(name) => {
|
||||
@@ -58,10 +66,17 @@ impl From<StorageError> for S3Error {
|
||||
version_id,
|
||||
} => S3Error::from_code(S3ErrorCode::NoSuchVersion)
|
||||
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
|
||||
StorageError::DeleteMarker {
|
||||
bucket,
|
||||
key,
|
||||
version_id,
|
||||
} => S3Error::from_code(S3ErrorCode::MethodNotAllowed)
|
||||
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
|
||||
StorageError::InvalidBucketName(msg) => {
|
||||
S3Error::new(S3ErrorCode::InvalidBucketName, msg)
|
||||
}
|
||||
StorageError::InvalidObjectKey(msg) => S3Error::new(S3ErrorCode::InvalidKey, msg),
|
||||
StorageError::MethodNotAllowed(msg) => S3Error::new(S3ErrorCode::MethodNotAllowed, msg),
|
||||
StorageError::UploadNotFound(id) => S3Error::new(
|
||||
S3ErrorCode::NoSuchUpload,
|
||||
format!("Upload {} not found", id),
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -30,8 +30,44 @@ pub trait StorageEngine: Send + Sync {
|
||||
key: &str,
|
||||
) -> StorageResult<(ObjectMeta, AsyncReadStream)>;
|
||||
|
||||
async fn get_object_range(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
start: u64,
|
||||
len: Option<u64>,
|
||||
) -> StorageResult<(ObjectMeta, AsyncReadStream)>;
|
||||
|
||||
async fn get_object_snapshot(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> StorageResult<(ObjectMeta, tokio::fs::File)>;
|
||||
|
||||
async fn get_object_version_snapshot(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
version_id: &str,
|
||||
) -> StorageResult<(ObjectMeta, tokio::fs::File)>;
|
||||
|
||||
async fn get_object_path(&self, bucket: &str, key: &str) -> StorageResult<PathBuf>;
|
||||
|
||||
async fn snapshot_object_to_link(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
link_path: &std::path::Path,
|
||||
) -> StorageResult<ObjectMeta>;
|
||||
|
||||
async fn snapshot_object_version_to_link(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
version_id: &str,
|
||||
link_path: &std::path::Path,
|
||||
) -> StorageResult<ObjectMeta>;
|
||||
|
||||
async fn head_object(&self, bucket: &str, key: &str) -> StorageResult<ObjectMeta>;
|
||||
|
||||
async fn get_object_version(
|
||||
@@ -41,6 +77,15 @@ pub trait StorageEngine: Send + Sync {
|
||||
version_id: &str,
|
||||
) -> StorageResult<(ObjectMeta, AsyncReadStream)>;
|
||||
|
||||
async fn get_object_version_range(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
version_id: &str,
|
||||
start: u64,
|
||||
len: Option<u64>,
|
||||
) -> StorageResult<(ObjectMeta, AsyncReadStream)>;
|
||||
|
||||
async fn get_object_version_path(
|
||||
&self,
|
||||
bucket: &str,
|
||||
@@ -62,14 +107,14 @@ pub trait StorageEngine: Send + Sync {
|
||||
version_id: &str,
|
||||
) -> StorageResult<HashMap<String, String>>;
|
||||
|
||||
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>;
|
||||
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome>;
|
||||
|
||||
async fn delete_object_version(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
version_id: &str,
|
||||
) -> StorageResult<()>;
|
||||
) -> StorageResult<DeleteOutcome>;
|
||||
|
||||
async fn copy_object(
|
||||
&self,
|
||||
@@ -148,6 +193,12 @@ pub trait StorageEngine: Send + Sync {
|
||||
|
||||
async fn is_versioning_enabled(&self, bucket: &str) -> StorageResult<bool>;
|
||||
async fn set_versioning(&self, bucket: &str, enabled: bool) -> StorageResult<()>;
|
||||
async fn get_versioning_status(&self, bucket: &str) -> StorageResult<VersioningStatus>;
|
||||
async fn set_versioning_status(
|
||||
&self,
|
||||
bucket: &str,
|
||||
status: VersioningStatus,
|
||||
) -> StorageResult<()>;
|
||||
|
||||
async fn list_object_versions(
|
||||
&self,
|
||||
|
||||
@@ -47,6 +47,7 @@ pub fn validate_object_key(
|
||||
normalized.split('/').collect()
|
||||
};
|
||||
|
||||
|
||||
for part in &parts {
|
||||
if part.is_empty() {
|
||||
continue;
|
||||
@@ -60,6 +61,13 @@ pub fn validate_object_key(
|
||||
return Some("Object key contains invalid segments".to_string());
|
||||
}
|
||||
|
||||
if part.len() > 255 {
|
||||
return Some(
|
||||
"Object key contains a path segment longer than 255 bytes (filesystem backend limit)"
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
if part.chars().any(|c| (c as u32) < 32) {
|
||||
return Some("Object key contains control characters".to_string());
|
||||
}
|
||||
@@ -98,6 +106,15 @@ pub fn validate_object_key(
|
||||
}
|
||||
}
|
||||
|
||||
for part in &non_empty_parts {
|
||||
if *part == ".__myfsio_dirobj__"
|
||||
|| *part == ".__myfsio_empty__"
|
||||
|| part.starts_with("_index.json")
|
||||
{
|
||||
return Some("Object key segment uses a reserved internal name".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -132,6 +149,13 @@ pub fn validate_bucket_name(bucket_name: &str) -> Option<String> {
|
||||
return Some("Bucket name must not be formatted as an IP address".to_string());
|
||||
}
|
||||
|
||||
if bucket_name.starts_with("xn--") {
|
||||
return Some("Bucket name must not start with the reserved prefix 'xn--'".to_string());
|
||||
}
|
||||
if bucket_name.ends_with("-s3alias") || bucket_name.ends_with("--ol-s3") {
|
||||
return Some("Bucket name must not end with a reserved suffix".to_string());
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
@@ -174,10 +198,18 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_object_key_max_length() {
|
||||
let long_key = "a".repeat(1025);
|
||||
assert!(validate_object_key(&long_key, 1024, false, None).is_some());
|
||||
let ok_key = "a".repeat(1024);
|
||||
let too_long_total = "a/".repeat(513) + "a";
|
||||
assert!(validate_object_key(&too_long_total, 1024, false, None).is_some());
|
||||
|
||||
let too_long_segment = "a".repeat(256);
|
||||
assert!(validate_object_key(&too_long_segment, 1024, false, None).is_some());
|
||||
|
||||
let ok_key = vec!["a".repeat(255); 4].join("/");
|
||||
assert_eq!(ok_key.len(), 255 * 4 + 3);
|
||||
assert!(validate_object_key(&ok_key, 1024, false, None).is_none());
|
||||
|
||||
let ok_max_segment = "a".repeat(255);
|
||||
assert!(validate_object_key(&ok_max_segment, 1024, false, None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -8,3 +8,4 @@ myfsio-common = { path = "../myfsio-common" }
|
||||
quick-xml = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
percent-encoding = { workspace = true }
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use quick_xml::events::Event;
|
||||
use quick_xml::Reader;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct DeleteObjectsRequest {
|
||||
pub objects: Vec<ObjectIdentifier>,
|
||||
pub quiet: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ObjectIdentifier {
|
||||
pub key: String,
|
||||
pub version_id: Option<String>,
|
||||
@@ -86,6 +86,11 @@ pub fn parse_complete_multipart_upload(xml: &str) -> Result<CompleteMultipartUpl
|
||||
}
|
||||
|
||||
pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
|
||||
let trimmed = xml.trim();
|
||||
if trimmed.is_empty() {
|
||||
return Err("Request body is empty".to_string());
|
||||
}
|
||||
|
||||
let mut reader = Reader::from_str(xml);
|
||||
let mut result = DeleteObjectsRequest::default();
|
||||
let mut buf = Vec::new();
|
||||
@@ -93,18 +98,43 @@ pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
|
||||
let mut current_key: Option<String> = None;
|
||||
let mut current_version_id: Option<String> = None;
|
||||
let mut in_object = false;
|
||||
let mut saw_delete_root = false;
|
||||
let mut first_element_seen = false;
|
||||
|
||||
loop {
|
||||
match reader.read_event_into(&mut buf) {
|
||||
let event = reader.read_event_into(&mut buf);
|
||||
match event {
|
||||
Ok(Event::Start(ref e)) => {
|
||||
let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
|
||||
current_tag = name.clone();
|
||||
if name == "Object" {
|
||||
if !first_element_seen {
|
||||
first_element_seen = true;
|
||||
if name != "Delete" {
|
||||
return Err(format!(
|
||||
"Expected <Delete> root element, found <{}>",
|
||||
name
|
||||
));
|
||||
}
|
||||
saw_delete_root = true;
|
||||
} else if name == "Object" {
|
||||
in_object = true;
|
||||
current_key = None;
|
||||
current_version_id = None;
|
||||
}
|
||||
}
|
||||
Ok(Event::Empty(ref e)) => {
|
||||
let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
|
||||
if !first_element_seen {
|
||||
first_element_seen = true;
|
||||
if name != "Delete" {
|
||||
return Err(format!(
|
||||
"Expected <Delete> root element, found <{}>",
|
||||
name
|
||||
));
|
||||
}
|
||||
saw_delete_root = true;
|
||||
}
|
||||
}
|
||||
Ok(Event::Text(ref e)) => {
|
||||
let text = e.unescape().map_err(|e| e.to_string())?.to_string();
|
||||
match current_tag.as_str() {
|
||||
@@ -139,6 +169,13 @@ pub fn parse_delete_objects(xml: &str) -> Result<DeleteObjectsRequest, String> {
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
if !saw_delete_root {
|
||||
return Err("Expected <Delete> root element".to_string());
|
||||
}
|
||||
if result.objects.is_empty() {
|
||||
return Err("Delete request must contain at least one <Object>".to_string());
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,10 +8,21 @@ pub fn format_s3_datetime(dt: &DateTime<Utc>) -> String {
|
||||
dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
|
||||
}
|
||||
|
||||
pub fn rate_limit_exceeded_xml() -> String {
|
||||
pub fn rate_limit_exceeded_xml(resource: &str, request_id: &str) -> String {
|
||||
format!(
|
||||
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
|
||||
<Error><Code>SlowDown</Code><Message>Rate limit exceeded</Message></Error>"
|
||||
.to_string()
|
||||
<Error><Code>SlowDown</Code><Message>Please reduce your request rate</Message><Resource>{}</Resource><RequestId>{}</RequestId></Error>",
|
||||
xml_escape(resource),
|
||||
xml_escape(request_id),
|
||||
)
|
||||
}
|
||||
|
||||
fn xml_escape(s: &str) -> String {
|
||||
s.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">")
|
||||
.replace('"', """)
|
||||
.replace('\'', "'")
|
||||
}
|
||||
|
||||
pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String {
|
||||
@@ -62,6 +73,21 @@ pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]
|
||||
String::from_utf8(writer.into_inner().into_inner()).unwrap()
|
||||
}
|
||||
|
||||
fn maybe_url_encode(value: &str, encoding_type: Option<&str>) -> String {
|
||||
if matches!(encoding_type, Some(v) if v.eq_ignore_ascii_case("url")) {
|
||||
percent_encoding::utf8_percent_encode(value, KEY_ENCODE_SET).to_string()
|
||||
} else {
|
||||
value.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
const KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::NON_ALPHANUMERIC
|
||||
.remove(b'-')
|
||||
.remove(b'_')
|
||||
.remove(b'.')
|
||||
.remove(b'~')
|
||||
.remove(b'/');
|
||||
|
||||
pub fn list_objects_v2_xml(
|
||||
bucket_name: &str,
|
||||
prefix: &str,
|
||||
@@ -73,6 +99,34 @@ pub fn list_objects_v2_xml(
|
||||
continuation_token: Option<&str>,
|
||||
next_continuation_token: Option<&str>,
|
||||
key_count: usize,
|
||||
) -> String {
|
||||
list_objects_v2_xml_with_encoding(
|
||||
bucket_name,
|
||||
prefix,
|
||||
delimiter,
|
||||
max_keys,
|
||||
objects,
|
||||
common_prefixes,
|
||||
is_truncated,
|
||||
continuation_token,
|
||||
next_continuation_token,
|
||||
key_count,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn list_objects_v2_xml_with_encoding(
|
||||
bucket_name: &str,
|
||||
prefix: &str,
|
||||
delimiter: &str,
|
||||
max_keys: usize,
|
||||
objects: &[ObjectMeta],
|
||||
common_prefixes: &[String],
|
||||
is_truncated: bool,
|
||||
continuation_token: Option<&str>,
|
||||
next_continuation_token: Option<&str>,
|
||||
key_count: usize,
|
||||
encoding_type: Option<&str>,
|
||||
) -> String {
|
||||
let mut writer = Writer::new(Cursor::new(Vec::new()));
|
||||
|
||||
@@ -85,13 +139,22 @@ pub fn list_objects_v2_xml(
|
||||
writer.write_event(Event::Start(start)).unwrap();
|
||||
|
||||
write_text_element(&mut writer, "Name", bucket_name);
|
||||
write_text_element(&mut writer, "Prefix", prefix);
|
||||
write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
|
||||
if !delimiter.is_empty() {
|
||||
write_text_element(&mut writer, "Delimiter", delimiter);
|
||||
write_text_element(
|
||||
&mut writer,
|
||||
"Delimiter",
|
||||
&maybe_url_encode(delimiter, encoding_type),
|
||||
);
|
||||
}
|
||||
write_text_element(&mut writer, "MaxKeys", &max_keys.to_string());
|
||||
write_text_element(&mut writer, "KeyCount", &key_count.to_string());
|
||||
write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string());
|
||||
if let Some(encoding) = encoding_type {
|
||||
if !encoding.is_empty() {
|
||||
write_text_element(&mut writer, "EncodingType", encoding);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(token) = continuation_token {
|
||||
write_text_element(&mut writer, "ContinuationToken", token);
|
||||
@@ -104,7 +167,7 @@ pub fn list_objects_v2_xml(
|
||||
writer
|
||||
.write_event(Event::Start(BytesStart::new("Contents")))
|
||||
.unwrap();
|
||||
write_text_element(&mut writer, "Key", &obj.key);
|
||||
write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type));
|
||||
write_text_element(
|
||||
&mut writer,
|
||||
"LastModified",
|
||||
@@ -128,7 +191,7 @@ pub fn list_objects_v2_xml(
|
||||
writer
|
||||
.write_event(Event::Start(BytesStart::new("CommonPrefixes")))
|
||||
.unwrap();
|
||||
write_text_element(&mut writer, "Prefix", prefix);
|
||||
write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
|
||||
writer
|
||||
.write_event(Event::End(BytesEnd::new("CommonPrefixes")))
|
||||
.unwrap();
|
||||
@@ -151,6 +214,32 @@ pub fn list_objects_v1_xml(
|
||||
common_prefixes: &[String],
|
||||
is_truncated: bool,
|
||||
next_marker: Option<&str>,
|
||||
) -> String {
|
||||
list_objects_v1_xml_with_encoding(
|
||||
bucket_name,
|
||||
prefix,
|
||||
marker,
|
||||
delimiter,
|
||||
max_keys,
|
||||
objects,
|
||||
common_prefixes,
|
||||
is_truncated,
|
||||
next_marker,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn list_objects_v1_xml_with_encoding(
|
||||
bucket_name: &str,
|
||||
prefix: &str,
|
||||
marker: &str,
|
||||
delimiter: &str,
|
||||
max_keys: usize,
|
||||
objects: &[ObjectMeta],
|
||||
common_prefixes: &[String],
|
||||
is_truncated: bool,
|
||||
next_marker: Option<&str>,
|
||||
encoding_type: Option<&str>,
|
||||
) -> String {
|
||||
let mut writer = Writer::new(Cursor::new(Vec::new()));
|
||||
|
||||
@@ -163,27 +252,36 @@ pub fn list_objects_v1_xml(
|
||||
writer.write_event(Event::Start(start)).unwrap();
|
||||
|
||||
write_text_element(&mut writer, "Name", bucket_name);
|
||||
write_text_element(&mut writer, "Prefix", prefix);
|
||||
write_text_element(&mut writer, "Marker", marker);
|
||||
write_text_element(&mut writer, "Prefix", &maybe_url_encode(prefix, encoding_type));
|
||||
write_text_element(&mut writer, "Marker", &maybe_url_encode(marker, encoding_type));
|
||||
write_text_element(&mut writer, "MaxKeys", &max_keys.to_string());
|
||||
write_text_element(&mut writer, "IsTruncated", &is_truncated.to_string());
|
||||
|
||||
if !delimiter.is_empty() {
|
||||
write_text_element(&mut writer, "Delimiter", delimiter);
|
||||
write_text_element(
|
||||
&mut writer,
|
||||
"Delimiter",
|
||||
&maybe_url_encode(delimiter, encoding_type),
|
||||
);
|
||||
}
|
||||
if !delimiter.is_empty() && is_truncated {
|
||||
if let Some(nm) = next_marker {
|
||||
if !nm.is_empty() {
|
||||
write_text_element(&mut writer, "NextMarker", nm);
|
||||
write_text_element(&mut writer, "NextMarker", &maybe_url_encode(nm, encoding_type));
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(encoding) = encoding_type {
|
||||
if !encoding.is_empty() {
|
||||
write_text_element(&mut writer, "EncodingType", encoding);
|
||||
}
|
||||
}
|
||||
|
||||
for obj in objects {
|
||||
writer
|
||||
.write_event(Event::Start(BytesStart::new("Contents")))
|
||||
.unwrap();
|
||||
write_text_element(&mut writer, "Key", &obj.key);
|
||||
write_text_element(&mut writer, "Key", &maybe_url_encode(&obj.key, encoding_type));
|
||||
write_text_element(
|
||||
&mut writer,
|
||||
"LastModified",
|
||||
@@ -202,7 +300,7 @@ pub fn list_objects_v1_xml(
|
||||
writer
|
||||
.write_event(Event::Start(BytesStart::new("CommonPrefixes")))
|
||||
.unwrap();
|
||||
write_text_element(&mut writer, "Prefix", cp);
|
||||
write_text_element(&mut writer, "Prefix", &maybe_url_encode(cp, encoding_type));
|
||||
writer
|
||||
.write_event(Event::End(BytesEnd::new("CommonPrefixes")))
|
||||
.unwrap();
|
||||
@@ -325,8 +423,15 @@ pub fn copy_object_result_xml(etag: &str, last_modified: &str) -> String {
|
||||
String::from_utf8(writer.into_inner().into_inner()).unwrap()
|
||||
}
|
||||
|
||||
pub struct DeletedEntry {
|
||||
pub key: String,
|
||||
pub version_id: Option<String>,
|
||||
pub delete_marker: bool,
|
||||
pub delete_marker_version_id: Option<String>,
|
||||
}
|
||||
|
||||
pub fn delete_result_xml(
|
||||
deleted: &[(String, Option<String>)],
|
||||
deleted: &[DeletedEntry],
|
||||
errors: &[(String, String, String)],
|
||||
quiet: bool,
|
||||
) -> String {
|
||||
@@ -340,14 +445,20 @@ pub fn delete_result_xml(
|
||||
writer.write_event(Event::Start(start)).unwrap();
|
||||
|
||||
if !quiet {
|
||||
for (key, version_id) in deleted {
|
||||
for entry in deleted {
|
||||
writer
|
||||
.write_event(Event::Start(BytesStart::new("Deleted")))
|
||||
.unwrap();
|
||||
write_text_element(&mut writer, "Key", key);
|
||||
if let Some(vid) = version_id {
|
||||
write_text_element(&mut writer, "Key", &entry.key);
|
||||
if let Some(ref vid) = entry.version_id {
|
||||
write_text_element(&mut writer, "VersionId", vid);
|
||||
}
|
||||
if entry.delete_marker {
|
||||
write_text_element(&mut writer, "DeleteMarker", "true");
|
||||
if let Some(ref dm_vid) = entry.delete_marker_version_id {
|
||||
write_text_element(&mut writer, "DeleteMarkerVersionId", dm_vid);
|
||||
}
|
||||
}
|
||||
writer
|
||||
.write_event(Event::End(BytesEnd::new("Deleted")))
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user