diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs index 971e55c..e54b8a3 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/config.rs @@ -1,10 +1,19 @@ use axum::body::Body; -use axum::http::StatusCode; +use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; +use chrono::{DateTime, Utc}; use myfsio_common::error::{S3Error, S3ErrorCode}; use myfsio_storage::traits::StorageEngine; +use crate::services::acl::{ + acl_from_object_metadata, acl_to_xml, create_canned_acl, store_object_acl, +}; +use crate::services::notifications::parse_notification_configurations; +use crate::services::object_lock::{ + ensure_retention_mutable, get_legal_hold, get_object_retention as retention_from_metadata, + set_legal_hold, set_object_retention as store_retention, ObjectLockRetention, RetentionMode, +}; use crate::state::AppState; fn xml_response(status: StatusCode, xml: String) -> Response { @@ -32,6 +41,16 @@ fn json_response(status: StatusCode, value: serde_json::Value) -> Response { .into_response() } +fn custom_xml_error(status: StatusCode, code: &str, message: &str) -> Response { + let xml = format!( + "\ + {}{}", + xml_escape(code), + xml_escape(message), + ); + xml_response(status, xml) +} + pub async fn get_versioning(state: &AppState, bucket: &str) -> Response { match state.storage.is_versioning_enabled(bucket).await { Ok(enabled) => { @@ -847,13 +866,34 @@ pub async fn delete_object_lock(state: &AppState, bucket: &str) -> Response { pub async fn put_notification(state: &AppState, bucket: &str, body: Body) -> Response { let body_bytes = match http_body_util::BodyExt::collect(body).await { Ok(collected) => collected.to_bytes(), - Err(_) => return StatusCode::BAD_REQUEST.into_response(), + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "MalformedXML", + "Unable to parse XML document", + ) + } + }; + let raw = String::from_utf8_lossy(&body_bytes).to_string(); + let notification = if raw.trim().is_empty() { + None + } else { + match parse_notification_configurations(&raw) { + Ok(_) => Some(serde_json::Value::String(raw)), + Err(message) => { + let code = if message.contains("Destination URL is required") { + "InvalidArgument" + } else { + "MalformedXML" + }; + return custom_xml_error(StatusCode::BAD_REQUEST, code, &message); + } + } }; - let value = serde_json::Value::String(String::from_utf8_lossy(&body_bytes).to_string()); match state.storage.get_bucket_config(bucket).await { Ok(mut config) => { - config.notification = Some(value); + config.notification = notification; match state.storage.set_bucket_config(bucket, &config).await { Ok(()) => StatusCode::OK.into_response(), Err(e) => storage_err(e), @@ -1094,40 +1134,64 @@ pub async fn delete_object_tagging(state: &AppState, bucket: &str, key: &str) -> } } -pub async fn get_object_acl(state: &AppState, bucket: &str, key: &str) -> Response { +pub async fn put_object_acl( + state: &AppState, + bucket: &str, + key: &str, + headers: &HeaderMap, + _body: Body, +) -> Response { match state.storage.head_object(bucket, key).await { Ok(_) => { - let xml = "\ - \ - myfsiomyfsio\ - \ - \ - myfsiomyfsio\ - FULL_CONTROL\ - "; - xml_response(StatusCode::OK, xml.to_string()) + let canned_acl = headers + .get("x-amz-acl") + .and_then(|value| value.to_str().ok()) + .unwrap_or("private"); + let mut metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + let owner = acl_from_object_metadata(&metadata) + .map(|acl| acl.owner) + .unwrap_or_else(|| "myfsio".to_string()); + let acl = create_canned_acl(canned_acl, &owner); + store_object_acl(&mut metadata, &acl); + match state.storage.put_object_metadata(bucket, key, &metadata).await { + Ok(()) => StatusCode::OK.into_response(), + Err(err) => storage_err(err), + } } Err(e) => storage_err(e), } } -pub async fn put_object_acl(state: &AppState, bucket: &str, key: &str, _body: Body) -> Response { - match state.storage.head_object(bucket, key).await { - Ok(_) => StatusCode::OK.into_response(), - Err(e) => storage_err(e), - } -} - pub async fn get_object_retention(state: &AppState, bucket: &str, key: &str) -> Response { match state.storage.head_object(bucket, key).await { - Ok(_) => xml_response( - StatusCode::NOT_FOUND, - S3Error::new( - S3ErrorCode::InvalidRequest, - "No retention policy configured", - ) - .to_xml(), - ), + Ok(_) => { + let metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + if let Some(retention) = retention_from_metadata(&metadata) { + let xml = format!( + "\ + \ + {}{}", + match retention.mode { + RetentionMode::GOVERNANCE => "GOVERNANCE", + RetentionMode::COMPLIANCE => "COMPLIANCE", + }, + retention.retain_until_date.format("%Y-%m-%dT%H:%M:%S.000Z"), + ); + xml_response(StatusCode::OK, xml) + } else { + custom_xml_error( + StatusCode::NOT_FOUND, + "NoSuchObjectLockConfiguration", + "No retention policy", + ) + } + } Err(e) => storage_err(e), } } @@ -1136,21 +1200,108 @@ pub async fn put_object_retention( state: &AppState, bucket: &str, key: &str, - _body: Body, + headers: &HeaderMap, + body: Body, ) -> Response { match state.storage.head_object(bucket, key).await { - Ok(_) => StatusCode::OK.into_response(), - Err(e) => storage_err(e), + Ok(_) => {} + Err(e) => return storage_err(e), + } + + let body_bytes = match http_body_util::BodyExt::collect(body).await { + Ok(collected) => collected.to_bytes(), + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "MalformedXML", + "Unable to parse XML document", + ) + } + }; + let body_str = String::from_utf8_lossy(&body_bytes); + let doc = match roxmltree::Document::parse(&body_str) { + Ok(doc) => doc, + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "MalformedXML", + "Unable to parse XML document", + ) + } + }; + let mode = find_xml_text(&doc, "Mode").unwrap_or_default(); + let retain_until = find_xml_text(&doc, "RetainUntilDate").unwrap_or_default(); + if mode.is_empty() || retain_until.is_empty() { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "InvalidArgument", + "Mode and RetainUntilDate are required", + ); + } + let mode = match mode.as_str() { + "GOVERNANCE" => RetentionMode::GOVERNANCE, + "COMPLIANCE" => RetentionMode::COMPLIANCE, + other => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "InvalidArgument", + &format!("Invalid retention mode: {}", other), + ) + } + }; + let retain_until_date = match DateTime::parse_from_rfc3339(&retain_until) { + Ok(value) => value.with_timezone(&Utc), + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "InvalidArgument", + &format!("Invalid date format: {}", retain_until), + ) + } + }; + + let bypass_governance = headers + .get("x-amz-bypass-governance-retention") + .and_then(|value| value.to_str().ok()) + .map(|value| value.eq_ignore_ascii_case("true")) + .unwrap_or(false); + let mut metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + if let Err(message) = ensure_retention_mutable(&metadata, bypass_governance) { + return custom_xml_error(StatusCode::FORBIDDEN, "AccessDenied", &message); + } + if let Err(message) = store_retention( + &mut metadata, + &ObjectLockRetention { + mode, + retain_until_date, + }, + ) { + return custom_xml_error(StatusCode::BAD_REQUEST, "InvalidArgument", &message); + } + match state.storage.put_object_metadata(bucket, key, &metadata).await { + Ok(()) => StatusCode::OK.into_response(), + Err(err) => storage_err(err), } } pub async fn get_object_legal_hold(state: &AppState, bucket: &str, key: &str) -> Response { match state.storage.head_object(bucket, key).await { Ok(_) => { - let xml = "\ - \ - OFF"; - xml_response(StatusCode::OK, xml.to_string()) + let metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + let status = if get_legal_hold(&metadata) { "ON" } else { "OFF" }; + let xml = format!( + "\ + \ + {}", + status + ); + xml_response(StatusCode::OK, xml) } Err(e) => storage_err(e), } @@ -1160,14 +1311,80 @@ pub async fn put_object_legal_hold( state: &AppState, bucket: &str, key: &str, - _body: Body, + body: Body, ) -> Response { match state.storage.head_object(bucket, key).await { - Ok(_) => StatusCode::OK.into_response(), + Ok(_) => {} + Err(e) => return storage_err(e), + } + + let body_bytes = match http_body_util::BodyExt::collect(body).await { + Ok(collected) => collected.to_bytes(), + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "MalformedXML", + "Unable to parse XML document", + ) + } + }; + let body_str = String::from_utf8_lossy(&body_bytes); + let doc = match roxmltree::Document::parse(&body_str) { + Ok(doc) => doc, + Err(_) => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "MalformedXML", + "Unable to parse XML document", + ) + } + }; + let status = find_xml_text(&doc, "Status").unwrap_or_default(); + let enabled = match status.as_str() { + "ON" => true, + "OFF" => false, + _ => { + return custom_xml_error( + StatusCode::BAD_REQUEST, + "InvalidArgument", + "Status must be ON or OFF", + ) + } + }; + let mut metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + set_legal_hold(&mut metadata, enabled); + match state.storage.put_object_metadata(bucket, key, &metadata).await { + Ok(()) => StatusCode::OK.into_response(), + Err(err) => storage_err(err), + } +} + +pub async fn get_object_acl(state: &AppState, bucket: &str, key: &str) -> Response { + match state.storage.head_object(bucket, key).await { + Ok(_) => { + let metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return storage_err(err), + }; + let acl = acl_from_object_metadata(&metadata) + .unwrap_or_else(|| create_canned_acl("private", "myfsio")); + xml_response(StatusCode::OK, acl_to_xml(&acl)) + } Err(e) => storage_err(e), } } +fn find_xml_text(doc: &roxmltree::Document<'_>, name: &str) -> Option { + doc.descendants() + .find(|node| node.is_element() && node.tag_name().name() == name) + .and_then(|node| node.text()) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) +} + #[cfg(test)] mod tests { use super::{legacy_logging_config, parse_logging_config_xml}; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs index 2f63435..e00d978 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/mod.rs @@ -24,6 +24,8 @@ use myfsio_storage::traits::StorageEngine; use tokio::io::AsyncSeekExt; use tokio_util::io::ReaderStream; +use crate::services::notifications; +use crate::services::object_lock; use crate::state::AppState; fn s3_error_response(err: S3Error) -> Response { @@ -45,6 +47,39 @@ fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response { s3_error_response(S3Error::from(err)) } +async fn ensure_object_lock_allows_write( + state: &AppState, + bucket: &str, + key: &str, + headers: Option<&HeaderMap>, +) -> Result<(), Response> { + match state.storage.head_object(bucket, key).await { + Ok(_) => { + let metadata = match state.storage.get_object_metadata(bucket, key).await { + Ok(metadata) => metadata, + Err(err) => return Err(storage_err_response(err)), + }; + let bypass_governance = headers + .and_then(|headers| { + headers + .get("x-amz-bypass-governance-retention") + .and_then(|value| value.to_str().ok()) + }) + .map(|value| value.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) { + return Err(s3_error_response(S3Error::new( + S3ErrorCode::AccessDenied, + message, + ))); + } + Ok(()) + } + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), + Err(err) => Err(storage_err_response(err)), + } +} + pub async fn list_buckets(State(state): State) -> Response { match state.storage.list_buckets().await { Ok(buckets) => { @@ -549,10 +584,10 @@ pub async fn put_object( return config::put_object_tagging(&state, &bucket, &key, body).await; } if query.acl.is_some() { - return config::put_object_acl(&state, &bucket, &key, body).await; + return config::put_object_acl(&state, &bucket, &key, &headers, body).await; } if query.retention.is_some() { - return config::put_object_retention(&state, &bucket, &key, body).await; + return config::put_object_retention(&state, &bucket, &key, &headers, body).await; } if query.legal_hold.is_some() { return config::put_object_legal_hold(&state, &bucket, &key, body).await; @@ -597,6 +632,11 @@ pub async fn put_object( return copy_object_handler(&state, copy_source, &bucket, &key, &headers).await; } + if let Err(response) = ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await + { + return response; + } + let content_type = guessed_content_type( &key, headers.get("content-type").and_then(|v| v.to_str().ok()), @@ -678,6 +718,17 @@ pub async fn put_object( "x-amz-server-side-encryption", enc_ctx.algorithm.as_str().parse().unwrap(), ); + notifications::emit_object_created( + &state, + &bucket, + &key, + meta.size, + meta.etag.as_deref(), + "", + "", + "", + "Put", + ); return (StatusCode::OK, resp_headers).into_response(); } Err(e) => { @@ -695,6 +746,17 @@ pub async fn put_object( if let Some(ref etag) = meta.etag { resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); } + notifications::emit_object_created( + &state, + &bucket, + &key, + meta.size, + meta.etag.as_deref(), + "", + "", + "", + "Put", + ); (StatusCode::OK, resp_headers).into_response() } Err(e) => storage_err_response(e), @@ -890,6 +952,7 @@ pub async fn delete_object( State(state): State, Path((bucket, key)): Path<(String, String)>, Query(query): Query, + headers: HeaderMap, ) -> Response { if query.tagging.is_some() { return config::delete_object_tagging(&state, &bucket, &key).await; @@ -902,8 +965,16 @@ pub async fn delete_object( return abort_multipart_handler(&state, &bucket, upload_id).await; } + if let Err(response) = ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await + { + return response; + } + match state.storage.delete_object(&bucket, &key).await { - Ok(()) => StatusCode::NO_CONTENT.into_response(), + Ok(()) => { + notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete"); + StatusCode::NO_CONTENT.into_response() + } Err(e) => storage_err_response(e), } } @@ -1218,6 +1289,10 @@ async fn copy_object_handler( dst_key: &str, headers: &HeaderMap, ) -> Response { + if let Err(response) = ensure_object_lock_allows_write(state, dst_bucket, dst_key, Some(headers)).await { + return response; + } + let source = copy_source.strip_prefix('/').unwrap_or(copy_source); let (src_bucket, src_key) = match source.split_once('/') { Some(parts) => parts, @@ -1278,8 +1353,26 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R let mut errors = Vec::new(); for obj in &parsed.objects { + if let Err(message) = match state.storage.head_object(bucket, &obj.key).await { + Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await { + Ok(metadata) => object_lock::can_delete_object(&metadata, false), + Err(err) => Err(S3Error::from(err).message), + }, + Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()), + Err(err) => Err(S3Error::from(err).message), + } { + errors.push(( + obj.key.clone(), + S3ErrorCode::AccessDenied.as_str().to_string(), + message, + )); + continue; + } match state.storage.delete_object(bucket, &obj.key).await { - Ok(()) => deleted.push((obj.key.clone(), obj.version_id.clone())), + Ok(()) => { + notifications::emit_object_removed(state, bucket, &obj.key, "", "", "", "Delete"); + deleted.push((obj.key.clone(), obj.version_id.clone())) + } Err(e) => { let s3err = S3Error::from(e); errors.push(( @@ -1966,3 +2059,247 @@ fn validate_post_policy_conditions( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::ServerConfig; + use crate::services::acl::{acl_to_xml, create_canned_acl}; + use http_body_util::BodyExt; + use serde_json::Value; + use tower::ServiceExt; + + const TEST_ACCESS_KEY: &str = "AKIAIOSFODNN7EXAMPLE"; + const TEST_SECRET_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"; + + fn test_state() -> (AppState, tempfile::TempDir) { + let tmp = tempfile::tempdir().unwrap(); + let config_dir = tmp.path().join(".myfsio.sys").join("config"); + std::fs::create_dir_all(&config_dir).unwrap(); + std::fs::write( + config_dir.join("iam.json"), + serde_json::json!({ + "version": 2, + "users": [{ + "user_id": "u-test1234", + "display_name": "admin", + "enabled": true, + "access_keys": [{ + "access_key": TEST_ACCESS_KEY, + "secret_key": TEST_SECRET_KEY, + "status": "active" + }], + "policies": [{ + "bucket": "*", + "actions": ["*"], + "prefix": "*" + }] + }] + }) + .to_string(), + ) + .unwrap(); + + let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let config = ServerConfig { + bind_addr: "127.0.0.1:0".parse().unwrap(), + ui_bind_addr: "127.0.0.1:0".parse().unwrap(), + storage_root: tmp.path().to_path_buf(), + region: "us-east-1".to_string(), + iam_config_path: config_dir.join("iam.json"), + sigv4_timestamp_tolerance_secs: 900, + presigned_url_min_expiry: 1, + presigned_url_max_expiry: 604800, + secret_key: None, + encryption_enabled: false, + kms_enabled: false, + gc_enabled: false, + integrity_enabled: false, + metrics_enabled: false, + metrics_history_enabled: false, + metrics_interval_minutes: 5, + metrics_retention_hours: 24, + metrics_history_interval_minutes: 5, + metrics_history_retention_hours: 24, + lifecycle_enabled: false, + website_hosting_enabled: false, + replication_connect_timeout_secs: 1, + replication_read_timeout_secs: 1, + replication_max_retries: 1, + replication_streaming_threshold_bytes: 10_485_760, + replication_max_failures_per_bucket: 50, + site_sync_enabled: false, + site_sync_interval_secs: 60, + site_sync_batch_size: 100, + site_sync_connect_timeout_secs: 10, + site_sync_read_timeout_secs: 120, + site_sync_max_retries: 2, + site_sync_clock_skew_tolerance: 1.0, + ui_enabled: false, + templates_dir: manifest_dir.join("templates"), + static_dir: manifest_dir.join("static"), + }; + (AppState::new(config), tmp) + } + + fn auth_request( + method: axum::http::Method, + uri: &str, + body: Body, + ) -> axum::http::Request { + axum::http::Request::builder() + .method(method) + .uri(uri) + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .body(body) + .unwrap() + } + + #[tokio::test] + async fn public_bucket_acl_allows_anonymous_reads() { + let (state, _tmp) = test_state(); + state.storage.create_bucket("public").await.unwrap(); + state + .storage + .put_object( + "public", + "hello.txt", + Box::pin(std::io::Cursor::new(b"hello".to_vec())), + None, + ) + .await + .unwrap(); + + let mut config = state.storage.get_bucket_config("public").await.unwrap(); + config.acl = Some(Value::String(acl_to_xml(&create_canned_acl("public-read", "myfsio")))); + state.storage.set_bucket_config("public", &config).await.unwrap(); + + let app = crate::create_router(state); + let response = app + .oneshot( + axum::http::Request::builder() + .method(axum::http::Method::GET) + .uri("/public/hello.txt") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn object_retention_blocks_delete_without_bypass() { + let (state, _tmp) = test_state(); + state.storage.create_bucket("locked").await.unwrap(); + state + .storage + .put_object( + "locked", + "obj.txt", + Box::pin(std::io::Cursor::new(b"data".to_vec())), + None, + ) + .await + .unwrap(); + let app = crate::create_router(state); + + let retention_xml = r#" + + GOVERNANCE + 2099-01-01T00:00:00Z + "#; + let response = app + .clone() + .oneshot(auth_request( + axum::http::Method::PUT, + "/locked/obj.txt?retention", + Body::from(retention_xml), + )) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = app + .clone() + .oneshot(auth_request( + axum::http::Method::DELETE, + "/locked/obj.txt", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::FORBIDDEN); + + let response = app + .oneshot( + axum::http::Request::builder() + .method(axum::http::Method::DELETE) + .uri("/locked/obj.txt") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-bypass-governance-retention", "true") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + } + + #[tokio::test] + async fn object_acl_round_trip_uses_metadata() { + let (state, _tmp) = test_state(); + state.storage.create_bucket("acl").await.unwrap(); + state + .storage + .put_object( + "acl", + "photo.jpg", + Box::pin(std::io::Cursor::new(b"image".to_vec())), + None, + ) + .await + .unwrap(); + let app = crate::create_router(state); + + let response = app + .clone() + .oneshot( + axum::http::Request::builder() + .method(axum::http::Method::PUT) + .uri("/acl/photo.jpg?acl") + .header("x-access-key", TEST_ACCESS_KEY) + .header("x-secret-key", TEST_SECRET_KEY) + .header("x-amz-acl", "public-read") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = app + .oneshot(auth_request( + axum::http::Method::GET, + "/acl/photo.jpg?acl", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = String::from_utf8( + response + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(), + ) + .unwrap(); + assert!(body.contains("AllUsers")); + assert!(body.contains("READ")); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs index c4b3db7..8bdfa13 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_api.rs @@ -1153,13 +1153,6 @@ pub async fn list_copy_targets( Json(json!({ "buckets": buckets })).into_response() } -pub async fn json_not_implemented() -> Response { - json_error( - StatusCode::NOT_IMPLEMENTED, - "This feature is not implemented yet", - ) -} - #[derive(Deserialize)] pub struct ConnectionTestPayload { pub endpoint_url: String, @@ -3163,20 +3156,36 @@ fn apply_history_limit(mut value: Value, limit: Option) -> Value { value } -pub async fn bucket_stub_json(Extension(_session): Extension) -> Response { - Json(json!({"status": "not_implemented", "items": []})).into_response() -} - -pub async fn lifecycle_history_stub( +pub async fn lifecycle_history( State(state): State, Extension(_session): Extension, - Path(_bucket_name): Path, + Path(bucket_name): Path, + Query(params): Query>, ) -> Response { - Json(json!({ - "enabled": state.config.lifecycle_enabled, - "executions": [], - "total": 0, - })) + let limit = params + .get("limit") + .and_then(|value| value.parse::().ok()) + .unwrap_or(50); + let offset = params + .get("offset") + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + if !state.config.lifecycle_enabled { + return Json(json!({ + "executions": [], + "total": 0, + "limit": limit, + "offset": offset, + "enabled": false, + })) + .into_response(); + } + Json(crate::services::lifecycle::read_history( + &state.config.storage_root, + &bucket_name, + limit, + offset, + )) .into_response() } diff --git a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs index 9a59b1a..3be88f7 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/handlers/ui_pages.rs @@ -3015,15 +3015,3 @@ pub async fn update_bucket_website( .into_response(), } } - -pub async fn stub_post(Extension(session): Extension) -> Response { - session.write(|s| s.push_flash("info", "This action is not yet implemented in the Rust UI.")); - Redirect::to("/ui/buckets").into_response() -} - -#[derive(serde::Deserialize)] -pub struct QueryArgs(#[serde(default)] pub HashMap); - -pub async fn json_stub(Query(_q): Query) -> Response { - axum::Json(json!({"status": "not_implemented", "items": []})).into_response() -} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs index 93ee066..3ba3d1a 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/lib.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/lib.rs @@ -106,7 +106,7 @@ pub fn create_ui_router(state: state::AppState) -> Router { ) .route( "/ui/buckets/{bucket_name}/lifecycle/history", - get(ui_api::lifecycle_history_stub), + get(ui_api::lifecycle_history), ) .route( "/ui/buckets/{bucket_name}/replication/status", diff --git a/rust/myfsio-engine/crates/myfsio-server/src/main.rs b/rust/myfsio-engine/crates/myfsio-server/src/main.rs index 204511b..24ca56e 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/main.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/main.rs @@ -113,6 +113,7 @@ async fn main() { let lifecycle = std::sync::Arc::new(myfsio_server::services::lifecycle::LifecycleService::new( state.storage.clone(), + config.storage_root.clone(), myfsio_server::services::lifecycle::LifecycleConfig::default(), )); bg_handles.push(lifecycle.start_background()); diff --git a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs index fe658bd..09f796d 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/middleware/auth.rs @@ -12,6 +12,7 @@ use serde_json::Value; use std::time::Instant; use tokio::io::AsyncReadExt; +use crate::services::acl::acl_from_bucket_config; use crate::state::AppState; fn website_error_response( @@ -589,6 +590,17 @@ async fn authorize_action( if iam_allowed || matches!(policy_decision, PolicyDecision::Allow) { return Ok(()); } + if evaluate_bucket_acl( + state, + bucket, + principal.map(|principal| principal.access_key.as_str()), + action, + principal.is_some(), + ) + .await + { + return Ok(()); + } if principal.is_some() { Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied")) @@ -600,6 +612,27 @@ async fn authorize_action( } } +async fn evaluate_bucket_acl( + state: &AppState, + bucket: &str, + principal_id: Option<&str>, + action: &str, + is_authenticated: bool, +) -> bool { + let config = match state.storage.get_bucket_config(bucket).await { + Ok(config) => config, + Err(_) => return false, + }; + let Some(value) = config.acl.as_ref() else { + return false; + }; + let Some(acl) = acl_from_bucket_config(value) else { + return false; + }; + acl.allowed_actions(principal_id, is_authenticated) + .contains(action) +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum PolicyDecision { Allow, diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/acl.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/acl.rs new file mode 100644 index 0000000..2d26b6e --- /dev/null +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/acl.rs @@ -0,0 +1,278 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::{HashMap, HashSet}; + +pub const ACL_METADATA_KEY: &str = "__acl__"; +pub const GRANTEE_ALL_USERS: &str = "*"; +pub const GRANTEE_AUTHENTICATED_USERS: &str = "authenticated"; + +const ACL_PERMISSION_FULL_CONTROL: &str = "FULL_CONTROL"; +const ACL_PERMISSION_WRITE: &str = "WRITE"; +const ACL_PERMISSION_WRITE_ACP: &str = "WRITE_ACP"; +const ACL_PERMISSION_READ: &str = "READ"; +const ACL_PERMISSION_READ_ACP: &str = "READ_ACP"; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AclGrant { + pub grantee: String, + pub permission: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Acl { + pub owner: String, + #[serde(default)] + pub grants: Vec, +} + +impl Acl { + pub fn allowed_actions( + &self, + principal_id: Option<&str>, + is_authenticated: bool, + ) -> HashSet<&'static str> { + let mut actions = HashSet::new(); + if let Some(principal_id) = principal_id { + if principal_id == self.owner { + actions.extend(permission_to_actions(ACL_PERMISSION_FULL_CONTROL)); + } + } + for grant in &self.grants { + if grant.grantee == GRANTEE_ALL_USERS { + actions.extend(permission_to_actions(&grant.permission)); + } else if grant.grantee == GRANTEE_AUTHENTICATED_USERS && is_authenticated { + actions.extend(permission_to_actions(&grant.permission)); + } else if let Some(principal_id) = principal_id { + if grant.grantee == principal_id { + actions.extend(permission_to_actions(&grant.permission)); + } + } + } + actions + } +} + +pub fn create_canned_acl(canned_acl: &str, owner: &str) -> Acl { + let owner_grant = AclGrant { + grantee: owner.to_string(), + permission: ACL_PERMISSION_FULL_CONTROL.to_string(), + }; + match canned_acl { + "public-read" => Acl { + owner: owner.to_string(), + grants: vec![ + owner_grant, + AclGrant { + grantee: GRANTEE_ALL_USERS.to_string(), + permission: ACL_PERMISSION_READ.to_string(), + }, + ], + }, + "public-read-write" => Acl { + owner: owner.to_string(), + grants: vec![ + owner_grant, + AclGrant { + grantee: GRANTEE_ALL_USERS.to_string(), + permission: ACL_PERMISSION_READ.to_string(), + }, + AclGrant { + grantee: GRANTEE_ALL_USERS.to_string(), + permission: ACL_PERMISSION_WRITE.to_string(), + }, + ], + }, + "authenticated-read" => Acl { + owner: owner.to_string(), + grants: vec![ + owner_grant, + AclGrant { + grantee: GRANTEE_AUTHENTICATED_USERS.to_string(), + permission: ACL_PERMISSION_READ.to_string(), + }, + ], + }, + "bucket-owner-read" | "bucket-owner-full-control" | "private" | _ => Acl { + owner: owner.to_string(), + grants: vec![owner_grant], + }, + } +} + +pub fn acl_to_xml(acl: &Acl) -> String { + let mut xml = format!( + "\ + \ + {}{}\ + ", + xml_escape(&acl.owner), + xml_escape(&acl.owner), + ); + for grant in &acl.grants { + xml.push_str(""); + match grant.grantee.as_str() { + GRANTEE_ALL_USERS => { + xml.push_str( + "\ + http://acs.amazonaws.com/groups/global/AllUsers\ + ", + ); + } + GRANTEE_AUTHENTICATED_USERS => { + xml.push_str( + "\ + http://acs.amazonaws.com/groups/global/AuthenticatedUsers\ + ", + ); + } + other => { + xml.push_str(&format!( + "\ + {}{}\ + ", + xml_escape(other), + xml_escape(other), + )); + } + } + xml.push_str(&format!( + "{}", + xml_escape(&grant.permission) + )); + } + xml.push_str(""); + xml +} + +pub fn acl_from_bucket_config(value: &Value) -> Option { + match value { + Value::String(raw) => acl_from_xml(raw).or_else(|| serde_json::from_str(raw).ok()), + Value::Object(_) => serde_json::from_value(value.clone()).ok(), + _ => None, + } +} + +pub fn acl_from_object_metadata(metadata: &HashMap) -> Option { + metadata + .get(ACL_METADATA_KEY) + .and_then(|raw| serde_json::from_str::(raw).ok()) +} + +pub fn store_object_acl(metadata: &mut HashMap, acl: &Acl) { + if let Ok(serialized) = serde_json::to_string(acl) { + metadata.insert(ACL_METADATA_KEY.to_string(), serialized); + } +} + +fn acl_from_xml(xml: &str) -> Option { + let doc = roxmltree::Document::parse(xml).ok()?; + let owner = doc + .descendants() + .find(|node| node.is_element() && node.tag_name().name() == "Owner") + .and_then(|node| { + node.children() + .find(|child| child.is_element() && child.tag_name().name() == "ID") + .and_then(|child| child.text()) + }) + .unwrap_or("myfsio") + .trim() + .to_string(); + + let mut grants = Vec::new(); + for grant in doc + .descendants() + .filter(|node| node.is_element() && node.tag_name().name() == "Grant") + { + let permission = grant + .children() + .find(|child| child.is_element() && child.tag_name().name() == "Permission") + .and_then(|child| child.text()) + .unwrap_or_default() + .trim() + .to_string(); + if permission.is_empty() { + continue; + } + let grantee_node = grant + .children() + .find(|child| child.is_element() && child.tag_name().name() == "Grantee"); + let grantee = grantee_node + .and_then(|node| { + let uri = node + .children() + .find(|child| child.is_element() && child.tag_name().name() == "URI") + .and_then(|child| child.text()) + .map(|text| text.trim().to_string()); + match uri.as_deref() { + Some("http://acs.amazonaws.com/groups/global/AllUsers") => { + Some(GRANTEE_ALL_USERS.to_string()) + } + Some("http://acs.amazonaws.com/groups/global/AuthenticatedUsers") => { + Some(GRANTEE_AUTHENTICATED_USERS.to_string()) + } + _ => node + .children() + .find(|child| child.is_element() && child.tag_name().name() == "ID") + .and_then(|child| child.text()) + .map(|text| text.trim().to_string()), + } + }) + .unwrap_or_default(); + if grantee.is_empty() { + continue; + } + grants.push(AclGrant { + grantee, + permission, + }); + } + + Some(Acl { owner, grants }) +} + +fn permission_to_actions(permission: &str) -> &'static [&'static str] { + match permission { + ACL_PERMISSION_FULL_CONTROL => &["read", "write", "delete", "list", "share"], + ACL_PERMISSION_WRITE => &["write", "delete"], + ACL_PERMISSION_WRITE_ACP => &["share"], + ACL_PERMISSION_READ => &["read", "list"], + ACL_PERMISSION_READ_ACP => &["share"], + _ => &[], + } +} + +fn xml_escape(s: &str) -> String { + s.replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace('\'', "'") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn canned_acl_grants_public_read() { + let acl = create_canned_acl("public-read", "owner"); + let actions = acl.allowed_actions(None, false); + assert!(actions.contains("read")); + assert!(actions.contains("list")); + assert!(!actions.contains("write")); + } + + #[test] + fn xml_round_trip_preserves_grants() { + let acl = create_canned_acl("authenticated-read", "owner"); + let parsed = acl_from_bucket_config(&Value::String(acl_to_xml(&acl))).unwrap(); + assert_eq!(parsed.owner, "owner"); + assert_eq!(parsed.grants.len(), 2); + assert!( + parsed + .grants + .iter() + .any(|grant| grant.grantee == GRANTEE_AUTHENTICATED_USERS) + ); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/lifecycle.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/lifecycle.rs index 071e6ab..ce2430b 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/services/lifecycle.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/lifecycle.rs @@ -1,31 +1,75 @@ +use chrono::{DateTime, Duration, Utc}; use myfsio_storage::fs_backend::FsStorageBackend; use myfsio_storage::traits::StorageEngine; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::RwLock; pub struct LifecycleConfig { pub interval_seconds: u64, + pub max_history_per_bucket: usize, } impl Default for LifecycleConfig { fn default() -> Self { Self { interval_seconds: 3600, + max_history_per_bucket: 50, } } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LifecycleExecutionRecord { + pub timestamp: f64, + pub bucket_name: String, + pub objects_deleted: u64, + pub versions_deleted: u64, + pub uploads_aborted: u64, + #[serde(default)] + pub errors: Vec, + pub execution_time_seconds: f64, +} + +#[derive(Debug, Clone, Default)] +struct BucketLifecycleResult { + bucket_name: String, + objects_deleted: u64, + versions_deleted: u64, + uploads_aborted: u64, + errors: Vec, + execution_time_seconds: f64, +} + +#[derive(Debug, Clone, Default)] +struct ParsedLifecycleRule { + status: String, + prefix: String, + expiration_days: Option, + expiration_date: Option>, + noncurrent_days: Option, + abort_incomplete_multipart_days: Option, +} + pub struct LifecycleService { storage: Arc, + storage_root: PathBuf, config: LifecycleConfig, running: Arc>, } impl LifecycleService { - pub fn new(storage: Arc, config: LifecycleConfig) -> Self { + pub fn new( + storage: Arc, + storage_root: impl Into, + config: LifecycleConfig, + ) -> Self { Self { storage, + storage_root: storage_root.into(), config, running: Arc::new(RwLock::new(false)), } @@ -47,108 +91,261 @@ impl LifecycleService { async fn evaluate_rules(&self) -> Value { let buckets = match self.storage.list_buckets().await { - Ok(b) => b, - Err(e) => return json!({"error": e.to_string()}), + Ok(buckets) => buckets, + Err(err) => return json!({ "error": err.to_string() }), }; - let mut total_expired = 0u64; - let mut total_multipart_aborted = 0u64; - let mut errors: Vec = Vec::new(); + let mut bucket_results = Vec::new(); + let mut total_objects_deleted = 0u64; + let mut total_versions_deleted = 0u64; + let mut total_uploads_aborted = 0u64; + let mut errors = Vec::new(); for bucket in &buckets { + let started_at = std::time::Instant::now(); + let mut result = BucketLifecycleResult { + bucket_name: bucket.name.clone(), + ..Default::default() + }; + let config = match self.storage.get_bucket_config(&bucket.name).await { - Ok(c) => c, - Err(_) => continue, - }; - - let lifecycle = match &config.lifecycle { - Some(lc) => lc, - None => continue, - }; - - let rules = match lifecycle - .as_str() - .and_then(|s| serde_json::from_str::(s).ok()) - { - Some(v) => v, - None => continue, - }; - - let rules_arr = match rules.get("Rules").and_then(|r| r.as_array()) { - Some(a) => a.clone(), - None => continue, - }; - - for rule in &rules_arr { - if rule.get("Status").and_then(|s| s.as_str()) != Some("Enabled") { + Ok(config) => config, + Err(err) => { + result.errors.push(err.to_string()); + result.execution_time_seconds = started_at.elapsed().as_secs_f64(); + self.append_history(&result); + errors.extend(result.errors.clone()); + bucket_results.push(result); continue; } + }; + let Some(lifecycle) = config.lifecycle.as_ref() else { + continue; + }; + let rules = parse_lifecycle_rules(lifecycle); + if rules.is_empty() { + continue; + } - let prefix = rule - .get("Filter") - .and_then(|f| f.get("Prefix")) - .and_then(|p| p.as_str()) - .or_else(|| rule.get("Prefix").and_then(|p| p.as_str())) - .unwrap_or(""); - - if let Some(exp) = rule.get("Expiration") { - if let Some(days) = exp.get("Days").and_then(|d| d.as_u64()) { - let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64); - let params = myfsio_common::types::ListParams { - max_keys: 1000, - prefix: if prefix.is_empty() { - None - } else { - Some(prefix.to_string()) - }, - ..Default::default() - }; - if let Ok(result) = self.storage.list_objects(&bucket.name, ¶ms).await { - for obj in &result.objects { - if obj.last_modified < cutoff { - match self.storage.delete_object(&bucket.name, &obj.key).await { - Ok(()) => total_expired += 1, - Err(e) => errors - .push(format!("{}:{}: {}", bucket.name, obj.key, e)), - } - } - } - } - } + for rule in &rules { + if rule.status != "Enabled" { + continue; } - - if let Some(abort) = rule.get("AbortIncompleteMultipartUpload") { - if let Some(days) = abort.get("DaysAfterInitiation").and_then(|d| d.as_u64()) { - let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64); - if let Ok(uploads) = self.storage.list_multipart_uploads(&bucket.name).await - { - for upload in &uploads { - if upload.initiated < cutoff { - match self - .storage - .abort_multipart(&bucket.name, &upload.upload_id) - .await - { - Ok(()) => total_multipart_aborted += 1, - Err(e) => errors - .push(format!("abort {}: {}", upload.upload_id, e)), - } - } - } - } - } + if let Some(err) = self + .apply_expiration_rule(&bucket.name, rule, &mut result) + .await + { + result.errors.push(err); } + if let Some(err) = self + .apply_noncurrent_expiration_rule(&bucket.name, rule, &mut result) + .await + { + result.errors.push(err); + } + if let Some(err) = self + .apply_abort_incomplete_multipart_rule(&bucket.name, rule, &mut result) + .await + { + result.errors.push(err); + } + } + + result.execution_time_seconds = started_at.elapsed().as_secs_f64(); + if result.objects_deleted > 0 + || result.versions_deleted > 0 + || result.uploads_aborted > 0 + || !result.errors.is_empty() + { + total_objects_deleted += result.objects_deleted; + total_versions_deleted += result.versions_deleted; + total_uploads_aborted += result.uploads_aborted; + errors.extend(result.errors.clone()); + self.append_history(&result); + bucket_results.push(result); } } json!({ - "objects_expired": total_expired, - "multipart_aborted": total_multipart_aborted, + "objects_deleted": total_objects_deleted, + "versions_deleted": total_versions_deleted, + "multipart_aborted": total_uploads_aborted, "buckets_evaluated": buckets.len(), + "results": bucket_results.iter().map(result_to_json).collect::>(), "errors": errors, }) } + async fn apply_expiration_rule( + &self, + bucket: &str, + rule: &ParsedLifecycleRule, + result: &mut BucketLifecycleResult, + ) -> Option { + let cutoff = if let Some(days) = rule.expiration_days { + Some(Utc::now() - Duration::days(days as i64)) + } else { + rule.expiration_date + }; + let Some(cutoff) = cutoff else { + return None; + }; + + let params = myfsio_common::types::ListParams { + max_keys: 10_000, + prefix: if rule.prefix.is_empty() { + None + } else { + Some(rule.prefix.clone()) + }, + ..Default::default() + }; + match self.storage.list_objects(bucket, ¶ms).await { + Ok(objects) => { + for object in &objects.objects { + if object.last_modified < cutoff { + if let Err(err) = self.storage.delete_object(bucket, &object.key).await { + result + .errors + .push(format!("{}:{}: {}", bucket, object.key, err)); + } else { + result.objects_deleted += 1; + } + } + } + None + } + Err(err) => Some(format!("Failed to list objects for {}: {}", bucket, err)), + } + } + + async fn apply_noncurrent_expiration_rule( + &self, + bucket: &str, + rule: &ParsedLifecycleRule, + result: &mut BucketLifecycleResult, + ) -> Option { + let Some(days) = rule.noncurrent_days else { + return None; + }; + let cutoff = Utc::now() - Duration::days(days as i64); + let versions_root = version_root_for_bucket(&self.storage_root, bucket); + if !versions_root.exists() { + return None; + } + + let mut stack = VecDeque::from([versions_root]); + while let Some(current) = stack.pop_front() { + let entries = match std::fs::read_dir(¤t) { + Ok(entries) => entries, + Err(err) => return Some(err.to_string()), + }; + for entry in entries.flatten() { + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(_) => continue, + }; + if file_type.is_dir() { + stack.push_back(entry.path()); + continue; + } + if entry.path().extension().and_then(|ext| ext.to_str()) != Some("json") { + continue; + } + let contents = match std::fs::read_to_string(entry.path()) { + Ok(contents) => contents, + Err(_) => continue, + }; + let Ok(manifest) = serde_json::from_str::(&contents) else { + continue; + }; + let key = manifest + .get("key") + .and_then(|value| value.as_str()) + .unwrap_or_default() + .to_string(); + if !rule.prefix.is_empty() && !key.starts_with(&rule.prefix) { + continue; + } + let archived_at = manifest + .get("archived_at") + .and_then(|value| value.as_str()) + .and_then(|value| DateTime::parse_from_rfc3339(value).ok()) + .map(|value| value.with_timezone(&Utc)); + if archived_at.is_none() || archived_at.unwrap() >= cutoff { + continue; + } + let version_id = manifest + .get("version_id") + .and_then(|value| value.as_str()) + .unwrap_or_default(); + let data_path = entry.path().with_file_name(format!("{}.bin", version_id)); + let _ = std::fs::remove_file(&data_path); + let _ = std::fs::remove_file(entry.path()); + result.versions_deleted += 1; + } + } + None + } + + async fn apply_abort_incomplete_multipart_rule( + &self, + bucket: &str, + rule: &ParsedLifecycleRule, + result: &mut BucketLifecycleResult, + ) -> Option { + let Some(days) = rule.abort_incomplete_multipart_days else { + return None; + }; + let cutoff = Utc::now() - Duration::days(days as i64); + match self.storage.list_multipart_uploads(bucket).await { + Ok(uploads) => { + for upload in &uploads { + if upload.initiated < cutoff { + if let Err(err) = self.storage.abort_multipart(bucket, &upload.upload_id).await + { + result + .errors + .push(format!("abort {}: {}", upload.upload_id, err)); + } else { + result.uploads_aborted += 1; + } + } + } + None + } + Err(err) => Some(format!("Failed to list multipart uploads for {}: {}", bucket, err)), + } + } + + fn append_history(&self, result: &BucketLifecycleResult) { + let path = lifecycle_history_path(&self.storage_root, &result.bucket_name); + let mut history = load_history(&path); + history.insert( + 0, + LifecycleExecutionRecord { + timestamp: Utc::now().timestamp_millis() as f64 / 1000.0, + bucket_name: result.bucket_name.clone(), + objects_deleted: result.objects_deleted, + versions_deleted: result.versions_deleted, + uploads_aborted: result.uploads_aborted, + errors: result.errors.clone(), + execution_time_seconds: result.execution_time_seconds, + }, + ); + history.truncate(self.config.max_history_per_bucket); + let payload = json!({ + "executions": history, + }); + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let _ = std::fs::write( + &path, + serde_json::to_string_pretty(&payload).unwrap_or_else(|_| "{}".to_string()), + ); + } + pub fn start_background(self: Arc) -> tokio::task::JoinHandle<()> { let interval = std::time::Duration::from_secs(self.config.interval_seconds); tokio::spawn(async move { @@ -159,9 +356,277 @@ impl LifecycleService { tracing::info!("Lifecycle evaluation starting"); match self.run_cycle().await { Ok(result) => tracing::info!("Lifecycle cycle complete: {:?}", result), - Err(e) => tracing::warn!("Lifecycle cycle failed: {}", e), + Err(err) => tracing::warn!("Lifecycle cycle failed: {}", err), } } }) } } + +pub fn read_history(storage_root: &Path, bucket_name: &str, limit: usize, offset: usize) -> Value { + let path = lifecycle_history_path(storage_root, bucket_name); + let mut history = load_history(&path); + let total = history.len(); + let executions = history + .drain(offset.min(total)..) + .take(limit) + .collect::>(); + json!({ + "executions": executions, + "total": total, + "limit": limit, + "offset": offset, + "enabled": true, + }) +} + +fn load_history(path: &Path) -> Vec { + if !path.exists() { + return Vec::new(); + } + std::fs::read_to_string(path) + .ok() + .and_then(|contents| serde_json::from_str::(&contents).ok()) + .and_then(|value| value.get("executions").cloned()) + .and_then(|value| serde_json::from_value::>(value).ok()) + .unwrap_or_default() +} + +fn lifecycle_history_path(storage_root: &Path, bucket_name: &str) -> PathBuf { + storage_root + .join(".myfsio.sys") + .join("buckets") + .join(bucket_name) + .join("lifecycle_history.json") +} + +fn version_root_for_bucket(storage_root: &Path, bucket_name: &str) -> PathBuf { + storage_root + .join(".myfsio.sys") + .join("buckets") + .join(bucket_name) + .join("versions") +} + +fn parse_lifecycle_rules(value: &Value) -> Vec { + match value { + Value::String(raw) => parse_lifecycle_rules_from_string(raw), + Value::Array(items) => items.iter().filter_map(parse_lifecycle_rule).collect(), + Value::Object(map) => map + .get("Rules") + .and_then(|rules| rules.as_array()) + .map(|rules| rules.iter().filter_map(parse_lifecycle_rule).collect()) + .unwrap_or_default(), + _ => Vec::new(), + } +} + +fn parse_lifecycle_rules_from_string(raw: &str) -> Vec { + if let Ok(json) = serde_json::from_str::(raw) { + return parse_lifecycle_rules(&json); + } + let Ok(doc) = roxmltree::Document::parse(raw) else { + return Vec::new(); + }; + doc.descendants() + .filter(|node| node.is_element() && node.tag_name().name() == "Rule") + .map(|rule| ParsedLifecycleRule { + status: child_text(&rule, "Status").unwrap_or_else(|| "Enabled".to_string()), + prefix: child_text(&rule, "Prefix") + .or_else(|| { + rule.descendants() + .find(|node| { + node.is_element() + && node.tag_name().name() == "Filter" + && node + .children() + .any(|child| { + child.is_element() + && child.tag_name().name() == "Prefix" + }) + }) + .and_then(|filter| child_text(&filter, "Prefix")) + }) + .unwrap_or_default(), + expiration_days: rule + .descendants() + .find(|node| node.is_element() && node.tag_name().name() == "Expiration") + .and_then(|expiration| child_text(&expiration, "Days")) + .and_then(|value| value.parse::().ok()), + expiration_date: rule + .descendants() + .find(|node| node.is_element() && node.tag_name().name() == "Expiration") + .and_then(|expiration| child_text(&expiration, "Date")) + .as_deref() + .and_then(parse_datetime), + noncurrent_days: rule + .descendants() + .find(|node| { + node.is_element() && node.tag_name().name() == "NoncurrentVersionExpiration" + }) + .and_then(|node| child_text(&node, "NoncurrentDays")) + .and_then(|value| value.parse::().ok()), + abort_incomplete_multipart_days: rule + .descendants() + .find(|node| { + node.is_element() + && node.tag_name().name() == "AbortIncompleteMultipartUpload" + }) + .and_then(|node| child_text(&node, "DaysAfterInitiation")) + .and_then(|value| value.parse::().ok()), + }) + .collect() +} + +fn parse_lifecycle_rule(value: &Value) -> Option { + let map = value.as_object()?; + Some(ParsedLifecycleRule { + status: map + .get("Status") + .and_then(|value| value.as_str()) + .unwrap_or("Enabled") + .to_string(), + prefix: map + .get("Prefix") + .and_then(|value| value.as_str()) + .or_else(|| { + map.get("Filter") + .and_then(|value| value.get("Prefix")) + .and_then(|value| value.as_str()) + }) + .unwrap_or_default() + .to_string(), + expiration_days: map + .get("Expiration") + .and_then(|value| value.get("Days")) + .and_then(|value| value.as_u64()), + expiration_date: map + .get("Expiration") + .and_then(|value| value.get("Date")) + .and_then(|value| value.as_str()) + .and_then(parse_datetime), + noncurrent_days: map + .get("NoncurrentVersionExpiration") + .and_then(|value| value.get("NoncurrentDays")) + .and_then(|value| value.as_u64()), + abort_incomplete_multipart_days: map + .get("AbortIncompleteMultipartUpload") + .and_then(|value| value.get("DaysAfterInitiation")) + .and_then(|value| value.as_u64()), + }) +} + +fn parse_datetime(value: &str) -> Option> { + DateTime::parse_from_rfc3339(value) + .ok() + .map(|value| value.with_timezone(&Utc)) +} + +fn child_text(node: &roxmltree::Node<'_, '_>, name: &str) -> Option { + node.children() + .find(|child| child.is_element() && child.tag_name().name() == name) + .and_then(|child| child.text()) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) +} + +fn result_to_json(result: &BucketLifecycleResult) -> Value { + json!({ + "bucket_name": result.bucket_name, + "objects_deleted": result.objects_deleted, + "versions_deleted": result.versions_deleted, + "uploads_aborted": result.uploads_aborted, + "errors": result.errors, + "execution_time_seconds": result.execution_time_seconds, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + + #[test] + fn parses_rules_from_xml() { + let xml = r#" + + + Enabled + logs/ + 10 + 30 + 7 + + "#; + let rules = parse_lifecycle_rules(&Value::String(xml.to_string())); + assert_eq!(rules.len(), 1); + assert_eq!(rules[0].prefix, "logs/"); + assert_eq!(rules[0].expiration_days, Some(10)); + assert_eq!(rules[0].noncurrent_days, Some(30)); + assert_eq!(rules[0].abort_incomplete_multipart_days, Some(7)); + } + + #[tokio::test] + async fn run_cycle_writes_history_and_deletes_noncurrent_versions() { + let tmp = tempfile::tempdir().unwrap(); + let storage = Arc::new(FsStorageBackend::new(tmp.path().to_path_buf())); + storage.create_bucket("docs").await.unwrap(); + storage.set_versioning("docs", true).await.unwrap(); + + storage + .put_object( + "docs", + "logs/file.txt", + Box::pin(std::io::Cursor::new(b"old".to_vec())), + None, + ) + .await + .unwrap(); + storage + .put_object( + "docs", + "logs/file.txt", + Box::pin(std::io::Cursor::new(b"new".to_vec())), + None, + ) + .await + .unwrap(); + + let versions_root = version_root_for_bucket(tmp.path(), "docs").join("logs").join("file.txt"); + let manifest = std::fs::read_dir(&versions_root) + .unwrap() + .flatten() + .find(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("json")) + .unwrap() + .path(); + let old_manifest = json!({ + "version_id": "ver-1", + "key": "logs/file.txt", + "size": 3, + "archived_at": (Utc::now() - Duration::days(45)).to_rfc3339(), + "etag": "etag", + }); + std::fs::write(&manifest, serde_json::to_string(&old_manifest).unwrap()).unwrap(); + std::fs::write(manifest.with_file_name("ver-1.bin"), b"old").unwrap(); + + let lifecycle_xml = r#" + + + Enabled + logs/ + 30 + + "#; + let mut config = storage.get_bucket_config("docs").await.unwrap(); + config.lifecycle = Some(Value::String(lifecycle_xml.to_string())); + storage.set_bucket_config("docs", &config).await.unwrap(); + + let service = LifecycleService::new(storage.clone(), tmp.path(), LifecycleConfig::default()); + let result = service.run_cycle().await.unwrap(); + assert_eq!(result["versions_deleted"], 1); + + let history = read_history(tmp.path(), "docs", 50, 0); + assert_eq!(history["total"], 1); + assert_eq!(history["executions"][0]["versions_deleted"], 1); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/mod.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/mod.rs index 9da9f01..7a7f441 100644 --- a/rust/myfsio-engine/crates/myfsio-server/src/services/mod.rs +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/mod.rs @@ -1,8 +1,11 @@ +pub mod acl; pub mod access_logging; pub mod gc; pub mod integrity; pub mod lifecycle; pub mod metrics; +pub mod notifications; +pub mod object_lock; pub mod replication; pub mod s3_client; pub mod site_registry; diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/notifications.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/notifications.rs new file mode 100644 index 0000000..479663d --- /dev/null +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/notifications.rs @@ -0,0 +1,294 @@ +use crate::state::AppState; +use chrono::{DateTime, Utc}; +use myfsio_storage::traits::StorageEngine; +use serde::Serialize; +use serde_json::json; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WebhookDestination { + pub url: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NotificationConfiguration { + pub id: String, + pub events: Vec, + pub destination: WebhookDestination, + pub prefix_filter: String, + pub suffix_filter: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NotificationEvent { + #[serde(rename = "eventVersion")] + event_version: &'static str, + #[serde(rename = "eventSource")] + event_source: &'static str, + #[serde(rename = "awsRegion")] + aws_region: &'static str, + #[serde(rename = "eventTime")] + event_time: String, + #[serde(rename = "eventName")] + event_name: String, + #[serde(rename = "userIdentity")] + user_identity: serde_json::Value, + #[serde(rename = "requestParameters")] + request_parameters: serde_json::Value, + #[serde(rename = "responseElements")] + response_elements: serde_json::Value, + s3: serde_json::Value, +} + +impl NotificationConfiguration { + pub fn matches_event(&self, event_name: &str, object_key: &str) -> bool { + let event_match = self.events.iter().any(|pattern| { + if let Some(prefix) = pattern.strip_suffix('*') { + event_name.starts_with(prefix) + } else { + pattern == event_name + } + }); + if !event_match { + return false; + } + if !self.prefix_filter.is_empty() && !object_key.starts_with(&self.prefix_filter) { + return false; + } + if !self.suffix_filter.is_empty() && !object_key.ends_with(&self.suffix_filter) { + return false; + } + true + } +} + +pub fn parse_notification_configurations(xml: &str) -> Result, String> { + let doc = roxmltree::Document::parse(xml).map_err(|err| err.to_string())?; + let mut configs = Vec::new(); + + for webhook in doc + .descendants() + .filter(|node| node.is_element() && node.tag_name().name() == "WebhookConfiguration") + { + let id = child_text(&webhook, "Id").unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + let events = webhook + .children() + .filter(|node| node.is_element() && node.tag_name().name() == "Event") + .filter_map(|node| node.text()) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) + .collect::>(); + + let destination = webhook + .children() + .find(|node| node.is_element() && node.tag_name().name() == "Destination"); + let url = destination + .as_ref() + .and_then(|node| child_text(node, "Url")) + .unwrap_or_default(); + if url.trim().is_empty() { + return Err("Destination URL is required".to_string()); + } + + let mut prefix_filter = String::new(); + let mut suffix_filter = String::new(); + if let Some(filter) = webhook + .children() + .find(|node| node.is_element() && node.tag_name().name() == "Filter") + { + if let Some(key) = filter + .children() + .find(|node| node.is_element() && node.tag_name().name() == "S3Key") + { + for rule in key + .children() + .filter(|node| node.is_element() && node.tag_name().name() == "FilterRule") + { + let name = child_text(&rule, "Name").unwrap_or_default(); + let value = child_text(&rule, "Value").unwrap_or_default(); + if name == "prefix" { + prefix_filter = value; + } else if name == "suffix" { + suffix_filter = value; + } + } + } + } + + configs.push(NotificationConfiguration { + id, + events, + destination: WebhookDestination { url }, + prefix_filter, + suffix_filter, + }); + } + + Ok(configs) +} + +pub fn emit_object_created( + state: &AppState, + bucket: &str, + key: &str, + size: u64, + etag: Option<&str>, + request_id: &str, + source_ip: &str, + user_identity: &str, + operation: &str, +) { + emit_notifications( + state.clone(), + bucket.to_string(), + key.to_string(), + format!("s3:ObjectCreated:{}", operation), + size, + etag.unwrap_or_default().to_string(), + request_id.to_string(), + source_ip.to_string(), + user_identity.to_string(), + ); +} + +pub fn emit_object_removed( + state: &AppState, + bucket: &str, + key: &str, + request_id: &str, + source_ip: &str, + user_identity: &str, + operation: &str, +) { + emit_notifications( + state.clone(), + bucket.to_string(), + key.to_string(), + format!("s3:ObjectRemoved:{}", operation), + 0, + String::new(), + request_id.to_string(), + source_ip.to_string(), + user_identity.to_string(), + ); +} + +fn emit_notifications( + state: AppState, + bucket: String, + key: String, + event_name: String, + size: u64, + etag: String, + request_id: String, + source_ip: String, + user_identity: String, +) { + tokio::spawn(async move { + let config = match state.storage.get_bucket_config(&bucket).await { + Ok(config) => config, + Err(_) => return, + }; + let raw = match config.notification { + Some(serde_json::Value::String(raw)) => raw, + _ => return, + }; + let configs = match parse_notification_configurations(&raw) { + Ok(configs) => configs, + Err(err) => { + tracing::warn!("Invalid notification config for bucket {}: {}", bucket, err); + return; + } + }; + + let record = NotificationEvent { + event_version: "2.1", + event_source: "myfsio:s3", + aws_region: "local", + event_time: format_event_time(Utc::now()), + event_name: event_name.clone(), + user_identity: json!({ "principalId": if user_identity.is_empty() { "ANONYMOUS" } else { &user_identity } }), + request_parameters: json!({ "sourceIPAddress": if source_ip.is_empty() { "127.0.0.1" } else { &source_ip } }), + response_elements: json!({ + "x-amz-request-id": request_id, + "x-amz-id-2": request_id, + }), + s3: json!({ + "s3SchemaVersion": "1.0", + "configurationId": "notification", + "bucket": { + "name": bucket, + "ownerIdentity": { "principalId": "local" }, + "arn": format!("arn:aws:s3:::{}", bucket), + }, + "object": { + "key": key, + "size": size, + "eTag": etag, + "versionId": "null", + "sequencer": format!("{:016X}", Utc::now().timestamp_millis()), + } + }), + }; + let payload = json!({ "Records": [record] }); + let client = reqwest::Client::new(); + + for config in configs { + if !config.matches_event(&event_name, &key) { + continue; + } + let result = client + .post(&config.destination.url) + .header("content-type", "application/json") + .json(&payload) + .send() + .await; + if let Err(err) = result { + tracing::warn!( + "Failed to deliver notification for {} to {}: {}", + event_name, + config.destination.url, + err + ); + } + } + }); +} + +fn format_event_time(value: DateTime) -> String { + value.format("%Y-%m-%dT%H:%M:%S.000Z").to_string() +} + +fn child_text(node: &roxmltree::Node<'_, '_>, name: &str) -> Option { + node.children() + .find(|child| child.is_element() && child.tag_name().name() == name) + .and_then(|child| child.text()) + .map(|text| text.trim().to_string()) + .filter(|text| !text.is_empty()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_webhook_configuration() { + let xml = r#" + + + upload + s3:ObjectCreated:* + https://example.com/hook + + + prefixlogs/ + suffix.txt + + + + "#; + let configs = parse_notification_configurations(xml).unwrap(); + assert_eq!(configs.len(), 1); + assert!(configs[0].matches_event("s3:ObjectCreated:Put", "logs/test.txt")); + assert!(!configs[0].matches_event("s3:ObjectRemoved:Delete", "logs/test.txt")); + } +} diff --git a/rust/myfsio-engine/crates/myfsio-server/src/services/object_lock.rs b/rust/myfsio-engine/crates/myfsio-server/src/services/object_lock.rs new file mode 100644 index 0000000..7e93883 --- /dev/null +++ b/rust/myfsio-engine/crates/myfsio-server/src/services/object_lock.rs @@ -0,0 +1,128 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub const LEGAL_HOLD_METADATA_KEY: &str = "__legal_hold__"; +pub const RETENTION_METADATA_KEY: &str = "__object_retention__"; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum RetentionMode { + GOVERNANCE, + COMPLIANCE, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ObjectLockRetention { + pub mode: RetentionMode, + pub retain_until_date: DateTime, +} + +impl ObjectLockRetention { + pub fn is_expired(&self) -> bool { + Utc::now() > self.retain_until_date + } +} + +pub fn get_object_retention(metadata: &HashMap) -> Option { + metadata + .get(RETENTION_METADATA_KEY) + .and_then(|raw| serde_json::from_str::(raw).ok()) +} + +pub fn set_object_retention( + metadata: &mut HashMap, + retention: &ObjectLockRetention, +) -> Result<(), String> { + let encoded = serde_json::to_string(retention).map_err(|err| err.to_string())?; + metadata.insert(RETENTION_METADATA_KEY.to_string(), encoded); + Ok(()) +} + +pub fn get_legal_hold(metadata: &HashMap) -> bool { + metadata + .get(LEGAL_HOLD_METADATA_KEY) + .map(|value| value.eq_ignore_ascii_case("ON") || value.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +pub fn set_legal_hold(metadata: &mut HashMap, enabled: bool) { + metadata.insert( + LEGAL_HOLD_METADATA_KEY.to_string(), + if enabled { "ON" } else { "OFF" }.to_string(), + ); +} + +pub fn ensure_retention_mutable( + metadata: &HashMap, + bypass_governance: bool, +) -> Result<(), String> { + let Some(existing) = get_object_retention(metadata) else { + return Ok(()); + }; + if existing.is_expired() { + return Ok(()); + } + match existing.mode { + RetentionMode::COMPLIANCE => Err(format!( + "Cannot modify retention on object with COMPLIANCE mode until retention expires" + )), + RetentionMode::GOVERNANCE if !bypass_governance => Err( + "Cannot modify GOVERNANCE retention without bypass-governance permission".to_string(), + ), + RetentionMode::GOVERNANCE => Ok(()), + } +} + +pub fn can_delete_object( + metadata: &HashMap, + bypass_governance: bool, +) -> Result<(), String> { + if get_legal_hold(metadata) { + return Err("Object is under legal hold".to_string()); + } + if let Some(retention) = get_object_retention(metadata) { + if !retention.is_expired() { + return match retention.mode { + RetentionMode::COMPLIANCE => Err(format!( + "Object is locked in COMPLIANCE mode until {}", + retention.retain_until_date.to_rfc3339() + )), + RetentionMode::GOVERNANCE if !bypass_governance => Err(format!( + "Object is locked in GOVERNANCE mode until {}", + retention.retain_until_date.to_rfc3339() + )), + RetentionMode::GOVERNANCE => Ok(()), + }; + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + + #[test] + fn legal_hold_blocks_delete() { + let mut metadata = HashMap::new(); + set_legal_hold(&mut metadata, true); + let err = can_delete_object(&metadata, false).unwrap_err(); + assert!(err.contains("legal hold")); + } + + #[test] + fn governance_requires_bypass() { + let mut metadata = HashMap::new(); + set_object_retention( + &mut metadata, + &ObjectLockRetention { + mode: RetentionMode::GOVERNANCE, + retain_until_date: Utc::now() + Duration::hours(1), + }, + ) + .unwrap(); + assert!(can_delete_object(&metadata, false).is_err()); + assert!(can_delete_object(&metadata, true).is_ok()); + } +}