MyFSIO v0.5.1 Release

Reviewed-on: #38
This commit was merged in pull request #38.
This commit is contained in:
2026-04-28 07:28:59 +00:00
34 changed files with 1916 additions and 261 deletions

17
Cargo.lock generated
View File

@@ -2660,7 +2660,7 @@ dependencies = [
[[package]]
name = "myfsio-auth"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes",
"base64",
@@ -2685,18 +2685,20 @@ dependencies = [
[[package]]
name = "myfsio-common"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"base64",
"chrono",
"serde",
"serde_json",
"sha2 0.10.9",
"thiserror",
"uuid",
]
[[package]]
name = "myfsio-crypto"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes-gcm",
"base64",
@@ -2717,7 +2719,7 @@ dependencies = [
[[package]]
name = "myfsio-server"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes-gcm",
"async-trait",
@@ -2729,6 +2731,7 @@ dependencies = [
"base64",
"bytes",
"chrono",
"chrono-tz",
"clap",
"cookie",
"crc32fast",
@@ -2775,7 +2778,7 @@ dependencies = [
[[package]]
name = "myfsio-storage"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"chrono",
"dashmap",
@@ -2799,13 +2802,15 @@ dependencies = [
[[package]]
name = "myfsio-xml"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"base64",
"chrono",
"myfsio-common",
"percent-encoding",
"quick-xml",
"serde",
"sha2 0.10.9",
]
[[package]]

View File

@@ -10,7 +10,7 @@ members = [
]
[workspace.package]
version = "0.5.0"
version = "0.5.1"
edition = "2021"
[workspace.dependencies]
@@ -41,6 +41,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
thiserror = "2"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.9"
base64 = "0.22"
tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"

View File

@@ -1,4 +1,4 @@
mod fernet;
pub mod fernet;
pub mod iam;
pub mod principal;
pub mod sigv4;

View File

@@ -9,3 +9,5 @@ serde = { workspace = true }
serde_json = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
sha2 = { workspace = true }
base64 = { workspace = true }

View File

@@ -206,6 +206,7 @@ impl S3Error {
}
pub fn to_xml(&self) -> String {
let host_id = derive_host_id(&self.request_id);
format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Error>\
@@ -213,15 +214,30 @@ impl S3Error {
<Message>{}</Message>\
<Resource>{}</Resource>\
<RequestId>{}</RequestId>\
<HostId>{}</HostId>\
</Error>",
self.code.as_str(),
xml_escape(&self.message),
xml_escape(&self.resource),
xml_escape(&self.request_id),
xml_escape(&host_id),
)
}
}
fn derive_host_id(request_id: &str) -> String {
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine;
use sha2::{Digest, Sha256};
if request_id.is_empty() {
return String::new();
}
let mut hasher = Sha256::new();
hasher.update(b"myfsio-host-id\0");
hasher.update(request_id.as_bytes());
B64.encode(hasher.finalize())
}
impl fmt::Display for S3Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}: {}", self.code, self.message)

View File

@@ -25,6 +25,7 @@ tracing-subscriber = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
uuid = { workspace = true }
futures = { workspace = true }
http-body = "1"

View File

@@ -84,6 +84,7 @@ pub struct ServerConfig {
pub cors_expose_headers: Vec<String>,
pub session_lifetime_days: u64,
pub log_level: String,
pub display_timezone: String,
pub multipart_min_part_size: u64,
pub bulk_delete_max_keys: usize,
pub stream_chunk_size: usize,
@@ -240,6 +241,19 @@ impl ServerConfig {
let cors_expose_headers = parse_list_env("CORS_EXPOSE_HEADERS", "*");
let session_lifetime_days = parse_u64_env("SESSION_LIFETIME_DAYS", 1);
let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| "INFO".to_string());
let display_timezone = {
let raw = std::env::var("DISPLAY_TIMEZONE").unwrap_or_else(|_| "UTC".to_string());
match raw.parse::<chrono_tz::Tz>() {
Ok(_) => raw,
Err(_) => {
tracing::warn!(
"Invalid DISPLAY_TIMEZONE '{}', falling back to UTC",
raw
);
"UTC".to_string()
}
}
};
let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880);
let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000);
let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576);
@@ -331,6 +345,7 @@ impl ServerConfig {
cors_expose_headers,
session_lifetime_days,
log_level,
display_timezone,
multipart_min_part_size,
bulk_delete_max_keys,
stream_chunk_size,
@@ -425,6 +440,7 @@ impl Default for ServerConfig {
cors_expose_headers: vec!["*".to_string()],
session_lifetime_days: 1,
log_level: "INFO".to_string(),
display_timezone: "UTC".to_string(),
multipart_min_part_size: 5_242_880,
bulk_delete_max_keys: 1000,
stream_chunk_size: 1_048_576,

View File

@@ -1326,8 +1326,17 @@ pub async fn list_object_versions(
xml_response(StatusCode::OK, xml)
}
pub async fn get_object_tagging(state: &AppState, bucket: &str, key: &str) -> Response {
match state.storage.get_object_tags(bucket, key).await {
pub async fn get_object_tagging(
state: &AppState,
bucket: &str,
key: &str,
version_id: Option<&str>,
) -> Response {
let lookup = match version_id {
Some(v) => state.storage.get_object_version_tags(bucket, key, v).await,
None => state.storage.get_object_tags(bucket, key).await,
};
match lookup {
Ok(tags) => {
let mut xml = String::from(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\

View File

@@ -14,17 +14,32 @@ use crate::state::AppState;
fn json_ok(value: Value) -> Response {
(
StatusCode::OK,
[("content-type", "application/json")],
[("content-type", "application/x-amz-json-1.1")],
value.to_string(),
)
.into_response()
}
fn json_err(status: StatusCode, msg: &str) -> Response {
let type_name = match status {
StatusCode::BAD_REQUEST => "ValidationException",
StatusCode::NOT_FOUND => "NotFoundException",
StatusCode::SERVICE_UNAVAILABLE => "KMSInternalException",
StatusCode::FORBIDDEN => "AccessDeniedException",
_ => "KMSInternalException",
};
json_err_typed(status, type_name, msg)
}
fn json_err_typed(status: StatusCode, type_name: &str, msg: &str) -> Response {
(
status,
[("content-type", "application/json")],
json!({"error": msg}).to_string(),
[("content-type", "application/x-amz-json-1.1")],
json!({
"__type": format!("com.amazonaws.kms#{}", type_name),
"message": msg,
})
.to_string(),
)
.into_response()
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,14 +1,18 @@
use axum::{
body::Body,
extract::{Path, State},
http::{header, HeaderValue, StatusCode},
http::{header, HeaderMap, HeaderValue, StatusCode},
response::{IntoResponse, Response},
};
use crate::embedded;
use crate::state::AppState;
pub async fn serve(State(state): State<AppState>, Path(path): Path<String>) -> Response {
pub async fn serve(
State(state): State<AppState>,
Path(path): Path<String>,
headers: HeaderMap,
) -> Response {
let normalized = path.trim_start_matches('/').to_string();
if normalized.is_empty() || normalized.contains("..") {
return StatusCode::NOT_FOUND.into_response();
@@ -27,7 +31,7 @@ pub async fn serve(State(state): State<AppState>, Path(path): Path<String>) -> R
) {
if let Ok(bytes) = tokio::fs::read(&canonical).await {
let mime = mime_guess::from_path(&canonical).first_or_octet_stream();
return build_response(&normalized, bytes, mime.as_ref());
return build_response(&normalized, bytes, mime.as_ref(), &headers);
}
}
}
@@ -37,20 +41,46 @@ pub async fn serve(State(state): State<AppState>, Path(path): Path<String>) -> R
match embedded::static_file(&normalized) {
Some(file) => {
let mime = mime_guess::from_path(&normalized).first_or_octet_stream();
build_response(&normalized, file.data.into_owned(), mime.as_ref())
build_response(&normalized, file.data.into_owned(), mime.as_ref(), &headers)
}
None => StatusCode::NOT_FOUND.into_response(),
}
}
fn build_response(_path: &str, bytes: Vec<u8>, mime: &str) -> Response {
fn build_response(_path: &str, bytes: Vec<u8>, mime: &str, request_headers: &HeaderMap) -> Response {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&bytes);
let etag = format!("\"{:.16x}\"", hasher.finalize());
if let Some(if_none_match) = request_headers
.get(header::IF_NONE_MATCH)
.and_then(|v| v.to_str().ok())
{
if if_none_match.split(',').any(|tag| tag.trim() == etag) {
let mut resp = Response::new(Body::empty());
*resp.status_mut() = StatusCode::NOT_MODIFIED;
if let Ok(v) = HeaderValue::from_str(&etag) {
resp.headers_mut().insert(header::ETAG, v);
}
return resp;
}
}
let len = bytes.len();
let mut response = Response::new(Body::from(bytes));
if let Ok(value) = HeaderValue::from_str(mime) {
response.headers_mut().insert(header::CONTENT_TYPE, value);
}
response
.headers_mut()
.insert(header::CONTENT_LENGTH, HeaderValue::from(len));
if let Ok(v) = HeaderValue::from_str(&etag) {
response.headers_mut().insert(header::ETAG, v);
}
response.headers_mut().insert(
header::CACHE_CONTROL,
HeaderValue::from_static("no-cache"),
HeaderValue::from_static("public, max-age=300, must-revalidate"),
);
response
}

View File

@@ -56,6 +56,7 @@ pub async fn login_submit(
})
.unwrap_or_else(|| access_key.to_string());
session.rotate_id();
session.write(|s| {
s.user_id = Some(access_key.to_string());
s.display_name = Some(display);

View File

@@ -2067,6 +2067,7 @@ pub async fn upload_object(
State(state),
Path((bucket_name.clone(), key.clone())),
Query(ObjectQuery::default()),
None,
upload_headers,
Body::from(bytes),
)

View File

@@ -1951,13 +1951,17 @@ pub async fn metrics_dashboard(
render(&state, "metrics.html", &ctx)
}
fn format_history_timestamp(timestamp: Option<f64>) -> String {
fn format_history_timestamp(timestamp: Option<f64>, tz: chrono_tz::Tz) -> String {
let Some(timestamp) = timestamp else {
return "-".to_string();
};
let millis = (timestamp * 1000.0).round() as i64;
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(millis)
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string())
.map(|dt| {
dt.with_timezone(&tz)
.format("%Y-%m-%d %H:%M:%S %Z")
.to_string()
})
.unwrap_or_else(|| "-".to_string())
}
@@ -1976,7 +1980,7 @@ fn format_byte_count(bytes: u64) -> String {
}
}
fn decorate_gc_history(executions: &[Value]) -> Vec<Value> {
fn decorate_gc_history(executions: &[Value], tz: chrono_tz::Tz) -> Vec<Value> {
executions
.iter()
.cloned()
@@ -1990,7 +1994,7 @@ fn decorate_gc_history(executions: &[Value]) -> Vec<Value> {
if let Some(obj) = execution.as_object_mut() {
obj.insert(
"timestamp_display".to_string(),
Value::String(format_history_timestamp(timestamp)),
Value::String(format_history_timestamp(timestamp, tz)),
);
obj.insert(
"bytes_freed_display".to_string(),
@@ -2002,7 +2006,7 @@ fn decorate_gc_history(executions: &[Value]) -> Vec<Value> {
.collect()
}
fn decorate_integrity_history(executions: &[Value]) -> Vec<Value> {
fn decorate_integrity_history(executions: &[Value], tz: chrono_tz::Tz) -> Vec<Value> {
executions
.iter()
.cloned()
@@ -2011,7 +2015,7 @@ fn decorate_integrity_history(executions: &[Value]) -> Vec<Value> {
if let Some(obj) = execution.as_object_mut() {
obj.insert(
"timestamp_display".to_string(),
Value::String(format_history_timestamp(timestamp)),
Value::String(format_history_timestamp(timestamp, tz)),
);
}
execution
@@ -2024,6 +2028,11 @@ pub async fn system_dashboard(
Extension(session): Extension<SessionHandle>,
) -> Response {
let mut ctx = page_context(&state, &session, "ui.system_dashboard");
let display_tz: chrono_tz::Tz = state
.config
.display_timezone
.parse()
.unwrap_or(chrono_tz::UTC);
let gc_status = match &state.gc {
Some(gc) => gc.status().await,
@@ -2045,7 +2054,7 @@ pub async fn system_dashboard(
.await
.get("executions")
.and_then(|value| value.as_array())
.map(|values| decorate_gc_history(values))
.map(|values| decorate_gc_history(values, display_tz))
.unwrap_or_default(),
None => Vec::new(),
};
@@ -2069,7 +2078,7 @@ pub async fn system_dashboard(
.await
.get("executions")
.and_then(|value| value.as_array())
.map(|values| decorate_integrity_history(values))
.map(|values| decorate_integrity_history(values, display_tz))
.unwrap_or_default(),
None => Vec::new(),
};
@@ -2081,7 +2090,7 @@ pub async fn system_dashboard(
ctx.insert("gc_status", &gc_status);
ctx.insert("integrity_status", &integrity_status);
ctx.insert("app_version", &env!("CARGO_PKG_VERSION"));
ctx.insert("display_timezone", &"UTC");
ctx.insert("display_timezone", &state.config.display_timezone);
ctx.insert("platform", &std::env::consts::OS);
ctx.insert(
"storage_root",

View File

@@ -320,6 +320,9 @@ pub fn create_ui_router(state: state::AppState) -> Router {
let session_state = middleware::SessionLayerState {
store: state.sessions.clone(),
secure: false,
ttl: std::time::Duration::from_secs(
state.config.session_lifetime_days.saturating_mul(86_400),
),
};
let static_router = Router::new()

View File

@@ -475,13 +475,17 @@ fn ensure_iam_bootstrap(config: &ServerConfig) {
return;
}
tracing::info!("============================================================");
tracing::info!("MYFSIO - ADMIN CREDENTIALS INITIALIZED");
tracing::info!("============================================================");
tracing::info!("Access Key: {}", access_key);
tracing::info!("Secret Key: {}", secret_key);
tracing::info!("Saved to: {}", iam_path.display());
tracing::info!("============================================================");
println!("============================================================");
println!("MYFSIO - ADMIN CREDENTIALS INITIALIZED");
println!("============================================================");
println!("Access Key: {}", access_key);
println!("Secret Key: {}", secret_key);
println!("Saved to: {}", iam_path.display());
println!("============================================================");
tracing::info!(
"Admin credentials initialized; access key written to {}",
iam_path.display()
);
}
fn reset_admin_credentials(config: &ServerConfig) {

View File

@@ -16,30 +16,68 @@ use crate::middleware::sha_body::{is_hex_sha256, Sha256VerifyBody};
use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState;
fn wrap_body_for_sha256_verification(req: &mut Request) {
fn wrap_body_for_sha256_verification(req: &mut Request) -> Option<Response> {
let declared = match req
.headers()
.get("x-amz-content-sha256")
.and_then(|v| v.to_str().ok())
{
Some(v) => v.to_string(),
None => return,
None => return None,
};
if !is_hex_sha256(&declared) {
return;
let upper = declared.to_ascii_uppercase();
let is_streaming_signed = upper == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
|| upper == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";
let is_streaming_unsigned = upper == "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
let is_streaming = is_streaming_signed || is_streaming_unsigned;
if is_streaming {
if std::env::var("STRICT_STREAMING_SIGV4")
.ok()
.as_deref()
.map(|v| v.eq_ignore_ascii_case("true") || v == "1")
.unwrap_or(false)
{
tracing::warn!(
payload_type = %upper,
"Rejecting streaming SigV4 request because STRICT_STREAMING_SIGV4 is enabled"
);
let err = S3Error::new(
S3ErrorCode::SignatureDoesNotMatch,
"Streaming SigV4 chunk-signature validation is not yet implemented; \
resend with x-amz-content-sha256: UNSIGNED-PAYLOAD or disable STRICT_STREAMING_SIGV4",
);
let status = StatusCode::from_u16(err.http_status())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let code_str = err.code.as_str();
return Some(
(
status,
[
("content-type", "application/xml"),
("x-amz-error-code", code_str),
],
err.to_xml(),
)
.into_response(),
);
}
tracing::warn!(
payload_type = %upper,
"Accepting streaming SigV4 request without per-chunk signature validation. \
Set STRICT_STREAMING_SIGV4=true to reject these requests until full validation lands."
);
return None;
}
let is_chunked = req
.headers()
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_ascii_lowercase().contains("aws-chunked"))
.unwrap_or(false);
if is_chunked {
return;
if !is_hex_sha256(&declared) {
return None;
}
let body = std::mem::replace(req.body_mut(), axum::body::Body::empty());
let wrapped = Sha256VerifyBody::new(body, declared);
*req.body_mut() = axum::body::Body::new(wrapped);
None
}
#[derive(Clone, Debug)]
@@ -162,6 +200,39 @@ fn parse_website_config(value: &Value) -> Option<(String, Option<String>)> {
}
}
fn apply_website_object_headers(
headers: &mut HeaderMap,
meta: &myfsio_common::types::ObjectMeta,
) {
if let Some(ref etag) = meta.etag {
if let Ok(value) = format!("\"{}\"", etag).parse() {
headers.insert(header::ETAG, value);
}
}
if let Ok(value) = meta
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string()
.parse()
{
headers.insert(header::LAST_MODIFIED, value);
}
if let Some(enc_info) =
myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&meta.internal_metadata)
{
if let Ok(value) = enc_info.algorithm.as_str().parse() {
headers.insert("x-amz-server-side-encryption", value);
}
}
for (k, v) in &meta.metadata {
if let Ok(header_val) = v.parse() {
if let Ok(name) = format!("x-amz-meta-{}", k).parse::<axum::http::HeaderName>() {
headers.insert(name, header_val);
}
}
}
}
async fn serve_website_document(
state: &AppState,
bucket: &str,
@@ -182,6 +253,7 @@ async fn serve_website_document(
meta.size.to_string().parse().unwrap(),
);
headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
apply_website_object_headers(&mut headers, &meta);
return Some((status, headers).into_response());
}
@@ -193,6 +265,7 @@ async fn serve_website_document(
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
apply_website_object_headers(&mut headers, &meta);
if status == StatusCode::OK {
if let Some(range_header) = range_header {
@@ -501,9 +574,18 @@ pub async fn auth_layer(State(state): State<AppState>, mut req: Request, next: N
{
error_response(err, &auth_path)
} else {
if let Some(registry) = state.site_registry.as_ref() {
if registry.is_peer_inbound_access_key(&principal.access_key) {
req.extensions_mut()
.insert(crate::middleware::ReplicationPeerRequest);
}
}
req.extensions_mut().insert(principal);
wrap_body_for_sha256_verification(&mut req);
next.run(req).await
if let Some(rejection) = wrap_body_for_sha256_verification(&mut req) {
rejection
} else {
next.run(req).await
}
}
}
AuthResult::Denied(err) => error_response(err, &auth_path),

View File

@@ -9,6 +9,9 @@ pub use bucket_cors::bucket_cors_layer;
pub use ratelimit::{rate_limit_layer, RateLimitLayerState};
pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState};
#[derive(Clone, Copy, Debug)]
pub struct ReplicationPeerRequest;
use axum::extract::{Request, State};
use axum::middleware::Next;
use axum::response::Response;

View File

@@ -1,10 +1,11 @@
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Request, State};
use axum::http::{header, HeaderValue, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use cookie::{Cookie, SameSite};
use cookie::{time::Duration as CookieDuration, Cookie, SameSite};
use parking_lot::Mutex;
use crate::session::{
@@ -16,6 +17,7 @@ use crate::session::{
pub struct SessionLayerState {
pub store: Arc<SessionStore>,
pub secure: bool,
pub ttl: Duration,
}
#[derive(Clone)]
@@ -23,6 +25,8 @@ pub struct SessionHandle {
pub id: String,
inner: Arc<Mutex<SessionData>>,
dirty: Arc<Mutex<bool>>,
rotated_id: Arc<Mutex<Option<String>>>,
destroy_old: Arc<Mutex<Option<String>>>,
}
impl SessionHandle {
@@ -31,6 +35,8 @@ impl SessionHandle {
id,
inner: Arc::new(Mutex::new(data)),
dirty: Arc::new(Mutex::new(false)),
rotated_id: Arc::new(Mutex::new(None)),
destroy_old: Arc::new(Mutex::new(None)),
}
}
@@ -53,6 +59,21 @@ impl SessionHandle {
pub fn is_dirty(&self) -> bool {
*self.dirty.lock()
}
pub fn rotate_id(&self) {
let new_id = crate::session::generate_token(32);
*self.destroy_old.lock() = Some(self.id.clone());
*self.rotated_id.lock() = Some(new_id);
*self.dirty.lock() = true;
}
pub(crate) fn take_rotated_id(&self) -> Option<String> {
self.rotated_id.lock().take()
}
pub(crate) fn take_destroy_old(&self) -> Option<String> {
self.destroy_old.lock().take()
}
}
pub async fn session_layer(
@@ -62,13 +83,10 @@ pub async fn session_layer(
) -> Response {
let cookie_id = extract_session_cookie(&req);
let (session_id, session_data, is_new) =
let (session_id, session_data) =
match cookie_id.and_then(|id| state.store.get(&id).map(|data| (id.clone(), data))) {
Some((id, data)) => (id, data, false),
None => {
let (id, data) = state.store.create();
(id, data, true)
}
Some((id, data)) => (id, data),
None => state.store.create(),
};
let handle = SessionHandle::new(session_id.clone(), session_data);
@@ -76,15 +94,22 @@ pub async fn session_layer(
let mut resp = next.run(req).await;
let rotated = handle.take_rotated_id();
let destroy_old = handle.take_destroy_old();
let effective_id = rotated.unwrap_or_else(|| handle.id.clone());
if handle.is_dirty() {
state.store.save(&handle.id, handle.snapshot());
state.store.save(&effective_id, handle.snapshot());
}
if is_new {
let cookie = build_session_cookie(&session_id, state.secure);
if let Ok(value) = HeaderValue::from_str(&cookie.to_string()) {
resp.headers_mut().append(header::SET_COOKIE, value);
}
if let Some(old) = destroy_old {
state.store.destroy(&old);
}
let cookie = build_session_cookie(&effective_id, state.secure, state.ttl);
if let Ok(value) = HeaderValue::from_str(&cookie.to_string()) {
resp.headers_mut().append(header::SET_COOKIE, value);
}
resp
@@ -231,12 +256,14 @@ fn extract_session_cookie(req: &Request) -> Option<String> {
None
}
fn build_session_cookie(id: &str, secure: bool) -> Cookie<'static> {
fn build_session_cookie(id: &str, secure: bool, ttl: Duration) -> Cookie<'static> {
let mut cookie = Cookie::new(SESSION_COOKIE_NAME, id.to_string());
cookie.set_http_only(true);
cookie.set_same_site(SameSite::Lax);
cookie.set_same_site(SameSite::Strict);
cookie.set_secure(secure);
cookie.set_path("/");
let secs = i64::try_from(ttl.as_secs()).unwrap_or(i64::MAX);
cookie.set_max_age(CookieDuration::seconds(secs));
cookie
}

View File

@@ -48,6 +48,7 @@ struct BucketLifecycleResult {
struct ParsedLifecycleRule {
status: String,
prefix: String,
tags: Vec<(String, String)>,
expiration_days: Option<u64>,
expiration_date: Option<DateTime<Utc>>,
noncurrent_days: Option<u64>,
@@ -203,7 +204,9 @@ impl LifecycleService {
match self.storage.list_objects(bucket, &params).await {
Ok(objects) => {
for object in &objects.objects {
if object.last_modified < cutoff {
if object.last_modified < cutoff
&& self.object_matches_tag_filter(bucket, &object.key, rule).await
{
if let Err(err) = self.storage.delete_object(bucket, &object.key).await {
result
.errors
@@ -219,6 +222,24 @@ impl LifecycleService {
}
}
async fn object_matches_tag_filter(
&self,
bucket: &str,
key: &str,
rule: &ParsedLifecycleRule,
) -> bool {
if rule.tags.is_empty() {
return true;
}
match self.storage.get_object_tags(bucket, key).await {
Ok(tags) => rule
.tags
.iter()
.all(|(k, v)| tags.iter().any(|t| t.key == *k && t.value == *v)),
Err(_) => false,
}
}
async fn apply_noncurrent_expiration_rule(
&self,
bucket: &str,
@@ -275,6 +296,21 @@ impl LifecycleService {
if archived_at.is_none() || archived_at.unwrap() >= cutoff {
continue;
}
if !rule.tags.is_empty() {
let Some(version_tags_value) = manifest.get("tags") else {
continue;
};
let version_tags: Vec<myfsio_common::types::Tag> =
serde_json::from_value(version_tags_value.clone()).unwrap_or_default();
let matched = rule.tags.iter().all(|(k, v)| {
version_tags
.iter()
.any(|t| t.key == *k && t.value == *v)
});
if !matched {
continue;
}
}
let version_id = manifest
.get("version_id")
.and_then(|value| value.as_str())
@@ -440,17 +476,49 @@ fn parse_lifecycle_rules_from_string(raw: &str) -> Vec<ParsedLifecycleRule> {
status: child_text(&rule, "Status").unwrap_or_else(|| "Enabled".to_string()),
prefix: child_text(&rule, "Prefix")
.or_else(|| {
rule.descendants()
.find(|node| {
node.is_element()
&& node.tag_name().name() == "Filter"
&& node.children().any(|child| {
child.is_element() && child.tag_name().name() == "Prefix"
})
})
.and_then(|filter| child_text(&filter, "Prefix"))
let filter = rule.children().find(|node| {
node.is_element() && node.tag_name().name() == "Filter"
})?;
if let Some(prefix) = child_text(&filter, "Prefix") {
return Some(prefix);
}
let and = filter.children().find(|node| {
node.is_element() && node.tag_name().name() == "And"
})?;
child_text(&and, "Prefix")
})
.unwrap_or_default(),
tags: {
let mut collected: Vec<(String, String)> = Vec::new();
if let Some(filter) = rule
.children()
.find(|node| node.is_element() && node.tag_name().name() == "Filter")
{
let direct_tag = filter
.children()
.find(|node| node.is_element() && node.tag_name().name() == "Tag");
if let Some(tag) = direct_tag {
if let Some(key) = child_text(&tag, "Key") {
collected.push((key, child_text(&tag, "Value").unwrap_or_default()));
}
}
if let Some(and) = filter
.children()
.find(|node| node.is_element() && node.tag_name().name() == "And")
{
for tag in and
.children()
.filter(|node| node.is_element() && node.tag_name().name() == "Tag")
{
if let Some(key) = child_text(&tag, "Key") {
collected
.push((key, child_text(&tag, "Value").unwrap_or_default()));
}
}
}
}
collected
},
expiration_days: rule
.descendants()
.find(|node| node.is_element() && node.tag_name().name() == "Expiration")
@@ -482,6 +550,37 @@ fn parse_lifecycle_rules_from_string(raw: &str) -> Vec<ParsedLifecycleRule> {
fn parse_lifecycle_rule(value: &Value) -> Option<ParsedLifecycleRule> {
let map = value.as_object()?;
let mut tags: Vec<(String, String)> = Vec::new();
if let Some(filter) = map.get("Filter").and_then(|v| v.as_object()) {
if let Some(tag) = filter.get("Tag").and_then(|v| v.as_object()) {
if let (Some(k), Some(v)) = (
tag.get("Key").and_then(|v| v.as_str()),
tag.get("Value").and_then(|v| v.as_str()),
) {
tags.push((k.to_string(), v.to_string()));
}
}
if let Some(and) = filter.get("And").and_then(|v| v.as_object()) {
if let Some(arr) = and.get("Tags").and_then(|v| v.as_array()) {
for entry in arr {
if let (Some(k), Some(v)) = (
entry.get("Key").and_then(|v| v.as_str()),
entry.get("Value").and_then(|v| v.as_str()),
) {
tags.push((k.to_string(), v.to_string()));
}
}
}
if let Some(tag) = and.get("Tag").and_then(|v| v.as_object()) {
if let (Some(k), Some(v)) = (
tag.get("Key").and_then(|v| v.as_str()),
tag.get("Value").and_then(|v| v.as_str()),
) {
tags.push((k.to_string(), v.to_string()));
}
}
}
}
Some(ParsedLifecycleRule {
status: map
.get("Status")
@@ -496,8 +595,15 @@ fn parse_lifecycle_rule(value: &Value) -> Option<ParsedLifecycleRule> {
.and_then(|value| value.get("Prefix"))
.and_then(|value| value.as_str())
})
.or_else(|| {
map.get("Filter")
.and_then(|value| value.get("And"))
.and_then(|value| value.get("Prefix"))
.and_then(|value| value.as_str())
})
.unwrap_or_default()
.to_string(),
tags,
expiration_days: map
.get("Expiration")
.and_then(|value| value.get("Days"))
@@ -568,6 +674,74 @@ mod tests {
assert_eq!(rules[0].abort_incomplete_multipart_days, Some(7));
}
#[test]
fn parses_xml_filter_and_with_prefix_and_tags() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter>
<And>
<Prefix>logs/</Prefix>
<Tag><Key>env</Key><Value>prod</Value></Tag>
<Tag><Key>tier</Key><Value>cold</Value></Tag>
</And>
</Filter>
<Expiration><Days>10</Days></Expiration>
</Rule>
</LifecycleConfiguration>"#;
let rules = parse_lifecycle_rules(&Value::String(xml.to_string()));
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].prefix, "logs/");
assert_eq!(
rules[0].tags,
vec![
("env".to_string(), "prod".to_string()),
("tier".to_string(), "cold".to_string()),
]
);
assert_eq!(rules[0].expiration_days, Some(10));
}
#[test]
fn parses_xml_filter_with_single_tag() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><Tag><Key>env</Key><Value>prod</Value></Tag></Filter>
<Expiration><Days>5</Days></Expiration>
</Rule>
</LifecycleConfiguration>"#;
let rules = parse_lifecycle_rules(&Value::String(xml.to_string()));
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].prefix, "");
assert_eq!(
rules[0].tags,
vec![("env".to_string(), "prod".to_string())]
);
}
#[test]
fn xml_tags_outside_filter_are_ignored() {
// a stray <Tag> nested under an action must not be picked up as a filter tag
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><Prefix>logs/</Prefix></Filter>
<Expiration>
<Days>10</Days>
<Tag><Key>spurious</Key><Value>nope</Value></Tag>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
let rules = parse_lifecycle_rules(&Value::String(xml.to_string()));
assert_eq!(rules.len(), 1);
assert!(rules[0].tags.is_empty());
assert_eq!(rules[0].prefix, "logs/");
}
#[tokio::test]
async fn run_cycle_writes_history_and_deletes_noncurrent_versions() {
let tmp = tempfile::tempdir().unwrap();
@@ -634,4 +808,156 @@ mod tests {
assert_eq!(history["total"], 1);
assert_eq!(history["executions"][0]["versions_deleted"], 1);
}
#[tokio::test]
async fn noncurrent_tag_filter_uses_version_tags_not_current_tags() {
let tmp = tempfile::tempdir().unwrap();
let storage = Arc::new(FsStorageBackend::new(tmp.path().to_path_buf()));
storage.create_bucket("docs").await.unwrap();
storage.set_versioning("docs", true).await.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"v1".to_vec())),
None,
)
.await
.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"v2".to_vec())),
None,
)
.await
.unwrap();
let live_tags = vec![myfsio_common::types::Tag {
key: "env".to_string(),
value: "prod".to_string(),
}];
storage
.set_object_tags("docs", "logs/file.txt", &live_tags)
.await
.unwrap();
let versions_root = version_root_for_bucket(tmp.path(), "docs")
.join("logs")
.join("file.txt");
let manifest = std::fs::read_dir(&versions_root)
.unwrap()
.flatten()
.find(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("json"))
.unwrap()
.path();
let archived_manifest = json!({
"version_id": "ver-untagged",
"key": "logs/file.txt",
"size": 2,
"archived_at": (Utc::now() - Duration::days(45)).to_rfc3339(),
"etag": "etag",
"tags": [],
});
std::fs::write(
&manifest,
serde_json::to_string(&archived_manifest).unwrap(),
)
.unwrap();
std::fs::write(manifest.with_file_name("ver-untagged.bin"), b"v1").unwrap();
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><And><Prefix>logs/</Prefix><Tag><Key>env</Key><Value>prod</Value></Tag></And></Filter>
<NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
</Rule>
</LifecycleConfiguration>"#;
let mut config = storage.get_bucket_config("docs").await.unwrap();
config.lifecycle = Some(Value::String(lifecycle_xml.to_string()));
storage.set_bucket_config("docs", &config).await.unwrap();
let service =
LifecycleService::new(storage.clone(), tmp.path(), LifecycleConfig::default());
let result = service.run_cycle().await.unwrap();
assert_eq!(
result["versions_deleted"], 0,
"noncurrent expiration must consult the version's own tags, not the current key's"
);
assert!(
manifest.exists(),
"the untagged archived version should still be on disk"
);
}
#[tokio::test]
async fn noncurrent_tag_filter_deletes_when_version_tags_match() {
let tmp = tempfile::tempdir().unwrap();
let storage = Arc::new(FsStorageBackend::new(tmp.path().to_path_buf()));
storage.create_bucket("docs").await.unwrap();
storage.set_versioning("docs", true).await.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"v1".to_vec())),
None,
)
.await
.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"v2".to_vec())),
None,
)
.await
.unwrap();
let versions_root = version_root_for_bucket(tmp.path(), "docs")
.join("logs")
.join("file.txt");
let manifest = std::fs::read_dir(&versions_root)
.unwrap()
.flatten()
.find(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("json"))
.unwrap()
.path();
let archived_manifest = json!({
"version_id": "ver-tagged",
"key": "logs/file.txt",
"size": 2,
"archived_at": (Utc::now() - Duration::days(45)).to_rfc3339(),
"etag": "etag",
"tags": [{ "key": "env", "value": "prod" }],
});
std::fs::write(
&manifest,
serde_json::to_string(&archived_manifest).unwrap(),
)
.unwrap();
std::fs::write(manifest.with_file_name("ver-tagged.bin"), b"v1").unwrap();
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><And><Prefix>logs/</Prefix><Tag><Key>env</Key><Value>prod</Value></Tag></And></Filter>
<NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
</Rule>
</LifecycleConfiguration>"#;
let mut config = storage.get_bucket_config("docs").await.unwrap();
config.lifecycle = Some(Value::String(lifecycle_xml.to_string()));
storage.set_bucket_config("docs", &config).await.unwrap();
let service =
LifecycleService::new(storage.clone(), tmp.path(), LifecycleConfig::default());
let result = service.run_cycle().await.unwrap();
assert_eq!(result["versions_deleted"], 1);
}
}

View File

@@ -319,10 +319,11 @@ impl MetricsService {
let _ = std::fs::create_dir_all(parent);
}
let data = json!({ "snapshots": snapshots });
let _ = std::fs::write(
&self.snapshots_path,
serde_json::to_string_pretty(&data).unwrap_or_default(),
);
let serialized = serde_json::to_string_pretty(&data).unwrap_or_default();
let tmp = self.snapshots_path.with_extension("json.tmp");
if std::fs::write(&tmp, serialized).is_ok() {
let _ = std::fs::rename(&tmp, &self.snapshots_path);
}
}
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {

View File

@@ -65,6 +65,24 @@ pub fn parse_notification_configurations(
xml: &str,
) -> Result<Vec<NotificationConfiguration>, String> {
let doc = roxmltree::Document::parse(xml).map_err(|err| err.to_string())?;
for unsupported in [
"TopicConfiguration",
"QueueConfiguration",
"CloudFunctionConfiguration",
"LambdaFunctionConfiguration",
] {
if doc
.descendants()
.any(|node| node.is_element() && node.tag_name().name() == unsupported)
{
return Err(format!(
"{} is not supported on this server; only WebhookConfiguration is accepted",
unsupported
));
}
}
let mut configs = Vec::new();
for webhook in doc

View File

@@ -12,7 +12,10 @@ use myfsio_common::types::ListParams;
use myfsio_storage::fs_backend::{metadata_is_corrupted, FsStorageBackend};
use myfsio_storage::traits::StorageEngine;
use crate::services::s3_client::{build_client, check_endpoint_health, ClientOptions};
use crate::services::s3_client::{
build_client, build_health_client, check_endpoint_health, check_target_bucket_reachable,
ClientOptions,
};
use crate::stores::connections::{ConnectionStore, RemoteConnection};
pub const MODE_NEW_ONLY: &str = "new_only";
@@ -347,7 +350,10 @@ impl ReplicationManager {
return 0;
}
};
if !self.check_endpoint(&connection).await {
if !self
.check_target_bucket(&connection, &rule.target_bucket)
.await
{
tracing::warn!(
"Cannot replicate existing objects for {}: endpoint {} is unreachable",
bucket,
@@ -459,6 +465,7 @@ impl ReplicationManager {
self.failures.remove(bucket, object_key);
}
Err(err) => {
let code = sdk_error_code(&err);
let msg = format!("{:?}", err);
tracing::error!(
"Replication DELETE failed {}/{}: {}",
@@ -475,7 +482,7 @@ impl ReplicationManager {
failure_count: 1,
bucket_name: bucket.to_string(),
action: "delete".to_string(),
last_error_code: None,
last_error_code: code,
},
);
}
@@ -505,9 +512,39 @@ impl ReplicationManager {
Ok(m) => m.len(),
Err(_) => 0,
};
let content_type = mime_guess::from_path(&src_path)
.first_raw()
.map(|s| s.to_string());
let stored_meta = self
.storage
.get_object_metadata(bucket, object_key)
.await
.unwrap_or_default();
let mut obj_meta = ReplicationObjectMeta::from_internal_metadata(&stored_meta);
if obj_meta.content_type.is_none() {
obj_meta.content_type = mime_guess::from_path(&src_path)
.first_raw()
.map(|s| s.to_string());
}
if let Ok(tags) = self.storage.get_object_tags(bucket, object_key).await {
if !tags.is_empty() {
obj_meta.tagging_header = Some(
tags.iter()
.map(|t| {
format!(
"{}={}",
percent_encoding::utf8_percent_encode(
&t.key,
percent_encoding::NON_ALPHANUMERIC,
),
percent_encoding::utf8_percent_encode(
&t.value,
percent_encoding::NON_ALPHANUMERIC,
),
)
})
.collect::<Vec<_>>()
.join("&"),
);
}
}
let upload_result = upload_object(
&client,
@@ -516,7 +553,7 @@ impl ReplicationManager {
&src_path,
file_size,
self.streaming_threshold_bytes,
content_type.as_deref(),
Some(&obj_meta),
)
.await;
@@ -540,7 +577,7 @@ impl ReplicationManager {
&src_path,
file_size,
self.streaming_threshold_bytes,
content_type.as_deref(),
Some(&obj_meta),
)
.await
}
@@ -562,6 +599,7 @@ impl ReplicationManager {
self.failures.remove(bucket, object_key);
}
Err(err) => {
let code = upload_error_code(&err);
let msg = err.to_string();
tracing::error!("Replication failed {}/{}: {}", bucket, object_key, msg);
self.failures.add(
@@ -573,7 +611,7 @@ impl ReplicationManager {
failure_count: 1,
bucket_name: bucket.to_string(),
action: action.to_string(),
last_error_code: None,
last_error_code: code,
},
);
}
@@ -581,10 +619,15 @@ impl ReplicationManager {
}
pub async fn check_endpoint(&self, conn: &RemoteConnection) -> bool {
let client = build_client(conn, &self.client_options);
let client = build_health_client(conn, &self.client_options);
check_endpoint_health(&client).await
}
pub async fn check_target_bucket(&self, conn: &RemoteConnection, target_bucket: &str) -> bool {
let client = build_client(conn, &self.client_options);
check_target_bucket_reachable(&client, target_bucket).await
}
pub async fn retry_failed(&self, bucket: &str, object_key: &str) -> bool {
let failure = match self.failures.get(bucket, object_key) {
Some(f) => f,
@@ -598,6 +641,15 @@ impl ReplicationManager {
Some(c) => c,
None => return false,
};
if !self.check_target_bucket(&conn, &rule.target_bucket).await {
tracing::warn!(
"Cannot retry {}/{}: endpoint {} is not reachable",
bucket,
object_key,
conn.endpoint_url
);
return false;
}
self.replicate_task(bucket, object_key, &rule, &conn, &failure.action)
.await;
true
@@ -616,6 +668,15 @@ impl ReplicationManager {
Some(c) => c,
None => return (0, failures.len()),
};
if !self.check_target_bucket(&conn, &rule.target_bucket).await {
tracing::warn!(
"Cannot retry {} failure(s) in {}: endpoint {} is not reachable",
failures.len(),
bucket,
conn.endpoint_url
);
return (0, failures.len());
}
let mut submitted = 0;
for failure in failures {
self.replicate_task(bucket, &failure.object_key, &rule, &conn, &failure.action)
@@ -675,6 +736,65 @@ fn is_no_such_bucket<E: std::fmt::Debug>(err: &E) -> bool {
text.contains("NoSuchBucket")
}
fn sdk_error_code<E, R>(err: &aws_sdk_s3::error::SdkError<E, R>) -> Option<String>
where
E: aws_sdk_s3::error::ProvideErrorMetadata,
{
if let aws_sdk_s3::error::SdkError::ServiceError(svc) = err {
if let Some(code) = svc.err().code() {
return Some(code.to_string());
}
}
None
}
fn upload_error_code(
err: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::put_object::PutObjectError>,
) -> Option<String> {
sdk_error_code(err)
}
#[derive(Default, Clone)]
pub struct ReplicationObjectMeta {
pub content_type: Option<String>,
pub content_encoding: Option<String>,
pub content_disposition: Option<String>,
pub content_language: Option<String>,
pub cache_control: Option<String>,
pub expires: Option<String>,
pub storage_class: Option<String>,
pub website_redirect_location: Option<String>,
pub user_metadata: HashMap<String, String>,
pub tagging_header: Option<String>,
}
impl ReplicationObjectMeta {
pub fn from_internal_metadata(meta: &HashMap<String, String>) -> Self {
let mut user_metadata = HashMap::new();
for (k, v) in meta {
if k.starts_with("__") {
continue;
}
if k.starts_with("x-amz-") {
continue;
}
user_metadata.insert(k.clone(), v.clone());
}
Self {
content_type: meta.get("__content_type__").cloned(),
content_encoding: meta.get("__content_encoding__").cloned(),
content_disposition: meta.get("__content_disposition__").cloned(),
content_language: meta.get("__content_language__").cloned(),
cache_control: meta.get("__cache_control__").cloned(),
expires: meta.get("__expires__").cloned(),
storage_class: meta.get("__storage_class__").cloned(),
website_redirect_location: meta.get("__website_redirect_location__").cloned(),
user_metadata,
tagging_header: None,
}
}
}
async fn upload_object(
client: &aws_sdk_s3::Client,
bucket: &str,
@@ -682,11 +802,44 @@ async fn upload_object(
path: &Path,
file_size: u64,
streaming_threshold: u64,
content_type: Option<&str>,
obj_meta: Option<&ReplicationObjectMeta>,
) -> Result<(), aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::put_object::PutObjectError>> {
let mut req = client.put_object().bucket(bucket).key(key);
if let Some(ct) = content_type {
req = req.content_type(ct);
if let Some(meta) = obj_meta {
if let Some(ref ct) = meta.content_type {
req = req.content_type(ct);
}
if let Some(ref v) = meta.content_encoding {
req = req.content_encoding(v);
}
if let Some(ref v) = meta.content_disposition {
req = req.content_disposition(v);
}
if let Some(ref v) = meta.content_language {
req = req.content_language(v);
}
if let Some(ref v) = meta.cache_control {
req = req.cache_control(v);
}
if let Some(ref v) = meta.expires {
if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(v) {
req = req.expires(aws_smithy_types::DateTime::from_secs(dt.timestamp()));
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(v) {
req = req.expires(aws_smithy_types::DateTime::from_secs(dt.timestamp()));
}
}
if let Some(ref v) = meta.storage_class {
req = req.storage_class(aws_sdk_s3::types::StorageClass::from(v.as_str()));
}
if let Some(ref v) = meta.website_redirect_location {
req = req.website_redirect_location(v);
}
if let Some(ref v) = meta.tagging_header {
req = req.tagging(v);
}
for (k, v) in &meta.user_metadata {
req = req.metadata(k, v);
}
}
let body = if file_size >= streaming_threshold {

View File

@@ -2,11 +2,13 @@ use std::time::Duration;
use aws_config::BehaviorVersion;
use aws_credential_types::Credentials;
use aws_sdk_s3::config::{Region, SharedCredentialsProvider};
use aws_sdk_s3::config::{AppName, Region, SharedCredentialsProvider};
use aws_sdk_s3::Client;
use crate::stores::connections::RemoteConnection;
pub const REPLICATION_USER_AGENT_TAG: &str = "MyFSIO-Replication";
pub struct ClientOptions {
pub connect_timeout: Duration,
pub read_timeout: Duration,
@@ -40,17 +42,29 @@ pub fn build_client(connection: &RemoteConnection, options: &ClientOptions) -> C
let retry_config =
aws_smithy_types::retry::RetryConfig::standard().with_max_attempts(options.max_attempts);
let config = aws_sdk_s3::config::Builder::new()
let mut builder = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())
.credentials_provider(SharedCredentialsProvider::new(credentials))
.region(Region::new(connection.region.clone()))
.endpoint_url(connection.endpoint_url.clone())
.force_path_style(true)
.timeout_config(timeout_config)
.retry_config(retry_config)
.build();
.retry_config(retry_config);
Client::from_conf(config)
if let Ok(app_name) = AppName::new(REPLICATION_USER_AGENT_TAG) {
builder = builder.app_name(app_name);
}
Client::from_conf(builder.build())
}
pub fn build_health_client(connection: &RemoteConnection, options: &ClientOptions) -> Client {
let fast_fail = ClientOptions {
connect_timeout: options.connect_timeout,
read_timeout: options.read_timeout,
max_attempts: 1,
};
build_client(connection, &fast_fail)
}
pub async fn check_endpoint_health(client: &Client) -> bool {
@@ -62,3 +76,28 @@ pub async fn check_endpoint_health(client: &Client) -> bool {
}
}
}
pub async fn check_target_bucket_reachable(client: &Client, target_bucket: &str) -> bool {
match client.head_bucket().bucket(target_bucket).send().await {
Ok(_) => true,
Err(err) => match &err {
aws_sdk_s3::error::SdkError::ServiceError(_)
| aws_sdk_s3::error::SdkError::ResponseError(_) => {
tracing::debug!(
"Target-bucket reachability probe for {} got a server response; treating as reachable: {:?}",
target_bucket,
err
);
true
}
_ => {
tracing::warn!(
"Target-bucket reachability probe for {} failed at transport layer: {:?}",
target_bucket,
err
);
false
}
},
}
}

View File

@@ -147,4 +147,15 @@ impl SiteRegistry {
drop(data);
self.save();
}
pub fn is_peer_inbound_access_key(&self, access_key: &str) -> bool {
if access_key.is_empty() {
return false;
}
self.data
.read()
.peers
.iter()
.any(|p| p.peer_inbound_access_key.as_deref() == Some(access_key))
}
}

View File

@@ -192,7 +192,7 @@ impl AppState {
None
};
let templates = init_templates(&config.templates_dir);
let templates = init_templates(&config.templates_dir, &config.display_timezone);
let access_logging = Arc::new(AccessLoggingService::new(&config.storage_root));
let session_ttl = Duration::from_secs(config.session_lifetime_days.saturating_mul(86_400));
Self {
@@ -259,15 +259,18 @@ impl AppState {
}
}
fn init_templates(templates_dir: &std::path::Path) -> Option<Arc<TemplateEngine>> {
fn init_templates(
templates_dir: &std::path::Path,
display_timezone: &str,
) -> Option<Arc<TemplateEngine>> {
let use_disk = std::env::var("TEMPLATES_DIR").is_ok() && templates_dir.is_dir();
let result = if use_disk {
let glob = format!("{}/*.html", templates_dir.display()).replace('\\', "/");
tracing::info!("Loading templates from disk: {}", templates_dir.display());
TemplateEngine::new(&glob)
TemplateEngine::new(&glob, display_timezone)
} else {
tracing::info!("Loading templates from embedded assets");
TemplateEngine::from_embedded()
TemplateEngine::from_embedded(display_timezone)
};
match result {
Ok(engine) => {

View File

@@ -1,9 +1,14 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use base64::engine::general_purpose::URL_SAFE;
use base64::Engine;
use parking_lot::RwLock;
use rand::RngCore;
use serde::{Deserialize, Serialize};
const ENCRYPTED_PREFIX: &str = "enc:";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteConnection {
pub id: String,
@@ -21,6 +26,7 @@ fn default_region() -> String {
pub struct ConnectionStore {
path: PathBuf,
encryption_key: String,
inner: Arc<RwLock<Vec<RemoteConnection>>>,
}
@@ -30,12 +36,17 @@ impl ConnectionStore {
.join(".myfsio.sys")
.join("config")
.join("connections.json");
let inner = Arc::new(RwLock::new(load_from_disk(&path)));
Self { path, inner }
let encryption_key = load_or_create_key(storage_root);
let inner = Arc::new(RwLock::new(load_from_disk(&path, &encryption_key)));
Self {
path,
encryption_key,
inner,
}
}
pub fn reload(&self) {
let loaded = load_from_disk(&self.path);
let loaded = load_from_disk(&self.path, &self.encryption_key);
*self.inner.write() = loaded;
}
@@ -76,19 +87,76 @@ impl ConnectionStore {
if let Some(parent) = self.path.parent() {
std::fs::create_dir_all(parent)?;
}
let snapshot = self.inner.read().clone();
let mut snapshot = self.inner.read().clone();
for conn in &mut snapshot {
if !conn.secret_key.starts_with(ENCRYPTED_PREFIX) {
if let Ok(token) =
myfsio_auth::fernet::encrypt(&self.encryption_key, conn.secret_key.as_bytes())
{
conn.secret_key = format!("{}{}", ENCRYPTED_PREFIX, token);
}
}
}
let bytes = serde_json::to_vec_pretty(&snapshot)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
std::fs::write(&self.path, bytes)
let tmp = self.path.with_extension("json.tmp");
std::fs::write(&tmp, bytes)?;
std::fs::rename(&tmp, &self.path)
}
}
fn load_from_disk(path: &Path) -> Vec<RemoteConnection> {
fn load_or_create_key(storage_root: &Path) -> String {
let key_path = storage_root
.join(".myfsio.sys")
.join("config")
.join(".connections_key");
if let Ok(text) = std::fs::read_to_string(&key_path) {
let trimmed = text.trim();
if !trimmed.is_empty() {
if let Ok(decoded) = URL_SAFE.decode(trimmed) {
if decoded.len() == 32 {
return trimmed.to_string();
}
}
}
}
let mut key = [0u8; 32];
rand::thread_rng().fill_bytes(&mut key);
let encoded = URL_SAFE.encode(key);
if let Some(parent) = key_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&key_path, &encoded);
encoded
}
fn load_from_disk(path: &Path, encryption_key: &str) -> Vec<RemoteConnection> {
if !path.exists() {
return Vec::new();
}
match std::fs::read_to_string(path) {
Ok(text) => serde_json::from_str(&text).unwrap_or_default(),
Err(_) => Vec::new(),
let text = match std::fs::read_to_string(path) {
Ok(text) => text,
Err(_) => return Vec::new(),
};
let mut connections: Vec<RemoteConnection> =
serde_json::from_str(&text).unwrap_or_default();
for conn in &mut connections {
if let Some(token) = conn.secret_key.strip_prefix(ENCRYPTED_PREFIX) {
match myfsio_auth::fernet::decrypt(encryption_key, token) {
Ok(plaintext) => {
if let Ok(s) = String::from_utf8(plaintext) {
conn.secret_key = s;
}
}
Err(err) => {
tracing::error!(
"Failed to decrypt peer secret_key for connection {}: {}",
conn.id,
err
);
}
}
}
}
connections
}

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use chrono_tz::Tz;
use parking_lot::RwLock;
use serde_json::Value;
use tera::{Context, Error as TeraError, Tera};
@@ -16,10 +17,10 @@ pub struct TemplateEngine {
}
impl TemplateEngine {
pub fn new(template_glob: &str) -> Result<Self, TeraError> {
pub fn new(template_glob: &str, display_timezone: &str) -> Result<Self, TeraError> {
let mut tera = Tera::new(template_glob)?;
tera.set_escape_fn(html_escape);
register_filters(&mut tera);
register_filters(&mut tera, display_timezone);
let endpoints: Arc<RwLock<HashMap<String, String>>> = Arc::new(RwLock::new(HashMap::new()));
@@ -31,10 +32,10 @@ impl TemplateEngine {
})
}
pub fn from_embedded() -> Result<Self, TeraError> {
pub fn from_embedded(display_timezone: &str) -> Result<Self, TeraError> {
let mut tera = Tera::default();
tera.set_escape_fn(html_escape);
register_filters(&mut tera);
register_filters(&mut tera, display_timezone);
let names = crate::embedded::template_names();
let mut entries: Vec<(String, String)> = Vec::with_capacity(names.len());
@@ -95,8 +96,14 @@ fn html_escape(input: &str) -> String {
out
}
fn register_filters(tera: &mut Tera) {
tera.register_filter("format_datetime", format_datetime_filter);
fn register_filters(tera: &mut Tera, display_timezone: &str) {
let tz: Tz = display_timezone.parse().unwrap_or(chrono_tz::UTC);
tera.register_filter(
"format_datetime",
move |value: &Value, args: &HashMap<String, Value>| -> tera::Result<Value> {
format_datetime_filter(value, args, tz)
},
);
tera.register_filter("filesizeformat", filesizeformat_filter);
tera.register_filter("slice", slice_filter);
}
@@ -186,11 +193,15 @@ fn urlencode_query(s: &str) -> String {
percent_encoding::utf8_percent_encode(s, UNRESERVED).to_string()
}
fn format_datetime_filter(value: &Value, args: &HashMap<String, Value>) -> tera::Result<Value> {
fn format_datetime_filter(
value: &Value,
args: &HashMap<String, Value>,
tz: Tz,
) -> tera::Result<Value> {
let format = args
.get("format")
.and_then(|v| v.as_str())
.unwrap_or("%Y-%m-%d %H:%M:%S UTC");
.unwrap_or("%Y-%m-%d %H:%M:%S %Z");
let dt: Option<DateTime<Utc>> = match value {
Value::String(s) => DateTime::parse_from_rfc3339(s)
@@ -210,7 +221,7 @@ fn format_datetime_filter(value: &Value, args: &HashMap<String, Value>) -> tera:
};
match dt {
Some(d) => Ok(Value::String(d.format(format).to_string())),
Some(d) => Ok(Value::String(d.with_timezone(&tz).format(format).to_string())),
None => Ok(value.clone()),
}
}
@@ -291,7 +302,7 @@ mod tests {
let tpl = tmp.path().join("t.html");
std::fs::write(&tpl, "").unwrap();
let glob = format!("{}/*.html", tmp.path().display());
let engine = TemplateEngine::new(&glob).unwrap();
let engine = TemplateEngine::new(&glob, "UTC").unwrap();
engine.register_endpoints(&[
("ui.buckets_overview", "/ui/buckets"),
("ui.bucket_detail", "/ui/buckets/{bucket_name}"),
@@ -356,7 +367,7 @@ mod tests {
path.push("templates");
path.push("*.html");
let glob = path.to_string_lossy().replace('\\', "/");
let engine = TemplateEngine::new(&glob).expect("Tera parse failed");
let engine = TemplateEngine::new(&glob, "UTC").expect("Tera parse failed");
let names: Vec<String> = engine
.tera
.read()
@@ -372,7 +383,7 @@ mod tests {
#[test]
fn embedded_templates_parse() {
let engine = TemplateEngine::from_embedded().expect("Embedded Tera parse failed");
let engine = TemplateEngine::from_embedded("UTC").expect("Embedded Tera parse failed");
let names: Vec<String> = engine
.tera
.read()
@@ -393,8 +404,21 @@ mod tests {
let v = format_datetime_filter(
&Value::String("2024-06-15T12:34:56Z".into()),
&HashMap::new(),
chrono_tz::UTC,
)
.unwrap();
assert_eq!(v, Value::String("2024-06-15 12:34:56 UTC".into()));
}
#[test]
fn format_datetime_custom_timezone() {
let tz: Tz = "America/New_York".parse().unwrap();
let v = format_datetime_filter(
&Value::String("2024-06-15T12:34:56Z".into()),
&HashMap::new(),
tz,
)
.unwrap();
assert_eq!(v, Value::String("2024-06-15 08:34:56 EDT".into()));
}
}

View File

@@ -10,7 +10,7 @@ fn engine() -> TemplateEngine {
path.push("templates");
path.push("*.html");
let glob = path.to_string_lossy().replace('\\', "/");
let engine = TemplateEngine::new(&glob).expect("template parse");
let engine = TemplateEngine::new(&glob, "UTC").expect("template parse");
myfsio_server::handlers::ui_pages::register_ui_endpoints(&engine);
engine
}

View File

@@ -1053,6 +1053,11 @@ impl FsStorageBackend {
})
.unwrap_or(now);
let live_tags = self
.read_index_entry_sync(bucket_name, key)
.and_then(|entry| entry.get("tags").cloned())
.unwrap_or(Value::Array(Vec::new()));
let record = serde_json::json!({
"version_id": version_id,
"key": key,
@@ -1061,6 +1066,7 @@ impl FsStorageBackend {
"last_modified": live_last_modified.to_rfc3339(),
"etag": etag,
"metadata": metadata,
"tags": live_tags,
"reason": reason,
});
@@ -1321,6 +1327,10 @@ impl FsStorageBackend {
for (k, v) in &metadata {
meta_json.insert(k.clone(), Value::String(v.clone()));
}
let tags = self
.read_index_entry_sync(bucket_name, key)
.and_then(|entry| entry.get("tags").cloned())
.unwrap_or(Value::Null);
let record = serde_json::json!({
"version_id": live_version,
"key": key,
@@ -1328,6 +1338,7 @@ impl FsStorageBackend {
"archived_at": archived_at.to_rfc3339(),
"etag": etag,
"metadata": Value::Object(meta_json),
"tags": tags,
"reason": "current",
"is_delete_marker": false,
});
@@ -3838,6 +3849,32 @@ impl crate::traits::StorageEngine for FsStorageBackend {
async fn delete_object_tags(&self, bucket: &str, key: &str) -> StorageResult<()> {
self.set_object_tags(bucket, key, &[]).await
}
async fn get_object_version_tags(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> StorageResult<Vec<Tag>> {
run_blocking(|| {
let _guard = self.get_object_lock(bucket, key).read();
let (record, _data_path) = self.read_version_record_sync(bucket, key, version_id)?;
if record
.get("is_delete_marker")
.and_then(Value::as_bool)
.unwrap_or(false)
{
return Err(StorageError::MethodNotAllowed(
"The specified method is not allowed against a delete marker".to_string(),
));
}
let tags = record
.get("tags")
.and_then(|v| serde_json::from_value::<Vec<Tag>>(v.clone()).ok())
.unwrap_or_default();
Ok(tags)
})
}
}
#[cfg(test)]

View File

@@ -224,4 +224,11 @@ pub trait StorageEngine: Send + Sync {
async fn set_object_tags(&self, bucket: &str, key: &str, tags: &[Tag]) -> StorageResult<()>;
async fn delete_object_tags(&self, bucket: &str, key: &str) -> StorageResult<()>;
async fn get_object_version_tags(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> StorageResult<Vec<Tag>>;
}

View File

@@ -67,8 +67,8 @@ pub fn validate_object_key(
);
}
if part.chars().any(|c| (c as u32) < 32) {
return Some("Object key contains control characters".to_string());
if part.contains('\0') {
return Some("Object key must not contain NUL bytes".to_string());
}
if is_windows {
@@ -78,6 +78,12 @@ pub fn validate_object_key(
.to_string(),
);
}
if part.chars().any(|c| (c as u32) < 32) {
return Some(
"Object key contains control characters not supported on Windows filesystems"
.to_string(),
);
}
if part.ends_with(' ') || part.ends_with('.') {
return Some(
"Object key segments cannot end with spaces or periods on Windows".to_string(),
@@ -144,6 +150,12 @@ pub fn validate_bucket_name(bucket_name: &str) -> Option<String> {
return Some("Bucket name must not contain consecutive periods".to_string());
}
if bucket_name.contains(".-") || bucket_name.contains("-.") {
return Some(
"Bucket name must not contain a period adjacent to a hyphen".to_string(),
);
}
if IP_REGEX.is_match(bucket_name) {
return Some("Bucket name must not be formatted as an IP address".to_string());
}
@@ -179,6 +191,14 @@ mod tests {
assert!(validate_bucket_name("192.168.1.1").is_some());
}
#[test]
fn test_bucket_name_period_hyphen_adjacency_rejected() {
assert!(validate_bucket_name("my-.bucket").is_some());
assert!(validate_bucket_name("my.-bucket").is_some());
assert!(validate_bucket_name("a.-b").is_some());
assert!(validate_bucket_name("a-.b").is_some());
}
#[test]
fn test_valid_object_keys() {
assert!(validate_object_key("file.txt", 1024, false, None).is_none());
@@ -217,4 +237,18 @@ mod tests {
assert!(validate_object_key("file<name", 1024, true, None).is_some());
assert!(validate_object_key("file.txt ", 1024, true, None).is_some());
}
#[test]
fn test_windows_rejects_control_chars() {
assert!(validate_object_key("a\u{0001}b", 1024, true, None).is_some());
assert!(validate_object_key("a\nb", 1024, true, None).is_some());
assert!(validate_object_key("a\u{001f}b", 1024, true, None).is_some());
}
#[test]
fn test_non_windows_allows_control_chars_except_nul() {
assert!(validate_object_key("a\u{0001}b", 1024, false, None).is_none());
assert!(validate_object_key("a\nb", 1024, false, None).is_none());
assert!(validate_object_key("a\0b", 1024, false, None).is_some());
}
}

View File

@@ -9,3 +9,5 @@ quick-xml = { workspace = true }
serde = { workspace = true }
chrono = { workspace = true }
percent-encoding = { workspace = true }
sha2 = { workspace = true }
base64 = { workspace = true }

View File

@@ -9,14 +9,29 @@ pub fn format_s3_datetime(dt: &DateTime<Utc>) -> String {
}
pub fn rate_limit_exceeded_xml(resource: &str, request_id: &str) -> String {
let host_id = derive_host_id(request_id);
format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Error><Code>SlowDown</Code><Message>Please reduce your request rate</Message><Resource>{}</Resource><RequestId>{}</RequestId></Error>",
<Error><Code>SlowDown</Code><Message>Please reduce your request rate</Message><Resource>{}</Resource><RequestId>{}</RequestId><HostId>{}</HostId></Error>",
xml_escape(resource),
xml_escape(request_id),
xml_escape(&host_id),
)
}
fn derive_host_id(request_id: &str) -> String {
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine;
use sha2::{Digest, Sha256};
if request_id.is_empty() {
return String::new();
}
let mut hasher = Sha256::new();
hasher.update(b"myfsio-host-id\0");
hasher.update(request_id.as_bytes());
B64.encode(hasher.finalize())
}
fn xml_escape(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
@@ -25,7 +40,12 @@ fn xml_escape(s: &str) -> String {
.replace('\'', "&apos;")
}
pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String {
pub fn list_buckets_xml(
owner_id: &str,
owner_name: &str,
buckets: &[BucketMeta],
region: &str,
) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
writer
@@ -58,6 +78,7 @@ pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]
"CreationDate",
&format_s3_datetime(&bucket.creation_date),
);
write_text_element(&mut writer, "BucketRegion", region);
writer
.write_event(Event::End(BytesEnd::new("Bucket")))
.unwrap();
@@ -112,6 +133,7 @@ pub fn list_objects_v2_xml(
next_continuation_token,
key_count,
None,
false,
)
}
@@ -127,6 +149,7 @@ pub fn list_objects_v2_xml_with_encoding(
next_continuation_token: Option<&str>,
key_count: usize,
encoding_type: Option<&str>,
fetch_owner: bool,
) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
@@ -182,6 +205,16 @@ pub fn list_objects_v2_xml_with_encoding(
"StorageClass",
obj.storage_class.as_deref().unwrap_or("STANDARD"),
);
if fetch_owner {
writer
.write_event(Event::Start(BytesStart::new("Owner")))
.unwrap();
write_text_element(&mut writer, "ID", "myfsio");
write_text_element(&mut writer, "DisplayName", "myfsio");
writer
.write_event(Event::End(BytesEnd::new("Owner")))
.unwrap();
}
writer
.write_event(Event::End(BytesEnd::new("Contents")))
.unwrap();
@@ -264,10 +297,24 @@ pub fn list_objects_v1_xml_with_encoding(
&maybe_url_encode(delimiter, encoding_type),
);
}
if !delimiter.is_empty() && is_truncated {
if let Some(nm) = next_marker {
if is_truncated {
let fallback = next_marker
.filter(|nm| !nm.is_empty())
.map(|nm| nm.to_string())
.or_else(|| {
if !delimiter.is_empty() {
common_prefixes.last().cloned()
} else {
objects.last().map(|o| o.key.clone())
}
});
if let Some(nm) = fallback {
if !nm.is_empty() {
write_text_element(&mut writer, "NextMarker", &maybe_url_encode(nm, encoding_type));
write_text_element(
&mut writer,
"NextMarker",
&maybe_url_encode(&nm, encoding_type),
);
}
}
}
@@ -484,10 +531,34 @@ pub fn delete_result_xml(
String::from_utf8(writer.into_inner().into_inner()).unwrap()
}
pub struct ListMultipartUploadsParams<'a> {
pub bucket: &'a str,
pub key_marker: &'a str,
pub upload_id_marker: &'a str,
pub next_key_marker: &'a str,
pub next_upload_id_marker: &'a str,
pub max_uploads: usize,
pub is_truncated: bool,
pub uploads: &'a [myfsio_common::types::MultipartUploadInfo],
}
pub fn list_multipart_uploads_xml(
bucket: &str,
uploads: &[myfsio_common::types::MultipartUploadInfo],
) -> String {
list_multipart_uploads_xml_paged(&ListMultipartUploadsParams {
bucket,
key_marker: "",
upload_id_marker: "",
next_key_marker: "",
next_upload_id_marker: "",
max_uploads: 1000,
is_truncated: false,
uploads,
})
}
pub fn list_multipart_uploads_xml_paged(p: &ListMultipartUploadsParams<'_>) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
writer
.write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
@@ -496,9 +567,17 @@ pub fn list_multipart_uploads_xml(
let start = BytesStart::new("ListMultipartUploadsResult")
.with_attributes([("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")]);
writer.write_event(Event::Start(start)).unwrap();
write_text_element(&mut writer, "Bucket", bucket);
write_text_element(&mut writer, "Bucket", p.bucket);
write_text_element(&mut writer, "KeyMarker", p.key_marker);
write_text_element(&mut writer, "UploadIdMarker", p.upload_id_marker);
if p.is_truncated {
write_text_element(&mut writer, "NextKeyMarker", p.next_key_marker);
write_text_element(&mut writer, "NextUploadIdMarker", p.next_upload_id_marker);
}
write_text_element(&mut writer, "MaxUploads", &p.max_uploads.to_string());
write_text_element(&mut writer, "IsTruncated", &p.is_truncated.to_string());
for upload in uploads {
for upload in p.uploads {
writer
.write_event(Event::Start(BytesStart::new("Upload")))
.unwrap();
@@ -521,12 +600,36 @@ pub fn list_multipart_uploads_xml(
String::from_utf8(writer.into_inner().into_inner()).unwrap()
}
pub struct ListPartsParams<'a> {
pub bucket: &'a str,
pub key: &'a str,
pub upload_id: &'a str,
pub part_number_marker: u32,
pub next_part_number_marker: u32,
pub max_parts: usize,
pub is_truncated: bool,
pub parts: &'a [myfsio_common::types::PartMeta],
}
pub fn list_parts_xml(
bucket: &str,
key: &str,
upload_id: &str,
parts: &[myfsio_common::types::PartMeta],
) -> String {
list_parts_xml_paged(&ListPartsParams {
bucket,
key,
upload_id,
part_number_marker: 0,
next_part_number_marker: parts.last().map(|p| p.part_number).unwrap_or(0),
max_parts: 1000,
is_truncated: false,
parts,
})
}
pub fn list_parts_xml_paged(p: &ListPartsParams<'_>) -> String {
let mut writer = Writer::new(Cursor::new(Vec::new()));
writer
.write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
@@ -535,11 +638,23 @@ pub fn list_parts_xml(
let start = BytesStart::new("ListPartsResult")
.with_attributes([("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/")]);
writer.write_event(Event::Start(start)).unwrap();
write_text_element(&mut writer, "Bucket", bucket);
write_text_element(&mut writer, "Key", key);
write_text_element(&mut writer, "UploadId", upload_id);
write_text_element(&mut writer, "Bucket", p.bucket);
write_text_element(&mut writer, "Key", p.key);
write_text_element(&mut writer, "UploadId", p.upload_id);
write_text_element(
&mut writer,
"PartNumberMarker",
&p.part_number_marker.to_string(),
);
write_text_element(
&mut writer,
"NextPartNumberMarker",
&p.next_part_number_marker.to_string(),
);
write_text_element(&mut writer, "MaxParts", &p.max_parts.to_string());
write_text_element(&mut writer, "IsTruncated", &p.is_truncated.to_string());
for part in parts {
for part in p.parts {
writer
.write_event(Event::Start(BytesStart::new("Part")))
.unwrap();
@@ -572,9 +687,10 @@ mod tests {
name: "test-bucket".to_string(),
creation_date: Utc::now(),
}];
let xml = list_buckets_xml("owner-id", "owner-name", &buckets);
let xml = list_buckets_xml("owner-id", "owner-name", &buckets, "us-east-1");
assert!(xml.contains("<Name>test-bucket</Name>"));
assert!(xml.contains("<ID>owner-id</ID>"));
assert!(xml.contains("<BucketRegion>us-east-1</BucketRegion>"));
assert!(xml.contains("ListAllMyBucketsResult"));
}