Fix S3 read-path response-header gaps and replication-loop regression

S3 read path (handlers/mod.rs):
- Add Last-Modified + x-amz-meta-* to Range 206 responses
- Add x-amz-server-side-encryption to HEAD/?partNumber= responses
- 304 Not Modified now carries ETag, Last-Modified, version-id, cache headers
- Treat If-Match/If-Unmodified-Since and If-None-Match/If-Modified-Since
  as RFC-9110 pairs on both GET and CopyObject (date header ignored when
  ETag header is present)

Website hosting (middleware/auth.rs):
- Add ETag, Last-Modified, x-amz-server-side-encryption, and x-amz-meta-*
  to website HEAD/200/206 responses so CDN cachers can revalidate

Replication (services/replication.rs, services/s3_client.rs,
middleware/auth.rs, services/site_registry.rs):
- Detect replicated incoming writes via the authenticated principal's
  access key against the site registry's peer_inbound_access_key set.
  The auth middleware inserts a ReplicationPeerRequest extension marker
  on matched requests; handlers skip trigger_replication when set.
  Replaces a forgeable User-Agent substring check.
- Replication retry preflight now probes HeadBucket on the actual target
  bucket (not ListBuckets) and treats any HTTP response as reachable, so
  bucket-scoped credentials no longer block valid retries
- Populate ReplicationFailure.last_error_code from SdkError metadata
- Health probes use a max_attempts=1 client (fast-fail) rather than the
  production retry budget
This commit is contained in:
2026-04-27 23:41:30 +08:00
parent 05a30d2227
commit 4d923df16c
9 changed files with 301 additions and 60 deletions

12
Cargo.lock generated
View File

