Stop search auto-pagination from looping on failure; accept CSRF in JSON body; make replication pause/resume idempotent

This commit is contained in:
2026-04-25 14:06:39 +08:00
parent 7e32ac2a46
commit dd1e6d0409
6 changed files with 169 additions and 61 deletions

View File

@@ -1213,6 +1213,8 @@ pub struct SearchObjectsQuery {
pub prefix: Option<String>,
#[serde(default)]
pub limit: Option<usize>,
#[serde(default)]
pub start_after: Option<String>,
}
pub async fn search_bucket_objects(
@@ -1228,14 +1230,18 @@ pub async fn search_bucket_objects(
let term = q.q.unwrap_or_default().to_lowercase();
let limit = q.limit.unwrap_or(500).clamp(1, 1000);
let prefix = q.prefix.clone().unwrap_or_default();
let start_after = q.start_after.clone().filter(|s| !s.is_empty());
if term.is_empty() {
return Json(json!({ "results": [], "truncated": false })).into_response();
return Json(json!({ "results": [], "truncated": false, "next_token": Value::Null }))
.into_response();
}
let mut results: Vec<Value> = Vec::new();
let mut truncated = false;
let mut last_match_key: Option<String> = None;
let mut token: Option<String> = None;
let mut start_after_arg = start_after;
loop {
let params = ListParams {
max_keys: 1000,
@@ -1245,7 +1251,7 @@ pub async fn search_bucket_objects(
} else {
Some(prefix.clone())
},
start_after: None,
start_after: start_after_arg.take(),
};
match state.storage.list_objects(&bucket_name, &params).await {
Ok(res) => {
@@ -1255,6 +1261,7 @@ pub async fn search_bucket_objects(
truncated = true;
break;
}
last_match_key = Some(o.key.clone());
results.push(object_json(&bucket_name, o));
}
}
@@ -1270,9 +1277,11 @@ pub async fn search_bucket_objects(
}
}
let next_token = if truncated { last_match_key } else { None };
Json(json!({
"results": results,
"truncated": truncated,
"next_token": next_token,
}))
.into_response()
}

View File

@@ -427,11 +427,17 @@ pub async fn bucket_detail(
let target_conn = replication_rule
.as_ref()
.and_then(|rule| state.connections.get(&rule.target_connection_id));
let versioning_enabled = state
let versioning_status_enum = state
.storage
.is_versioning_enabled(&bucket_name)
.get_versioning_status(&bucket_name)
.await
.unwrap_or(false);
.unwrap_or(myfsio_common::types::VersioningStatus::Disabled);
let versioning_enabled =
matches!(versioning_status_enum, myfsio_common::types::VersioningStatus::Enabled);
let versioning_suspended = matches!(
versioning_status_enum,
myfsio_common::types::VersioningStatus::Suspended
);
let encryption_config = config_encryption_to_ui(bucket_config.encryption.as_ref());
let website_config = config_website_to_ui(bucket_config.website.as_ref());
let quota = bucket_config.quota.clone();
@@ -491,12 +497,13 @@ pub async fn bucket_detail(
);
ctx.insert("has_quota", &quota.is_some());
ctx.insert("versioning_enabled", &versioning_enabled);
ctx.insert("versioning_suspended", &versioning_suspended);
ctx.insert(
"versioning_status",
&(if versioning_enabled {
"Enabled"
} else {
"Disabled"
&(match versioning_status_enum {
myfsio_common::types::VersioningStatus::Enabled => "Enabled",
myfsio_common::types::VersioningStatus::Suspended => "Suspended",
myfsio_common::types::VersioningStatus::Disabled => "Disabled",
}),
);
ctx.insert("encryption_config", &encryption_config);
@@ -2361,10 +2368,10 @@ pub async fn update_bucket_replication(
"pause" => {
let Some(mut rule) = state.replication.get_rule(&bucket_name) else {
return respond(
false,
StatusCode::NOT_FOUND,
true,
StatusCode::OK,
"No replication configuration to pause.".to_string(),
json!({ "error": "No replication configuration to pause" }),
json!({ "action": "pause", "enabled": false, "no_op": true }),
);
};
rule.enabled = false;
@@ -2379,10 +2386,10 @@ pub async fn update_bucket_replication(
"resume" => {
let Some(mut rule) = state.replication.get_rule(&bucket_name) else {
return respond(
false,
StatusCode::NOT_FOUND,
true,
StatusCode::OK,
"No replication configuration to resume.".to_string(),
json!({ "error": "No replication configuration to resume" }),
json!({ "action": "resume", "enabled": false, "no_op": true }),
);
};
rule.enabled = true;

View File

@@ -155,6 +155,8 @@ pub async fn csrf_layer(
extract_form_token(&bytes)
} else if content_type.starts_with("multipart/form-data") {
extract_multipart_token(&content_type, &bytes)
} else if content_type.starts_with("application/json") {
extract_json_token(&bytes)
} else {
None
};
@@ -194,7 +196,7 @@ pub async fn csrf_layer(
let mut resp = (
StatusCode::FORBIDDEN,
[(header::CONTENT_TYPE, "application/json")],
r#"{"error":"Invalid CSRF token"}"#,
r#"{"error":"Invalid CSRF token. Send it via the X-CSRF-Token header or a csrf_token field in the form/JSON body."}"#,
)
.into_response();
*resp.status_mut() = StatusCode::FORBIDDEN;
@@ -238,6 +240,14 @@ fn build_session_cookie(id: &str, secure: bool) -> Cookie<'static> {
cookie
}
fn extract_json_token(body: &[u8]) -> Option<String> {
let value: serde_json::Value = serde_json::from_slice(body).ok()?;
value
.get(CSRF_FIELD_NAME)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn extract_form_token(body: &[u8]) -> Option<String> {
let text = std::str::from_utf8(body).ok()?;
let prefix = format!("{}=", CSRF_FIELD_NAME);