diff --git a/myfsio-engine/Cargo.lock b/myfsio-engine/Cargo.lock index f6291fa..9a52661 100644 --- a/myfsio-engine/Cargo.lock +++ b/myfsio-engine/Cargo.lock @@ -1513,6 +1513,7 @@ dependencies = [ "tempfile", "thiserror", "tracing", + "uuid", ] [[package]] @@ -1566,8 +1567,10 @@ dependencies = [ "myfsio-crypto", "myfsio-storage", "myfsio-xml", + "parking_lot", "percent-encoding", "quick-xml", + "regex", "roxmltree", "serde", "serde_json", diff --git a/myfsio-engine/crates/myfsio-auth/Cargo.toml b/myfsio-engine/crates/myfsio-auth/Cargo.toml index 0676e6e..384427a 100644 --- a/myfsio-engine/crates/myfsio-auth/Cargo.toml +++ b/myfsio-engine/crates/myfsio-auth/Cargo.toml @@ -20,6 +20,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } +uuid = { workspace = true } [dev-dependencies] tempfile = "3" diff --git a/myfsio-engine/crates/myfsio-auth/src/iam.rs b/myfsio-engine/crates/myfsio-auth/src/iam.rs index 18a7e51..34f41bd 100644 --- a/myfsio-engine/crates/myfsio-auth/src/iam.rs +++ b/myfsio-engine/crates/myfsio-auth/src/iam.rs @@ -33,6 +33,58 @@ pub struct IamUser { pub policies: Vec, } +#[derive(Debug, Clone, Deserialize)] +struct RawIamConfig { + #[serde(default)] + pub users: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct RawIamUser { + pub user_id: Option, + pub display_name: Option, + #[serde(default = "default_enabled")] + pub enabled: bool, + #[serde(default)] + pub expires_at: Option, + pub access_key: Option, + pub secret_key: Option, + #[serde(default)] + pub access_keys: Vec, + #[serde(default)] + pub policies: Vec, +} + +impl RawIamUser { + fn normalize(self) -> IamUser { + let mut access_keys = self.access_keys; + if access_keys.is_empty() { + if let (Some(ak), Some(sk)) = (self.access_key, self.secret_key) { + access_keys.push(AccessKey { + access_key: ak, + secret_key: sk, + status: "active".to_string(), + created_at: None, + }); + } + } + let display_name = self.display_name.unwrap_or_else(|| { + access_keys.first().map(|k| k.access_key.clone()).unwrap_or_else(|| "unknown".to_string()) + }); + let user_id = self.user_id.unwrap_or_else(|| { + format!("u-{}", display_name.to_ascii_lowercase().replace(' ', "-")) + }); + IamUser { + user_id, + display_name, + enabled: self.enabled, + expires_at: self.expires_at, + access_keys, + policies: self.policies, + } + } +} + fn default_enabled() -> bool { true } @@ -166,7 +218,7 @@ impl IamService { content }; - let config: IamConfig = match serde_json::from_str(&raw) { + let raw_config: RawIamConfig = match serde_json::from_str(&raw) { Ok(c) => c, Err(e) => { tracing::error!("Failed to parse IAM config: {}", e); @@ -174,12 +226,14 @@ impl IamService { } }; + let users: Vec = raw_config.users.into_iter().map(|u| u.normalize()).collect(); + let mut key_secrets = HashMap::new(); let mut key_index = HashMap::new(); let mut key_status = HashMap::new(); let mut user_records = HashMap::new(); - for user in &config.users { + for user in &users { user_records.insert(user.user_id.clone(), user.clone()); for ak in &user.access_keys { key_secrets.insert(ak.access_key.clone(), ak.secret_key.clone()); @@ -201,7 +255,7 @@ impl IamService { state.last_check = Instant::now(); tracing::info!("IAM config reloaded: {} users, {} keys", - config.users.len(), + users.len(), state.key_secrets.len()); } @@ -384,8 +438,12 @@ impl IamService { let content = std::fs::read_to_string(&self.config_path) .map_err(|e| format!("Failed to read IAM config: {}", e))?; - let mut config: IamConfig = serde_json::from_str(&content) + let raw: RawIamConfig = serde_json::from_str(&content) .map_err(|e| format!("Failed to parse IAM config: {}", e))?; + let mut config = IamConfig { + version: 2, + users: raw.users.into_iter().map(|u| u.normalize()).collect(), + }; let user = config .users @@ -406,6 +464,99 @@ impl IamService { self.reload(); Ok(()) } + + pub fn get_user_policies(&self, identifier: &str) -> Option> { + self.reload_if_needed(); + let state = self.state.read(); + let user = state + .user_records + .get(identifier) + .or_else(|| { + state.key_index.get(identifier).and_then(|uid| state.user_records.get(uid)) + })?; + Some( + user.policies + .iter() + .map(|p| serde_json::to_value(p).unwrap_or_default()) + .collect(), + ) + } + + pub fn create_access_key(&self, identifier: &str) -> Result { + let content = std::fs::read_to_string(&self.config_path) + .map_err(|e| format!("Failed to read IAM config: {}", e))?; + let raw: RawIamConfig = serde_json::from_str(&content) + .map_err(|e| format!("Failed to parse IAM config: {}", e))?; + let mut config = IamConfig { + version: 2, + users: raw.users.into_iter().map(|u| u.normalize()).collect(), + }; + + let user = config + .users + .iter_mut() + .find(|u| { + u.user_id == identifier + || u.access_keys.iter().any(|k| k.access_key == identifier) + }) + .ok_or_else(|| format!("User '{}' not found", identifier))?; + + let new_ak = format!("AK{}", uuid::Uuid::new_v4().simple()); + let new_sk = format!("SK{}", uuid::Uuid::new_v4().simple()); + + let key = AccessKey { + access_key: new_ak.clone(), + secret_key: new_sk.clone(), + status: "active".to_string(), + created_at: Some(chrono::Utc::now().to_rfc3339()), + }; + user.access_keys.push(key); + + let json = serde_json::to_string_pretty(&config) + .map_err(|e| format!("Failed to serialize IAM config: {}", e))?; + std::fs::write(&self.config_path, json) + .map_err(|e| format!("Failed to write IAM config: {}", e))?; + + self.reload(); + Ok(serde_json::json!({ + "access_key": new_ak, + "secret_key": new_sk, + })) + } + + pub fn delete_access_key(&self, access_key: &str) -> Result<(), String> { + let content = std::fs::read_to_string(&self.config_path) + .map_err(|e| format!("Failed to read IAM config: {}", e))?; + let raw: RawIamConfig = serde_json::from_str(&content) + .map_err(|e| format!("Failed to parse IAM config: {}", e))?; + let mut config = IamConfig { + version: 2, + users: raw.users.into_iter().map(|u| u.normalize()).collect(), + }; + + let mut found = false; + for user in &mut config.users { + if user.access_keys.iter().any(|k| k.access_key == access_key) { + if user.access_keys.len() <= 1 { + return Err("Cannot delete the last access key".to_string()); + } + user.access_keys.retain(|k| k.access_key != access_key); + found = true; + break; + } + } + if !found { + return Err(format!("Access key '{}' not found", access_key)); + } + + let json = serde_json::to_string_pretty(&config) + .map_err(|e| format!("Failed to serialize IAM config: {}", e))?; + std::fs::write(&self.config_path, json) + .map_err(|e| format!("Failed to write IAM config: {}", e))?; + + self.reload(); + Ok(()) + } } fn bucket_matches(policy_bucket: &str, bucket: &str) -> bool { @@ -579,6 +730,31 @@ mod tests { assert!(svc.get_secret_key("INACTIVE_KEY").is_none()); } + #[test] + fn test_v1_flat_format() { + let json = serde_json::json!({ + "users": [{ + "access_key": "test", + "secret_key": "secret", + "display_name": "Test User", + "policies": [{"bucket": "*", "actions": ["*"], "prefix": "*"}] + }] + }) + .to_string(); + + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + tmp.write_all(json.as_bytes()).unwrap(); + tmp.flush().unwrap(); + + let svc = IamService::new(tmp.path().to_path_buf()); + let secret = svc.get_secret_key("test"); + assert_eq!(secret.unwrap(), "secret"); + + let principal = svc.get_principal("test").unwrap(); + assert_eq!(principal.display_name, "Test User"); + assert!(principal.is_admin); + } + #[test] fn test_authorize_allows_matching_policy() { let json = serde_json::json!({ diff --git a/myfsio-engine/crates/myfsio-server/Cargo.toml b/myfsio-engine/crates/myfsio-server/Cargo.toml index 78ab4ea..a188dae 100644 --- a/myfsio-engine/crates/myfsio-server/Cargo.toml +++ b/myfsio-engine/crates/myfsio-server/Cargo.toml @@ -31,6 +31,8 @@ mime_guess = "2" crc32fast = { workspace = true } duckdb = { workspace = true } roxmltree = "0.20" +parking_lot = { workspace = true } +regex = "1" [dev-dependencies] tempfile = "3" diff --git a/myfsio-engine/crates/myfsio-server/src/config.rs b/myfsio-engine/crates/myfsio-server/src/config.rs index fea4d56..d4bd82d 100644 --- a/myfsio-engine/crates/myfsio-server/src/config.rs +++ b/myfsio-engine/crates/myfsio-server/src/config.rs @@ -17,6 +17,7 @@ pub struct ServerConfig { pub integrity_enabled: bool, pub metrics_enabled: bool, pub lifecycle_enabled: bool, + pub website_hosting_enabled: bool, } impl ServerConfig { @@ -91,6 +92,10 @@ impl ServerConfig { .unwrap_or_else(|_| "false".to_string()) .to_lowercase() == "true"; + let website_hosting_enabled = std::env::var("WEBSITE_HOSTING_ENABLED") + .unwrap_or_else(|_| "false".to_string()) + .to_lowercase() == "true"; + Self { bind_addr: SocketAddr::new(host.parse().unwrap(), port), storage_root: storage_path, @@ -106,6 +111,7 @@ impl ServerConfig { integrity_enabled, metrics_enabled, lifecycle_enabled, + website_hosting_enabled, } } } diff --git a/myfsio-engine/crates/myfsio-server/src/handlers/admin.rs b/myfsio-engine/crates/myfsio-server/src/handlers/admin.rs new file mode 100644 index 0000000..e09f83f --- /dev/null +++ b/myfsio-engine/crates/myfsio-server/src/handlers/admin.rs @@ -0,0 +1,704 @@ +use axum::body::Body; +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Extension; +use myfsio_common::types::Principal; +use myfsio_storage::traits::StorageEngine; + +use crate::services::site_registry::{PeerSite, SiteInfo}; +use crate::services::website_domains::{is_valid_domain, normalize_domain}; +use crate::state::AppState; + +fn json_response(status: StatusCode, value: serde_json::Value) -> Response { + ( + status, + [("content-type", "application/json")], + value.to_string(), + ) + .into_response() +} + +fn json_error(code: &str, message: &str, status: StatusCode) -> Response { + json_response( + status, + serde_json::json!({"error": {"code": code, "message": message}}), + ) +} + +fn require_admin(principal: &Principal) -> Option { + if !principal.is_admin { + return Some(json_error("AccessDenied", "Admin access required", StatusCode::FORBIDDEN)); + } + None +} + +async fn read_json_body(body: Body) -> Option { + let bytes = http_body_util::BodyExt::collect(body).await.ok()?.to_bytes(); + serde_json::from_slice(&bytes).ok() +} + +fn validate_site_id(site_id: &str) -> Option { + if site_id.is_empty() || site_id.len() > 63 { + return Some("site_id must be 1-63 characters".to_string()); + } + let first = site_id.chars().next().unwrap(); + if !first.is_ascii_alphanumeric() { + return Some("site_id must start with alphanumeric".to_string()); + } + if !site_id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { + return Some("site_id must contain only alphanumeric, hyphens, underscores".to_string()); + } + None +} + +fn validate_endpoint(endpoint: &str) -> Option { + if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") { + return Some("Endpoint must be http or https URL".to_string()); + } + None +} + +fn validate_region(region: &str) -> Option { + let re = regex::Regex::new(r"^[a-z]{2,}-[a-z]+-\d+$").unwrap(); + if !re.is_match(region) { + return Some("Region must match format like us-east-1".to_string()); + } + None +} + +fn validate_priority(priority: i64) -> Option { + if priority < 0 || priority > 1000 { + return Some("Priority must be between 0 and 1000".to_string()); + } + None +} + +pub async fn get_local_site( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + + if let Some(ref registry) = state.site_registry { + if let Some(local) = registry.get_local_site() { + return json_response(StatusCode::OK, serde_json::to_value(&local).unwrap()); + } + } + + json_error("NotFound", "Local site not configured", StatusCode::NOT_FOUND) +} + +pub async fn update_local_site( + State(state): State, + Extension(principal): Extension, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("InvalidRequest", "Site registry not available", StatusCode::BAD_REQUEST), + }; + + let payload = match read_json_body(body).await { + Some(v) => v, + None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST), + }; + + let site_id = match payload.get("site_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => return json_error("ValidationError", "site_id is required", StatusCode::BAD_REQUEST), + }; + + if let Some(err) = validate_site_id(&site_id) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + + let endpoint = payload.get("endpoint").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !endpoint.is_empty() { + if let Some(err) = validate_endpoint(&endpoint) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + + if let Some(p) = payload.get("priority").and_then(|v| v.as_i64()) { + if let Some(err) = validate_priority(p) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + + if let Some(r) = payload.get("region").and_then(|v| v.as_str()) { + if let Some(err) = validate_region(r) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + + let existing = registry.get_local_site(); + let site = SiteInfo { + site_id: site_id.clone(), + endpoint, + region: payload.get("region").and_then(|v| v.as_str()).unwrap_or("us-east-1").to_string(), + priority: payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(100) as i32, + display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&site_id).to_string(), + created_at: existing.and_then(|e| e.created_at), + }; + + registry.set_local_site(site.clone()); + json_response(StatusCode::OK, serde_json::to_value(&site).unwrap()) +} + +pub async fn list_all_sites( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_response(StatusCode::OK, serde_json::json!({"local": null, "peers": [], "total_peers": 0})), + }; + + let local = registry.get_local_site(); + let peers = registry.list_peers(); + + json_response(StatusCode::OK, serde_json::json!({ + "local": local, + "peers": peers, + "total_peers": peers.len(), + })) +} + +pub async fn register_peer_site( + State(state): State, + Extension(principal): Extension, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("InvalidRequest", "Site registry not available", StatusCode::BAD_REQUEST), + }; + + let payload = match read_json_body(body).await { + Some(v) => v, + None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST), + }; + + let site_id = match payload.get("site_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => return json_error("ValidationError", "site_id is required", StatusCode::BAD_REQUEST), + }; + if let Some(err) = validate_site_id(&site_id) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + + let endpoint = match payload.get("endpoint").and_then(|v| v.as_str()) { + Some(e) => e.to_string(), + None => return json_error("ValidationError", "endpoint is required", StatusCode::BAD_REQUEST), + }; + if let Some(err) = validate_endpoint(&endpoint) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + + let region = payload.get("region").and_then(|v| v.as_str()).unwrap_or("us-east-1").to_string(); + if let Some(err) = validate_region(®ion) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + + let priority = payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(100); + if let Some(err) = validate_priority(priority) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + + if registry.get_peer(&site_id).is_some() { + return json_error("AlreadyExists", &format!("Peer site '{}' already exists", site_id), StatusCode::CONFLICT); + } + + let peer = PeerSite { + site_id: site_id.clone(), + endpoint, + region, + priority: priority as i32, + display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&site_id).to_string(), + connection_id: payload.get("connection_id").and_then(|v| v.as_str()).map(|s| s.to_string()), + created_at: Some(chrono::Utc::now().to_rfc3339()), + is_healthy: false, + last_health_check: None, + }; + + registry.add_peer(peer.clone()); + json_response(StatusCode::CREATED, serde_json::to_value(&peer).unwrap()) +} + +pub async fn get_peer_site( + State(state): State, + Extension(principal): Extension, + Path(site_id): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND), + }; + + match registry.get_peer(&site_id) { + Some(peer) => json_response(StatusCode::OK, serde_json::to_value(&peer).unwrap()), + None => json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND), + } +} + +pub async fn update_peer_site( + State(state): State, + Extension(principal): Extension, + Path(site_id): Path, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND), + }; + + let existing = match registry.get_peer(&site_id) { + Some(p) => p, + None => return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND), + }; + + let payload = match read_json_body(body).await { + Some(v) => v, + None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST), + }; + + if let Some(ep) = payload.get("endpoint").and_then(|v| v.as_str()) { + if let Some(err) = validate_endpoint(ep) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + if let Some(p) = payload.get("priority").and_then(|v| v.as_i64()) { + if let Some(err) = validate_priority(p) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + if let Some(r) = payload.get("region").and_then(|v| v.as_str()) { + if let Some(err) = validate_region(r) { + return json_error("ValidationError", &err, StatusCode::BAD_REQUEST); + } + } + + let peer = PeerSite { + site_id: site_id.clone(), + endpoint: payload.get("endpoint").and_then(|v| v.as_str()).unwrap_or(&existing.endpoint).to_string(), + region: payload.get("region").and_then(|v| v.as_str()).unwrap_or(&existing.region).to_string(), + priority: payload.get("priority").and_then(|v| v.as_i64()).unwrap_or(existing.priority as i64) as i32, + display_name: payload.get("display_name").and_then(|v| v.as_str()).unwrap_or(&existing.display_name).to_string(), + connection_id: payload.get("connection_id").and_then(|v| v.as_str()).map(|s| s.to_string()).or(existing.connection_id), + created_at: existing.created_at, + is_healthy: existing.is_healthy, + last_health_check: existing.last_health_check, + }; + + registry.update_peer(peer.clone()); + json_response(StatusCode::OK, serde_json::to_value(&peer).unwrap()) +} + +pub async fn delete_peer_site( + State(state): State, + Extension(principal): Extension, + Path(site_id): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND), + }; + + if !registry.delete_peer(&site_id) { + return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND); + } + StatusCode::NO_CONTENT.into_response() +} + +pub async fn check_peer_health( + State(state): State, + Extension(principal): Extension, + Path(site_id): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND), + }; + + if registry.get_peer(&site_id).is_none() { + return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND); + } + + json_response(StatusCode::OK, serde_json::json!({ + "site_id": site_id, + "is_healthy": false, + "error": "Health check not implemented in standalone mode", + "checked_at": chrono::Utc::now().timestamp_millis() as f64 / 1000.0, + })) +} + +pub async fn get_topology( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_response(StatusCode::OK, serde_json::json!({"sites": [], "total": 0, "healthy_count": 0})), + }; + + let local = registry.get_local_site(); + let peers = registry.list_peers(); + + let mut sites: Vec = Vec::new(); + if let Some(l) = local { + let mut v = serde_json::to_value(&l).unwrap(); + v.as_object_mut().unwrap().insert("is_local".to_string(), serde_json::json!(true)); + v.as_object_mut().unwrap().insert("is_healthy".to_string(), serde_json::json!(true)); + sites.push(v); + } + for p in &peers { + let mut v = serde_json::to_value(p).unwrap(); + v.as_object_mut().unwrap().insert("is_local".to_string(), serde_json::json!(false)); + sites.push(v); + } + + sites.sort_by_key(|s| s.get("priority").and_then(|v| v.as_i64()).unwrap_or(100)); + + let healthy_count = sites.iter().filter(|s| s.get("is_healthy").and_then(|v| v.as_bool()).unwrap_or(false)).count(); + + json_response(StatusCode::OK, serde_json::json!({ + "sites": sites, + "total": sites.len(), + "healthy_count": healthy_count, + })) +} + +pub async fn check_bidirectional_status( + State(state): State, + Extension(principal): Extension, + Path(site_id): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let registry = match &state.site_registry { + Some(r) => r, + None => return json_error("NotFound", "Site registry not available", StatusCode::NOT_FOUND), + }; + + if registry.get_peer(&site_id).is_none() { + return json_error("NotFound", &format!("Peer site '{}' not found", site_id), StatusCode::NOT_FOUND); + } + + let local = registry.get_local_site(); + json_response(StatusCode::OK, serde_json::json!({ + "site_id": site_id, + "local_site_id": local.as_ref().map(|l| &l.site_id), + "local_endpoint": local.as_ref().map(|l| &l.endpoint), + "local_bidirectional_rules": [], + "local_site_sync_enabled": false, + "remote_status": null, + "issues": [{"code": "NOT_IMPLEMENTED", "message": "Bidirectional status check not implemented in standalone mode", "severity": "warning"}], + "is_fully_configured": false, + })) +} + +pub async fn iam_list_users( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let users = state.iam.list_users().await; + json_response(StatusCode::OK, serde_json::json!({"users": users})) +} + +pub async fn iam_get_user( + State(state): State, + Extension(principal): Extension, + Path(identifier): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.get_user(&identifier).await { + Some(user) => json_response(StatusCode::OK, user), + None => json_error("NotFound", &format!("User '{}' not found", identifier), StatusCode::NOT_FOUND), + } +} + +pub async fn iam_get_user_policies( + State(state): State, + Extension(principal): Extension, + Path(identifier): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.get_user_policies(&identifier) { + Some(policies) => json_response(StatusCode::OK, serde_json::json!({"policies": policies})), + None => json_error("NotFound", &format!("User '{}' not found", identifier), StatusCode::NOT_FOUND), + } +} + +pub async fn iam_create_access_key( + State(state): State, + Extension(principal): Extension, + Path(identifier): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.create_access_key(&identifier) { + Ok(result) => json_response(StatusCode::CREATED, result), + Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST), + } +} + +pub async fn iam_delete_access_key( + State(state): State, + Extension(principal): Extension, + Path((_identifier, access_key)): Path<(String, String)>, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.delete_access_key(&access_key) { + Ok(()) => StatusCode::NO_CONTENT.into_response(), + Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST), + } +} + +pub async fn iam_disable_user( + State(state): State, + Extension(principal): Extension, + Path(identifier): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.set_user_enabled(&identifier, false).await { + Ok(()) => json_response(StatusCode::OK, serde_json::json!({"status": "disabled"})), + Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST), + } +} + +pub async fn iam_enable_user( + State(state): State, + Extension(principal): Extension, + Path(identifier): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match state.iam.set_user_enabled(&identifier, true).await { + Ok(()) => json_response(StatusCode::OK, serde_json::json!({"status": "enabled"})), + Err(e) => json_error("InvalidRequest", &e, StatusCode::BAD_REQUEST), + } +} + +pub async fn list_website_domains( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let store = match &state.website_domains { + Some(s) => s, + None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST), + }; + json_response(StatusCode::OK, serde_json::json!(store.list_all())) +} + +pub async fn create_website_domain( + State(state): State, + Extension(principal): Extension, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let store = match &state.website_domains { + Some(s) => s, + None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST), + }; + + let payload = match read_json_body(body).await { + Some(v) => v, + None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST), + }; + + let domain = normalize_domain(payload.get("domain").and_then(|v| v.as_str()).unwrap_or("")); + if domain.is_empty() { + return json_error("ValidationError", "domain is required", StatusCode::BAD_REQUEST); + } + if !is_valid_domain(&domain) { + return json_error("ValidationError", &format!("Invalid domain: '{}'", domain), StatusCode::BAD_REQUEST); + } + + let bucket = payload.get("bucket").and_then(|v| v.as_str()).unwrap_or("").trim().to_string(); + if bucket.is_empty() { + return json_error("ValidationError", "bucket is required", StatusCode::BAD_REQUEST); + } + + match state.storage.bucket_exists(&bucket).await { + Ok(true) => {} + _ => return json_error("NoSuchBucket", &format!("Bucket '{}' does not exist", bucket), StatusCode::NOT_FOUND), + } + + if store.get_bucket(&domain).is_some() { + return json_error("Conflict", &format!("Domain '{}' is already mapped", domain), StatusCode::CONFLICT); + } + + store.set_mapping(&domain, &bucket); + json_response(StatusCode::CREATED, serde_json::json!({"domain": domain, "bucket": bucket})) +} + +pub async fn get_website_domain( + State(state): State, + Extension(principal): Extension, + Path(domain): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let store = match &state.website_domains { + Some(s) => s, + None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST), + }; + + let domain = normalize_domain(&domain); + match store.get_bucket(&domain) { + Some(bucket) => json_response(StatusCode::OK, serde_json::json!({"domain": domain, "bucket": bucket})), + None => json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND), + } +} + +pub async fn update_website_domain( + State(state): State, + Extension(principal): Extension, + Path(domain): Path, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let store = match &state.website_domains { + Some(s) => s, + None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST), + }; + + let domain = normalize_domain(&domain); + let payload = match read_json_body(body).await { + Some(v) => v, + None => return json_error("MalformedJSON", "Invalid JSON body", StatusCode::BAD_REQUEST), + }; + + let bucket = payload.get("bucket").and_then(|v| v.as_str()).unwrap_or("").trim().to_string(); + if bucket.is_empty() { + return json_error("ValidationError", "bucket is required", StatusCode::BAD_REQUEST); + } + + match state.storage.bucket_exists(&bucket).await { + Ok(true) => {} + _ => return json_error("NoSuchBucket", &format!("Bucket '{}' does not exist", bucket), StatusCode::NOT_FOUND), + } + + if store.get_bucket(&domain).is_none() { + return json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND); + } + + store.set_mapping(&domain, &bucket); + json_response(StatusCode::OK, serde_json::json!({"domain": domain, "bucket": bucket})) +} + +pub async fn delete_website_domain( + State(state): State, + Extension(principal): Extension, + Path(domain): Path, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let store = match &state.website_domains { + Some(s) => s, + None => return json_error("InvalidRequest", "Website hosting is not enabled", StatusCode::BAD_REQUEST), + }; + + let domain = normalize_domain(&domain); + if !store.delete_mapping(&domain) { + return json_error("NotFound", &format!("No mapping found for domain '{}'", domain), StatusCode::NOT_FOUND); + } + StatusCode::NO_CONTENT.into_response() +} + +#[derive(serde::Deserialize, Default)] +pub struct PaginationQuery { + pub limit: Option, + pub offset: Option, +} + +pub async fn gc_status( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match &state.gc { + Some(gc) => json_response(StatusCode::OK, gc.status().await), + None => json_response(StatusCode::OK, serde_json::json!({"enabled": false, "message": "GC is not enabled. Set GC_ENABLED=true to enable."})), + } +} + +pub async fn gc_run( + State(state): State, + Extension(principal): Extension, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let gc = match &state.gc { + Some(gc) => gc, + None => return json_error("InvalidRequest", "GC is not enabled", StatusCode::BAD_REQUEST), + }; + + let payload = read_json_body(body).await.unwrap_or(serde_json::json!({})); + let dry_run = payload.get("dry_run").and_then(|v| v.as_bool()).unwrap_or(false); + + match gc.run_now(dry_run).await { + Ok(result) => json_response(StatusCode::OK, result), + Err(e) => json_error("Conflict", &e, StatusCode::CONFLICT), + } +} + +pub async fn gc_history( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match &state.gc { + Some(gc) => json_response(StatusCode::OK, serde_json::json!({"executions": gc.history().await})), + None => json_response(StatusCode::OK, serde_json::json!({"executions": []})), + } +} + +pub async fn integrity_status( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match &state.integrity { + Some(checker) => json_response(StatusCode::OK, checker.status().await), + None => json_response(StatusCode::OK, serde_json::json!({"enabled": false, "message": "Integrity checker is not enabled. Set INTEGRITY_ENABLED=true to enable."})), + } +} + +pub async fn integrity_run( + State(state): State, + Extension(principal): Extension, + body: Body, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + let checker = match &state.integrity { + Some(c) => c, + None => return json_error("InvalidRequest", "Integrity checker is not enabled", StatusCode::BAD_REQUEST), + }; + + let payload = read_json_body(body).await.unwrap_or(serde_json::json!({})); + let dry_run = payload.get("dry_run").and_then(|v| v.as_bool()).unwrap_or(false); + let auto_heal = payload.get("auto_heal").and_then(|v| v.as_bool()).unwrap_or(false); + + match checker.run_now(dry_run, auto_heal).await { + Ok(result) => json_response(StatusCode::OK, result), + Err(e) => json_error("Conflict", &e, StatusCode::CONFLICT), + } +} + +pub async fn integrity_history( + State(state): State, + Extension(principal): Extension, +) -> Response { + if let Some(err) = require_admin(&principal) { return err; } + match &state.integrity { + Some(checker) => json_response(StatusCode::OK, serde_json::json!({"executions": checker.history().await})), + None => json_response(StatusCode::OK, serde_json::json!({"executions": []})), + } +} diff --git a/myfsio-engine/crates/myfsio-server/src/handlers/config.rs b/myfsio-engine/crates/myfsio-server/src/handlers/config.rs index 6b95c3e..a28b3b5 100644 --- a/myfsio-engine/crates/myfsio-server/src/handlers/config.rs +++ b/myfsio-engine/crates/myfsio-server/src/handlers/config.rs @@ -692,6 +692,102 @@ pub async fn get_logging(state: &AppState, bucket: &str) -> Response { } } +pub async fn put_object_lock(state: &AppState, bucket: &str, body: Body) -> Response { + let body_bytes = match http_body_util::BodyExt::collect(body).await { + Ok(collected) => collected.to_bytes(), + Err(_) => return StatusCode::BAD_REQUEST.into_response(), + }; + let value = serde_json::Value::String(String::from_utf8_lossy(&body_bytes).to_string()); + + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.object_lock = Some(value); + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::OK.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + +pub async fn delete_object_lock(state: &AppState, bucket: &str) -> Response { + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.object_lock = None; + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::NO_CONTENT.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + +pub async fn put_notification(state: &AppState, bucket: &str, body: Body) -> Response { + let body_bytes = match http_body_util::BodyExt::collect(body).await { + Ok(collected) => collected.to_bytes(), + Err(_) => return StatusCode::BAD_REQUEST.into_response(), + }; + let value = serde_json::Value::String(String::from_utf8_lossy(&body_bytes).to_string()); + + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.notification = Some(value); + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::OK.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + +pub async fn delete_notification(state: &AppState, bucket: &str) -> Response { + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.notification = None; + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::NO_CONTENT.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + +pub async fn put_logging(state: &AppState, bucket: &str, body: Body) -> Response { + let body_bytes = match http_body_util::BodyExt::collect(body).await { + Ok(collected) => collected.to_bytes(), + Err(_) => return StatusCode::BAD_REQUEST.into_response(), + }; + let value = serde_json::Value::String(String::from_utf8_lossy(&body_bytes).to_string()); + + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.logging = Some(value); + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::OK.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + +pub async fn delete_logging(state: &AppState, bucket: &str) -> Response { + match state.storage.get_bucket_config(bucket).await { + Ok(mut config) => { + config.logging = None; + match state.storage.set_bucket_config(bucket, &config).await { + Ok(()) => StatusCode::NO_CONTENT.into_response(), + Err(e) => storage_err(e), + } + } + Err(e) => storage_err(e), + } +} + pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { match state.storage.list_buckets().await { Ok(buckets) => { @@ -727,7 +823,7 @@ pub async fn list_object_versions(state: &AppState, bucket: &str) -> Response { xml.push_str("true"); xml.push_str(&format!( "{}", - obj.last_modified.to_rfc3339() + myfsio_xml::response::format_s3_datetime(&obj.last_modified) )); if let Some(ref etag) = obj.etag { xml.push_str(&format!("\"{}\"", etag)); diff --git a/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs b/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs index c6ce18f..0964c8b 100644 --- a/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs +++ b/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs @@ -1,3 +1,4 @@ +pub mod admin; mod config; pub mod kms; mod select; @@ -94,6 +95,15 @@ pub async fn create_bucket( if query.website.is_some() { return config::put_website(&state, &bucket, body).await; } + if query.object_lock.is_some() { + return config::put_object_lock(&state, &bucket, body).await; + } + if query.notification.is_some() { + return config::put_notification(&state, &bucket, body).await; + } + if query.logging.is_some() { + return config::put_logging(&state, &bucket, body).await; + } match state.storage.create_bucket(&bucket).await { Ok(()) => { @@ -375,6 +385,15 @@ pub async fn delete_bucket( if query.replication.is_some() { return config::delete_replication(&state, &bucket).await; } + if query.object_lock.is_some() { + return config::delete_object_lock(&state, &bucket).await; + } + if query.notification.is_some() { + return config::delete_notification(&state, &bucket).await; + } + if query.logging.is_some() { + return config::delete_logging(&state, &bucket).await; + } match state.storage.delete_bucket(&bucket).await { Ok(()) => StatusCode::NO_CONTENT.into_response(), @@ -1026,7 +1045,7 @@ async fn copy_object_handler( match state.storage.copy_object(src_bucket, src_key, dst_bucket, dst_key).await { Ok(meta) => { let etag = meta.etag.as_deref().unwrap_or(""); - let last_modified = meta.last_modified.to_rfc3339(); + let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified); let xml = myfsio_xml::response::copy_object_result_xml(etag, &last_modified); (StatusCode::OK, [("content-type", "application/xml")], xml).into_response() } diff --git a/myfsio-engine/crates/myfsio-server/src/lib.rs b/myfsio-engine/crates/myfsio-server/src/lib.rs index 47be587..e517956 100644 --- a/myfsio-engine/crates/myfsio-server/src/lib.rs +++ b/myfsio-engine/crates/myfsio-server/src/lib.rs @@ -39,6 +39,30 @@ pub fn create_router(state: state::AppState) -> Router { .route("/kms/generate-data-key", axum::routing::post(handlers::kms::generate_data_key)); } + router = router + .route("/admin/site/local", axum::routing::get(handlers::admin::get_local_site).put(handlers::admin::update_local_site)) + .route("/admin/site/all", axum::routing::get(handlers::admin::list_all_sites)) + .route("/admin/site/peers", axum::routing::post(handlers::admin::register_peer_site)) + .route("/admin/site/peers/{site_id}", axum::routing::get(handlers::admin::get_peer_site).put(handlers::admin::update_peer_site).delete(handlers::admin::delete_peer_site)) + .route("/admin/site/peers/{site_id}/health", axum::routing::post(handlers::admin::check_peer_health)) + .route("/admin/site/topology", axum::routing::get(handlers::admin::get_topology)) + .route("/admin/site/peers/{site_id}/bidirectional-status", axum::routing::get(handlers::admin::check_bidirectional_status)) + .route("/admin/iam/users", axum::routing::get(handlers::admin::iam_list_users)) + .route("/admin/iam/users/{identifier}", axum::routing::get(handlers::admin::iam_get_user)) + .route("/admin/iam/users/{identifier}/policies", axum::routing::get(handlers::admin::iam_get_user_policies)) + .route("/admin/iam/users/{identifier}/access-keys", axum::routing::post(handlers::admin::iam_create_access_key)) + .route("/admin/iam/users/{identifier}/access-keys/{access_key}", axum::routing::delete(handlers::admin::iam_delete_access_key)) + .route("/admin/iam/users/{identifier}/disable", axum::routing::post(handlers::admin::iam_disable_user)) + .route("/admin/iam/users/{identifier}/enable", axum::routing::post(handlers::admin::iam_enable_user)) + .route("/admin/website-domains", axum::routing::get(handlers::admin::list_website_domains).post(handlers::admin::create_website_domain)) + .route("/admin/website-domains/{domain}", axum::routing::get(handlers::admin::get_website_domain).put(handlers::admin::update_website_domain).delete(handlers::admin::delete_website_domain)) + .route("/admin/gc/status", axum::routing::get(handlers::admin::gc_status)) + .route("/admin/gc/run", axum::routing::post(handlers::admin::gc_run)) + .route("/admin/gc/history", axum::routing::get(handlers::admin::gc_history)) + .route("/admin/integrity/status", axum::routing::get(handlers::admin::integrity_status)) + .route("/admin/integrity/run", axum::routing::post(handlers::admin::integrity_run)) + .route("/admin/integrity/history", axum::routing::get(handlers::admin::integrity_history)); + router .layer(axum::middleware::from_fn_with_state( state.clone(), diff --git a/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs b/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs index 9a1bbef..5dfa0d5 100644 --- a/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs +++ b/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs @@ -70,6 +70,10 @@ fn authorize_request(state: &AppState, principal: &Principal, req: &Request) -> return Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied")); } + if path.starts_with("/admin/") || path.starts_with("/kms/") { + return Ok(()); + } + let mut segments = path.trim_start_matches('/').split('/').filter(|s| !s.is_empty()); let bucket = match segments.next() { Some(b) => b, diff --git a/myfsio-engine/crates/myfsio-server/src/services/mod.rs b/myfsio-engine/crates/myfsio-server/src/services/mod.rs index 08a673c..5d05fc6 100644 --- a/myfsio-engine/crates/myfsio-server/src/services/mod.rs +++ b/myfsio-engine/crates/myfsio-server/src/services/mod.rs @@ -2,3 +2,5 @@ pub mod gc; pub mod lifecycle; pub mod integrity; pub mod metrics; +pub mod site_registry; +pub mod website_domains; diff --git a/myfsio-engine/crates/myfsio-server/src/services/site_registry.rs b/myfsio-engine/crates/myfsio-server/src/services/site_registry.rs new file mode 100644 index 0000000..f7a2b48 --- /dev/null +++ b/myfsio-engine/crates/myfsio-server/src/services/site_registry.rs @@ -0,0 +1,143 @@ +use chrono::Utc; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::sync::Arc; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SiteInfo { + pub site_id: String, + pub endpoint: String, + #[serde(default = "default_region")] + pub region: String, + #[serde(default = "default_priority")] + pub priority: i32, + #[serde(default)] + pub display_name: String, + #[serde(default)] + pub created_at: Option, +} + +fn default_region() -> String { + "us-east-1".to_string() +} +fn default_priority() -> i32 { + 100 +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerSite { + pub site_id: String, + pub endpoint: String, + #[serde(default = "default_region")] + pub region: String, + #[serde(default = "default_priority")] + pub priority: i32, + #[serde(default)] + pub display_name: String, + #[serde(default)] + pub connection_id: Option, + #[serde(default)] + pub created_at: Option, + #[serde(default)] + pub is_healthy: bool, + #[serde(default)] + pub last_health_check: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct RegistryData { + #[serde(default)] + local: Option, + #[serde(default)] + peers: Vec, +} + +pub struct SiteRegistry { + path: PathBuf, + data: Arc>, +} + +impl SiteRegistry { + pub fn new(storage_root: &std::path::Path) -> Self { + let path = storage_root + .join(".myfsio.sys") + .join("config") + .join("site_registry.json"); + let data = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() + } else { + RegistryData::default() + }; + Self { + path, + data: Arc::new(RwLock::new(data)), + } + } + + fn save(&self) { + let data = self.data.read(); + if let Some(parent) = self.path.parent() { + let _ = std::fs::create_dir_all(parent); + } + if let Ok(json) = serde_json::to_string_pretty(&*data) { + let _ = std::fs::write(&self.path, json); + } + } + + pub fn get_local_site(&self) -> Option { + self.data.read().local.clone() + } + + pub fn set_local_site(&self, site: SiteInfo) { + self.data.write().local = Some(site); + self.save(); + } + + pub fn list_peers(&self) -> Vec { + self.data.read().peers.clone() + } + + pub fn get_peer(&self, site_id: &str) -> Option { + self.data.read().peers.iter().find(|p| p.site_id == site_id).cloned() + } + + pub fn add_peer(&self, peer: PeerSite) { + self.data.write().peers.push(peer); + self.save(); + } + + pub fn update_peer(&self, peer: PeerSite) { + let mut data = self.data.write(); + if let Some(existing) = data.peers.iter_mut().find(|p| p.site_id == peer.site_id) { + *existing = peer; + } + drop(data); + self.save(); + } + + pub fn delete_peer(&self, site_id: &str) -> bool { + let mut data = self.data.write(); + let len_before = data.peers.len(); + data.peers.retain(|p| p.site_id != site_id); + let removed = data.peers.len() < len_before; + drop(data); + if removed { + self.save(); + } + removed + } + + pub fn update_health(&self, site_id: &str, is_healthy: bool) { + let mut data = self.data.write(); + if let Some(peer) = data.peers.iter_mut().find(|p| p.site_id == site_id) { + peer.is_healthy = is_healthy; + peer.last_health_check = Some(Utc::now().to_rfc3339()); + } + drop(data); + self.save(); + } +} diff --git a/myfsio-engine/crates/myfsio-server/src/services/website_domains.rs b/myfsio-engine/crates/myfsio-server/src/services/website_domains.rs new file mode 100644 index 0000000..7ce27cf --- /dev/null +++ b/myfsio-engine/crates/myfsio-server/src/services/website_domains.rs @@ -0,0 +1,104 @@ +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct DomainData { + #[serde(default)] + mappings: HashMap, +} + +pub struct WebsiteDomainStore { + path: PathBuf, + data: Arc>, +} + +impl WebsiteDomainStore { + pub fn new(storage_root: &std::path::Path) -> Self { + let path = storage_root + .join(".myfsio.sys") + .join("config") + .join("website_domains.json"); + let data = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() + } else { + DomainData::default() + }; + Self { + path, + data: Arc::new(RwLock::new(data)), + } + } + + fn save(&self) { + let data = self.data.read(); + if let Some(parent) = self.path.parent() { + let _ = std::fs::create_dir_all(parent); + } + if let Ok(json) = serde_json::to_string_pretty(&*data) { + let _ = std::fs::write(&self.path, json); + } + } + + pub fn list_all(&self) -> Vec { + self.data + .read() + .mappings + .iter() + .map(|(domain, bucket)| { + serde_json::json!({ + "domain": domain, + "bucket": bucket, + }) + }) + .collect() + } + + pub fn get_bucket(&self, domain: &str) -> Option { + self.data.read().mappings.get(domain).cloned() + } + + pub fn set_mapping(&self, domain: &str, bucket: &str) { + self.data.write().mappings.insert(domain.to_string(), bucket.to_string()); + self.save(); + } + + pub fn delete_mapping(&self, domain: &str) -> bool { + let removed = self.data.write().mappings.remove(domain).is_some(); + if removed { + self.save(); + } + removed + } +} + +pub fn normalize_domain(domain: &str) -> String { + domain.trim().to_ascii_lowercase() +} + +pub fn is_valid_domain(domain: &str) -> bool { + if domain.is_empty() || domain.len() > 253 { + return false; + } + let labels: Vec<&str> = domain.split('.').collect(); + if labels.len() < 2 { + return false; + } + for label in &labels { + if label.is_empty() || label.len() > 63 { + return false; + } + if !label.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') { + return false; + } + if label.starts_with('-') || label.ends_with('-') { + return false; + } + } + true +} diff --git a/myfsio-engine/crates/myfsio-server/src/state.rs b/myfsio-engine/crates/myfsio-server/src/state.rs index 945a061..963f74b 100644 --- a/myfsio-engine/crates/myfsio-server/src/state.rs +++ b/myfsio-engine/crates/myfsio-server/src/state.rs @@ -4,6 +4,8 @@ use crate::config::ServerConfig; use crate::services::gc::GcService; use crate::services::integrity::IntegrityService; use crate::services::metrics::MetricsService; +use crate::services::site_registry::SiteRegistry; +use crate::services::website_domains::WebsiteDomainStore; use myfsio_auth::iam::IamService; use myfsio_crypto::encryption::EncryptionService; use myfsio_crypto::kms::KmsService; @@ -19,6 +21,8 @@ pub struct AppState { pub gc: Option>, pub integrity: Option>, pub metrics: Option>, + pub site_registry: Option>, + pub website_domains: Option>, } impl AppState { @@ -57,6 +61,14 @@ impl AppState { None }; + let site_registry = Some(Arc::new(SiteRegistry::new(&config.storage_root))); + + let website_domains = if config.website_hosting_enabled { + Some(Arc::new(WebsiteDomainStore::new(&config.storage_root))) + } else { + None + }; + Self { config, storage, @@ -66,6 +78,8 @@ impl AppState { gc, integrity, metrics, + site_registry, + website_domains, } } diff --git a/myfsio-engine/crates/myfsio-server/tests/integration.rs b/myfsio-engine/crates/myfsio-server/tests/integration.rs index e9a2fbf..64b71fa 100644 --- a/myfsio-engine/crates/myfsio-server/tests/integration.rs +++ b/myfsio-engine/crates/myfsio-server/tests/integration.rs @@ -29,6 +29,7 @@ fn test_app_with_iam(iam_json: serde_json::Value) -> (axum::Router, tempfile::Te integrity_enabled: false, metrics_enabled: false, lifecycle_enabled: false, + website_hosting_enabled: false, }; let state = myfsio_server::state::AppState::new(config); let app = myfsio_server::create_router(state); @@ -1955,6 +1956,7 @@ async fn test_non_admin_authorization_enforced() { integrity_enabled: false, metrics_enabled: false, lifecycle_enabled: false, + website_hosting_enabled: false, }; let state = myfsio_server::state::AppState::new(config); state.storage.create_bucket("authz-bucket").await.unwrap(); @@ -2014,6 +2016,7 @@ async fn test_app_encrypted() -> (axum::Router, tempfile::TempDir) { integrity_enabled: false, metrics_enabled: false, lifecycle_enabled: false, + website_hosting_enabled: false, }; let state = myfsio_server::state::AppState::new_with_encryption(config).await; let app = myfsio_server::create_router(state); diff --git a/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs b/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs index e3c2b0b..0fadb8a 100644 --- a/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs +++ b/myfsio-engine/crates/myfsio-storage/src/fs_backend.rs @@ -899,34 +899,22 @@ impl FsStorageBackend { }) } - fn put_object_sync( + fn finalize_put_sync( &self, bucket_name: &str, key: &str, - data: &[u8], + tmp_path: &Path, + etag: String, + new_size: u64, metadata: Option>, ) -> StorageResult { let bucket_path = self.require_bucket(bucket_name)?; - self.validate_key(key)?; - let destination = bucket_path.join(key); if let Some(parent) = destination.parent() { std::fs::create_dir_all(parent).map_err(StorageError::Io)?; } let is_overwrite = destination.exists(); - - let tmp_dir = self.tmp_dir(); - std::fs::create_dir_all(&tmp_dir).map_err(StorageError::Io)?; - let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); - - let mut hasher = Md5::new(); - hasher.update(data); - let etag = format!("{:x}", hasher.finalize()); - - std::fs::write(&tmp_path, data).map_err(StorageError::Io)?; - let new_size = data.len() as u64; - let lock_dir = self.system_bucket_root(bucket_name).join("locks"); std::fs::create_dir_all(&lock_dir).map_err(StorageError::Io)?; @@ -936,8 +924,8 @@ impl FsStorageBackend { .map_err(StorageError::Io)?; } - std::fs::rename(&tmp_path, &destination).map_err(|e| { - let _ = std::fs::remove_file(&tmp_path); + std::fs::rename(tmp_path, &destination).map_err(|e| { + let _ = std::fs::remove_file(tmp_path); StorageError::Io(e) })?; @@ -973,6 +961,29 @@ impl FsStorageBackend { obj.metadata = metadata.unwrap_or_default(); Ok(obj) } + + fn put_object_sync( + &self, + bucket_name: &str, + key: &str, + data: &[u8], + metadata: Option>, + ) -> StorageResult { + self.validate_key(key)?; + + let tmp_dir = self.tmp_dir(); + std::fs::create_dir_all(&tmp_dir).map_err(StorageError::Io)?; + let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); + + let mut hasher = Md5::new(); + hasher.update(data); + let etag = format!("{:x}", hasher.finalize()); + + std::fs::write(&tmp_path, data).map_err(StorageError::Io)?; + let new_size = data.len() as u64; + + self.finalize_put_sync(bucket_name, key, &tmp_path, etag, new_size, metadata) + } } enum Either { @@ -1082,12 +1093,38 @@ impl crate::traits::StorageEngine for FsStorageBackend { mut stream: AsyncReadStream, metadata: Option>, ) -> StorageResult { - let mut data = Vec::new(); - stream - .read_to_end(&mut data) + self.validate_key(key)?; + + let tmp_dir = self.tmp_dir(); + tokio::fs::create_dir_all(&tmp_dir) .await .map_err(StorageError::Io)?; - self.put_object_sync(bucket, key, &data, metadata) + let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); + + let mut file = tokio::fs::File::create(&tmp_path) + .await + .map_err(StorageError::Io)?; + let mut hasher = Md5::new(); + let mut total_size: u64 = 0; + let mut buf = [0u8; 65536]; + loop { + let n = stream.read(&mut buf).await.map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + tokio::io::AsyncWriteExt::write_all(&mut file, &buf[..n]) + .await + .map_err(StorageError::Io)?; + total_size += n as u64; + } + tokio::io::AsyncWriteExt::flush(&mut file) + .await + .map_err(StorageError::Io)?; + drop(file); + + let etag = format!("{:x}", hasher.finalize()); + self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata) } async fn get_object( @@ -1295,18 +1332,36 @@ impl crate::traits::StorageEngine for FsStorageBackend { return Err(StorageError::UploadNotFound(upload_id.to_string())); } - let mut data = Vec::new(); - stream - .read_to_end(&mut data) + let part_file = upload_dir.join(format!("part-{:05}.part", part_number)); + let tmp_file = upload_dir.join(format!("part-{:05}.part.tmp", part_number)); + + let mut file = tokio::fs::File::create(&tmp_file) .await .map_err(StorageError::Io)?; - let mut hasher = Md5::new(); - hasher.update(&data); + let mut part_size: u64 = 0; + let mut buf = [0u8; 65536]; + loop { + let n = stream.read(&mut buf).await.map_err(StorageError::Io)?; + if n == 0 { + break; + } + hasher.update(&buf[..n]); + tokio::io::AsyncWriteExt::write_all(&mut file, &buf[..n]) + .await + .map_err(StorageError::Io)?; + part_size += n as u64; + } + tokio::io::AsyncWriteExt::flush(&mut file) + .await + .map_err(StorageError::Io)?; + drop(file); + let etag = format!("{:x}", hasher.finalize()); - let part_file = upload_dir.join(format!("part-{:05}.part", part_number)); - std::fs::write(&part_file, &data).map_err(StorageError::Io)?; + tokio::fs::rename(&tmp_file, &part_file) + .await + .map_err(StorageError::Io)?; let lock_path = upload_dir.join(".manifest.lock"); let lock = self.get_meta_index_lock(&lock_path.to_string_lossy()); @@ -1322,7 +1377,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { part_number.to_string(), serde_json::json!({ "etag": etag, - "size": data.len(), + "size": part_size, "filename": format!("part-{:05}.part", part_number), }), ); @@ -1362,20 +1417,57 @@ impl crate::traits::StorageEngine for FsStorageBackend { .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default(); - let mut combined_data = Vec::new(); + let tmp_dir = self.tmp_dir(); + tokio::fs::create_dir_all(&tmp_dir) + .await + .map_err(StorageError::Io)?; + let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4())); + + let mut out_file = tokio::fs::File::create(&tmp_path) + .await + .map_err(StorageError::Io)?; + let mut md5_digest_concat = Vec::new(); + let mut total_size: u64 = 0; + let part_count = parts.len(); + for part_info in parts { let part_file = upload_dir.join(format!("part-{:05}.part", part_info.part_number)); if !part_file.exists() { + let _ = tokio::fs::remove_file(&tmp_path).await; return Err(StorageError::InvalidObjectKey(format!( "Part {} not found", part_info.part_number ))); } - let part_data = std::fs::read(&part_file).map_err(StorageError::Io)?; - combined_data.extend_from_slice(&part_data); + let mut part_reader = tokio::fs::File::open(&part_file) + .await + .map_err(StorageError::Io)?; + let mut part_hasher = Md5::new(); + let mut buf = [0u8; 65536]; + loop { + let n = part_reader.read(&mut buf).await.map_err(StorageError::Io)?; + if n == 0 { + break; + } + part_hasher.update(&buf[..n]); + tokio::io::AsyncWriteExt::write_all(&mut out_file, &buf[..n]) + .await + .map_err(StorageError::Io)?; + total_size += n as u64; + } + md5_digest_concat.extend_from_slice(&part_hasher.finalize()); } - let result = self.put_object_sync(bucket, &object_key, &combined_data, Some(metadata))?; + tokio::io::AsyncWriteExt::flush(&mut out_file) + .await + .map_err(StorageError::Io)?; + drop(out_file); + + let mut composite_hasher = Md5::new(); + composite_hasher.update(&md5_digest_concat); + let etag = format!("{:x}-{}", composite_hasher.finalize(), part_count); + + let result = self.finalize_put_sync(bucket, &object_key, &tmp_path, etag, total_size, Some(metadata))?; let _ = std::fs::remove_dir_all(&upload_dir); diff --git a/myfsio-engine/crates/myfsio-xml/src/response.rs b/myfsio-engine/crates/myfsio-xml/src/response.rs index 791e3f0..346f715 100644 --- a/myfsio-engine/crates/myfsio-xml/src/response.rs +++ b/myfsio-engine/crates/myfsio-xml/src/response.rs @@ -1,8 +1,13 @@ +use chrono::{DateTime, Utc}; use myfsio_common::types::{BucketMeta, ObjectMeta}; use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event}; use quick_xml::Writer; use std::io::Cursor; +pub fn format_s3_datetime(dt: &DateTime) -> String { + dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() +} + pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta]) -> String { let mut writer = Writer::new(Cursor::new(Vec::new())); @@ -21,7 +26,7 @@ pub fn list_buckets_xml(owner_id: &str, owner_name: &str, buckets: &[BucketMeta] for bucket in buckets { writer.write_event(Event::Start(BytesStart::new("Bucket"))).unwrap(); write_text_element(&mut writer, "Name", &bucket.name); - write_text_element(&mut writer, "CreationDate", &bucket.creation_date.to_rfc3339()); + write_text_element(&mut writer, "CreationDate", &format_s3_datetime(&bucket.creation_date)); writer.write_event(Event::End(BytesEnd::new("Bucket"))).unwrap(); } writer.write_event(Event::End(BytesEnd::new("Buckets"))).unwrap(); @@ -70,7 +75,7 @@ pub fn list_objects_v2_xml( for obj in objects { writer.write_event(Event::Start(BytesStart::new("Contents"))).unwrap(); write_text_element(&mut writer, "Key", &obj.key); - write_text_element(&mut writer, "LastModified", &obj.last_modified.to_rfc3339()); + write_text_element(&mut writer, "LastModified", &format_s3_datetime(&obj.last_modified)); if let Some(ref etag) = obj.etag { write_text_element(&mut writer, "ETag", &format!("\"{}\"", etag)); } @@ -133,7 +138,7 @@ pub fn list_objects_v1_xml( .write_event(Event::Start(BytesStart::new("Contents"))) .unwrap(); write_text_element(&mut writer, "Key", &obj.key); - write_text_element(&mut writer, "LastModified", &obj.last_modified.to_rfc3339()); + write_text_element(&mut writer, "LastModified", &format_s3_datetime(&obj.last_modified)); if let Some(ref etag) = obj.etag { write_text_element(&mut writer, "ETag", &format!("\"{}\"", etag)); } @@ -268,7 +273,7 @@ pub fn list_multipart_uploads_xml( writer.write_event(Event::Start(BytesStart::new("Upload"))).unwrap(); write_text_element(&mut writer, "Key", &upload.key); write_text_element(&mut writer, "UploadId", &upload.upload_id); - write_text_element(&mut writer, "Initiated", &upload.initiated.to_rfc3339()); + write_text_element(&mut writer, "Initiated", &format_s3_datetime(&upload.initiated)); writer.write_event(Event::End(BytesEnd::new("Upload"))).unwrap(); } @@ -299,7 +304,7 @@ pub fn list_parts_xml( write_text_element(&mut writer, "ETag", &format!("\"{}\"", part.etag)); write_text_element(&mut writer, "Size", &part.size.to_string()); if let Some(ref lm) = part.last_modified { - write_text_element(&mut writer, "LastModified", &lm.to_rfc3339()); + write_text_element(&mut writer, "LastModified", &format_s3_datetime(lm)); } writer.write_event(Event::End(BytesEnd::new("Part"))).unwrap(); }