Add missing features - notifcations, object lock, acl

This commit is contained in:
2026-04-21 00:27:50 +08:00
parent ddcdb4026c
commit 501d563df2
12 changed files with 1911 additions and 158 deletions

View File

@@ -1,10 +1,19 @@
use axum::body::Body; use axum::body::Body;
use axum::http::StatusCode; use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Utc};
use myfsio_common::error::{S3Error, S3ErrorCode}; use myfsio_common::error::{S3Error, S3ErrorCode};
use myfsio_storage::traits::StorageEngine; use myfsio_storage::traits::StorageEngine;
use crate::services::acl::{
acl_from_object_metadata, acl_to_xml, create_canned_acl, store_object_acl,
};
use crate::services::notifications::parse_notification_configurations;
use crate::services::object_lock::{
ensure_retention_mutable, get_legal_hold, get_object_retention as retention_from_metadata,
set_legal_hold, set_object_retention as store_retention, ObjectLockRetention, RetentionMode,
};
use crate::state::AppState; use crate::state::AppState;
fn xml_response(status: StatusCode, xml: String) -> Response { fn xml_response(status: StatusCode, xml: String) -> Response {
@@ -32,6 +41,16 @@ fn json_response(status: StatusCode, value: serde_json::Value) -> Response {
.into_response() .into_response()
} }
fn custom_xml_error(status: StatusCode, code: &str, message: &str) -> Response {
let xml = format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Error><Code>{}</Code><Message>{}</Message><Resource></Resource><RequestId></RequestId></Error>",
xml_escape(code),
xml_escape(message),
);
xml_response(status, xml)
}
pub async fn get_versioning(state: &AppState, bucket: &str) -> Response { pub async fn get_versioning(state: &AppState, bucket: &str) -> Response {
match state.storage.is_versioning_enabled(bucket).await { match state.storage.is_versioning_enabled(bucket).await {
Ok(enabled) => { Ok(enabled) => {
@@ -847,13 +866,34 @@ pub async fn delete_object_lock(state: &AppState, bucket: &str) -> Response {
pub async fn put_notification(state: &AppState, bucket: &str, body: Body) -> Response { pub async fn put_notification(state: &AppState, bucket: &str, body: Body) -> Response {
let body_bytes = match http_body_util::BodyExt::collect(body).await { let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(), Ok(collected) => collected.to_bytes(),
Err(_) => return StatusCode::BAD_REQUEST.into_response(), Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"MalformedXML",
"Unable to parse XML document",
)
}
};
let raw = String::from_utf8_lossy(&body_bytes).to_string();
let notification = if raw.trim().is_empty() {
None
} else {
match parse_notification_configurations(&raw) {
Ok(_) => Some(serde_json::Value::String(raw)),
Err(message) => {
let code = if message.contains("Destination URL is required") {
"InvalidArgument"
} else {
"MalformedXML"
};
return custom_xml_error(StatusCode::BAD_REQUEST, code, &message);
}
}
}; };
let value = serde_json::Value::String(String::from_utf8_lossy(&body_bytes).to_string());
match state.storage.get_bucket_config(bucket).await { match state.storage.get_bucket_config(bucket).await {
Ok(mut config) => { Ok(mut config) => {
config.notification = Some(value); config.notification = notification;
match state.storage.set_bucket_config(bucket, &config).await { match state.storage.set_bucket_config(bucket, &config).await {
Ok(()) => StatusCode::OK.into_response(), Ok(()) => StatusCode::OK.into_response(),
Err(e) => storage_err(e), Err(e) => storage_err(e),
@@ -1094,40 +1134,64 @@ pub async fn delete_object_tagging(state: &AppState, bucket: &str, key: &str) ->
} }
} }
pub async fn get_object_acl(state: &AppState, bucket: &str, key: &str) -> Response { pub async fn put_object_acl(
state: &AppState,
bucket: &str,
key: &str,
headers: &HeaderMap,
_body: Body,
) -> Response {
match state.storage.head_object(bucket, key).await { match state.storage.head_object(bucket, key).await {
Ok(_) => { Ok(_) => {
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ let canned_acl = headers
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ .get("x-amz-acl")
<Owner><ID>myfsio</ID><DisplayName>myfsio</DisplayName></Owner>\ .and_then(|value| value.to_str().ok())
<AccessControlList>\ .unwrap_or("private");
<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\ let mut metadata = match state.storage.get_object_metadata(bucket, key).await {
<ID>myfsio</ID><DisplayName>myfsio</DisplayName></Grantee>\ Ok(metadata) => metadata,
<Permission>FULL_CONTROL</Permission></Grant>\ Err(err) => return storage_err(err),
</AccessControlList></AccessControlPolicy>"; };
xml_response(StatusCode::OK, xml.to_string()) let owner = acl_from_object_metadata(&metadata)
.map(|acl| acl.owner)
.unwrap_or_else(|| "myfsio".to_string());
let acl = create_canned_acl(canned_acl, &owner);
store_object_acl(&mut metadata, &acl);
match state.storage.put_object_metadata(bucket, key, &metadata).await {
Ok(()) => StatusCode::OK.into_response(),
Err(err) => storage_err(err),
} }
Err(e) => storage_err(e),
} }
}
pub async fn put_object_acl(state: &AppState, bucket: &str, key: &str, _body: Body) -> Response {
match state.storage.head_object(bucket, key).await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
} }
pub async fn get_object_retention(state: &AppState, bucket: &str, key: &str) -> Response { pub async fn get_object_retention(state: &AppState, bucket: &str, key: &str) -> Response {
match state.storage.head_object(bucket, key).await { match state.storage.head_object(bucket, key).await {
Ok(_) => xml_response( Ok(_) => {
let metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return storage_err(err),
};
if let Some(retention) = retention_from_metadata(&metadata) {
let xml = format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<Retention xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Mode>{}</Mode><RetainUntilDate>{}</RetainUntilDate></Retention>",
match retention.mode {
RetentionMode::GOVERNANCE => "GOVERNANCE",
RetentionMode::COMPLIANCE => "COMPLIANCE",
},
retention.retain_until_date.format("%Y-%m-%dT%H:%M:%S.000Z"),
);
xml_response(StatusCode::OK, xml)
} else {
custom_xml_error(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
S3Error::new( "NoSuchObjectLockConfiguration",
S3ErrorCode::InvalidRequest, "No retention policy",
"No retention policy configured",
) )
.to_xml(), }
), }
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
} }
@@ -1136,21 +1200,108 @@ pub async fn put_object_retention(
state: &AppState, state: &AppState,
bucket: &str, bucket: &str,
key: &str, key: &str,
_body: Body, headers: &HeaderMap,
body: Body,
) -> Response { ) -> Response {
match state.storage.head_object(bucket, key).await { match state.storage.head_object(bucket, key).await {
Ok(_) => StatusCode::OK.into_response(), Ok(_) => {}
Err(e) => storage_err(e), Err(e) => return storage_err(e),
}
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"MalformedXML",
"Unable to parse XML document",
)
}
};
let body_str = String::from_utf8_lossy(&body_bytes);
let doc = match roxmltree::Document::parse(&body_str) {
Ok(doc) => doc,
Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"MalformedXML",
"Unable to parse XML document",
)
}
};
let mode = find_xml_text(&doc, "Mode").unwrap_or_default();
let retain_until = find_xml_text(&doc, "RetainUntilDate").unwrap_or_default();
if mode.is_empty() || retain_until.is_empty() {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"InvalidArgument",
"Mode and RetainUntilDate are required",
);
}
let mode = match mode.as_str() {
"GOVERNANCE" => RetentionMode::GOVERNANCE,
"COMPLIANCE" => RetentionMode::COMPLIANCE,
other => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"InvalidArgument",
&format!("Invalid retention mode: {}", other),
)
}
};
let retain_until_date = match DateTime::parse_from_rfc3339(&retain_until) {
Ok(value) => value.with_timezone(&Utc),
Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"InvalidArgument",
&format!("Invalid date format: {}", retain_until),
)
}
};
let bypass_governance = headers
.get("x-amz-bypass-governance-retention")
.and_then(|value| value.to_str().ok())
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let mut metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return storage_err(err),
};
if let Err(message) = ensure_retention_mutable(&metadata, bypass_governance) {
return custom_xml_error(StatusCode::FORBIDDEN, "AccessDenied", &message);
}
if let Err(message) = store_retention(
&mut metadata,
&ObjectLockRetention {
mode,
retain_until_date,
},
) {
return custom_xml_error(StatusCode::BAD_REQUEST, "InvalidArgument", &message);
}
match state.storage.put_object_metadata(bucket, key, &metadata).await {
Ok(()) => StatusCode::OK.into_response(),
Err(err) => storage_err(err),
} }
} }
pub async fn get_object_legal_hold(state: &AppState, bucket: &str, key: &str) -> Response { pub async fn get_object_legal_hold(state: &AppState, bucket: &str, key: &str) -> Response {
match state.storage.head_object(bucket, key).await { match state.storage.head_object(bucket, key).await {
Ok(_) => { Ok(_) => {
let xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ let metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return storage_err(err),
};
let status = if get_legal_hold(&metadata) { "ON" } else { "OFF" };
let xml = format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<LegalHold xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ <LegalHold xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Status>OFF</Status></LegalHold>"; <Status>{}</Status></LegalHold>",
xml_response(StatusCode::OK, xml.to_string()) status
);
xml_response(StatusCode::OK, xml)
} }
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
@@ -1160,14 +1311,80 @@ pub async fn put_object_legal_hold(
state: &AppState, state: &AppState,
bucket: &str, bucket: &str,
key: &str, key: &str,
_body: Body, body: Body,
) -> Response { ) -> Response {
match state.storage.head_object(bucket, key).await { match state.storage.head_object(bucket, key).await {
Ok(_) => StatusCode::OK.into_response(), Ok(_) => {}
Err(e) => return storage_err(e),
}
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"MalformedXML",
"Unable to parse XML document",
)
}
};
let body_str = String::from_utf8_lossy(&body_bytes);
let doc = match roxmltree::Document::parse(&body_str) {
Ok(doc) => doc,
Err(_) => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"MalformedXML",
"Unable to parse XML document",
)
}
};
let status = find_xml_text(&doc, "Status").unwrap_or_default();
let enabled = match status.as_str() {
"ON" => true,
"OFF" => false,
_ => {
return custom_xml_error(
StatusCode::BAD_REQUEST,
"InvalidArgument",
"Status must be ON or OFF",
)
}
};
let mut metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return storage_err(err),
};
set_legal_hold(&mut metadata, enabled);
match state.storage.put_object_metadata(bucket, key, &metadata).await {
Ok(()) => StatusCode::OK.into_response(),
Err(err) => storage_err(err),
}
}
pub async fn get_object_acl(state: &AppState, bucket: &str, key: &str) -> Response {
match state.storage.head_object(bucket, key).await {
Ok(_) => {
let metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return storage_err(err),
};
let acl = acl_from_object_metadata(&metadata)
.unwrap_or_else(|| create_canned_acl("private", "myfsio"));
xml_response(StatusCode::OK, acl_to_xml(&acl))
}
Err(e) => storage_err(e), Err(e) => storage_err(e),
} }
} }
fn find_xml_text(doc: &roxmltree::Document<'_>, name: &str) -> Option<String> {
doc.descendants()
.find(|node| node.is_element() && node.tag_name().name() == name)
.and_then(|node| node.text())
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{legacy_logging_config, parse_logging_config_xml}; use super::{legacy_logging_config, parse_logging_config_xml};

View File

@@ -24,6 +24,8 @@ use myfsio_storage::traits::StorageEngine;
use tokio::io::AsyncSeekExt; use tokio::io::AsyncSeekExt;
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use crate::services::notifications;
use crate::services::object_lock;
use crate::state::AppState; use crate::state::AppState;
fn s3_error_response(err: S3Error) -> Response { fn s3_error_response(err: S3Error) -> Response {
@@ -45,6 +47,39 @@ fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response {
s3_error_response(S3Error::from(err)) s3_error_response(S3Error::from(err))
} }
async fn ensure_object_lock_allows_write(
state: &AppState,
bucket: &str,
key: &str,
headers: Option<&HeaderMap>,
) -> Result<(), Response> {
match state.storage.head_object(bucket, key).await {
Ok(_) => {
let metadata = match state.storage.get_object_metadata(bucket, key).await {
Ok(metadata) => metadata,
Err(err) => return Err(storage_err_response(err)),
};
let bypass_governance = headers
.and_then(|headers| {
headers
.get("x-amz-bypass-governance-retention")
.and_then(|value| value.to_str().ok())
})
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) {
return Err(s3_error_response(S3Error::new(
S3ErrorCode::AccessDenied,
message,
)));
}
Ok(())
}
Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()),
Err(err) => Err(storage_err_response(err)),
}
}
pub async fn list_buckets(State(state): State<AppState>) -> Response { pub async fn list_buckets(State(state): State<AppState>) -> Response {
match state.storage.list_buckets().await { match state.storage.list_buckets().await {
Ok(buckets) => { Ok(buckets) => {
@@ -549,10 +584,10 @@ pub async fn put_object(
return config::put_object_tagging(&state, &bucket, &key, body).await; return config::put_object_tagging(&state, &bucket, &key, body).await;
} }
if query.acl.is_some() { if query.acl.is_some() {
return config::put_object_acl(&state, &bucket, &key, body).await; return config::put_object_acl(&state, &bucket, &key, &headers, body).await;
} }
if query.retention.is_some() { if query.retention.is_some() {
return config::put_object_retention(&state, &bucket, &key, body).await; return config::put_object_retention(&state, &bucket, &key, &headers, body).await;
} }
if query.legal_hold.is_some() { if query.legal_hold.is_some() {
return config::put_object_legal_hold(&state, &bucket, &key, body).await; return config::put_object_legal_hold(&state, &bucket, &key, body).await;
@@ -597,6 +632,11 @@ pub async fn put_object(
return copy_object_handler(&state, copy_source, &bucket, &key, &headers).await; return copy_object_handler(&state, copy_source, &bucket, &key, &headers).await;
} }
if let Err(response) = ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await
{
return response;
}
let content_type = guessed_content_type( let content_type = guessed_content_type(
&key, &key,
headers.get("content-type").and_then(|v| v.to_str().ok()), headers.get("content-type").and_then(|v| v.to_str().ok()),
@@ -678,6 +718,17 @@ pub async fn put_object(
"x-amz-server-side-encryption", "x-amz-server-side-encryption",
enc_ctx.algorithm.as_str().parse().unwrap(), enc_ctx.algorithm.as_str().parse().unwrap(),
); );
notifications::emit_object_created(
&state,
&bucket,
&key,
meta.size,
meta.etag.as_deref(),
"",
"",
"",
"Put",
);
return (StatusCode::OK, resp_headers).into_response(); return (StatusCode::OK, resp_headers).into_response();
} }
Err(e) => { Err(e) => {
@@ -695,6 +746,17 @@ pub async fn put_object(
if let Some(ref etag) = meta.etag { if let Some(ref etag) = meta.etag {
resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap()); resp_headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
} }
notifications::emit_object_created(
&state,
&bucket,
&key,
meta.size,
meta.etag.as_deref(),
"",
"",
"",
"Put",
);
(StatusCode::OK, resp_headers).into_response() (StatusCode::OK, resp_headers).into_response()
} }
Err(e) => storage_err_response(e), Err(e) => storage_err_response(e),
@@ -890,6 +952,7 @@ pub async fn delete_object(
State(state): State<AppState>, State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>, Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQuery>, Query(query): Query<ObjectQuery>,
headers: HeaderMap,
) -> Response { ) -> Response {
if query.tagging.is_some() { if query.tagging.is_some() {
return config::delete_object_tagging(&state, &bucket, &key).await; return config::delete_object_tagging(&state, &bucket, &key).await;
@@ -902,8 +965,16 @@ pub async fn delete_object(
return abort_multipart_handler(&state, &bucket, upload_id).await; return abort_multipart_handler(&state, &bucket, upload_id).await;
} }
if let Err(response) = ensure_object_lock_allows_write(&state, &bucket, &key, Some(&headers)).await
{
return response;
}
match state.storage.delete_object(&bucket, &key).await { match state.storage.delete_object(&bucket, &key).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(), Ok(()) => {
notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
StatusCode::NO_CONTENT.into_response()
}
Err(e) => storage_err_response(e), Err(e) => storage_err_response(e),
} }
} }
@@ -1218,6 +1289,10 @@ async fn copy_object_handler(
dst_key: &str, dst_key: &str,
headers: &HeaderMap, headers: &HeaderMap,
) -> Response { ) -> Response {
if let Err(response) = ensure_object_lock_allows_write(state, dst_bucket, dst_key, Some(headers)).await {
return response;
}
let source = copy_source.strip_prefix('/').unwrap_or(copy_source); let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
let (src_bucket, src_key) = match source.split_once('/') { let (src_bucket, src_key) = match source.split_once('/') {
Some(parts) => parts, Some(parts) => parts,
@@ -1278,8 +1353,26 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
let mut errors = Vec::new(); let mut errors = Vec::new();
for obj in &parsed.objects { for obj in &parsed.objects {
if let Err(message) = match state.storage.head_object(bucket, &obj.key).await {
Ok(_) => match state.storage.get_object_metadata(bucket, &obj.key).await {
Ok(metadata) => object_lock::can_delete_object(&metadata, false),
Err(err) => Err(S3Error::from(err).message),
},
Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()),
Err(err) => Err(S3Error::from(err).message),
} {
errors.push((
obj.key.clone(),
S3ErrorCode::AccessDenied.as_str().to_string(),
message,
));
continue;
}
match state.storage.delete_object(bucket, &obj.key).await { match state.storage.delete_object(bucket, &obj.key).await {
Ok(()) => deleted.push((obj.key.clone(), obj.version_id.clone())), Ok(()) => {
notifications::emit_object_removed(state, bucket, &obj.key, "", "", "", "Delete");
deleted.push((obj.key.clone(), obj.version_id.clone()))
}
Err(e) => { Err(e) => {
let s3err = S3Error::from(e); let s3err = S3Error::from(e);
errors.push(( errors.push((
@@ -1966,3 +2059,247 @@ fn validate_post_policy_conditions(
} }
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ServerConfig;
use crate::services::acl::{acl_to_xml, create_canned_acl};
use http_body_util::BodyExt;
use serde_json::Value;
use tower::ServiceExt;
const TEST_ACCESS_KEY: &str = "AKIAIOSFODNN7EXAMPLE";
const TEST_SECRET_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
fn test_state() -> (AppState, tempfile::TempDir) {
let tmp = tempfile::tempdir().unwrap();
let config_dir = tmp.path().join(".myfsio.sys").join("config");
std::fs::create_dir_all(&config_dir).unwrap();
std::fs::write(
config_dir.join("iam.json"),
serde_json::json!({
"version": 2,
"users": [{
"user_id": "u-test1234",
"display_name": "admin",
"enabled": true,
"access_keys": [{
"access_key": TEST_ACCESS_KEY,
"secret_key": TEST_SECRET_KEY,
"status": "active"
}],
"policies": [{
"bucket": "*",
"actions": ["*"],
"prefix": "*"
}]
}]
})
.to_string(),
)
.unwrap();
let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let config = ServerConfig {
bind_addr: "127.0.0.1:0".parse().unwrap(),
ui_bind_addr: "127.0.0.1:0".parse().unwrap(),
storage_root: tmp.path().to_path_buf(),
region: "us-east-1".to_string(),
iam_config_path: config_dir.join("iam.json"),
sigv4_timestamp_tolerance_secs: 900,
presigned_url_min_expiry: 1,
presigned_url_max_expiry: 604800,
secret_key: None,
encryption_enabled: false,
kms_enabled: false,
gc_enabled: false,
integrity_enabled: false,
metrics_enabled: false,
metrics_history_enabled: false,
metrics_interval_minutes: 5,
metrics_retention_hours: 24,
metrics_history_interval_minutes: 5,
metrics_history_retention_hours: 24,
lifecycle_enabled: false,
website_hosting_enabled: false,
replication_connect_timeout_secs: 1,
replication_read_timeout_secs: 1,
replication_max_retries: 1,
replication_streaming_threshold_bytes: 10_485_760,
replication_max_failures_per_bucket: 50,
site_sync_enabled: false,
site_sync_interval_secs: 60,
site_sync_batch_size: 100,
site_sync_connect_timeout_secs: 10,
site_sync_read_timeout_secs: 120,
site_sync_max_retries: 2,
site_sync_clock_skew_tolerance: 1.0,
ui_enabled: false,
templates_dir: manifest_dir.join("templates"),
static_dir: manifest_dir.join("static"),
};
(AppState::new(config), tmp)
}
fn auth_request(
method: axum::http::Method,
uri: &str,
body: Body,
) -> axum::http::Request<Body> {
axum::http::Request::builder()
.method(method)
.uri(uri)
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.body(body)
.unwrap()
}
#[tokio::test]
async fn public_bucket_acl_allows_anonymous_reads() {
let (state, _tmp) = test_state();
state.storage.create_bucket("public").await.unwrap();
state
.storage
.put_object(
"public",
"hello.txt",
Box::pin(std::io::Cursor::new(b"hello".to_vec())),
None,
)
.await
.unwrap();
let mut config = state.storage.get_bucket_config("public").await.unwrap();
config.acl = Some(Value::String(acl_to_xml(&create_canned_acl("public-read", "myfsio"))));
state.storage.set_bucket_config("public", &config).await.unwrap();
let app = crate::create_router(state);
let response = app
.oneshot(
axum::http::Request::builder()
.method(axum::http::Method::GET)
.uri("/public/hello.txt")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn object_retention_blocks_delete_without_bypass() {
let (state, _tmp) = test_state();
state.storage.create_bucket("locked").await.unwrap();
state
.storage
.put_object(
"locked",
"obj.txt",
Box::pin(std::io::Cursor::new(b"data".to_vec())),
None,
)
.await
.unwrap();
let app = crate::create_router(state);
let retention_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<Retention xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Mode>GOVERNANCE</Mode>
<RetainUntilDate>2099-01-01T00:00:00Z</RetainUntilDate>
</Retention>"#;
let response = app
.clone()
.oneshot(auth_request(
axum::http::Method::PUT,
"/locked/obj.txt?retention",
Body::from(retention_xml),
))
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let response = app
.clone()
.oneshot(auth_request(
axum::http::Method::DELETE,
"/locked/obj.txt",
Body::empty(),
))
.await
.unwrap();
assert_eq!(response.status(), StatusCode::FORBIDDEN);
let response = app
.oneshot(
axum::http::Request::builder()
.method(axum::http::Method::DELETE)
.uri("/locked/obj.txt")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("x-amz-bypass-governance-retention", "true")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn object_acl_round_trip_uses_metadata() {
let (state, _tmp) = test_state();
state.storage.create_bucket("acl").await.unwrap();
state
.storage
.put_object(
"acl",
"photo.jpg",
Box::pin(std::io::Cursor::new(b"image".to_vec())),
None,
)
.await
.unwrap();
let app = crate::create_router(state);
let response = app
.clone()
.oneshot(
axum::http::Request::builder()
.method(axum::http::Method::PUT)
.uri("/acl/photo.jpg?acl")
.header("x-access-key", TEST_ACCESS_KEY)
.header("x-secret-key", TEST_SECRET_KEY)
.header("x-amz-acl", "public-read")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let response = app
.oneshot(auth_request(
axum::http::Method::GET,
"/acl/photo.jpg?acl",
Body::empty(),
))
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = String::from_utf8(
response
.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec(),
)
.unwrap();
assert!(body.contains("AllUsers"));
assert!(body.contains("READ"));
}
}

View File

@@ -1153,13 +1153,6 @@ pub async fn list_copy_targets(
Json(json!({ "buckets": buckets })).into_response() Json(json!({ "buckets": buckets })).into_response()
} }
pub async fn json_not_implemented() -> Response {
json_error(
StatusCode::NOT_IMPLEMENTED,
"This feature is not implemented yet",
)
}
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct ConnectionTestPayload { pub struct ConnectionTestPayload {
pub endpoint_url: String, pub endpoint_url: String,
@@ -3163,20 +3156,36 @@ fn apply_history_limit(mut value: Value, limit: Option<usize>) -> Value {
value value
} }
pub async fn bucket_stub_json(Extension(_session): Extension<SessionHandle>) -> Response { pub async fn lifecycle_history(
Json(json!({"status": "not_implemented", "items": []})).into_response()
}
pub async fn lifecycle_history_stub(
State(state): State<AppState>, State(state): State<AppState>,
Extension(_session): Extension<SessionHandle>, Extension(_session): Extension<SessionHandle>,
Path(_bucket_name): Path<String>, Path(bucket_name): Path<String>,
Query(params): Query<HashMap<String, String>>,
) -> Response { ) -> Response {
Json(json!({ let limit = params
"enabled": state.config.lifecycle_enabled, .get("limit")
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(50);
let offset = params
.get("offset")
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(0);
if !state.config.lifecycle_enabled {
return Json(json!({
"executions": [], "executions": [],
"total": 0, "total": 0,
"limit": limit,
"offset": offset,
"enabled": false,
})) }))
.into_response();
}
Json(crate::services::lifecycle::read_history(
&state.config.storage_root,
&bucket_name,
limit,
offset,
))
.into_response() .into_response()
} }

View File

@@ -3015,15 +3015,3 @@ pub async fn update_bucket_website(
.into_response(), .into_response(),
} }
} }
pub async fn stub_post(Extension(session): Extension<SessionHandle>) -> Response {
session.write(|s| s.push_flash("info", "This action is not yet implemented in the Rust UI."));
Redirect::to("/ui/buckets").into_response()
}
#[derive(serde::Deserialize)]
pub struct QueryArgs(#[serde(default)] pub HashMap<String, String>);
pub async fn json_stub(Query(_q): Query<QueryArgs>) -> Response {
axum::Json(json!({"status": "not_implemented", "items": []})).into_response()
}

View File

@@ -106,7 +106,7 @@ pub fn create_ui_router(state: state::AppState) -> Router {
) )
.route( .route(
"/ui/buckets/{bucket_name}/lifecycle/history", "/ui/buckets/{bucket_name}/lifecycle/history",
get(ui_api::lifecycle_history_stub), get(ui_api::lifecycle_history),
) )
.route( .route(
"/ui/buckets/{bucket_name}/replication/status", "/ui/buckets/{bucket_name}/replication/status",

View File

@@ -113,6 +113,7 @@ async fn main() {
let lifecycle = let lifecycle =
std::sync::Arc::new(myfsio_server::services::lifecycle::LifecycleService::new( std::sync::Arc::new(myfsio_server::services::lifecycle::LifecycleService::new(
state.storage.clone(), state.storage.clone(),
config.storage_root.clone(),
myfsio_server::services::lifecycle::LifecycleConfig::default(), myfsio_server::services::lifecycle::LifecycleConfig::default(),
)); ));
bg_handles.push(lifecycle.start_background()); bg_handles.push(lifecycle.start_background());

View File

@@ -12,6 +12,7 @@ use serde_json::Value;
use std::time::Instant; use std::time::Instant;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use crate::services::acl::acl_from_bucket_config;
use crate::state::AppState; use crate::state::AppState;
fn website_error_response( fn website_error_response(
@@ -589,6 +590,17 @@ async fn authorize_action(
if iam_allowed || matches!(policy_decision, PolicyDecision::Allow) { if iam_allowed || matches!(policy_decision, PolicyDecision::Allow) {
return Ok(()); return Ok(());
} }
if evaluate_bucket_acl(
state,
bucket,
principal.map(|principal| principal.access_key.as_str()),
action,
principal.is_some(),
)
.await
{
return Ok(());
}
if principal.is_some() { if principal.is_some() {
Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied")) Err(S3Error::new(S3ErrorCode::AccessDenied, "Access denied"))
@@ -600,6 +612,27 @@ async fn authorize_action(
} }
} }
async fn evaluate_bucket_acl(
state: &AppState,
bucket: &str,
principal_id: Option<&str>,
action: &str,
is_authenticated: bool,
) -> bool {
let config = match state.storage.get_bucket_config(bucket).await {
Ok(config) => config,
Err(_) => return false,
};
let Some(value) = config.acl.as_ref() else {
return false;
};
let Some(acl) = acl_from_bucket_config(value) else {
return false;
};
acl.allowed_actions(principal_id, is_authenticated)
.contains(action)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PolicyDecision { enum PolicyDecision {
Allow, Allow,

View File

@@ -0,0 +1,278 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
pub const ACL_METADATA_KEY: &str = "__acl__";
pub const GRANTEE_ALL_USERS: &str = "*";
pub const GRANTEE_AUTHENTICATED_USERS: &str = "authenticated";
const ACL_PERMISSION_FULL_CONTROL: &str = "FULL_CONTROL";
const ACL_PERMISSION_WRITE: &str = "WRITE";
const ACL_PERMISSION_WRITE_ACP: &str = "WRITE_ACP";
const ACL_PERMISSION_READ: &str = "READ";
const ACL_PERMISSION_READ_ACP: &str = "READ_ACP";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AclGrant {
pub grantee: String,
pub permission: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Acl {
pub owner: String,
#[serde(default)]
pub grants: Vec<AclGrant>,
}
impl Acl {
pub fn allowed_actions(
&self,
principal_id: Option<&str>,
is_authenticated: bool,
) -> HashSet<&'static str> {
let mut actions = HashSet::new();
if let Some(principal_id) = principal_id {
if principal_id == self.owner {
actions.extend(permission_to_actions(ACL_PERMISSION_FULL_CONTROL));
}
}
for grant in &self.grants {
if grant.grantee == GRANTEE_ALL_USERS {
actions.extend(permission_to_actions(&grant.permission));
} else if grant.grantee == GRANTEE_AUTHENTICATED_USERS && is_authenticated {
actions.extend(permission_to_actions(&grant.permission));
} else if let Some(principal_id) = principal_id {
if grant.grantee == principal_id {
actions.extend(permission_to_actions(&grant.permission));
}
}
}
actions
}
}
pub fn create_canned_acl(canned_acl: &str, owner: &str) -> Acl {
let owner_grant = AclGrant {
grantee: owner.to_string(),
permission: ACL_PERMISSION_FULL_CONTROL.to_string(),
};
match canned_acl {
"public-read" => Acl {
owner: owner.to_string(),
grants: vec![
owner_grant,
AclGrant {
grantee: GRANTEE_ALL_USERS.to_string(),
permission: ACL_PERMISSION_READ.to_string(),
},
],
},
"public-read-write" => Acl {
owner: owner.to_string(),
grants: vec![
owner_grant,
AclGrant {
grantee: GRANTEE_ALL_USERS.to_string(),
permission: ACL_PERMISSION_READ.to_string(),
},
AclGrant {
grantee: GRANTEE_ALL_USERS.to_string(),
permission: ACL_PERMISSION_WRITE.to_string(),
},
],
},
"authenticated-read" => Acl {
owner: owner.to_string(),
grants: vec![
owner_grant,
AclGrant {
grantee: GRANTEE_AUTHENTICATED_USERS.to_string(),
permission: ACL_PERMISSION_READ.to_string(),
},
],
},
"bucket-owner-read" | "bucket-owner-full-control" | "private" | _ => Acl {
owner: owner.to_string(),
grants: vec![owner_grant],
},
}
}
pub fn acl_to_xml(acl: &Acl) -> String {
let mut xml = format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
<Owner><ID>{}</ID><DisplayName>{}</DisplayName></Owner>\
<AccessControlList>",
xml_escape(&acl.owner),
xml_escape(&acl.owner),
);
for grant in &acl.grants {
xml.push_str("<Grant>");
match grant.grantee.as_str() {
GRANTEE_ALL_USERS => {
xml.push_str(
"<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
<URI>http://acs.amazonaws.com/groups/global/AllUsers</URI>\
</Grantee>",
);
}
GRANTEE_AUTHENTICATED_USERS => {
xml.push_str(
"<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"Group\">\
<URI>http://acs.amazonaws.com/groups/global/AuthenticatedUsers</URI>\
</Grantee>",
);
}
other => {
xml.push_str(&format!(
"<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:type=\"CanonicalUser\">\
<ID>{}</ID><DisplayName>{}</DisplayName>\
</Grantee>",
xml_escape(other),
xml_escape(other),
));
}
}
xml.push_str(&format!(
"<Permission>{}</Permission></Grant>",
xml_escape(&grant.permission)
));
}
xml.push_str("</AccessControlList></AccessControlPolicy>");
xml
}
pub fn acl_from_bucket_config(value: &Value) -> Option<Acl> {
match value {
Value::String(raw) => acl_from_xml(raw).or_else(|| serde_json::from_str(raw).ok()),
Value::Object(_) => serde_json::from_value(value.clone()).ok(),
_ => None,
}
}
pub fn acl_from_object_metadata(metadata: &HashMap<String, String>) -> Option<Acl> {
metadata
.get(ACL_METADATA_KEY)
.and_then(|raw| serde_json::from_str::<Acl>(raw).ok())
}
pub fn store_object_acl(metadata: &mut HashMap<String, String>, acl: &Acl) {
if let Ok(serialized) = serde_json::to_string(acl) {
metadata.insert(ACL_METADATA_KEY.to_string(), serialized);
}
}
fn acl_from_xml(xml: &str) -> Option<Acl> {
let doc = roxmltree::Document::parse(xml).ok()?;
let owner = doc
.descendants()
.find(|node| node.is_element() && node.tag_name().name() == "Owner")
.and_then(|node| {
node.children()
.find(|child| child.is_element() && child.tag_name().name() == "ID")
.and_then(|child| child.text())
})
.unwrap_or("myfsio")
.trim()
.to_string();
let mut grants = Vec::new();
for grant in doc
.descendants()
.filter(|node| node.is_element() && node.tag_name().name() == "Grant")
{
let permission = grant
.children()
.find(|child| child.is_element() && child.tag_name().name() == "Permission")
.and_then(|child| child.text())
.unwrap_or_default()
.trim()
.to_string();
if permission.is_empty() {
continue;
}
let grantee_node = grant
.children()
.find(|child| child.is_element() && child.tag_name().name() == "Grantee");
let grantee = grantee_node
.and_then(|node| {
let uri = node
.children()
.find(|child| child.is_element() && child.tag_name().name() == "URI")
.and_then(|child| child.text())
.map(|text| text.trim().to_string());
match uri.as_deref() {
Some("http://acs.amazonaws.com/groups/global/AllUsers") => {
Some(GRANTEE_ALL_USERS.to_string())
}
Some("http://acs.amazonaws.com/groups/global/AuthenticatedUsers") => {
Some(GRANTEE_AUTHENTICATED_USERS.to_string())
}
_ => node
.children()
.find(|child| child.is_element() && child.tag_name().name() == "ID")
.and_then(|child| child.text())
.map(|text| text.trim().to_string()),
}
})
.unwrap_or_default();
if grantee.is_empty() {
continue;
}
grants.push(AclGrant {
grantee,
permission,
});
}
Some(Acl { owner, grants })
}
fn permission_to_actions(permission: &str) -> &'static [&'static str] {
match permission {
ACL_PERMISSION_FULL_CONTROL => &["read", "write", "delete", "list", "share"],
ACL_PERMISSION_WRITE => &["write", "delete"],
ACL_PERMISSION_WRITE_ACP => &["share"],
ACL_PERMISSION_READ => &["read", "list"],
ACL_PERMISSION_READ_ACP => &["share"],
_ => &[],
}
}
fn xml_escape(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&apos;")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn canned_acl_grants_public_read() {
let acl = create_canned_acl("public-read", "owner");
let actions = acl.allowed_actions(None, false);
assert!(actions.contains("read"));
assert!(actions.contains("list"));
assert!(!actions.contains("write"));
}
#[test]
fn xml_round_trip_preserves_grants() {
let acl = create_canned_acl("authenticated-read", "owner");
let parsed = acl_from_bucket_config(&Value::String(acl_to_xml(&acl))).unwrap();
assert_eq!(parsed.owner, "owner");
assert_eq!(parsed.grants.len(), 2);
assert!(
parsed
.grants
.iter()
.any(|grant| grant.grantee == GRANTEE_AUTHENTICATED_USERS)
);
}
}

View File

@@ -1,31 +1,75 @@
use chrono::{DateTime, Duration, Utc};
use myfsio_storage::fs_backend::FsStorageBackend; use myfsio_storage::fs_backend::FsStorageBackend;
use myfsio_storage::traits::StorageEngine; use myfsio_storage::traits::StorageEngine;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub struct LifecycleConfig { pub struct LifecycleConfig {
pub interval_seconds: u64, pub interval_seconds: u64,
pub max_history_per_bucket: usize,
} }
impl Default for LifecycleConfig { impl Default for LifecycleConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
interval_seconds: 3600, interval_seconds: 3600,
max_history_per_bucket: 50,
} }
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LifecycleExecutionRecord {
pub timestamp: f64,
pub bucket_name: String,
pub objects_deleted: u64,
pub versions_deleted: u64,
pub uploads_aborted: u64,
#[serde(default)]
pub errors: Vec<String>,
pub execution_time_seconds: f64,
}
#[derive(Debug, Clone, Default)]
struct BucketLifecycleResult {
bucket_name: String,
objects_deleted: u64,
versions_deleted: u64,
uploads_aborted: u64,
errors: Vec<String>,
execution_time_seconds: f64,
}
#[derive(Debug, Clone, Default)]
struct ParsedLifecycleRule {
status: String,
prefix: String,
expiration_days: Option<u64>,
expiration_date: Option<DateTime<Utc>>,
noncurrent_days: Option<u64>,
abort_incomplete_multipart_days: Option<u64>,
}
pub struct LifecycleService { pub struct LifecycleService {
storage: Arc<FsStorageBackend>, storage: Arc<FsStorageBackend>,
storage_root: PathBuf,
config: LifecycleConfig, config: LifecycleConfig,
running: Arc<RwLock<bool>>, running: Arc<RwLock<bool>>,
} }
impl LifecycleService { impl LifecycleService {
pub fn new(storage: Arc<FsStorageBackend>, config: LifecycleConfig) -> Self { pub fn new(
storage: Arc<FsStorageBackend>,
storage_root: impl Into<PathBuf>,
config: LifecycleConfig,
) -> Self {
Self { Self {
storage, storage,
storage_root: storage_root.into(),
config, config,
running: Arc::new(RwLock::new(false)), running: Arc::new(RwLock::new(false)),
} }
@@ -47,108 +91,261 @@ impl LifecycleService {
async fn evaluate_rules(&self) -> Value { async fn evaluate_rules(&self) -> Value {
let buckets = match self.storage.list_buckets().await { let buckets = match self.storage.list_buckets().await {
Ok(b) => b, Ok(buckets) => buckets,
Err(e) => return json!({"error": e.to_string()}), Err(err) => return json!({ "error": err.to_string() }),
}; };
let mut total_expired = 0u64; let mut bucket_results = Vec::new();
let mut total_multipart_aborted = 0u64; let mut total_objects_deleted = 0u64;
let mut errors: Vec<String> = Vec::new(); let mut total_versions_deleted = 0u64;
let mut total_uploads_aborted = 0u64;
let mut errors = Vec::new();
for bucket in &buckets { for bucket in &buckets {
let started_at = std::time::Instant::now();
let mut result = BucketLifecycleResult {
bucket_name: bucket.name.clone(),
..Default::default()
};
let config = match self.storage.get_bucket_config(&bucket.name).await { let config = match self.storage.get_bucket_config(&bucket.name).await {
Ok(c) => c, Ok(config) => config,
Err(_) => continue, Err(err) => {
result.errors.push(err.to_string());
result.execution_time_seconds = started_at.elapsed().as_secs_f64();
self.append_history(&result);
errors.extend(result.errors.clone());
bucket_results.push(result);
continue;
}
}; };
let Some(lifecycle) = config.lifecycle.as_ref() else {
let lifecycle = match &config.lifecycle { continue;
Some(lc) => lc,
None => continue,
}; };
let rules = parse_lifecycle_rules(lifecycle);
let rules = match lifecycle if rules.is_empty() {
.as_str()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
{
Some(v) => v,
None => continue,
};
let rules_arr = match rules.get("Rules").and_then(|r| r.as_array()) {
Some(a) => a.clone(),
None => continue,
};
for rule in &rules_arr {
if rule.get("Status").and_then(|s| s.as_str()) != Some("Enabled") {
continue; continue;
} }
let prefix = rule for rule in &rules {
.get("Filter") if rule.status != "Enabled" {
.and_then(|f| f.get("Prefix")) continue;
.and_then(|p| p.as_str())
.or_else(|| rule.get("Prefix").and_then(|p| p.as_str()))
.unwrap_or("");
if let Some(exp) = rule.get("Expiration") {
if let Some(days) = exp.get("Days").and_then(|d| d.as_u64()) {
let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64);
let params = myfsio_common::types::ListParams {
max_keys: 1000,
prefix: if prefix.is_empty() {
None
} else {
Some(prefix.to_string())
},
..Default::default()
};
if let Ok(result) = self.storage.list_objects(&bucket.name, &params).await {
for obj in &result.objects {
if obj.last_modified < cutoff {
match self.storage.delete_object(&bucket.name, &obj.key).await {
Ok(()) => total_expired += 1,
Err(e) => errors
.push(format!("{}:{}: {}", bucket.name, obj.key, e)),
} }
} if let Some(err) = self
} .apply_expiration_rule(&bucket.name, rule, &mut result)
}
}
}
if let Some(abort) = rule.get("AbortIncompleteMultipartUpload") {
if let Some(days) = abort.get("DaysAfterInitiation").and_then(|d| d.as_u64()) {
let cutoff = chrono::Utc::now() - chrono::Duration::days(days as i64);
if let Ok(uploads) = self.storage.list_multipart_uploads(&bucket.name).await
{
for upload in &uploads {
if upload.initiated < cutoff {
match self
.storage
.abort_multipart(&bucket.name, &upload.upload_id)
.await .await
{ {
Ok(()) => total_multipart_aborted += 1, result.errors.push(err);
Err(e) => errors }
.push(format!("abort {}: {}", upload.upload_id, e)), if let Some(err) = self
} .apply_noncurrent_expiration_rule(&bucket.name, rule, &mut result)
} .await
} {
} result.errors.push(err);
}
if let Some(err) = self
.apply_abort_incomplete_multipart_rule(&bucket.name, rule, &mut result)
.await
{
result.errors.push(err);
} }
} }
result.execution_time_seconds = started_at.elapsed().as_secs_f64();
if result.objects_deleted > 0
|| result.versions_deleted > 0
|| result.uploads_aborted > 0
|| !result.errors.is_empty()
{
total_objects_deleted += result.objects_deleted;
total_versions_deleted += result.versions_deleted;
total_uploads_aborted += result.uploads_aborted;
errors.extend(result.errors.clone());
self.append_history(&result);
bucket_results.push(result);
} }
} }
json!({ json!({
"objects_expired": total_expired, "objects_deleted": total_objects_deleted,
"multipart_aborted": total_multipart_aborted, "versions_deleted": total_versions_deleted,
"multipart_aborted": total_uploads_aborted,
"buckets_evaluated": buckets.len(), "buckets_evaluated": buckets.len(),
"results": bucket_results.iter().map(result_to_json).collect::<Vec<_>>(),
"errors": errors, "errors": errors,
}) })
} }
async fn apply_expiration_rule(
&self,
bucket: &str,
rule: &ParsedLifecycleRule,
result: &mut BucketLifecycleResult,
) -> Option<String> {
let cutoff = if let Some(days) = rule.expiration_days {
Some(Utc::now() - Duration::days(days as i64))
} else {
rule.expiration_date
};
let Some(cutoff) = cutoff else {
return None;
};
let params = myfsio_common::types::ListParams {
max_keys: 10_000,
prefix: if rule.prefix.is_empty() {
None
} else {
Some(rule.prefix.clone())
},
..Default::default()
};
match self.storage.list_objects(bucket, &params).await {
Ok(objects) => {
for object in &objects.objects {
if object.last_modified < cutoff {
if let Err(err) = self.storage.delete_object(bucket, &object.key).await {
result
.errors
.push(format!("{}:{}: {}", bucket, object.key, err));
} else {
result.objects_deleted += 1;
}
}
}
None
}
Err(err) => Some(format!("Failed to list objects for {}: {}", bucket, err)),
}
}
async fn apply_noncurrent_expiration_rule(
&self,
bucket: &str,
rule: &ParsedLifecycleRule,
result: &mut BucketLifecycleResult,
) -> Option<String> {
let Some(days) = rule.noncurrent_days else {
return None;
};
let cutoff = Utc::now() - Duration::days(days as i64);
let versions_root = version_root_for_bucket(&self.storage_root, bucket);
if !versions_root.exists() {
return None;
}
let mut stack = VecDeque::from([versions_root]);
while let Some(current) = stack.pop_front() {
let entries = match std::fs::read_dir(&current) {
Ok(entries) => entries,
Err(err) => return Some(err.to_string()),
};
for entry in entries.flatten() {
let file_type = match entry.file_type() {
Ok(file_type) => file_type,
Err(_) => continue,
};
if file_type.is_dir() {
stack.push_back(entry.path());
continue;
}
if entry.path().extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let contents = match std::fs::read_to_string(entry.path()) {
Ok(contents) => contents,
Err(_) => continue,
};
let Ok(manifest) = serde_json::from_str::<Value>(&contents) else {
continue;
};
let key = manifest
.get("key")
.and_then(|value| value.as_str())
.unwrap_or_default()
.to_string();
if !rule.prefix.is_empty() && !key.starts_with(&rule.prefix) {
continue;
}
let archived_at = manifest
.get("archived_at")
.and_then(|value| value.as_str())
.and_then(|value| DateTime::parse_from_rfc3339(value).ok())
.map(|value| value.with_timezone(&Utc));
if archived_at.is_none() || archived_at.unwrap() >= cutoff {
continue;
}
let version_id = manifest
.get("version_id")
.and_then(|value| value.as_str())
.unwrap_or_default();
let data_path = entry.path().with_file_name(format!("{}.bin", version_id));
let _ = std::fs::remove_file(&data_path);
let _ = std::fs::remove_file(entry.path());
result.versions_deleted += 1;
}
}
None
}
async fn apply_abort_incomplete_multipart_rule(
&self,
bucket: &str,
rule: &ParsedLifecycleRule,
result: &mut BucketLifecycleResult,
) -> Option<String> {
let Some(days) = rule.abort_incomplete_multipart_days else {
return None;
};
let cutoff = Utc::now() - Duration::days(days as i64);
match self.storage.list_multipart_uploads(bucket).await {
Ok(uploads) => {
for upload in &uploads {
if upload.initiated < cutoff {
if let Err(err) = self.storage.abort_multipart(bucket, &upload.upload_id).await
{
result
.errors
.push(format!("abort {}: {}", upload.upload_id, err));
} else {
result.uploads_aborted += 1;
}
}
}
None
}
Err(err) => Some(format!("Failed to list multipart uploads for {}: {}", bucket, err)),
}
}
fn append_history(&self, result: &BucketLifecycleResult) {
let path = lifecycle_history_path(&self.storage_root, &result.bucket_name);
let mut history = load_history(&path);
history.insert(
0,
LifecycleExecutionRecord {
timestamp: Utc::now().timestamp_millis() as f64 / 1000.0,
bucket_name: result.bucket_name.clone(),
objects_deleted: result.objects_deleted,
versions_deleted: result.versions_deleted,
uploads_aborted: result.uploads_aborted,
errors: result.errors.clone(),
execution_time_seconds: result.execution_time_seconds,
},
);
history.truncate(self.config.max_history_per_bucket);
let payload = json!({
"executions": history,
});
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(
&path,
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| "{}".to_string()),
);
}
pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> { pub fn start_background(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
let interval = std::time::Duration::from_secs(self.config.interval_seconds); let interval = std::time::Duration::from_secs(self.config.interval_seconds);
tokio::spawn(async move { tokio::spawn(async move {
@@ -159,9 +356,277 @@ impl LifecycleService {
tracing::info!("Lifecycle evaluation starting"); tracing::info!("Lifecycle evaluation starting");
match self.run_cycle().await { match self.run_cycle().await {
Ok(result) => tracing::info!("Lifecycle cycle complete: {:?}", result), Ok(result) => tracing::info!("Lifecycle cycle complete: {:?}", result),
Err(e) => tracing::warn!("Lifecycle cycle failed: {}", e), Err(err) => tracing::warn!("Lifecycle cycle failed: {}", err),
} }
} }
}) })
} }
} }
pub fn read_history(storage_root: &Path, bucket_name: &str, limit: usize, offset: usize) -> Value {
let path = lifecycle_history_path(storage_root, bucket_name);
let mut history = load_history(&path);
let total = history.len();
let executions = history
.drain(offset.min(total)..)
.take(limit)
.collect::<Vec<_>>();
json!({
"executions": executions,
"total": total,
"limit": limit,
"offset": offset,
"enabled": true,
})
}
fn load_history(path: &Path) -> Vec<LifecycleExecutionRecord> {
if !path.exists() {
return Vec::new();
}
std::fs::read_to_string(path)
.ok()
.and_then(|contents| serde_json::from_str::<Value>(&contents).ok())
.and_then(|value| value.get("executions").cloned())
.and_then(|value| serde_json::from_value::<Vec<LifecycleExecutionRecord>>(value).ok())
.unwrap_or_default()
}
fn lifecycle_history_path(storage_root: &Path, bucket_name: &str) -> PathBuf {
storage_root
.join(".myfsio.sys")
.join("buckets")
.join(bucket_name)
.join("lifecycle_history.json")
}
fn version_root_for_bucket(storage_root: &Path, bucket_name: &str) -> PathBuf {
storage_root
.join(".myfsio.sys")
.join("buckets")
.join(bucket_name)
.join("versions")
}
fn parse_lifecycle_rules(value: &Value) -> Vec<ParsedLifecycleRule> {
match value {
Value::String(raw) => parse_lifecycle_rules_from_string(raw),
Value::Array(items) => items.iter().filter_map(parse_lifecycle_rule).collect(),
Value::Object(map) => map
.get("Rules")
.and_then(|rules| rules.as_array())
.map(|rules| rules.iter().filter_map(parse_lifecycle_rule).collect())
.unwrap_or_default(),
_ => Vec::new(),
}
}
fn parse_lifecycle_rules_from_string(raw: &str) -> Vec<ParsedLifecycleRule> {
if let Ok(json) = serde_json::from_str::<Value>(raw) {
return parse_lifecycle_rules(&json);
}
let Ok(doc) = roxmltree::Document::parse(raw) else {
return Vec::new();
};
doc.descendants()
.filter(|node| node.is_element() && node.tag_name().name() == "Rule")
.map(|rule| ParsedLifecycleRule {
status: child_text(&rule, "Status").unwrap_or_else(|| "Enabled".to_string()),
prefix: child_text(&rule, "Prefix")
.or_else(|| {
rule.descendants()
.find(|node| {
node.is_element()
&& node.tag_name().name() == "Filter"
&& node
.children()
.any(|child| {
child.is_element()
&& child.tag_name().name() == "Prefix"
})
})
.and_then(|filter| child_text(&filter, "Prefix"))
})
.unwrap_or_default(),
expiration_days: rule
.descendants()
.find(|node| node.is_element() && node.tag_name().name() == "Expiration")
.and_then(|expiration| child_text(&expiration, "Days"))
.and_then(|value| value.parse::<u64>().ok()),
expiration_date: rule
.descendants()
.find(|node| node.is_element() && node.tag_name().name() == "Expiration")
.and_then(|expiration| child_text(&expiration, "Date"))
.as_deref()
.and_then(parse_datetime),
noncurrent_days: rule
.descendants()
.find(|node| {
node.is_element() && node.tag_name().name() == "NoncurrentVersionExpiration"
})
.and_then(|node| child_text(&node, "NoncurrentDays"))
.and_then(|value| value.parse::<u64>().ok()),
abort_incomplete_multipart_days: rule
.descendants()
.find(|node| {
node.is_element()
&& node.tag_name().name() == "AbortIncompleteMultipartUpload"
})
.and_then(|node| child_text(&node, "DaysAfterInitiation"))
.and_then(|value| value.parse::<u64>().ok()),
})
.collect()
}
fn parse_lifecycle_rule(value: &Value) -> Option<ParsedLifecycleRule> {
let map = value.as_object()?;
Some(ParsedLifecycleRule {
status: map
.get("Status")
.and_then(|value| value.as_str())
.unwrap_or("Enabled")
.to_string(),
prefix: map
.get("Prefix")
.and_then(|value| value.as_str())
.or_else(|| {
map.get("Filter")
.and_then(|value| value.get("Prefix"))
.and_then(|value| value.as_str())
})
.unwrap_or_default()
.to_string(),
expiration_days: map
.get("Expiration")
.and_then(|value| value.get("Days"))
.and_then(|value| value.as_u64()),
expiration_date: map
.get("Expiration")
.and_then(|value| value.get("Date"))
.and_then(|value| value.as_str())
.and_then(parse_datetime),
noncurrent_days: map
.get("NoncurrentVersionExpiration")
.and_then(|value| value.get("NoncurrentDays"))
.and_then(|value| value.as_u64()),
abort_incomplete_multipart_days: map
.get("AbortIncompleteMultipartUpload")
.and_then(|value| value.get("DaysAfterInitiation"))
.and_then(|value| value.as_u64()),
})
}
fn parse_datetime(value: &str) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc3339(value)
.ok()
.map(|value| value.with_timezone(&Utc))
}
fn child_text(node: &roxmltree::Node<'_, '_>, name: &str) -> Option<String> {
node.children()
.find(|child| child.is_element() && child.tag_name().name() == name)
.and_then(|child| child.text())
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
}
fn result_to_json(result: &BucketLifecycleResult) -> Value {
json!({
"bucket_name": result.bucket_name,
"objects_deleted": result.objects_deleted,
"versions_deleted": result.versions_deleted,
"uploads_aborted": result.uploads_aborted,
"errors": result.errors,
"execution_time_seconds": result.execution_time_seconds,
})
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[test]
fn parses_rules_from_xml() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><Prefix>logs/</Prefix></Filter>
<Expiration><Days>10</Days></Expiration>
<NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
<AbortIncompleteMultipartUpload><DaysAfterInitiation>7</DaysAfterInitiation></AbortIncompleteMultipartUpload>
</Rule>
</LifecycleConfiguration>"#;
let rules = parse_lifecycle_rules(&Value::String(xml.to_string()));
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].prefix, "logs/");
assert_eq!(rules[0].expiration_days, Some(10));
assert_eq!(rules[0].noncurrent_days, Some(30));
assert_eq!(rules[0].abort_incomplete_multipart_days, Some(7));
}
#[tokio::test]
async fn run_cycle_writes_history_and_deletes_noncurrent_versions() {
let tmp = tempfile::tempdir().unwrap();
let storage = Arc::new(FsStorageBackend::new(tmp.path().to_path_buf()));
storage.create_bucket("docs").await.unwrap();
storage.set_versioning("docs", true).await.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"old".to_vec())),
None,
)
.await
.unwrap();
storage
.put_object(
"docs",
"logs/file.txt",
Box::pin(std::io::Cursor::new(b"new".to_vec())),
None,
)
.await
.unwrap();
let versions_root = version_root_for_bucket(tmp.path(), "docs").join("logs").join("file.txt");
let manifest = std::fs::read_dir(&versions_root)
.unwrap()
.flatten()
.find(|entry| entry.path().extension().and_then(|ext| ext.to_str()) == Some("json"))
.unwrap()
.path();
let old_manifest = json!({
"version_id": "ver-1",
"key": "logs/file.txt",
"size": 3,
"archived_at": (Utc::now() - Duration::days(45)).to_rfc3339(),
"etag": "etag",
});
std::fs::write(&manifest, serde_json::to_string(&old_manifest).unwrap()).unwrap();
std::fs::write(manifest.with_file_name("ver-1.bin"), b"old").unwrap();
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<Status>Enabled</Status>
<Filter><Prefix>logs/</Prefix></Filter>
<NoncurrentVersionExpiration><NoncurrentDays>30</NoncurrentDays></NoncurrentVersionExpiration>
</Rule>
</LifecycleConfiguration>"#;
let mut config = storage.get_bucket_config("docs").await.unwrap();
config.lifecycle = Some(Value::String(lifecycle_xml.to_string()));
storage.set_bucket_config("docs", &config).await.unwrap();
let service = LifecycleService::new(storage.clone(), tmp.path(), LifecycleConfig::default());
let result = service.run_cycle().await.unwrap();
assert_eq!(result["versions_deleted"], 1);
let history = read_history(tmp.path(), "docs", 50, 0);
assert_eq!(history["total"], 1);
assert_eq!(history["executions"][0]["versions_deleted"], 1);
}
}

View File

@@ -1,8 +1,11 @@
pub mod acl;
pub mod access_logging; pub mod access_logging;
pub mod gc; pub mod gc;
pub mod integrity; pub mod integrity;
pub mod lifecycle; pub mod lifecycle;
pub mod metrics; pub mod metrics;
pub mod notifications;
pub mod object_lock;
pub mod replication; pub mod replication;
pub mod s3_client; pub mod s3_client;
pub mod site_registry; pub mod site_registry;

View File

@@ -0,0 +1,294 @@
use crate::state::AppState;
use chrono::{DateTime, Utc};
use myfsio_storage::traits::StorageEngine;
use serde::Serialize;
use serde_json::json;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WebhookDestination {
pub url: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NotificationConfiguration {
pub id: String,
pub events: Vec<String>,
pub destination: WebhookDestination,
pub prefix_filter: String,
pub suffix_filter: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct NotificationEvent {
#[serde(rename = "eventVersion")]
event_version: &'static str,
#[serde(rename = "eventSource")]
event_source: &'static str,
#[serde(rename = "awsRegion")]
aws_region: &'static str,
#[serde(rename = "eventTime")]
event_time: String,
#[serde(rename = "eventName")]
event_name: String,
#[serde(rename = "userIdentity")]
user_identity: serde_json::Value,
#[serde(rename = "requestParameters")]
request_parameters: serde_json::Value,
#[serde(rename = "responseElements")]
response_elements: serde_json::Value,
s3: serde_json::Value,
}
impl NotificationConfiguration {
pub fn matches_event(&self, event_name: &str, object_key: &str) -> bool {
let event_match = self.events.iter().any(|pattern| {
if let Some(prefix) = pattern.strip_suffix('*') {
event_name.starts_with(prefix)
} else {
pattern == event_name
}
});
if !event_match {
return false;
}
if !self.prefix_filter.is_empty() && !object_key.starts_with(&self.prefix_filter) {
return false;
}
if !self.suffix_filter.is_empty() && !object_key.ends_with(&self.suffix_filter) {
return false;
}
true
}
}
pub fn parse_notification_configurations(xml: &str) -> Result<Vec<NotificationConfiguration>, String> {
let doc = roxmltree::Document::parse(xml).map_err(|err| err.to_string())?;
let mut configs = Vec::new();
for webhook in doc
.descendants()
.filter(|node| node.is_element() && node.tag_name().name() == "WebhookConfiguration")
{
let id = child_text(&webhook, "Id").unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let events = webhook
.children()
.filter(|node| node.is_element() && node.tag_name().name() == "Event")
.filter_map(|node| node.text())
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
.collect::<Vec<_>>();
let destination = webhook
.children()
.find(|node| node.is_element() && node.tag_name().name() == "Destination");
let url = destination
.as_ref()
.and_then(|node| child_text(node, "Url"))
.unwrap_or_default();
if url.trim().is_empty() {
return Err("Destination URL is required".to_string());
}
let mut prefix_filter = String::new();
let mut suffix_filter = String::new();
if let Some(filter) = webhook
.children()
.find(|node| node.is_element() && node.tag_name().name() == "Filter")
{
if let Some(key) = filter
.children()
.find(|node| node.is_element() && node.tag_name().name() == "S3Key")
{
for rule in key
.children()
.filter(|node| node.is_element() && node.tag_name().name() == "FilterRule")
{
let name = child_text(&rule, "Name").unwrap_or_default();
let value = child_text(&rule, "Value").unwrap_or_default();
if name == "prefix" {
prefix_filter = value;
} else if name == "suffix" {
suffix_filter = value;
}
}
}
}
configs.push(NotificationConfiguration {
id,
events,
destination: WebhookDestination { url },
prefix_filter,
suffix_filter,
});
}
Ok(configs)
}
pub fn emit_object_created(
state: &AppState,
bucket: &str,
key: &str,
size: u64,
etag: Option<&str>,
request_id: &str,
source_ip: &str,
user_identity: &str,
operation: &str,
) {
emit_notifications(
state.clone(),
bucket.to_string(),
key.to_string(),
format!("s3:ObjectCreated:{}", operation),
size,
etag.unwrap_or_default().to_string(),
request_id.to_string(),
source_ip.to_string(),
user_identity.to_string(),
);
}
pub fn emit_object_removed(
state: &AppState,
bucket: &str,
key: &str,
request_id: &str,
source_ip: &str,
user_identity: &str,
operation: &str,
) {
emit_notifications(
state.clone(),
bucket.to_string(),
key.to_string(),
format!("s3:ObjectRemoved:{}", operation),
0,
String::new(),
request_id.to_string(),
source_ip.to_string(),
user_identity.to_string(),
);
}
fn emit_notifications(
state: AppState,
bucket: String,
key: String,
event_name: String,
size: u64,
etag: String,
request_id: String,
source_ip: String,
user_identity: String,
) {
tokio::spawn(async move {
let config = match state.storage.get_bucket_config(&bucket).await {
Ok(config) => config,
Err(_) => return,
};
let raw = match config.notification {
Some(serde_json::Value::String(raw)) => raw,
_ => return,
};
let configs = match parse_notification_configurations(&raw) {
Ok(configs) => configs,
Err(err) => {
tracing::warn!("Invalid notification config for bucket {}: {}", bucket, err);
return;
}
};
let record = NotificationEvent {
event_version: "2.1",
event_source: "myfsio:s3",
aws_region: "local",
event_time: format_event_time(Utc::now()),
event_name: event_name.clone(),
user_identity: json!({ "principalId": if user_identity.is_empty() { "ANONYMOUS" } else { &user_identity } }),
request_parameters: json!({ "sourceIPAddress": if source_ip.is_empty() { "127.0.0.1" } else { &source_ip } }),
response_elements: json!({
"x-amz-request-id": request_id,
"x-amz-id-2": request_id,
}),
s3: json!({
"s3SchemaVersion": "1.0",
"configurationId": "notification",
"bucket": {
"name": bucket,
"ownerIdentity": { "principalId": "local" },
"arn": format!("arn:aws:s3:::{}", bucket),
},
"object": {
"key": key,
"size": size,
"eTag": etag,
"versionId": "null",
"sequencer": format!("{:016X}", Utc::now().timestamp_millis()),
}
}),
};
let payload = json!({ "Records": [record] });
let client = reqwest::Client::new();
for config in configs {
if !config.matches_event(&event_name, &key) {
continue;
}
let result = client
.post(&config.destination.url)
.header("content-type", "application/json")
.json(&payload)
.send()
.await;
if let Err(err) = result {
tracing::warn!(
"Failed to deliver notification for {} to {}: {}",
event_name,
config.destination.url,
err
);
}
}
});
}
fn format_event_time(value: DateTime<Utc>) -> String {
value.format("%Y-%m-%dT%H:%M:%S.000Z").to_string()
}
fn child_text(node: &roxmltree::Node<'_, '_>, name: &str) -> Option<String> {
node.children()
.find(|child| child.is_element() && child.tag_name().name() == name)
.and_then(|child| child.text())
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_webhook_configuration() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<WebhookConfiguration>
<Id>upload</Id>
<Event>s3:ObjectCreated:*</Event>
<Destination><Url>https://example.com/hook</Url></Destination>
<Filter>
<S3Key>
<FilterRule><Name>prefix</Name><Value>logs/</Value></FilterRule>
<FilterRule><Name>suffix</Name><Value>.txt</Value></FilterRule>
</S3Key>
</Filter>
</WebhookConfiguration>
</NotificationConfiguration>"#;
let configs = parse_notification_configurations(xml).unwrap();
assert_eq!(configs.len(), 1);
assert!(configs[0].matches_event("s3:ObjectCreated:Put", "logs/test.txt"));
assert!(!configs[0].matches_event("s3:ObjectRemoved:Delete", "logs/test.txt"));
}
}

View File

@@ -0,0 +1,128 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub const LEGAL_HOLD_METADATA_KEY: &str = "__legal_hold__";
pub const RETENTION_METADATA_KEY: &str = "__object_retention__";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum RetentionMode {
GOVERNANCE,
COMPLIANCE,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ObjectLockRetention {
pub mode: RetentionMode,
pub retain_until_date: DateTime<Utc>,
}
impl ObjectLockRetention {
pub fn is_expired(&self) -> bool {
Utc::now() > self.retain_until_date
}
}
pub fn get_object_retention(metadata: &HashMap<String, String>) -> Option<ObjectLockRetention> {
metadata
.get(RETENTION_METADATA_KEY)
.and_then(|raw| serde_json::from_str::<ObjectLockRetention>(raw).ok())
}
pub fn set_object_retention(
metadata: &mut HashMap<String, String>,
retention: &ObjectLockRetention,
) -> Result<(), String> {
let encoded = serde_json::to_string(retention).map_err(|err| err.to_string())?;
metadata.insert(RETENTION_METADATA_KEY.to_string(), encoded);
Ok(())
}
pub fn get_legal_hold(metadata: &HashMap<String, String>) -> bool {
metadata
.get(LEGAL_HOLD_METADATA_KEY)
.map(|value| value.eq_ignore_ascii_case("ON") || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn set_legal_hold(metadata: &mut HashMap<String, String>, enabled: bool) {
metadata.insert(
LEGAL_HOLD_METADATA_KEY.to_string(),
if enabled { "ON" } else { "OFF" }.to_string(),
);
}
pub fn ensure_retention_mutable(
metadata: &HashMap<String, String>,
bypass_governance: bool,
) -> Result<(), String> {
let Some(existing) = get_object_retention(metadata) else {
return Ok(());
};
if existing.is_expired() {
return Ok(());
}
match existing.mode {
RetentionMode::COMPLIANCE => Err(format!(
"Cannot modify retention on object with COMPLIANCE mode until retention expires"
)),
RetentionMode::GOVERNANCE if !bypass_governance => Err(
"Cannot modify GOVERNANCE retention without bypass-governance permission".to_string(),
),
RetentionMode::GOVERNANCE => Ok(()),
}
}
pub fn can_delete_object(
metadata: &HashMap<String, String>,
bypass_governance: bool,
) -> Result<(), String> {
if get_legal_hold(metadata) {
return Err("Object is under legal hold".to_string());
}
if let Some(retention) = get_object_retention(metadata) {
if !retention.is_expired() {
return match retention.mode {
RetentionMode::COMPLIANCE => Err(format!(
"Object is locked in COMPLIANCE mode until {}",
retention.retain_until_date.to_rfc3339()
)),
RetentionMode::GOVERNANCE if !bypass_governance => Err(format!(
"Object is locked in GOVERNANCE mode until {}",
retention.retain_until_date.to_rfc3339()
)),
RetentionMode::GOVERNANCE => Ok(()),
};
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[test]
fn legal_hold_blocks_delete() {
let mut metadata = HashMap::new();
set_legal_hold(&mut metadata, true);
let err = can_delete_object(&metadata, false).unwrap_err();
assert!(err.contains("legal hold"));
}
#[test]
fn governance_requires_bypass() {
let mut metadata = HashMap::new();
set_object_retention(
&mut metadata,
&ObjectLockRetention {
mode: RetentionMode::GOVERNANCE,
retain_until_date: Utc::now() + Duration::hours(1),
},
)
.unwrap();
assert!(can_delete_object(&metadata, false).is_err());
assert!(can_delete_object(&metadata, true).is_ok());
}
}