@@ -2660,7 +2660,7 @@ dependencies = [
[[package]]
name = "myfsio-auth"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes",
"base64",
@@ -2685,7 +2685,7 @@ dependencies = [
[[package]]
name = "myfsio-common"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"chrono",
"serde",
@@ -2696,7 +2696,7 @@ dependencies = [
[[package]]
name = "myfsio-crypto"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes-gcm",
"base64",
@@ -2717,7 +2717,7 @@ dependencies = [
[[package]]
name = "myfsio-server"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"aes-gcm",
"async-trait",
@@ -2775,7 +2775,7 @@ dependencies = [
[[package]]
name = "myfsio-storage"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"chrono",
"dashmap",
@@ -2799,7 +2799,7 @@ dependencies = [
[[package]]
name = "myfsio-xml"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"chrono",
"myfsio-common",

View File

@@ -10,7 +10,7 @@ members = [
]
[workspace.package]
version = "0.5.0"
version = "0.5.1"
edition = "2021"
[workspace.dependencies]

View File

@@ -159,6 +159,19 @@ fn trigger_replication(state: &AppState, bucket: &str, key: &str, action: &str)
});
}
fn trigger_replication_for_request(
state: &AppState,
peer_marker: Option<&crate::middleware::ReplicationPeerRequest>,
bucket: &str,
key: &str,
action: &str,
) {
if peer_marker.is_some() {
return;
}
trigger_replication(state, bucket, key, action);
}
async fn ensure_object_lock_allows_write(
state: &AppState,
bucket: &str,
@@ -302,6 +315,7 @@ pub async fn create_bucket(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
body: Body,
) -> Response {
@@ -311,6 +325,7 @@ pub async fn create_bucket(
State(state),
Path((host_bucket, bucket)),
Query(ObjectQuery::default()),
peer,
headers,
body,
)
@@ -728,15 +743,18 @@ pub async fn post_bucket(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
body: Body,
) -> Response {
let peer_marker = peer.as_ref().map(|e| &e.0);
if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
if host_bucket != bucket {
return post_object(
State(state),
Path((host_bucket, bucket)),
Query(ObjectQuery::default()),
peer,
headers,
body,
)
@@ -745,7 +763,7 @@ pub async fn post_bucket(
}
if query.delete.is_some() {
return delete_objects_handler(&state, &bucket, body).await;
return delete_objects_handler(&state, &bucket, peer_marker, body).await;
}
if let Some(ct) = headers.get("content-type").and_then(|v| v.to_str().ok()) {
@@ -762,6 +780,7 @@ pub async fn delete_bucket(
State(state): State<AppState>,
Path(bucket): Path<String>,
Query(query): Query<BucketQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
) -> Response {
if let Some(host_bucket) = virtual_host_bucket_from_headers(&state, &headers).await {
@@ -770,6 +789,7 @@ pub async fn delete_bucket(
State(state),
Path((host_bucket, bucket)),
Query(ObjectQuery::default()),
peer,
headers,
)
.await;
@@ -1365,9 +1385,11 @@ pub async fn put_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
body: Body,
) -> Response {
let peer_marker = peer.as_ref().map(|e| &e.0);
if query.tagging.is_some() {
return config::put_object_tagging(&state, &bucket, &key, body).await;
}
@@ -1417,7 +1439,15 @@ pub async fn put_object(
.get("x-amz-copy-source")
.and_then(|v| v.to_str().ok())
{
return copy_object_handler(&state, copy_source, &bucket, &key, &headers).await;
return copy_object_handler(
&state,
copy_source,
&bucket,
&key,
peer_marker,
&headers,
)
.await;
}
if let Err(response) =
@@ -1575,7 +1605,13 @@ pub async fn put_object(
"",
"Put",
);
trigger_replication(&state, &bucket, &key, "write");
trigger_replication_for_request(
&state,
peer_marker,
&bucket,
&key,
"write",
);
return (StatusCode::OK, resp_headers).into_response();
}
Err(e) => {
@@ -1615,7 +1651,7 @@ pub async fn put_object(
"",
"Put",
);
trigger_replication(&state, &bucket, &key, "write");
trigger_replication_for_request(&state, peer_marker, &bucket, &key, "write");
(StatusCode::OK, resp_headers).into_response()
}
Err(e) => storage_err_response(e),
@@ -1837,15 +1873,26 @@ pub async fn post_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
body: Body,
) -> Response {
let peer_marker = peer.as_ref().map(|e| &e.0);
if query.uploads.is_some() {
return initiate_multipart_handler(&state, &bucket, &key).await;
}
if let Some(ref upload_id) = query.upload_id {
return complete_multipart_handler(&state, &bucket, &key, upload_id, &headers, body).await;
return complete_multipart_handler(
&state,
&bucket,
&key,
upload_id,
peer_marker,
&headers,
body,
)
.await;
}
if query.select.is_some() {
@@ -1859,8 +1906,10 @@ pub async fn delete_object(
State(state): State<AppState>,
Path((bucket, key)): Path<(String, String)>,
Query(query): Query<ObjectQuery>,
peer: Option<axum::extract::Extension<crate::middleware::ReplicationPeerRequest>>,
headers: HeaderMap,
) -> Response {
let peer_marker = peer.as_ref().map(|e| &e.0);
if query.tagging.is_some() {
return config::delete_object_tagging(&state, &bucket, &key).await;
}
@@ -1895,7 +1944,7 @@ pub async fn delete_object(
resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap());
}
notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
trigger_replication(&state, &bucket, &key, "delete");
trigger_replication_for_request(&state, peer_marker, &bucket, &key, "delete");
(StatusCode::NO_CONTENT, resp_headers).into_response()
}
Err(e) => storage_err_response(e),
@@ -1920,7 +1969,7 @@ pub async fn delete_object(
resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap());
}
notifications::emit_object_removed(&state, &bucket, &key, "", "", "", "Delete");
trigger_replication(&state, &bucket, &key, "delete");
trigger_replication_for_request(&state, peer_marker, &bucket, &key, "delete");
(StatusCode::NO_CONTENT, resp_headers).into_response()
}
Err(e) => storage_err_response(e),
@@ -1979,6 +2028,13 @@ pub async fn head_object(
.unwrap(),
);
headers.insert("accept-ranges", "bytes".parse().unwrap());
if let Some(enc_info) = myfsio_crypto::encryption::EncryptionMetadata::from_metadata(
&meta.internal_metadata,
) {
if let Ok(alg) = enc_info.algorithm.as_str().parse() {
headers.insert("x-amz-server-side-encryption", alg);
}
}
apply_stored_response_headers(&mut headers, &meta.internal_metadata);
apply_stored_checksum_headers(&mut headers, &meta.internal_metadata);
if let Some(ref requested_version) = query.version_id {
@@ -2040,6 +2096,13 @@ fn build_part_response_headers(
.unwrap(),
);
headers.insert("accept-ranges", "bytes".parse().unwrap());
if let Some(enc_info) =
myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&meta.internal_metadata)
{
if let Ok(alg) = enc_info.algorithm.as_str().parse() {
headers.insert("x-amz-server-side-encryption", alg);
}
}
apply_stored_response_headers(&mut headers, &meta.internal_metadata);
if let Some(ref requested_version) = query.version_id {
if let Ok(value) = requested_version.parse() {
@@ -2253,6 +2316,7 @@ async fn complete_multipart_handler(
bucket: &str,
key: &str,
upload_id: &str,
peer_marker: Option<&crate::middleware::ReplicationPeerRequest>,
headers: &HeaderMap,
body: Body,
) -> Response {
@@ -2369,7 +2433,7 @@ async fn complete_multipart_handler(
etag,
&format!("/{}/{}", bucket, key),
);
trigger_replication(state, bucket, key, "write");
trigger_replication_for_request(state, peer_marker, bucket, key, "write");
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
Err(e) => storage_err_response(e),
@@ -2492,6 +2556,7 @@ async fn copy_object_handler(
copy_source: &str,
dst_bucket: &str,
dst_key: &str,
peer_marker: Option<&crate::middleware::ReplicationPeerRequest>,
headers: &HeaderMap,
) -> Response {
if let Err(response) =
@@ -2667,14 +2732,19 @@ async fn copy_object_handler(
let etag = meta.etag.as_deref().unwrap_or("");
let last_modified = myfsio_xml::response::format_s3_datetime(&meta.last_modified);
let xml = myfsio_xml::response::copy_object_result_xml(etag, &last_modified);
trigger_replication(state, dst_bucket, dst_key, "write");
trigger_replication_for_request(state, peer_marker, dst_bucket, dst_key, "write");
(StatusCode::OK, [("content-type", "application/xml")], xml).into_response()
}
Err(e) => storage_err_response(e),
}
}
async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> Response {
async fn delete_objects_handler(
state: &AppState,
bucket: &str,
peer_marker: Option<&crate::middleware::ReplicationPeerRequest>,
body: Body,
) -> Response {
let body_bytes = match http_body_util::BodyExt::collect(body).await {
Ok(collected) => collected.to_bytes(),
Err(_) => {
@@ -2784,7 +2854,7 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
match result {
Ok(outcome) => {
notifications::emit_object_removed(state, bucket, &key, "", "", "", "Delete");
trigger_replication(state, bucket, &key, "delete");
trigger_replication_for_request(state, peer_marker, bucket, &key, "delete");
let delete_marker_version_id = if outcome.is_delete_marker {
outcome.version_id.clone()
} else {
@@ -3032,6 +3102,14 @@ async fn stream_partial_content(
headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
insert_content_type(&mut headers, key, meta.content_type.as_deref());
headers.insert(
"last-modified",
meta.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string()
.parse()
.unwrap(),
);
headers.insert("accept-ranges", "bytes".parse().unwrap());
if let Some(alg) = enc_header {
headers.insert("x-amz-server-side-encryption", alg.parse().unwrap());
@@ -3050,6 +3128,7 @@ async fn stream_partial_content(
}
}
apply_user_metadata(&mut headers, &meta.metadata);
apply_response_overrides(&mut headers, query);
if let Some(count) = parts_count {
@@ -3063,15 +3142,16 @@ fn evaluate_get_preconditions(
headers: &HeaderMap,
meta: &myfsio_common::types::ObjectMeta,
) -> Option<Response> {
if let Some(value) = headers.get("if-match").and_then(|v| v.to_str().ok()) {
let if_match = headers.get("if-match").and_then(|v| v.to_str().ok());
let if_none_match = headers.get("if-none-match").and_then(|v| v.to_str().ok());
if let Some(value) = if_match {
if !etag_condition_matches(value, meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
} else if let Some(value) = headers
.get("if-unmodified-since")
.and_then(|v| v.to_str().ok())
{
@@ -3084,19 +3164,17 @@ fn evaluate_get_preconditions(
}
}
if let Some(value) = headers.get("if-none-match").and_then(|v| v.to_str().ok()) {
if let Some(value) = if_none_match {
if etag_condition_matches(value, meta.etag.as_deref()) {
return Some(StatusCode::NOT_MODIFIED.into_response());
return Some(not_modified_response(meta));
}
}
if let Some(value) = headers
} else if let Some(value) = headers
.get("if-modified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if meta.last_modified <= t {
return Some(StatusCode::NOT_MODIFIED.into_response());
return Some(not_modified_response(meta));
}
}
}
@@ -3104,6 +3182,28 @@ fn evaluate_get_preconditions(
None
}
fn not_modified_response(meta: &myfsio_common::types::ObjectMeta) -> Response {
let mut headers = HeaderMap::new();
if let Some(ref etag) = meta.etag {
headers.insert("etag", format!("\"{}\"", etag).parse().unwrap());
}
headers.insert(
"last-modified",
meta.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string()
.parse()
.unwrap(),
);
if let Some(ref vid) = meta.version_id {
if let Ok(value) = vid.parse() {
headers.insert("x-amz-version-id", value);
}
}
apply_stored_response_headers(&mut headers, &meta.internal_metadata);
(StatusCode::NOT_MODIFIED, headers).into_response()
}
async fn evaluate_put_preconditions(
state: &AppState,
bucket: &str,
@@ -3152,34 +3252,25 @@ fn evaluate_copy_preconditions(
headers: &HeaderMap,
source_meta: &myfsio_common::types::ObjectMeta,
) -> Option<Response> {
if let Some(value) = headers
let if_match = headers
.get("x-amz-copy-source-if-match")
.and_then(|v| v.to_str().ok())
{
.and_then(|v| v.to_str().ok());
let if_none_match = headers
.get("x-amz-copy-source-if-none-match")
.and_then(|v| v.to_str().ok());
if let Some(value) = if_match {
if !etag_condition_matches(value, source_meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-none-match")
.and_then(|v| v.to_str().ok())
{
if etag_condition_matches(value, source_meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-modified-since")
} else if let Some(value) = headers
.get("x-amz-copy-source-if-unmodified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if source_meta.last_modified <= t {
if source_meta.last_modified > t {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
@@ -3187,12 +3278,18 @@ fn evaluate_copy_preconditions(
}
}
if let Some(value) = headers
.get("x-amz-copy-source-if-unmodified-since")
if let Some(value) = if_none_match {
if etag_condition_matches(value, source_meta.etag.as_deref()) {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));
}
} else if let Some(value) = headers
.get("x-amz-copy-source-if-modified-since")
.and_then(|v| v.to_str().ok())
{
if let Some(t) = parse_http_date(value) {
if source_meta.last_modified > t {
if source_meta.last_modified <= t {
return Some(s3_error_response(S3Error::from_code(
S3ErrorCode::PreconditionFailed,
)));

View File

@@ -2067,6 +2067,7 @@ pub async fn upload_object(
State(state),
Path((bucket_name.clone(), key.clone())),
Query(ObjectQuery::default()),
None,
upload_headers,
Body::from(bytes),
)

View File

@@ -162,6 +162,39 @@ fn parse_website_config(value: &Value) -> Option<(String, Option<String>)> {
}
}
fn apply_website_object_headers(
headers: &mut HeaderMap,
meta: &myfsio_common::types::ObjectMeta,
) {
if let Some(ref etag) = meta.etag {
if let Ok(value) = format!("\"{}\"", etag).parse() {
headers.insert(header::ETAG, value);
}
}
if let Ok(value) = meta
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string()
.parse()
{
headers.insert(header::LAST_MODIFIED, value);
}
if let Some(enc_info) =
myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&meta.internal_metadata)
{
if let Ok(value) = enc_info.algorithm.as_str().parse() {
headers.insert("x-amz-server-side-encryption", value);
}
}
for (k, v) in &meta.metadata {
if let Ok(header_val) = v.parse() {
if let Ok(name) = format!("x-amz-meta-{}", k).parse::<axum::http::HeaderName>() {
headers.insert(name, header_val);
}
}
}
}
async fn serve_website_document(
state: &AppState,
bucket: &str,
@@ -182,6 +215,7 @@ async fn serve_website_document(
meta.size.to_string().parse().unwrap(),
);
headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
apply_website_object_headers(&mut headers, &meta);
return Some((status, headers).into_response());
}
@@ -193,6 +227,7 @@ async fn serve_website_document(
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
apply_website_object_headers(&mut headers, &meta);
if status == StatusCode::OK {
if let Some(range_header) = range_header {
@@ -501,6 +536,12 @@ pub async fn auth_layer(State(state): State<AppState>, mut req: Request, next: N
{
error_response(err, &auth_path)
} else {
if let Some(registry) = state.site_registry.as_ref() {
if registry.is_peer_inbound_access_key(&principal.access_key) {
req.extensions_mut()
.insert(crate::middleware::ReplicationPeerRequest);
}
}
req.extensions_mut().insert(principal);
wrap_body_for_sha256_verification(&mut req);
next.run(req).await

View File

@@ -9,6 +9,9 @@ pub use bucket_cors::bucket_cors_layer;
pub use ratelimit::{rate_limit_layer, RateLimitLayerState};
pub use session::{csrf_layer, session_layer, SessionHandle, SessionLayerState};
#[derive(Clone, Copy, Debug)]
pub struct ReplicationPeerRequest;
use axum::extract::{Request, State};
use axum::middleware::Next;
use axum::response::Response;

View File

@@ -12,7 +12,10 @@ use myfsio_common::types::ListParams;
use myfsio_storage::fs_backend::{metadata_is_corrupted, FsStorageBackend};
use myfsio_storage::traits::StorageEngine;
use crate::services::s3_client::{build_client, check_endpoint_health, ClientOptions};
use crate::services::s3_client::{
build_client, build_health_client, check_endpoint_health, check_target_bucket_reachable,
ClientOptions,
};
use crate::stores::connections::{ConnectionStore, RemoteConnection};
pub const MODE_NEW_ONLY: &str = "new_only";
@@ -347,7 +350,10 @@ impl ReplicationManager {
return 0;
}
};
if !self.check_endpoint(&connection).await {
if !self
.check_target_bucket(&connection, &rule.target_bucket)
.await
{
tracing::warn!(
"Cannot replicate existing objects for {}: endpoint {} is unreachable",
bucket,
@@ -459,6 +465,7 @@ impl ReplicationManager {
self.failures.remove(bucket, object_key);
}
Err(err) => {
let code = sdk_error_code(&err);
let msg = format!("{:?}", err);
tracing::error!(
"Replication DELETE failed {}/{}: {}",
@@ -475,7 +482,7 @@ impl ReplicationManager {
failure_count: 1,
bucket_name: bucket.to_string(),
action: "delete".to_string(),
last_error_code: None,
last_error_code: code,
},
);
}
@@ -562,6 +569,7 @@ impl ReplicationManager {
self.failures.remove(bucket, object_key);
}
Err(err) => {
let code = upload_error_code(&err);
let msg = err.to_string();
tracing::error!("Replication failed {}/{}: {}", bucket, object_key, msg);
self.failures.add(
@@ -573,7 +581,7 @@ impl ReplicationManager {
failure_count: 1,
bucket_name: bucket.to_string(),
action: action.to_string(),
last_error_code: None,
last_error_code: code,
},
);
}
@@ -581,10 +589,15 @@ impl ReplicationManager {
}
pub async fn check_endpoint(&self, conn: &RemoteConnection) -> bool {
let client = build_client(conn, &self.client_options);
let client = build_health_client(conn, &self.client_options);
check_endpoint_health(&client).await
}
pub async fn check_target_bucket(&self, conn: &RemoteConnection, target_bucket: &str) -> bool {
let client = build_health_client(conn, &self.client_options);
check_target_bucket_reachable(&client, target_bucket).await
}
pub async fn retry_failed(&self, bucket: &str, object_key: &str) -> bool {
let failure = match self.failures.get(bucket, object_key) {
Some(f) => f,
@@ -598,6 +611,15 @@ impl ReplicationManager {
Some(c) => c,
None => return false,
};
if !self.check_target_bucket(&conn, &rule.target_bucket).await {
tracing::warn!(
"Cannot retry {}/{}: endpoint {} is not reachable",
bucket,
object_key,
conn.endpoint_url
);
return false;
}
self.replicate_task(bucket, object_key, &rule, &conn, &failure.action)
.await;
true
@@ -616,6 +638,15 @@ impl ReplicationManager {
Some(c) => c,
None => return (0, failures.len()),
};
if !self.check_target_bucket(&conn, &rule.target_bucket).await {
tracing::warn!(
"Cannot retry {} failure(s) in {}: endpoint {} is not reachable",
failures.len(),
bucket,
conn.endpoint_url
);
return (0, failures.len());
}
let mut submitted = 0;
for failure in failures {
self.replicate_task(bucket, &failure.object_key, &rule, &conn, &failure.action)
@@ -675,6 +706,24 @@ fn is_no_such_bucket<E: std::fmt::Debug>(err: &E) -> bool {
text.contains("NoSuchBucket")
}
fn sdk_error_code<E, R>(err: &aws_sdk_s3::error::SdkError<E, R>) -> Option<String>
where
E: aws_sdk_s3::error::ProvideErrorMetadata,
{
if let aws_sdk_s3::error::SdkError::ServiceError(svc) = err {
if let Some(code) = svc.err().code() {
return Some(code.to_string());
}
}
None
}
fn upload_error_code(
err: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::put_object::PutObjectError>,
) -> Option<String> {
sdk_error_code(err)
}
async fn upload_object(
client: &aws_sdk_s3::Client,
bucket: &str,

View File

@@ -2,11 +2,13 @@ use std::time::Duration;
use aws_config::BehaviorVersion;
use aws_credential_types::Credentials;
use aws_sdk_s3::config::{Region, SharedCredentialsProvider};
use aws_sdk_s3::config::{AppName, Region, SharedCredentialsProvider};
use aws_sdk_s3::Client;
use crate::stores::connections::RemoteConnection;
pub const REPLICATION_USER_AGENT_TAG: &str = "MyFSIO-Replication";
pub struct ClientOptions {
pub connect_timeout: Duration,
pub read_timeout: Duration,
@@ -40,17 +42,29 @@ pub fn build_client(connection: &RemoteConnection, options: &ClientOptions) -> C
let retry_config =
aws_smithy_types::retry::RetryConfig::standard().with_max_attempts(options.max_attempts);
let config = aws_sdk_s3::config::Builder::new()
let mut builder = aws_sdk_s3::config::Builder::new()
.behavior_version(BehaviorVersion::latest())
.credentials_provider(SharedCredentialsProvider::new(credentials))
.region(Region::new(connection.region.clone()))
.endpoint_url(connection.endpoint_url.clone())
.force_path_style(true)
.timeout_config(timeout_config)
.retry_config(retry_config)
.build();
.retry_config(retry_config);
Client::from_conf(config)
if let Ok(app_name) = AppName::new(REPLICATION_USER_AGENT_TAG) {
builder = builder.app_name(app_name);
}
Client::from_conf(builder.build())
}
pub fn build_health_client(connection: &RemoteConnection, options: &ClientOptions) -> Client {
let fast_fail = ClientOptions {
connect_timeout: options.connect_timeout,
read_timeout: options.read_timeout,
max_attempts: 1,
};
build_client(connection, &fast_fail)
}
pub async fn check_endpoint_health(client: &Client) -> bool {
@@ -62,3 +76,28 @@ pub async fn check_endpoint_health(client: &Client) -> bool {
}
}
}
pub async fn check_target_bucket_reachable(client: &Client, target_bucket: &str) -> bool {
match client.head_bucket().bucket(target_bucket).send().await {
Ok(_) => true,
Err(err) => match &err {
aws_sdk_s3::error::SdkError::ServiceError(_)
| aws_sdk_s3::error::SdkError::ResponseError(_) => {
tracing::debug!(
"Target-bucket reachability probe for {} got a server response; treating as reachable: {:?}",
target_bucket,
err
);
true
}
_ => {
tracing::warn!(
"Target-bucket reachability probe for {} failed at transport layer: {:?}",
target_bucket,
err
);
false
}
},
}
}

View File

@@ -147,4 +147,15 @@ impl SiteRegistry {
drop(data);
self.save();
}
pub fn is_peer_inbound_access_key(&self, access_key: &str) -> bool {
if access_key.is_empty() {
return false;
}
self.data
.read()
.peers
.iter()
.any(|p| p.peer_inbound_access_key.as_deref() == Some(access_key))
}
}