From 51d54b42aca919c6fbe68ee626cfdd6a7f047372 Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 22 Apr 2026 15:41:18 +0800 Subject: [PATCH] Rust fixes --- rust/myfsio-engine/Cargo.lock | 14 + rust/myfsio-engine/Cargo.toml | 2 +- .../crates/myfsio-crypto/src/encryption.rs | 35 +- .../crates/myfsio-server/Cargo.toml | 1 + .../crates/myfsio-server/src/config.rs | 341 +++++++++++++++++- .../myfsio-server/src/handlers/config.rs | 10 + .../crates/myfsio-server/src/handlers/kms.rs | 26 +- .../crates/myfsio-server/src/handlers/mod.rs | 9 + .../crates/myfsio-server/src/handlers/ui.rs | 28 +- .../myfsio-server/src/handlers/ui_api.rs | 93 +++-- .../myfsio-server/src/handlers/ui_pages.rs | 91 ++++- .../crates/myfsio-server/src/lib.rs | 98 ++++- .../crates/myfsio-server/src/main.rs | 135 ++++++- .../myfsio-server/src/middleware/mod.rs | 2 + .../myfsio-server/src/middleware/ratelimit.rs | 241 +++++++++++++ .../crates/myfsio-server/src/services/gc.rs | 29 ++ .../crates/myfsio-server/src/state.rs | 51 ++- .../crates/myfsio-server/templates/docs.html | 35 +- .../crates/myfsio-server/templates/iam.html | 2 +- .../crates/myfsio-server/tests/integration.rs | 122 ++++++- .../crates/myfsio-storage/src/fs_backend.rs | 49 ++- .../crates/myfsio-xml/src/response.rs | 6 + 22 files changed, 1312 insertions(+), 108 deletions(-) create mode 100644 rust/myfsio-engine/crates/myfsio-server/src/middleware/ratelimit.rs diff --git a/rust/myfsio-engine/Cargo.lock b/rust/myfsio-engine/Cargo.lock index 1c0817f..d57d448 100644 --- a/rust/myfsio-engine/Cargo.lock +++ b/rust/myfsio-engine/Cargo.lock @@ -2542,6 +2542,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.8.4" @@ -2724,6 +2733,7 @@ dependencies = [ "roxmltree", "serde", "serde_json", + "serde_urlencoded", "sha2 0.10.9", "subtle", "sysinfo", @@ -4333,10 +4343,14 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/rust/myfsio-engine/Cargo.toml b/rust/myfsio-engine/Cargo.toml index 6f3bde5..e5360ec 100644 --- a/rust/myfsio-engine/Cargo.toml +++ b/rust/myfsio-engine/Cargo.toml @@ -38,7 +38,7 @@ percent-encoding = "2" regex = "1" unicode-normalization = "0.1" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } thiserror = "2" chrono = { version = "0.4", features = ["serde"] } base64 = "0.22" diff --git a/rust/myfsio-engine/crates/myfsio-crypto/src/encryption.rs b/rust/myfsio-engine/crates/myfsio-crypto/src/encryption.rs index 37a5193..40ca870 100644 --- a/rust/myfsio-engine/crates/myfsio-crypto/src/encryption.rs +++ b/rust/myfsio-engine/crates/myfsio-crypto/src/encryption.rs @@ -82,11 +82,35 @@ impl EncryptionMetadata { pub struct EncryptionService { master_key: [u8; 32], kms: Option>, + config: EncryptionConfig, +} + +#[derive(Debug, Clone, Copy)] +pub struct EncryptionConfig { + pub chunk_size: usize, +} + +impl Default for EncryptionConfig { + fn default() -> Self { + Self { chunk_size: 65_536 } + } } impl EncryptionService { pub fn new(master_key: [u8; 32], kms: Option>) -> Self { - Self { master_key, kms } + Self::with_config(master_key, kms, EncryptionConfig::default()) + } + + pub fn with_config( + master_key: [u8; 32], + kms: Option>, + config: EncryptionConfig, + ) -> Self { + Self { + master_key, + kms, + config, + } } pub fn generate_data_key(&self) -> ([u8; 32], [u8; 12]) { @@ -192,9 +216,12 @@ impl EncryptionService { let op = output_path.to_owned(); let ak = actual_key; let n = nonce; - tokio::task::spawn_blocking(move || encrypt_stream_chunked(&ip, &op, &ak, &n, None)) - .await - .map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??; + let chunk_size = self.config.chunk_size; + tokio::task::spawn_blocking(move || { + encrypt_stream_chunked(&ip, &op, &ak, &n, Some(chunk_size)) + }) + .await + .map_err(|e| CryptoError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??; Ok(EncryptionMetadata { algorithm: ctx.algorithm.as_str().to_string(), diff --git a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml index 27d8f75..40ca42e 100644 --- a/rust/myfsio-engine/crates/myfsio-server/Cargo.toml +++ b/rust/myfsio-engine/crates/myfsio-server/Cargo.toml @@ -19,6 +19,7 @@ hyper = { workspace = true } bytes = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_urlencoded = "0.7" tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio-util = { workspace = true } diff --git a/rust/myfsio-engine/crates/myfsio-server/src/config.rs b/rust/myfsio-engine/crates/myfsio-server/src/config.rs index 7e4b587..6bfd147 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/config.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/config.rs @@ -1,6 +1,21 @@ use std::net::SocketAddr; use std::path::PathBuf; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RateLimitSetting { + pub max_requests: u32, + pub window_seconds: u64, +} + +impl RateLimitSetting { + pub const fn new(max_requests: u32, window_seconds: u64) -> Self { + Self { + max_requests, + window_seconds, + } + } +} + #[derive(Debug, Clone)] pub struct ServerConfig { pub bind_addr: SocketAddr, @@ -13,8 +28,16 @@ pub struct ServerConfig { pub presigned_url_max_expiry: u64, pub secret_key: Option, pub encryption_enabled: bool, + pub encryption_chunk_size_bytes: usize, pub kms_enabled: bool, + pub kms_generate_data_key_min_bytes: usize, + pub kms_generate_data_key_max_bytes: usize, pub gc_enabled: bool, + pub gc_interval_hours: f64, + pub gc_temp_file_max_age_hours: f64, + pub gc_multipart_max_age_days: u64, + pub gc_lock_file_max_age_hours: f64, + pub gc_dry_run: bool, pub integrity_enabled: bool, pub metrics_enabled: bool, pub metrics_history_enabled: bool, @@ -23,7 +46,12 @@ pub struct ServerConfig { pub metrics_history_interval_minutes: u64, pub metrics_history_retention_hours: u64, pub lifecycle_enabled: bool, + pub lifecycle_max_history_per_bucket: usize, pub website_hosting_enabled: bool, + pub object_key_max_length_bytes: usize, + pub object_tag_limit: usize, + pub object_cache_max_size: usize, + pub bucket_config_cache_ttl_seconds: f64, pub replication_connect_timeout_secs: u64, pub replication_read_timeout_secs: u64, pub replication_max_retries: u32, @@ -36,6 +64,26 @@ pub struct ServerConfig { pub site_sync_read_timeout_secs: u64, pub site_sync_max_retries: u32, pub site_sync_clock_skew_tolerance: f64, + pub site_id: Option, + pub site_endpoint: Option, + pub site_region: String, + pub site_priority: i32, + pub api_base_url: String, + pub num_trusted_proxies: usize, + pub allowed_redirect_hosts: Vec, + pub allow_internal_endpoints: bool, + pub cors_origins: Vec, + pub cors_methods: Vec, + pub cors_allow_headers: Vec, + pub cors_expose_headers: Vec, + pub session_lifetime_days: u64, + pub log_level: String, + pub multipart_min_part_size: u64, + pub bulk_delete_max_keys: usize, + pub stream_chunk_size: usize, + pub ratelimit_default: RateLimitSetting, + pub ratelimit_admin: RateLimitSetting, + pub ratelimit_storage_uri: String, pub ui_enabled: bool, pub templates_dir: PathBuf, pub static_dir: PathBuf, @@ -48,6 +96,8 @@ impl ServerConfig { .unwrap_or_else(|_| "5000".to_string()) .parse() .unwrap_or(5000); + let host_ip: std::net::IpAddr = host.parse().unwrap(); + let bind_addr = SocketAddr::new(host_ip, port); let ui_port: u16 = std::env::var("UI_PORT") .unwrap_or_else(|_| "5100".to_string()) .parse() @@ -98,10 +148,19 @@ impl ServerConfig { }; let encryption_enabled = parse_bool_env("ENCRYPTION_ENABLED", false); + let encryption_chunk_size_bytes = parse_usize_env("ENCRYPTION_CHUNK_SIZE_BYTES", 65_536); let kms_enabled = parse_bool_env("KMS_ENABLED", false); + let kms_generate_data_key_min_bytes = parse_usize_env("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1); + let kms_generate_data_key_max_bytes = + parse_usize_env("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024); let gc_enabled = parse_bool_env("GC_ENABLED", false); + let gc_interval_hours = parse_f64_env("GC_INTERVAL_HOURS", 6.0); + let gc_temp_file_max_age_hours = parse_f64_env("GC_TEMP_FILE_MAX_AGE_HOURS", 24.0); + let gc_multipart_max_age_days = parse_u64_env("GC_MULTIPART_MAX_AGE_DAYS", 7); + let gc_lock_file_max_age_hours = parse_f64_env("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0); + let gc_dry_run = parse_bool_env("GC_DRY_RUN", false); let integrity_enabled = parse_bool_env("INTEGRITY_ENABLED", false); @@ -115,8 +174,15 @@ impl ServerConfig { let metrics_history_retention_hours = parse_u64_env("METRICS_HISTORY_RETENTION_HOURS", 24); let lifecycle_enabled = parse_bool_env("LIFECYCLE_ENABLED", false); + let lifecycle_max_history_per_bucket = + parse_usize_env("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50); let website_hosting_enabled = parse_bool_env("WEBSITE_HOSTING_ENABLED", false); + let object_key_max_length_bytes = parse_usize_env("OBJECT_KEY_MAX_LENGTH_BYTES", 1024); + let object_tag_limit = parse_usize_env("OBJECT_TAG_LIMIT", 50); + let object_cache_max_size = parse_usize_env("OBJECT_CACHE_MAX_SIZE", 100); + let bucket_config_cache_ttl_seconds = + parse_f64_env("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0); let replication_connect_timeout_secs = parse_u64_env("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5); @@ -139,6 +205,33 @@ impl ServerConfig { .and_then(|s| s.parse().ok()) .unwrap_or(1.0); + let site_id = parse_optional_string_env("SITE_ID"); + let site_endpoint = parse_optional_string_env("SITE_ENDPOINT"); + let site_region = std::env::var("SITE_REGION").unwrap_or_else(|_| region.clone()); + let site_priority = parse_i32_env("SITE_PRIORITY", 100); + let api_base_url = std::env::var("API_BASE_URL") + .unwrap_or_else(|_| format!("http://{}", bind_addr)) + .trim_end_matches('/') + .to_string(); + let num_trusted_proxies = parse_usize_env("NUM_TRUSTED_PROXIES", 0); + let allowed_redirect_hosts = parse_list_env("ALLOWED_REDIRECT_HOSTS", ""); + let allow_internal_endpoints = parse_bool_env("ALLOW_INTERNAL_ENDPOINTS", false); + let cors_origins = parse_list_env("CORS_ORIGINS", "*"); + let cors_methods = parse_list_env("CORS_METHODS", "GET,PUT,POST,DELETE,OPTIONS,HEAD"); + let cors_allow_headers = parse_list_env("CORS_ALLOW_HEADERS", "*"); + let cors_expose_headers = parse_list_env("CORS_EXPOSE_HEADERS", "*"); + let session_lifetime_days = parse_u64_env("SESSION_LIFETIME_DAYS", 1); + let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| "INFO".to_string()); + let multipart_min_part_size = parse_u64_env("MULTIPART_MIN_PART_SIZE", 5_242_880); + let bulk_delete_max_keys = parse_usize_env("BULK_DELETE_MAX_KEYS", 1000); + let stream_chunk_size = parse_usize_env("STREAM_CHUNK_SIZE", 1_048_576); + let ratelimit_default = + parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(200, 60)); + let ratelimit_admin = + parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60)); + let ratelimit_storage_uri = + std::env::var("RATE_LIMIT_STORAGE_URI").unwrap_or_else(|_| "memory://".to_string()); + let ui_enabled = parse_bool_env("UI_ENABLED", true); let templates_dir = std::env::var("TEMPLATES_DIR") .map(PathBuf::from) @@ -147,9 +240,8 @@ impl ServerConfig { .map(PathBuf::from) .unwrap_or_else(|_| default_static_dir()); - let host_ip: std::net::IpAddr = host.parse().unwrap(); Self { - bind_addr: SocketAddr::new(host_ip, port), + bind_addr, ui_bind_addr: SocketAddr::new(host_ip, ui_port), storage_root: storage_path, region, @@ -159,8 +251,16 @@ impl ServerConfig { presigned_url_max_expiry, secret_key, encryption_enabled, + encryption_chunk_size_bytes, kms_enabled, + kms_generate_data_key_min_bytes, + kms_generate_data_key_max_bytes, gc_enabled, + gc_interval_hours, + gc_temp_file_max_age_hours, + gc_multipart_max_age_days, + gc_lock_file_max_age_hours, + gc_dry_run, integrity_enabled, metrics_enabled, metrics_history_enabled, @@ -169,7 +269,12 @@ impl ServerConfig { metrics_history_interval_minutes, metrics_history_retention_hours, lifecycle_enabled, + lifecycle_max_history_per_bucket, website_hosting_enabled, + object_key_max_length_bytes, + object_tag_limit, + object_cache_max_size, + bucket_config_cache_ttl_seconds, replication_connect_timeout_secs, replication_read_timeout_secs, replication_max_retries, @@ -182,6 +287,26 @@ impl ServerConfig { site_sync_read_timeout_secs, site_sync_max_retries, site_sync_clock_skew_tolerance, + site_id, + site_endpoint, + site_region, + site_priority, + api_base_url, + num_trusted_proxies, + allowed_redirect_hosts, + allow_internal_endpoints, + cors_origins, + cors_methods, + cors_allow_headers, + cors_expose_headers, + session_lifetime_days, + log_level, + multipart_min_part_size, + bulk_delete_max_keys, + stream_chunk_size, + ratelimit_default, + ratelimit_admin, + ratelimit_storage_uri, ui_enabled, templates_dir, static_dir, @@ -189,6 +314,89 @@ impl ServerConfig { } } +impl Default for ServerConfig { + fn default() -> Self { + Self { + bind_addr: "127.0.0.1:5000".parse().unwrap(), + ui_bind_addr: "127.0.0.1:5100".parse().unwrap(), + storage_root: PathBuf::from("./data"), + region: "us-east-1".to_string(), + iam_config_path: PathBuf::from("./data/.myfsio.sys/config/iam.json"), + sigv4_timestamp_tolerance_secs: 900, + presigned_url_min_expiry: 1, + presigned_url_max_expiry: 604_800, + secret_key: None, + encryption_enabled: false, + encryption_chunk_size_bytes: 65_536, + kms_enabled: false, + kms_generate_data_key_min_bytes: 1, + kms_generate_data_key_max_bytes: 1024, + gc_enabled: false, + gc_interval_hours: 6.0, + gc_temp_file_max_age_hours: 24.0, + gc_multipart_max_age_days: 7, + gc_lock_file_max_age_hours: 1.0, + gc_dry_run: false, + integrity_enabled: false, + metrics_enabled: false, + metrics_history_enabled: false, + metrics_interval_minutes: 5, + metrics_retention_hours: 24, + metrics_history_interval_minutes: 5, + metrics_history_retention_hours: 24, + lifecycle_enabled: false, + lifecycle_max_history_per_bucket: 50, + website_hosting_enabled: false, + object_key_max_length_bytes: 1024, + object_tag_limit: 50, + object_cache_max_size: 100, + bucket_config_cache_ttl_seconds: 30.0, + replication_connect_timeout_secs: 5, + replication_read_timeout_secs: 30, + replication_max_retries: 2, + replication_streaming_threshold_bytes: 10_485_760, + replication_max_failures_per_bucket: 50, + site_sync_enabled: false, + site_sync_interval_secs: 60, + site_sync_batch_size: 100, + site_sync_connect_timeout_secs: 10, + site_sync_read_timeout_secs: 120, + site_sync_max_retries: 2, + site_sync_clock_skew_tolerance: 1.0, + site_id: None, + site_endpoint: None, + site_region: "us-east-1".to_string(), + site_priority: 100, + api_base_url: "http://127.0.0.1:5000".to_string(), + num_trusted_proxies: 0, + allowed_redirect_hosts: Vec::new(), + allow_internal_endpoints: false, + cors_origins: vec!["*".to_string()], + cors_methods: vec![ + "GET".to_string(), + "PUT".to_string(), + "POST".to_string(), + "DELETE".to_string(), + "OPTIONS".to_string(), + "HEAD".to_string(), + ], + cors_allow_headers: vec!["*".to_string()], + cors_expose_headers: vec!["*".to_string()], + session_lifetime_days: 1, + log_level: "INFO".to_string(), + multipart_min_part_size: 5_242_880, + bulk_delete_max_keys: 1000, + stream_chunk_size: 1_048_576, + ratelimit_default: RateLimitSetting::new(200, 60), + ratelimit_admin: RateLimitSetting::new(60, 60), + ratelimit_storage_uri: "memory://".to_string(), + ui_enabled: true, + templates_dir: default_templates_dir(), + static_dir: default_static_dir(), + } + } +} + fn default_templates_dir() -> PathBuf { let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); manifest_dir.join("templates") @@ -214,6 +422,27 @@ fn parse_u64_env(key: &str, default: u64) -> u64 { .unwrap_or(default) } +fn parse_usize_env(key: &str, default: usize) -> usize { + std::env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + +fn parse_i32_env(key: &str, default: i32) -> i32 { + std::env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + +fn parse_f64_env(key: &str, default: f64) -> f64 { + std::env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + fn parse_bool_env(key: &str, default: bool) -> bool { std::env::var(key) .ok() @@ -225,3 +454,111 @@ fn parse_bool_env(key: &str, default: bool) -> bool { }) .unwrap_or(default) } + +fn parse_optional_string_env(key: &str) -> Option { + std::env::var(key) + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn parse_list_env(key: &str, default: &str) -> Vec { + std::env::var(key) + .unwrap_or_else(|_| default.to_string()) + .split(',') + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .collect() +} + +pub fn parse_rate_limit(value: &str) -> Option { + let parts = value.split_whitespace().collect::>(); + if parts.len() != 3 || !parts[1].eq_ignore_ascii_case("per") { + return None; + } + let max_requests = parts[0].parse::().ok()?; + if max_requests == 0 { + return None; + } + let window_seconds = match parts[2].to_ascii_lowercase().as_str() { + "second" | "seconds" => 1, + "minute" | "minutes" => 60, + "hour" | "hours" => 3600, + "day" | "days" => 86_400, + _ => return None, + }; + Some(RateLimitSetting::new(max_requests, window_seconds)) +} + +fn parse_rate_limit_env(key: &str, default: RateLimitSetting) -> RateLimitSetting { + std::env::var(key) + .ok() + .and_then(|value| parse_rate_limit(&value)) + .unwrap_or(default) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Mutex, OnceLock}; + + fn env_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) + } + + #[test] + fn parses_rate_limit_text() { + assert_eq!( + parse_rate_limit("200 per minute"), + Some(RateLimitSetting::new(200, 60)) + ); + assert_eq!( + parse_rate_limit("3 per hours"), + Some(RateLimitSetting::new(3, 3600)) + ); + assert_eq!(parse_rate_limit("0 per minute"), None); + assert_eq!(parse_rate_limit("bad"), None); + } + + #[test] + fn env_defaults_and_invalid_values_fall_back() { + let _guard = env_lock().lock().unwrap(); + std::env::remove_var("OBJECT_KEY_MAX_LENGTH_BYTES"); + std::env::set_var("OBJECT_TAG_LIMIT", "not-a-number"); + std::env::set_var("RATE_LIMIT_DEFAULT", "invalid"); + + let config = ServerConfig::from_env(); + + assert_eq!(config.object_key_max_length_bytes, 1024); + assert_eq!(config.object_tag_limit, 50); + assert_eq!(config.ratelimit_default, RateLimitSetting::new(200, 60)); + + std::env::remove_var("OBJECT_TAG_LIMIT"); + std::env::remove_var("RATE_LIMIT_DEFAULT"); + } + + #[test] + fn env_overrides_new_values() { + let _guard = env_lock().lock().unwrap(); + std::env::set_var("OBJECT_KEY_MAX_LENGTH_BYTES", "2048"); + std::env::set_var("GC_DRY_RUN", "true"); + std::env::set_var("RATE_LIMIT_ADMIN", "7 per second"); + std::env::set_var("HOST", "127.0.0.1"); + std::env::set_var("PORT", "5501"); + std::env::remove_var("API_BASE_URL"); + + let config = ServerConfig::from_env(); + + assert_eq!(config.object_key_max_length_bytes, 2048); + assert!(config.gc_dry_run); + assert_eq!(config.ratelimit_admin, RateLimitSetting::new(7, 1)); + assert_eq!(config.api_base_url, "http://127.0.0.1:5501"); + + std::env::remove_var("OBJECT_KEY_MAX_LENGTH_BYTES"); + std::env::remove_var("GC_DRY_RUN"); + std::env::remove_var("RATE_LIMIT_ADMIN"); + std::env::remove_var("HOST"); + std::env::remove_var("PORT"); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs index 5bb6163..4547374 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs @@ -1172,6 +1172,16 @@ pub async fn put_object_tagging(state: &AppState, bucket: &str, key: &str, body: let xml_str = String::from_utf8_lossy(&body_bytes); let tags = parse_tagging_xml(&xml_str); + if tags.len() > state.config.object_tag_limit { + return xml_response( + StatusCode::BAD_REQUEST, + S3Error::new( + S3ErrorCode::InvalidTag, + format!("Maximum {} tags allowed", state.config.object_tag_limit), + ) + .to_xml(), + ); + } match state.storage.set_object_tags(bucket, key, &tags).await { Ok(()) => StatusCode::OK.into_response(), diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/kms.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/kms.rs index 857382e..b6ca8f0 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/kms.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/kms.rs @@ -294,8 +294,17 @@ async fn generate_data_key_inner(state: AppState, body: Body, include_plaintext: .and_then(|v| v.as_u64()) .unwrap_or(32) as usize; - if !(1..=1024).contains(&num_bytes) { - return json_err(StatusCode::BAD_REQUEST, "NumberOfBytes must be 1-1024"); + if num_bytes < state.config.kms_generate_data_key_min_bytes + || num_bytes > state.config.kms_generate_data_key_max_bytes + { + return json_err( + StatusCode::BAD_REQUEST, + &format!( + "NumberOfBytes must be {}-{}", + state.config.kms_generate_data_key_min_bytes, + state.config.kms_generate_data_key_max_bytes + ), + ); } match kms.generate_data_key(key_id, num_bytes).await { @@ -389,8 +398,17 @@ pub async fn generate_random(State(state): State, body: Body) -> Respo .and_then(|v| v.as_u64()) .unwrap_or(32) as usize; - if !(1..=1024).contains(&num_bytes) { - return json_err(StatusCode::BAD_REQUEST, "NumberOfBytes must be 1-1024"); + if num_bytes < state.config.kms_generate_data_key_min_bytes + || num_bytes > state.config.kms_generate_data_key_max_bytes + { + return json_err( + StatusCode::BAD_REQUEST, + &format!( + "NumberOfBytes must be {}-{}", + state.config.kms_generate_data_key_min_bytes, + state.config.kms_generate_data_key_max_bytes + ), + ); } let mut bytes = vec![0u8; num_bytes]; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs index 45d1b51..1acd085 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs @@ -1122,6 +1122,14 @@ pub async fn put_object( Ok(tags) => tags, Err(response) => return response, }; + if let Some(ref tags) = tags { + if tags.len() > state.config.object_tag_limit { + return s3_error_response(S3Error::new( + S3ErrorCode::InvalidTag, + format!("Maximum {} tags allowed", state.config.object_tag_limit), + )); + } + } let aws_chunked = is_aws_chunked(&headers); let boxed: myfsio_storage::traits::AsyncReadStream = if has_upload_checksum(&headers) { @@ -2860,6 +2868,7 @@ mod tests { ui_enabled: false, templates_dir: manifest_dir.join("templates"), static_dir: manifest_dir.join("static"), + ..ServerConfig::default() }; (AppState::new(config), tmp) } diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui.rs index 6414d5e..3d94114 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui.rs @@ -66,7 +66,7 @@ pub async fn login_submit( let next = form .next .as_deref() - .filter(|n| n.starts_with("/ui/") || *n == "/ui") + .filter(|n| is_allowed_redirect(n, &state.config.allowed_redirect_hosts)) .unwrap_or("/ui/buckets") .to_string(); Redirect::to(&next).into_response() @@ -80,6 +80,32 @@ pub async fn login_submit( } } +fn is_allowed_redirect(target: &str, allowed_hosts: &[String]) -> bool { + if target == "/ui" || target.starts_with("/ui/") { + return true; + } + let Some(rest) = target + .strip_prefix("https://") + .or_else(|| target.strip_prefix("http://")) + else { + return false; + }; + let host = rest + .split('/') + .next() + .unwrap_or_default() + .split('@') + .last() + .unwrap_or_default() + .split(':') + .next() + .unwrap_or_default() + .to_ascii_lowercase(); + allowed_hosts + .iter() + .any(|allowed| allowed.eq_ignore_ascii_case(&host)) +} + pub async fn logout(Extension(session): Extension) -> Response { session.write(|s| { s.user_id = None; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs index 0217117..b4b002b 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs @@ -49,6 +49,8 @@ const AWS_QUERY_ENCODE_SET: &AsciiSet = &NON_ALPHANUMERIC .remove(b'.') .remove(b'~'); +const UI_OBJECT_BROWSER_MAX_KEYS: usize = 5000; + fn url_templates_for(bucket: &str) -> Value { json!({ "download": format!("/ui/buckets/{}/objects/KEY_PLACEHOLDER/download", bucket), @@ -185,10 +187,7 @@ fn safe_attachment_filename(key: &str) -> String { } fn parse_api_base(state: &AppState) -> String { - std::env::var("API_BASE_URL") - .unwrap_or_else(|_| format!("http://{}", state.config.bind_addr)) - .trim_end_matches('/') - .to_string() + state.config.api_base_url.trim_end_matches('/').to_string() } fn aws_query_encode(value: &str) -> String { @@ -1022,38 +1021,48 @@ pub async fn stream_bucket_objects( let prefix = q.prefix.clone().unwrap_or_default(); if use_delimiter { - let params = myfsio_common::types::ShallowListParams { - prefix: prefix.clone(), - delimiter: "/".to_string(), - max_keys: 5000, - continuation_token: None, - }; - match state - .storage - .list_objects_shallow(&bucket_name, ¶ms) - .await - { - Ok(res) => { - for p in &res.common_prefixes { - lines.push(json!({ "type": "folder", "prefix": p }).to_string()); + let mut token: Option = None; + loop { + let params = myfsio_common::types::ShallowListParams { + prefix: prefix.clone(), + delimiter: "/".to_string(), + max_keys: UI_OBJECT_BROWSER_MAX_KEYS, + continuation_token: token.clone(), + }; + match state + .storage + .list_objects_shallow(&bucket_name, ¶ms) + .await + { + Ok(res) => { + for p in &res.common_prefixes { + lines.push(json!({ "type": "folder", "prefix": p }).to_string()); + } + for o in &res.objects { + lines.push( + json!({ + "type": "object", + "key": o.key, + "size": o.size, + "last_modified": o.last_modified.to_rfc3339(), + "last_modified_iso": o.last_modified.to_rfc3339(), + "last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(), + "etag": o.etag.clone().unwrap_or_default(), + "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), + }) + .to_string(), + ); + } + if !res.is_truncated || res.next_continuation_token.is_none() { + break; + } + token = res.next_continuation_token; } - for o in &res.objects { - lines.push( - json!({ - "type": "object", - "key": o.key, - "size": o.size, - "last_modified": o.last_modified.to_rfc3339(), - "last_modified_iso": o.last_modified.to_rfc3339(), - "last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(), - "etag": o.etag.clone().unwrap_or_default(), - "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), - }) - .to_string(), - ); + Err(e) => { + lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()); + break; } } - Err(e) => lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()), } } else { let mut token: Option = None; @@ -1123,7 +1132,7 @@ pub async fn list_bucket_folders( let params = myfsio_common::types::ShallowListParams { prefix: prefix.clone(), delimiter: "/".to_string(), - max_keys: 5000, + max_keys: UI_OBJECT_BROWSER_MAX_KEYS, continuation_token: None, }; match state @@ -2408,8 +2417,11 @@ async fn update_object_tags(state: &AppState, bucket: &str, key: &str, body: Bod Err(response) => return response, }; - if payload.tags.len() > 50 { - return json_error(StatusCode::BAD_REQUEST, "Maximum 50 tags allowed"); + if payload.tags.len() > state.config.object_tag_limit { + return json_error( + StatusCode::BAD_REQUEST, + format!("Maximum {} tags allowed", state.config.object_tag_limit), + ); } let tags = payload @@ -2841,6 +2853,15 @@ pub async fn bulk_delete_objects( "No objects found under the selected folders", ); } + if keys.len() > state.config.bulk_delete_max_keys { + return json_error( + StatusCode::BAD_REQUEST, + format!( + "Bulk delete supports at most {} keys", + state.config.bulk_delete_max_keys + ), + ); + } let mut deleted = Vec::new(); let mut errors = Vec::new(); diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs index 3be88f7..ec86ded 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; +use axum::body::Body; use axum::extract::{Extension, Form, Path, Query, State}; use axum::http::{header, HeaderMap, StatusCode}; use axum::response::{IntoResponse, Redirect, Response}; +use http_body_util::BodyExt; use serde_json::{json, Value}; use tera::Context; @@ -203,6 +205,59 @@ fn wants_json(headers: &HeaderMap) -> bool { .unwrap_or(false) } +async fn parse_form_any( + headers: &HeaderMap, + body: Body, +) -> Result, String> { + let content_type = headers + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + let is_multipart = content_type + .to_ascii_lowercase() + .starts_with("multipart/form-data"); + + let bytes = body + .collect() + .await + .map_err(|e| format!("Failed to read request body: {}", e))? + .to_bytes(); + + if is_multipart { + let boundary = multer::parse_boundary(&content_type) + .map_err(|_| "Missing multipart boundary".to_string())?; + let stream = futures::stream::once(async move { + Ok::<_, std::io::Error>(bytes) + }); + let mut multipart = multer::Multipart::new(stream, boundary); + let mut out = HashMap::new(); + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| format!("Malformed multipart body: {}", e))? + { + let name = match field.name() { + Some(name) => name.to_string(), + None => continue, + }; + if field.file_name().is_some() { + continue; + } + let value = field + .text() + .await + .map_err(|e| format!("Invalid multipart field '{}': {}", name, e))?; + out.insert(name, value); + } + Ok(out) + } else { + let parsed: Vec<(String, String)> = serde_urlencoded::from_bytes(&bytes) + .map_err(|e| format!("Invalid form body: {}", e))?; + Ok(parsed.into_iter().collect()) + } +} + fn bucket_tab_redirect(bucket_name: &str, tab: &str) -> Response { Redirect::to(&format!("/ui/buckets/{}?tab={}", bucket_name, tab)).into_response() } @@ -231,10 +286,7 @@ fn default_public_policy(bucket_name: &str) -> String { } fn parse_api_base(state: &AppState) -> (String, String) { - let api_base = std::env::var("API_BASE_URL") - .unwrap_or_else(|_| format!("http://{}", state.config.bind_addr)) - .trim_end_matches('/') - .to_string(); + let api_base = state.config.api_base_url.trim_end_matches('/').to_string(); let api_host = api_base .split("://") .nth(1) @@ -1173,16 +1225,13 @@ pub async fn sites_dashboard( ctx.insert("connections", &conns); ctx.insert( "config_site_id", - &std::env::var("SITE_ID").unwrap_or_default(), + &state.config.site_id.clone().unwrap_or_default(), ); ctx.insert( "config_site_endpoint", - &std::env::var("SITE_ENDPOINT").unwrap_or_default(), - ); - ctx.insert( - "config_site_region", - &std::env::var("SITE_REGION").unwrap_or_else(|_| state.config.region.clone()), + &state.config.site_endpoint.clone().unwrap_or_default(), ); + ctx.insert("config_site_region", &state.config.site_region); ctx.insert("topology", &json!({"sites": [], "connections": []})); render(&state, "sites.html", &ctx) } @@ -2119,9 +2168,29 @@ pub async fn create_bucket( State(state): State, Extension(session): Extension, headers: HeaderMap, - axum::extract::Form(form): axum::extract::Form, + body: Body, ) -> Response { let wants_json = wants_json(&headers); + let form = match parse_form_any(&headers, body).await { + Ok(fields) => CreateBucketForm { + bucket_name: fields + .get("bucket_name") + .cloned() + .unwrap_or_default(), + csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(), + }, + Err(message) => { + if wants_json { + return ( + StatusCode::BAD_REQUEST, + axum::Json(json!({ "error": message })), + ) + .into_response(); + } + session.write(|s| s.push_flash("danger", message)); + return Redirect::to("/ui/buckets").into_response(); + } + }; let bucket_name = form.bucket_name.trim().to_string(); if bucket_name.is_empty() { diff --git a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs index 8199121..217f2b8 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs @@ -333,7 +333,16 @@ pub fn create_ui_router(state: state::AppState) -> Router { } pub fn create_router(state: state::AppState) -> Router { - let mut router = Router::new() + let default_rate_limit = middleware::RateLimitLayerState::new( + state.config.ratelimit_default, + state.config.num_trusted_proxies, + ); + let admin_rate_limit = middleware::RateLimitLayerState::new( + state.config.ratelimit_admin, + state.config.num_trusted_proxies, + ); + + let mut api_router = Router::new() .route("/myfsio/health", axum::routing::get(handlers::health_check)) .route("/", axum::routing::get(handlers::list_buckets)) .route( @@ -362,7 +371,7 @@ pub fn create_router(state: state::AppState) -> Router { ); if state.config.kms_enabled { - router = router + api_router = api_router .route( "/kms/keys", axum::routing::get(handlers::kms::list_keys).post(handlers::kms::create_key), @@ -415,7 +424,17 @@ pub fn create_router(state: state::AppState) -> Router { ); } - router = router + api_router = api_router + .layer(axum::middleware::from_fn_with_state( + state.clone(), + middleware::auth_layer, + )) + .layer(axum::middleware::from_fn_with_state( + default_rate_limit, + middleware::rate_limit_layer, + )); + + let admin_router = Router::new() .route( "/admin/site", axum::routing::get(handlers::admin::get_local_site) @@ -546,14 +565,81 @@ pub fn create_router(state: state::AppState) -> Router { .route( "/admin/integrity/history", axum::routing::get(handlers::admin::integrity_history), - ); - - router + ) .layer(axum::middleware::from_fn_with_state( state.clone(), middleware::auth_layer, )) + .layer(axum::middleware::from_fn_with_state( + admin_rate_limit, + middleware::rate_limit_layer, + )); + + api_router + .merge(admin_router) .layer(axum::middleware::from_fn(middleware::server_header)) + .layer(cors_layer(&state.config)) .layer(tower_http::compression::CompressionLayer::new()) .with_state(state) } + +fn cors_layer(config: &config::ServerConfig) -> tower_http::cors::CorsLayer { + use axum::http::{HeaderName, HeaderValue, Method}; + use tower_http::cors::{Any, CorsLayer}; + + let mut layer = CorsLayer::new(); + + if config.cors_origins.iter().any(|origin| origin == "*") { + layer = layer.allow_origin(Any); + } else { + let origins = config + .cors_origins + .iter() + .filter_map(|origin| HeaderValue::from_str(origin).ok()) + .collect::>(); + if !origins.is_empty() { + layer = layer.allow_origin(origins); + } + } + + let methods = config + .cors_methods + .iter() + .filter_map(|method| method.parse::().ok()) + .collect::>(); + if !methods.is_empty() { + layer = layer.allow_methods(methods); + } + + if config.cors_allow_headers.iter().any(|header| header == "*") { + layer = layer.allow_headers(Any); + } else { + let headers = config + .cors_allow_headers + .iter() + .filter_map(|header| header.parse::().ok()) + .collect::>(); + if !headers.is_empty() { + layer = layer.allow_headers(headers); + } + } + + if config + .cors_expose_headers + .iter() + .any(|header| header == "*") + { + layer = layer.expose_headers(Any); + } else { + let headers = config + .cors_expose_headers + .iter() + .filter_map(|header| header.parse::().ok()) + .collect::>(); + if !headers.is_empty() { + layer = layer.expose_headers(headers); + } + } + + layer +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/main.rs b/rust/myfsio-engine/crates/myfsio-server/src/main.rs index 24ca56e..a77175f 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/main.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/main.rs @@ -28,10 +28,19 @@ enum Command { #[tokio::main] async fn main() { load_env_files(); - tracing_subscriber::fmt::init(); + init_tracing(); let cli = Cli::parse(); let config = ServerConfig::from_env(); + if !config + .ratelimit_storage_uri + .eq_ignore_ascii_case("memory://") + { + tracing::warn!( + "RATE_LIMIT_STORAGE_URI={} is not supported yet; using in-memory rate limits", + config.ratelimit_storage_uri + ); + } if cli.reset_cred { reset_admin_credentials(&config); @@ -114,7 +123,10 @@ async fn main() { std::sync::Arc::new(myfsio_server::services::lifecycle::LifecycleService::new( state.storage.clone(), config.storage_root.clone(), - myfsio_server::services::lifecycle::LifecycleConfig::default(), + myfsio_server::services::lifecycle::LifecycleConfig { + interval_seconds: 3600, + max_history_per_bucket: config.lifecycle_max_history_per_bucket, + }, )); bg_handles.push(lifecycle.start_background()); tracing::info!("Lifecycle manager background service started"); @@ -178,11 +190,14 @@ async fn main() { let shutdown = shutdown_signal_shared(); let api_shutdown = shutdown.clone(); let api_task = tokio::spawn(async move { - axum::serve(api_listener, api_app) - .with_graceful_shutdown(async move { - api_shutdown.notified().await; - }) - .await + axum::serve( + api_listener, + api_app.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(async move { + api_shutdown.notified().await; + }) + .await }); let ui_task = if let (Some(listener), Some(app)) = (ui_listener, ui_app) { @@ -228,15 +243,43 @@ fn print_config_summary(config: &ServerConfig) { println!("IAM config: {}", config.iam_config_path.display()); println!("Region: {}", config.region); println!("Encryption enabled: {}", config.encryption_enabled); + println!( + "Encryption chunk size: {} bytes", + config.encryption_chunk_size_bytes + ); println!("KMS enabled: {}", config.kms_enabled); + println!( + "KMS data key bounds: {}-{} bytes", + config.kms_generate_data_key_min_bytes, config.kms_generate_data_key_max_bytes + ); println!("GC enabled: {}", config.gc_enabled); + println!( + "GC interval: {} hours, dry run: {}", + config.gc_interval_hours, config.gc_dry_run + ); println!("Integrity enabled: {}", config.integrity_enabled); println!("Lifecycle enabled: {}", config.lifecycle_enabled); + println!( + "Lifecycle history limit: {}", + config.lifecycle_max_history_per_bucket + ); println!( "Website hosting enabled: {}", config.website_hosting_enabled ); println!("Site sync enabled: {}", config.site_sync_enabled); + println!("API base URL: {}", config.api_base_url); + println!( + "Object key max: {} bytes, tag limit: {}", + config.object_key_max_length_bytes, config.object_tag_limit + ); + println!( + "Rate limits: default {} per {}s, admin {} per {}s", + config.ratelimit_default.max_requests, + config.ratelimit_default.window_seconds, + config.ratelimit_admin.max_requests, + config.ratelimit_admin.window_seconds + ); println!( "Metrics history enabled: {}", config.metrics_history_enabled @@ -256,6 +299,32 @@ fn validate_config(config: &ServerConfig) -> Vec { if config.presigned_url_min_expiry > config.presigned_url_max_expiry { issues.push("CRITICAL: PRESIGNED_URL_MIN_EXPIRY_SECONDS cannot exceed PRESIGNED_URL_MAX_EXPIRY_SECONDS.".to_string()); } + if config.encryption_chunk_size_bytes == 0 { + issues.push("CRITICAL: ENCRYPTION_CHUNK_SIZE_BYTES must be greater than zero.".to_string()); + } + if config.kms_generate_data_key_min_bytes == 0 { + issues.push( + "CRITICAL: KMS_GENERATE_DATA_KEY_MIN_BYTES must be greater than zero.".to_string(), + ); + } + if config.kms_generate_data_key_min_bytes > config.kms_generate_data_key_max_bytes { + issues.push("CRITICAL: KMS_GENERATE_DATA_KEY_MIN_BYTES cannot exceed KMS_GENERATE_DATA_KEY_MAX_BYTES.".to_string()); + } + if config.gc_interval_hours <= 0.0 { + issues.push("CRITICAL: GC_INTERVAL_HOURS must be greater than zero.".to_string()); + } + if config.bucket_config_cache_ttl_seconds < 0.0 { + issues.push("CRITICAL: BUCKET_CONFIG_CACHE_TTL_SECONDS cannot be negative.".to_string()); + } + if !config + .ratelimit_storage_uri + .eq_ignore_ascii_case("memory://") + { + issues.push(format!( + "WARNING: RATE_LIMIT_STORAGE_URI={} is not supported yet; using in-memory limits.", + config.ratelimit_storage_uri + )); + } if let Err(err) = std::fs::create_dir_all(&config.storage_root) { issues.push(format!( "CRITICAL: Cannot create storage root {}: {}", @@ -286,6 +355,17 @@ fn validate_config(config: &ServerConfig) -> Vec { issues } +fn init_tracing() { + use tracing_subscriber::EnvFilter; + + let filter = EnvFilter::try_from_env("RUST_LOG") + .or_else(|_| { + EnvFilter::try_new(std::env::var("LOG_LEVEL").unwrap_or_else(|_| "INFO".to_string())) + }) + .unwrap_or_else(|_| EnvFilter::new("INFO")); + tracing_subscriber::fmt().with_env_filter(filter).init(); +} + fn shutdown_signal_shared() -> std::sync::Arc { std::sync::Arc::new(tokio::sync::Notify::new()) } @@ -419,8 +499,49 @@ fn reset_admin_credentials(config: &ServerConfig) { std::process::exit(1); } println!("Backed up existing IAM config to {}", backup.display()); + prune_iam_backups(&config.iam_config_path, 5); } ensure_iam_bootstrap(config); println!("Admin credentials reset."); } + +fn prune_iam_backups(iam_path: &std::path::Path, keep: usize) { + let parent = match iam_path.parent() { + Some(p) => p, + None => return, + }; + let stem = match iam_path.file_stem().and_then(|s| s.to_str()) { + Some(s) => s, + None => return, + }; + let prefix = format!("{}.bak-", stem); + + let entries = match std::fs::read_dir(parent) { + Ok(entries) => entries, + Err(_) => return, + }; + let mut backups: Vec<(i64, std::path::PathBuf)> = entries + .filter_map(|e| e.ok()) + .filter_map(|e| { + let path = e.path(); + let name = path.file_name()?.to_str()?; + let rest = name.strip_prefix(&prefix)?; + let ts: i64 = rest.parse().ok()?; + Some((ts, path)) + }) + .collect(); + backups.sort_by(|a, b| b.0.cmp(&a.0)); + + for (_, path) in backups.into_iter().skip(keep) { + if let Err(err) = std::fs::remove_file(&path) { + eprintln!( + "Failed to remove old IAM backup {}: {}", + path.display(), + err + ); + } else { + println!("Pruned old IAM backup {}", path.display()); + } + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/middleware/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/middleware/mod.rs index b8ab0ef..2f3727d 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/middleware/mod.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/middleware/mod.rs @@ -1,7 +1,9 @@ mod auth; +pub mod ratelimit; pub mod session; pub use auth::auth_layer; +pub use ratelimit::{rate_limit_layer, RateLimitLayerState}; pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState}; use axum::extract::{Request, State}; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/middleware/ratelimit.rs b/rust/myfsio-engine/crates/myfsio-server/src/middleware/ratelimit.rs new file mode 100644 index 0000000..bcdb2c8 --- /dev/null +++ b/rust/myfsio-engine/crates/myfsio-server/src/middleware/ratelimit.rs @@ -0,0 +1,241 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use axum::extract::{ConnectInfo, Request, State}; +use axum::http::{header, StatusCode}; +use axum::middleware::Next; +use axum::response::{IntoResponse, Response}; +use parking_lot::Mutex; + +use crate::config::RateLimitSetting; + +#[derive(Clone)] +pub struct RateLimitLayerState { + limiter: Arc, + num_trusted_proxies: usize, +} + +impl RateLimitLayerState { + pub fn new(setting: RateLimitSetting, num_trusted_proxies: usize) -> Self { + Self { + limiter: Arc::new(FixedWindowLimiter::new(setting)), + num_trusted_proxies, + } + } +} + +#[derive(Debug)] +struct FixedWindowLimiter { + setting: RateLimitSetting, + state: Mutex, +} + +#[derive(Debug)] +struct LimiterState { + entries: HashMap, + last_sweep: Instant, +} + +#[derive(Debug, Clone, Copy)] +struct LimitEntry { + window_started: Instant, + count: u32, +} + +const SWEEP_MIN_INTERVAL: Duration = Duration::from_secs(60); +const SWEEP_ENTRY_THRESHOLD: usize = 1024; + +impl FixedWindowLimiter { + fn new(setting: RateLimitSetting) -> Self { + Self { + setting, + state: Mutex::new(LimiterState { + entries: HashMap::new(), + last_sweep: Instant::now(), + }), + } + } + + fn check(&self, key: &str) -> Result<(), u64> { + let now = Instant::now(); + let window = Duration::from_secs(self.setting.window_seconds.max(1)); + let mut state = self.state.lock(); + + if state.entries.len() >= SWEEP_ENTRY_THRESHOLD + && now.duration_since(state.last_sweep) >= SWEEP_MIN_INTERVAL + { + state + .entries + .retain(|_, entry| now.duration_since(entry.window_started) < window); + state.last_sweep = now; + } + + let entry = state.entries.entry(key.to_string()).or_insert(LimitEntry { + window_started: now, + count: 0, + }); + + if now.duration_since(entry.window_started) >= window { + entry.window_started = now; + entry.count = 0; + } + + if entry.count >= self.setting.max_requests { + let elapsed = now.duration_since(entry.window_started); + let retry_after = window.saturating_sub(elapsed).as_secs().max(1); + return Err(retry_after); + } + + entry.count += 1; + Ok(()) + } +} + +pub async fn rate_limit_layer( + State(state): State, + req: Request, + next: Next, +) -> Response { + let key = rate_limit_key(&req, state.num_trusted_proxies); + match state.limiter.check(&key) { + Ok(()) => next.run(req).await, + Err(retry_after) => too_many_requests(retry_after), + } +} + +fn too_many_requests(retry_after: u64) -> Response { + ( + StatusCode::TOO_MANY_REQUESTS, + [ + (header::CONTENT_TYPE, "application/xml".to_string()), + (header::RETRY_AFTER, retry_after.to_string()), + ], + myfsio_xml::response::rate_limit_exceeded_xml(), + ) + .into_response() +} + +fn rate_limit_key(req: &Request, num_trusted_proxies: usize) -> String { + format!("ip:{}", client_ip(req, num_trusted_proxies)) +} + +fn client_ip(req: &Request, num_trusted_proxies: usize) -> String { + if num_trusted_proxies > 0 { + if let Some(value) = req + .headers() + .get("x-forwarded-for") + .and_then(|v| v.to_str().ok()) + { + let parts = value + .split(',') + .map(|part| part.trim()) + .filter(|part| !part.is_empty()) + .collect::>(); + if parts.len() > num_trusted_proxies { + let index = parts.len() - num_trusted_proxies - 1; + return parts[index].to_string(); + } + } + + if let Some(value) = req.headers().get("x-real-ip").and_then(|v| v.to_str().ok()) { + if !value.trim().is_empty() { + return value.trim().to_string(); + } + } + } + + req.extensions() + .get::>() + .map(|ConnectInfo(addr)| addr.ip().to_string()) + .unwrap_or_else(|| "unknown".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + + #[test] + fn honors_trusted_proxy_count_for_forwarded_for() { + let req = Request::builder() + .header("x-forwarded-for", "198.51.100.1, 10.0.0.1, 10.0.0.2") + .body(Body::empty()) + .unwrap(); + assert_eq!(rate_limit_key(&req, 2), "ip:198.51.100.1"); + assert_eq!(rate_limit_key(&req, 1), "ip:10.0.0.1"); + } + + #[test] + fn falls_back_to_connect_info_when_forwarded_for_has_too_few_hops() { + let mut req = Request::builder() + .header("x-forwarded-for", "198.51.100.1") + .body(Body::empty()) + .unwrap(); + req.extensions_mut() + .insert(ConnectInfo(SocketAddr::from(([203, 0, 113, 9], 443)))); + + assert_eq!(rate_limit_key(&req, 2), "ip:203.0.113.9"); + } + + #[test] + fn ignores_forwarded_headers_when_no_proxies_are_trusted() { + let mut req = Request::builder() + .header("x-forwarded-for", "198.51.100.1") + .header("x-real-ip", "198.51.100.2") + .body(Body::empty()) + .unwrap(); + req.extensions_mut() + .insert(ConnectInfo(SocketAddr::from(([203, 0, 113, 9], 443)))); + + assert_eq!(rate_limit_key(&req, 0), "ip:203.0.113.9"); + } + + #[test] + fn uses_connect_info_for_direct_clients() { + let mut req = Request::builder().body(Body::empty()).unwrap(); + req.extensions_mut() + .insert(ConnectInfo(SocketAddr::from(([203, 0, 113, 10], 443)))); + + assert_eq!(rate_limit_key(&req, 0), "ip:203.0.113.10"); + } + + #[test] + fn fixed_window_rejects_after_quota() { + let limiter = FixedWindowLimiter::new(RateLimitSetting::new(2, 60)); + assert!(limiter.check("k").is_ok()); + assert!(limiter.check("k").is_ok()); + assert!(limiter.check("k").is_err()); + } + + #[test] + fn sweep_removes_expired_entries() { + let limiter = FixedWindowLimiter::new(RateLimitSetting::new(10, 1)); + let far_past = Instant::now() - (SWEEP_MIN_INTERVAL + Duration::from_secs(5)); + { + let mut state = limiter.state.lock(); + for i in 0..(SWEEP_ENTRY_THRESHOLD + 1024) { + state.entries.insert( + format!("stale-{}", i), + LimitEntry { + window_started: far_past, + count: 5, + }, + ); + } + state.last_sweep = far_past; + } + let seeded = limiter.state.lock().entries.len(); + assert_eq!(seeded, SWEEP_ENTRY_THRESHOLD + 1024); + + assert!(limiter.check("fresh").is_ok()); + + let remaining = limiter.state.lock().entries.len(); + assert_eq!( + remaining, 1, + "expected sweep to leave only the fresh entry, got {}", + remaining + ); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/gc.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/gc.rs index 37bb402..fb345e9 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/services/gc.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/gc.rs @@ -24,6 +24,35 @@ impl Default for GcConfig { } } +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn dry_run_reports_but_does_not_delete_temp_files() { + let tmp = tempfile::tempdir().unwrap(); + let tmp_dir = tmp.path().join(".myfsio.sys").join("tmp"); + std::fs::create_dir_all(&tmp_dir).unwrap(); + let file_path = tmp_dir.join("stale.tmp"); + std::fs::write(&file_path, b"temporary").unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + + let service = GcService::new( + tmp.path().to_path_buf(), + GcConfig { + temp_file_max_age_hours: 0.0, + dry_run: true, + ..GcConfig::default() + }, + ); + + let result = service.run_now(false).await.unwrap(); + + assert_eq!(result["temp_files_deleted"], 1); + assert!(file_path.exists()); + } +} + pub struct GcService { storage_root: PathBuf, config: GcConfig, diff --git a/rust/myfsio-engine/crates/myfsio-server/src/state.rs b/rust/myfsio-engine/crates/myfsio-server/src/state.rs index 34c25ab..7e577b3 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/state.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/state.rs @@ -15,9 +15,9 @@ use crate::session::SessionStore; use crate::stores::connections::ConnectionStore; use crate::templates::TemplateEngine; use myfsio_auth::iam::IamService; -use myfsio_crypto::encryption::EncryptionService; +use myfsio_crypto::encryption::{EncryptionConfig, EncryptionService}; use myfsio_crypto::kms::KmsService; -use myfsio_storage::fs_backend::FsStorageBackend; +use myfsio_storage::fs_backend::{FsStorageBackend, FsStorageBackendConfig}; #[derive(Clone)] pub struct AppState { @@ -42,7 +42,16 @@ pub struct AppState { impl AppState { pub fn new(config: ServerConfig) -> Self { - let storage = Arc::new(FsStorageBackend::new(config.storage_root.clone())); + let storage = Arc::new(FsStorageBackend::new_with_config( + config.storage_root.clone(), + FsStorageBackendConfig { + object_key_max_length_bytes: config.object_key_max_length_bytes, + object_cache_max_size: config.object_cache_max_size, + bucket_config_cache_ttl: Duration::from_secs_f64( + config.bucket_config_cache_ttl_seconds, + ), + }, + )); let iam = Arc::new(IamService::new_with_secret( config.iam_config_path.clone(), config.secret_key.clone(), @@ -51,7 +60,13 @@ impl AppState { let gc = if config.gc_enabled { Some(Arc::new(GcService::new( config.storage_root.clone(), - crate::services::gc::GcConfig::default(), + crate::services::gc::GcConfig { + interval_hours: config.gc_interval_hours, + temp_file_max_age_hours: config.gc_temp_file_max_age_hours, + multipart_max_age_days: config.gc_multipart_max_age_days, + lock_file_max_age_hours: config.gc_lock_file_max_age_hours, + dry_run: config.gc_dry_run, + }, ))) } else { None @@ -92,7 +107,22 @@ impl AppState { None }; - let site_registry = Some(Arc::new(SiteRegistry::new(&config.storage_root))); + let site_registry = { + let registry = SiteRegistry::new(&config.storage_root); + if let (Some(site_id), Some(endpoint)) = + (config.site_id.as_deref(), config.site_endpoint.as_deref()) + { + registry.set_local_site(crate::services::site_registry::SiteInfo { + site_id: site_id.to_string(), + endpoint: endpoint.to_string(), + region: config.site_region.clone(), + priority: config.site_priority, + display_name: site_id.to_string(), + created_at: Some(chrono::Utc::now().to_rfc3339()), + }); + } + Some(Arc::new(registry)) + }; let website_domains = if config.website_hosting_enabled { Some(Arc::new(WebsiteDomainStore::new(&config.storage_root))) @@ -132,6 +162,7 @@ impl AppState { let templates = init_templates(&config.templates_dir); let access_logging = Arc::new(AccessLoggingService::new(&config.storage_root)); + let session_ttl = Duration::from_secs(config.session_lifetime_days.saturating_mul(86_400)); Self { config, storage, @@ -148,7 +179,7 @@ impl AppState { replication, site_sync, templates, - sessions: Arc::new(SessionStore::new(Duration::from_secs(60 * 60 * 12))), + sessions: Arc::new(SessionStore::new(session_ttl)), access_logging, } } @@ -172,7 +203,13 @@ impl AppState { let encryption = if config.encryption_enabled { match myfsio_crypto::kms::load_or_create_master_key(&keys_dir).await { - Ok(master_key) => Some(Arc::new(EncryptionService::new(master_key, kms.clone()))), + Ok(master_key) => Some(Arc::new(EncryptionService::with_config( + master_key, + kms.clone(), + EncryptionConfig { + chunk_size: config.encryption_chunk_size_bytes, + }, + ))), Err(e) => { tracing::error!("Failed to initialize encryption: {}", e); None diff --git a/rust/myfsio-engine/crates/myfsio-server/templates/docs.html b/rust/myfsio-engine/crates/myfsio-server/templates/docs.html index 76da65d..ab6cc86 100644 --- a/rust/myfsio-engine/crates/myfsio-server/templates/docs.html +++ b/rust/myfsio-engine/crates/myfsio-server/templates/docs.html @@ -112,7 +112,7 @@ cargo build --release -p myfsio-server API_BASE_URL - http://127.0.0.1:5000 + Derived from HOST/PORT Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. @@ -184,33 +184,18 @@ cargo build --release -p myfsio-server RATE_LIMIT_DEFAULT 200 per minute - Default API rate limit. - - - RATE_LIMIT_LIST_BUCKETS - 60 per minute - Rate limit for listing buckets. - - - RATE_LIMIT_BUCKET_OPS - 120 per minute - Rate limit for bucket operations. - - - RATE_LIMIT_OBJECT_OPS - 240 per minute - Rate limit for object operations. - - - RATE_LIMIT_HEAD_OPS - 100 per minute - Rate limit for HEAD requests. + Default rate limit for S3 and KMS API endpoints. RATE_LIMIT_ADMIN 60 per minute Rate limit for admin API endpoints (/admin/*). + + RATE_LIMIT_STORAGE_URI + memory:// + Rate limit storage backend. Only in-memory storage is currently supported. + ADMIN_ACCESS_KEY (none) @@ -377,8 +362,8 @@ cargo build --release -p myfsio-server NUM_TRUSTED_PROXIES - 1 - Number of trusted reverse proxies for X-Forwarded-* headers. + 0 + Number of trusted reverse proxies for X-Forwarded-* headers. Forwarded IP headers are ignored when this is 0. ALLOWED_REDIRECT_HOSTS @@ -2078,7 +2063,7 @@ curl "{{ api_base | replace(from="/api", to="/ui") }}/metrics/operations/history Large folder uploads hitting rate limits (429) RATE_LIMIT_DEFAULT exceeded (200/min) - Increase rate limit in env config, use Redis backend (RATE_LIMIT_STORAGE_URI=redis://host:port) for distributed setups, or upload in smaller batches. + Increase RATE_LIMIT_DEFAULT in env config or upload in smaller batches. Distributed rate-limit storage is not supported yet. diff --git a/rust/myfsio-engine/crates/myfsio-server/templates/iam.html b/rust/myfsio-engine/crates/myfsio-server/templates/iam.html index 15b47f2..f1e8abd 100644 --- a/rust/myfsio-engine/crates/myfsio-server/templates/iam.html +++ b/rust/myfsio-engine/crates/myfsio-server/templates/iam.html @@ -133,7 +133,7 @@ {% endif %}
{% for user in users %} -
+
diff --git a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs index 8a50ece..e52d722 100644 --- a/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs +++ b/rust/myfsio-engine/crates/myfsio-server/tests/integration.rs @@ -53,6 +53,7 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te ui_enabled: false, templates_dir: std::path::PathBuf::from("templates"), static_dir: std::path::PathBuf::from("static"), + ..myfsio_server::config::ServerConfig::default() }; let state = myfsio_server::state::AppState::new(config); let app = myfsio_server::create_router(state); @@ -80,6 +81,120 @@ fn test_app() -> (axum::Router, tempfile::TempDir) { })) } +fn test_app_with_rate_limits( + default: myfsio_server::config::RateLimitSetting, + admin: myfsio_server::config::RateLimitSetting, +) -> (axum::Router, tempfile::TempDir) { + let tmp = tempfile::TempDir::new().unwrap(); + let iam_path = tmp.path().join(".myfsio.sys").join("config"); + std::fs::create_dir_all(&iam_path).unwrap(); + std::fs::write( + iam_path.join("iam.json"), + serde_json::json!({ + "version": 2, + "users": [{ + "user_id": "u-test1234", + "display_name": "admin", + "enabled": true, + "access_keys": [{ + "access_key": TEST_ACCESS_KEY, + "secret_key": TEST_SECRET_KEY, + "status": "active" + }], + "policies": [{ + "bucket": "*", + "actions": ["*"], + "prefix": "*" + }] + }] + }) + .to_string(), + ) + .unwrap(); + + let config = myfsio_server::config::ServerConfig { + bind_addr: "127.0.0.1:0".parse().unwrap(), + ui_bind_addr: "127.0.0.1:0".parse().unwrap(), + storage_root: tmp.path().to_path_buf(), + iam_config_path: iam_path.join("iam.json"), + ratelimit_default: default, + ratelimit_admin: admin, + ui_enabled: false, + ..myfsio_server::config::ServerConfig::default() + }; + let state = myfsio_server::state::AppState::new(config); + let app = myfsio_server::create_router(state); + (app, tmp) +} + +#[tokio::test] +async fn rate_limit_default_and_admin_are_independent() { + let (app, _tmp) = test_app_with_rate_limits( + myfsio_server::config::RateLimitSetting::new(1, 60), + myfsio_server::config::RateLimitSetting::new(2, 60), + ); + + let first = app + .clone() + .oneshot( + Request::builder() + .uri("/myfsio/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(first.status(), StatusCode::OK); + + let second = app + .clone() + .oneshot( + Request::builder() + .uri("/myfsio/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(second.status(), StatusCode::TOO_MANY_REQUESTS); + assert!(second.headers().contains_key("retry-after")); + + let admin_first = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/gc/status") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(admin_first.status(), StatusCode::FORBIDDEN); + + let admin_second = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/gc/status") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(admin_second.status(), StatusCode::FORBIDDEN); + + let admin_third = app + .oneshot( + Request::builder() + .uri("/admin/gc/status") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(admin_third.status(), StatusCode::TOO_MANY_REQUESTS); +} + fn test_ui_state() -> (myfsio_server::state::AppState, tempfile::TempDir) { let tmp = tempfile::TempDir::new().unwrap(); let iam_path = tmp.path().join(".myfsio.sys").join("config"); @@ -147,6 +262,7 @@ fn test_ui_state() -> (myfsio_server::state::AppState, tempfile::TempDir) { ui_enabled: true, templates_dir: manifest_dir.join("templates"), static_dir: manifest_dir.join("static"), + ..myfsio_server::config::ServerConfig::default() }; (myfsio_server::state::AppState::new(config), tmp) } @@ -303,6 +419,7 @@ fn test_website_state() -> (myfsio_server::state::AppState, tempfile::TempDir) { ui_enabled: false, templates_dir: std::path::PathBuf::from("templates"), static_dir: std::path::PathBuf::from("static"), + ..myfsio_server::config::ServerConfig::default() }; (myfsio_server::state::AppState::new(config), tmp) } @@ -1082,7 +1199,7 @@ async fn test_ui_metrics_history_endpoint_reads_system_history() { config_root.join("metrics_history.json"), serde_json::json!({ "history": [{ - "timestamp": "2026-04-20T00:00:00Z", + "timestamp": chrono::Utc::now().to_rfc3339(), "cpu_percent": 12.5, "memory_percent": 33.3, "disk_percent": 44.4, @@ -1131,6 +1248,7 @@ async fn test_ui_metrics_history_endpoint_reads_system_history() { ui_enabled: true, templates_dir: manifest_dir.join("templates"), static_dir: manifest_dir.join("static"), + ..myfsio_server::config::ServerConfig::default() }; let state = myfsio_server::state::AppState::new(config); let (session_id, _csrf) = authenticated_ui_session(&state); @@ -3851,6 +3969,7 @@ async fn test_non_admin_authorization_enforced() { ui_enabled: false, templates_dir: std::path::PathBuf::from("templates"), static_dir: std::path::PathBuf::from("static"), + ..myfsio_server::config::ServerConfig::default() }; let state = myfsio_server::state::AppState::new(config); state.storage.create_bucket("authz-bucket").await.unwrap(); @@ -3932,6 +4051,7 @@ async fn test_app_encrypted() -> (axum::Router, tempfile::TempDir) { ui_enabled: false, templates_dir: std::path::PathBuf::from("templates"), static_dir: std::path::PathBuf::from("static"), + ..myfsio_server::config::ServerConfig::default() }; let state = myfsio_server::state::AppState::new_with_encryption(config).await; let app = myfsio_server::create_router(state); diff --git a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs index 9f7a72f..40a052b 100644 --- a/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs +++ b/rust/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs @@ -19,6 +19,7 @@ use uuid::Uuid; pub struct FsStorageBackend { root: PathBuf, object_key_max_length_bytes: usize, + object_cache_max_size: usize, bucket_config_cache: DashMap, bucket_config_cache_ttl: std::time::Duration, meta_read_cache: DashMap<(String, String), Option>>, @@ -27,13 +28,35 @@ pub struct FsStorageBackend { stats_cache_ttl: std::time::Duration, } +#[derive(Debug, Clone)] +pub struct FsStorageBackendConfig { + pub object_key_max_length_bytes: usize, + pub object_cache_max_size: usize, + pub bucket_config_cache_ttl: std::time::Duration, +} + +impl Default for FsStorageBackendConfig { + fn default() -> Self { + Self { + object_key_max_length_bytes: DEFAULT_OBJECT_KEY_MAX_BYTES, + object_cache_max_size: 100, + bucket_config_cache_ttl: std::time::Duration::from_secs(30), + } + } +} + impl FsStorageBackend { pub fn new(root: PathBuf) -> Self { + Self::new_with_config(root, FsStorageBackendConfig::default()) + } + + pub fn new_with_config(root: PathBuf, config: FsStorageBackendConfig) -> Self { let backend = Self { root, - object_key_max_length_bytes: DEFAULT_OBJECT_KEY_MAX_BYTES, + object_key_max_length_bytes: config.object_key_max_length_bytes, + object_cache_max_size: config.object_cache_max_size, bucket_config_cache: DashMap::new(), - bucket_config_cache_ttl: std::time::Duration::from_secs(30), + bucket_config_cache_ttl: config.bucket_config_cache_ttl, meta_read_cache: DashMap::new(), meta_index_locks: DashMap::new(), stats_cache: DashMap::new(), @@ -142,6 +165,27 @@ impl FsStorageBackend { .clone() } + fn prune_meta_read_cache(&self) { + if self.object_cache_max_size == 0 { + self.meta_read_cache.clear(); + return; + } + let len = self.meta_read_cache.len(); + if len <= self.object_cache_max_size { + return; + } + let excess = len - self.object_cache_max_size; + let keys = self + .meta_read_cache + .iter() + .take(excess) + .map(|entry| entry.key().clone()) + .collect::>(); + for key in keys { + self.meta_read_cache.remove(&key); + } + } + fn bucket_config_path(&self, bucket_name: &str) -> PathBuf { self.system_bucket_root(bucket_name) .join(BUCKET_CONFIG_FILE) @@ -229,6 +273,7 @@ impl FsStorageBackend { }; self.meta_read_cache.insert(cache_key, result.clone()); + self.prune_meta_read_cache(); result } diff --git a/rust/myfsio-engine/crates/myfsio-xml/src/response.rs b/rust/myfsio-engine/crates/myfsio-xml/src/response.rs index 3474c3e..708db4b 100644 --- a/rust/myfsio-engine/crates/myfsio-xml/src/response.rs +++ b/rust/myfsio-engine/crates/myfsio-xml/src/response.rs @@ -8,6 +8,12 @@ pub fn format_s3_datetime(dt: &DateTime) -> String { dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() } +pub fn rate_limit_exceeded_xml() -> String { + "\ +SlowDownRate limit exceeded" + .to_string() +} + pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String { let mut writer = Writer::new(Cursor::new(Vec::new()));