Fix integrity auto-heal data-loss bug, return 422 ObjectCorrupted, lock heal swap, verify multipart peer body

This commit is contained in:
2026-04-25 19:29:54 +08:00
parent 660c328a84
commit 777d862a02
19 changed files with 634 additions and 365 deletions

View File

@@ -69,7 +69,7 @@ impl S3ErrorCode {
Self::NoSuchUpload => 404,
Self::NoSuchVersion => 404,
Self::NoSuchTagSet => 404,
Self::ObjectCorrupted => 500,
Self::ObjectCorrupted => 422,
Self::PreconditionFailed => 412,
Self::NotModified => 304,
Self::QuotaExceeded => 403,

View File

@@ -152,7 +152,10 @@ impl VersioningStatus {
}
pub fn is_active(self) -> bool {
matches!(self, VersioningStatus::Enabled | VersioningStatus::Suspended)
matches!(
self,
VersioningStatus::Enabled | VersioningStatus::Suspended
)
}
}

View File

@@ -248,12 +248,9 @@ impl ServerConfig {
parse_rate_limit_env("RATE_LIMIT_DEFAULT", RateLimitSetting::new(5000, 60));
let ratelimit_list_buckets =
parse_rate_limit_env("RATE_LIMIT_LIST_BUCKETS", ratelimit_default);
let ratelimit_bucket_ops =
parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default);
let ratelimit_object_ops =
parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default);
let ratelimit_head_ops =
parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default);
let ratelimit_bucket_ops = parse_rate_limit_env("RATE_LIMIT_BUCKET_OPS", ratelimit_default);
let ratelimit_object_ops = parse_rate_limit_env("RATE_LIMIT_OBJECT_OPS", ratelimit_default);
let ratelimit_head_ops = parse_rate_limit_env("RATE_LIMIT_HEAD_OPS", ratelimit_default);
let ratelimit_admin =
parse_rate_limit_env("RATE_LIMIT_ADMIN", RateLimitSetting::new(60, 60));
let ratelimit_storage_uri =

View File

@@ -1059,7 +1059,16 @@ pub async fn delete_logging(state: &AppState, bucket: &str) -> Response {
fn s3_error_response(code: S3ErrorCode, message: &str, status: StatusCode) -> Response {
let err = S3Error::new(code, message.to_string());
(status, [("content-type", "application/xml")], err.to_xml()).into_response()
let code_str = code.as_str();
(
status,
[
("content-type", "application/xml"),
("x-amz-error-code", code_str),
],
err.to_xml(),
)
.into_response()
}
pub async fn list_object_versions(

View File

@@ -66,11 +66,20 @@ fn s3_error_response(err: S3Error) -> Response {
} else {
err.resource.clone()
};
let code_str = err.code.as_str();
let body = err
.with_resource(resource)
.with_request_id(uuid::Uuid::new_v4().simple().to_string())
.to_xml();
(status, [("content-type", "application/xml")], body).into_response()
(
status,
[
("content-type", "application/xml"),
("x-amz-error-code", code_str),
],
body,
)
.into_response()
}
fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response {
@@ -91,14 +100,17 @@ fn storage_err_response(err: myfsio_storage::error::StorageError) -> Response {
let s3_err = S3Error::from_code(S3ErrorCode::NoSuchKey)
.with_resource(format!("/{}/{}", bucket, key))
.with_request_id(uuid::Uuid::new_v4().simple().to_string());
let status = StatusCode::from_u16(s3_err.http_status())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let status =
StatusCode::from_u16(s3_err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut resp_headers = HeaderMap::new();
resp_headers.insert("x-amz-delete-marker", "true".parse().unwrap());
if let Ok(vid) = version_id.parse() {
resp_headers.insert("x-amz-version-id", vid);
}
resp_headers.insert("content-type", "application/xml".parse().unwrap());
if let Ok(code_hdr) = s3_err.code.as_str().parse() {
resp_headers.insert("x-amz-error-code", code_hdr);
}
return (status, resp_headers, s3_err.to_xml()).into_response();
}
s3_error_response(S3Error::from(err))
@@ -118,8 +130,8 @@ fn io_error_to_s3_response(err: &std::io::Error) -> Option<Response> {
|| lower.contains("is a directory")
|| lower.contains("file exists")
|| lower.contains("directory not empty");
let hit_name_too_long = matches!(err.kind(), ErrorKind::InvalidFilename)
|| lower.contains("file name too long");
let hit_name_too_long =
matches!(err.kind(), ErrorKind::InvalidFilename) || lower.contains("file name too long");
if !hit_collision && !hit_name_too_long {
return None;
}
@@ -1118,9 +1130,7 @@ fn has_upload_checksum(headers: &HeaderMap) -> bool {
}
fn persist_additional_checksums(headers: &HeaderMap, metadata: &mut HashMap<String, String>) {
for algo in [
"sha256", "sha1", "crc32", "crc32c", "crc64nvme",
] {
for algo in ["sha256", "sha1", "crc32", "crc32c", "crc64nvme"] {
let header_name = format!("x-amz-checksum-{}", algo);
if let Some(value) = headers.get(&header_name).and_then(|v| v.to_str().ok()) {
let trimmed = value.trim();
@@ -1141,9 +1151,7 @@ fn persist_additional_checksums(headers: &HeaderMap, metadata: &mut HashMap<Stri
}
fn apply_stored_checksum_headers(resp_headers: &mut HeaderMap, metadata: &HashMap<String, String>) {
for algo in [
"sha256", "sha1", "crc32", "crc32c", "crc64nvme",
] {
for algo in ["sha256", "sha1", "crc32", "crc32c", "crc64nvme"] {
if let Some(value) = metadata.get(&format!("__checksum_{}__", algo)) {
if let Ok(parsed) = value.parse() {
resp_headers.insert(
@@ -1644,64 +1652,61 @@ pub async fn get_object(
return resp;
}
let enc_info = myfsio_crypto::encryption::EncryptionMetadata::from_metadata(
&snap_meta.internal_metadata,
);
let enc_info =
myfsio_crypto::encryption::EncryptionMetadata::from_metadata(&snap_meta.internal_metadata);
let (file, file_size, enc_header): (tokio::fs::File, u64, Option<&str>) = match (
enc_info.as_ref(),
state.encryption.as_ref(),
) {
(Some(enc_info), Some(enc_svc)) => {
let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4()));
let customer_key = extract_sse_c_key(&headers);
let decrypt_res = enc_svc
.decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref())
.await;
// Hardlink served its purpose; the decrypted plaintext is in
// dec_tmp now.
let _ = tokio::fs::remove_file(&snap_link).await;
if let Err(e) = decrypt_res {
let _ = tokio::fs::remove_file(&dec_tmp).await;
let (file, file_size, enc_header): (tokio::fs::File, u64, Option<&str>) =
match (enc_info.as_ref(), state.encryption.as_ref()) {
(Some(enc_info), Some(enc_svc)) => {
let dec_tmp = tmp_dir.join(format!("dec-{}", uuid::Uuid::new_v4()));
let customer_key = extract_sse_c_key(&headers);
let decrypt_res = enc_svc
.decrypt_object(&snap_link, &dec_tmp, enc_info, customer_key.as_deref())
.await;
// Hardlink served its purpose; the decrypted plaintext is in
// dec_tmp now.
let _ = tokio::fs::remove_file(&snap_link).await;
if let Err(e) = decrypt_res {
let _ = tokio::fs::remove_file(&dec_tmp).await;
return s3_error_response(S3Error::new(
myfsio_common::error::S3ErrorCode::InternalError,
format!("Decryption failed: {}", e),
));
}
let file = match open_self_deleting(dec_tmp.clone()).await {
Ok(f) => f,
Err(e) => {
let _ = tokio::fs::remove_file(&dec_tmp).await;
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
};
let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0);
(file, file_size, Some(enc_info.algorithm.as_str()))
}
(Some(_), None) => {
// Snapshot is encrypted but the server has no encryption
// service configured to decrypt it. Serving ciphertext as
// plaintext would be actively wrong; refuse explicitly.
let _ = tokio::fs::remove_file(&snap_link).await;
return s3_error_response(S3Error::new(
myfsio_common::error::S3ErrorCode::InternalError,
format!("Decryption failed: {}", e),
"Object is encrypted but encryption service is disabled".to_string(),
));
}
let file = match open_self_deleting(dec_tmp.clone()).await {
Ok(f) => f,
Err(e) => {
let _ = tokio::fs::remove_file(&dec_tmp).await;
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
};
let file_size = file.metadata().await.map(|m| m.len()).unwrap_or(0);
(file, file_size, Some(enc_info.algorithm.as_str()))
}
(Some(_), None) => {
// Snapshot is encrypted but the server has no encryption
// service configured to decrypt it. Serving ciphertext as
// plaintext would be actively wrong; refuse explicitly.
let _ = tokio::fs::remove_file(&snap_link).await;
return s3_error_response(S3Error::new(
myfsio_common::error::S3ErrorCode::InternalError,
"Object is encrypted but encryption service is disabled".to_string(),
));
}
(None, _) => {
// Raw path: stream directly from the hardlink, which becomes
// self-deleting on open (kernel keeps the inode alive via our
// fd).
let file = match open_self_deleting(snap_link.clone()).await {
Ok(f) => f,
Err(e) => {
let _ = tokio::fs::remove_file(&snap_link).await;
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
};
(file, snap_meta.size, None)
}
};
(None, _) => {
// Raw path: stream directly from the hardlink, which becomes
// self-deleting on open (kernel keeps the inode alive via our
// fd).
let file = match open_self_deleting(snap_link.clone()).await {
Ok(f) => f,
Err(e) => {
let _ = tokio::fs::remove_file(&snap_link).await;
return storage_err_response(myfsio_storage::error::StorageError::Io(e));
}
};
(file, snap_meta.size, None)
}
};
let stream = ReaderStream::with_capacity(file, stream_cap);
let body = Body::from_stream(stream);
@@ -2470,86 +2475,72 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
use futures::stream::{self, StreamExt};
let results: Vec<(String, Option<String>, Result<myfsio_common::types::DeleteOutcome, (String, String)>)> =
stream::iter(parsed.objects.iter().cloned())
.map(|obj| {
let state = state.clone();
let bucket = bucket.to_string();
async move {
let key = obj.key.clone();
let requested_vid = obj.version_id.clone();
let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() {
Some(version_id) if version_id != "null" => match state
.storage
.get_object_version_metadata(&bucket, &obj.key, version_id)
.await
{
Ok(metadata) => object_lock::can_delete_object(&metadata, false)
.map_err(|m| {
(S3ErrorCode::AccessDenied.as_str().to_string(), m)
}),
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
},
_ => match state.storage.head_object(&bucket, &obj.key).await {
Ok(_) => {
match state
.storage
.get_object_metadata(&bucket, &obj.key)
.await
{
Ok(metadata) => object_lock::can_delete_object(&metadata, false)
.map_err(|m| {
(
S3ErrorCode::AccessDenied.as_str().to_string(),
m,
)
}),
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
}
}
Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => {
Ok(())
}
Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => {
Ok(())
}
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
},
};
let result = match lock_check {
Err(e) => Err(e),
Ok(()) => {
let outcome = match obj.version_id.as_deref() {
Some(version_id) if version_id != "null" => {
state
.storage
.delete_object_version(&bucket, &obj.key, version_id)
.await
}
_ => state.storage.delete_object(&bucket, &obj.key).await,
};
outcome.map_err(|e| {
let s3err = S3Error::from(e);
(s3err.code.as_str().to_string(), s3err.message)
})
let results: Vec<(
String,
Option<String>,
Result<myfsio_common::types::DeleteOutcome, (String, String)>,
)> = stream::iter(parsed.objects.iter().cloned())
.map(|obj| {
let state = state.clone();
let bucket = bucket.to_string();
async move {
let key = obj.key.clone();
let requested_vid = obj.version_id.clone();
let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() {
Some(version_id) if version_id != "null" => match state
.storage
.get_object_version_metadata(&bucket, &obj.key, version_id)
.await
{
Ok(metadata) => object_lock::can_delete_object(&metadata, false)
.map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)),
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
};
(key, requested_vid, result)
}
})
.buffer_unordered(32)
.collect()
.await;
},
_ => match state.storage.head_object(&bucket, &obj.key).await {
Ok(_) => match state.storage.get_object_metadata(&bucket, &obj.key).await {
Ok(metadata) => object_lock::can_delete_object(&metadata, false)
.map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m)),
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
},
Err(myfsio_storage::error::StorageError::ObjectNotFound { .. }) => Ok(()),
Err(myfsio_storage::error::StorageError::DeleteMarker { .. }) => Ok(()),
Err(err) => {
let s3err = S3Error::from(err);
Err((s3err.code.as_str().to_string(), s3err.message))
}
},
};
let result = match lock_check {
Err(e) => Err(e),
Ok(()) => {
let outcome = match obj.version_id.as_deref() {
Some(version_id) if version_id != "null" => {
state
.storage
.delete_object_version(&bucket, &obj.key, version_id)
.await
}
_ => state.storage.delete_object(&bucket, &obj.key).await,
};
outcome.map_err(|e| {
let s3err = S3Error::from(e);
(s3err.code.as_str().to_string(), s3err.message)
})
}
};
(key, requested_vid, result)
}
})
.buffer_unordered(32)
.collect()
.await;
let mut deleted: Vec<myfsio_xml::response::DeletedEntry> = Vec::new();
let mut errors: Vec<(String, String, String)> = Vec::new();
@@ -2628,8 +2619,8 @@ async fn range_get_handler(
match (enc_info.as_ref(), state.encryption.as_ref()) {
(Some(enc_info), Some(enc_svc)) => {
let customer_key = extract_sse_c_key(headers);
let has_fast_path = enc_info.chunk_size.is_some()
&& enc_info.plaintext_size.is_some();
let has_fast_path =
enc_info.chunk_size.is_some() && enc_info.plaintext_size.is_some();
if has_fast_path {
let plaintext_size = enc_info.plaintext_size.unwrap();

View File

@@ -511,11 +511,20 @@ fn s3_error_response(err: S3Error) -> Response {
} else {
err.resource.clone()
};
let code_str = err.code.as_str();
let body = err
.with_resource(resource)
.with_request_id(uuid::Uuid::new_v4().simple().to_string())
.to_xml();
(status, [("content-type", "application/xml")], body).into_response()
(
status,
[
("content-type", "application/xml"),
("x-amz-error-code", code_str),
],
body,
)
.into_response()
}
fn build_stats_xml(bytes_scanned: usize, bytes_returned: usize) -> String {

View File

@@ -129,10 +129,10 @@ fn storage_status(err: &StorageError) -> StatusCode {
| StorageError::QuotaExceeded(_) => StatusCode::BAD_REQUEST,
StorageError::BucketAlreadyExists(_) => StatusCode::CONFLICT,
StorageError::BucketNotEmpty(_) => StatusCode::CONFLICT,
StorageError::Io(_)
| StorageError::Json(_)
| StorageError::Internal(_)
| StorageError::ObjectCorrupted { .. } => StatusCode::INTERNAL_SERVER_ERROR,
StorageError::ObjectCorrupted { .. } => StatusCode::UNPROCESSABLE_ENTITY,
StorageError::Io(_) | StorageError::Json(_) | StorageError::Internal(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
}
}

View File

@@ -432,8 +432,10 @@ pub async fn bucket_detail(
.get_versioning_status(&bucket_name)
.await
.unwrap_or(myfsio_common::types::VersioningStatus::Disabled);
let versioning_enabled =
matches!(versioning_status_enum, myfsio_common::types::VersioningStatus::Enabled);
let versioning_enabled = matches!(
versioning_status_enum,
myfsio_common::types::VersioningStatus::Enabled
);
let versioning_suspended = matches!(
versioning_status_enum,
myfsio_common::types::VersioningStatus::Suspended

View File

@@ -324,7 +324,9 @@ pub fn create_ui_router(state: state::AppState) -> Router {
axum::http::header::CACHE_CONTROL,
axum::http::HeaderValue::from_static("no-cache"),
))
.service(tower_http::services::ServeDir::new(&state.config.static_dir));
.service(tower_http::services::ServeDir::new(
&state.config.static_dir,
));
protected
.merge(public)

View File

@@ -1449,9 +1449,18 @@ fn error_response(err: S3Error, resource: &str) -> Response {
let status =
StatusCode::from_u16(err.http_status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let request_id = uuid::Uuid::new_v4().simple().to_string();
let code_str = err.code.as_str();
let body = err
.with_resource(resource.to_string())
.with_request_id(request_id)
.to_xml();
(status, [("content-type", "application/xml")], body).into_response()
(
status,
[
("content-type", "application/xml"),
("x-amz-error-code", code_str),
],
body,
)
.into_response()
}

View File

@@ -76,10 +76,7 @@ fn find_matching_rule<'a>(
request_headers: &[&str],
) -> Option<&'a CorsRule> {
rules.iter().find(|rule| {
let origin_match = rule
.allowed_origins
.iter()
.any(|p| match_origin(p, origin));
let origin_match = rule.allowed_origins.iter().any(|p| match_origin(p, origin));
if !origin_match {
return false;
}
@@ -104,9 +101,7 @@ fn find_matching_rule_for_actual<'a>(
method: &str,
) -> Option<&'a CorsRule> {
rules.iter().find(|rule| {
rule.allowed_origins
.iter()
.any(|p| match_origin(p, origin))
rule.allowed_origins.iter().any(|p| match_origin(p, origin))
&& rule
.allowed_methods
.iter()

View File

@@ -182,9 +182,7 @@ fn too_many_requests(retry_after: u64, resource: &str) -> Response {
)
.into_response();
if let Ok(value) = request_id.parse() {
response
.headers_mut()
.insert("x-amz-request-id", value);
response.headers_mut().insert("x-amz-request-id", value);
}
response
}

View File

@@ -276,10 +276,7 @@ impl GcService {
if !ts_path.is_dir() {
continue;
}
let modified = ts_entry
.metadata()
.ok()
.and_then(|m| m.modified().ok());
let modified = ts_entry.metadata().ok().and_then(|m| m.modified().ok());
let Some(modified) = modified else {
continue;
};

View File

@@ -391,15 +391,13 @@ async fn heal_corrupted(
}
}
if live_path.exists() {
if let Err(e) = std::fs::rename(&live_path, &quarantine_full) {
tracing::error!(
"Heal {}/{}: quarantine rename failed: {}",
bucket,
key,
e
);
return HealStatus::Failed;
{
let _guard = storage.lock_object_write(bucket, key);
if live_path.exists() {
if let Err(e) = std::fs::rename(&live_path, &quarantine_full) {
tracing::error!("Heal {}/{}: quarantine rename failed: {}", bucket, key, e);
return HealStatus::Failed;
}
}
}
@@ -421,14 +419,30 @@ async fn heal_corrupted(
.await
{
HealOutcome::Healed { peer_etag, bytes } => {
if let Err(e) = atomic_swap(&temp_path, &live_path) {
let swap_result = {
let _guard = storage.lock_object_write(bucket, key);
if live_path.exists() {
let _ = std::fs::remove_file(&temp_path);
tracing::info!(
"Heal {}/{}: concurrent PUT raced; preserving fresh write",
bucket,
key
);
return HealStatus::Skipped;
}
atomic_swap(&temp_path, &live_path)
};
if let Err(e) = swap_result {
tracing::error!(
"Heal {}/{}: atomic swap failed: {} (restoring from quarantine)",
bucket,
key,
e
);
let _ = std::fs::rename(&quarantine_full, &live_path);
let _guard = storage.lock_object_write(bucket, key);
if !live_path.exists() {
let _ = std::fs::rename(&quarantine_full, &live_path);
}
let _ = std::fs::remove_file(&temp_path);
return HealStatus::Failed;
}
@@ -444,8 +458,7 @@ async fn heal_corrupted(
}
HealOutcome::PeerMismatch { stored, peer } => {
let msg = format!("peer etag {} != stored {}", peer, stored);
let _ =
poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
tracing::warn!("Heal {}/{}: peer mismatch ({}), poisoned", bucket, key, msg);
return HealStatus::PeerMismatch;
}
@@ -460,14 +473,15 @@ async fn heal_corrupted(
"etag mismatch (stored={}, actual={}) — peer unavailable: {}",
stored_etag, actual_etag, error
);
let _ =
poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
return HealStatus::PeerUnavailable;
}
HealOutcome::VerifyFailed { expected, actual } => {
let msg = format!("peer download verify failed: expected={} actual={}", expected, actual);
let _ =
poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
let msg = format!(
"peer download verify failed: expected={} actual={}",
expected, actual
);
let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
tracing::warn!("Heal {}/{}: {}", bucket, key, msg);
return HealStatus::VerifyFailed;
}
@@ -476,8 +490,7 @@ async fn heal_corrupted(
"etag mismatch (stored={}, actual={}); no peer configured",
stored_etag, actual_etag
);
let _ =
poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
let _ = poison_metadata(storage, bucket, key, &msg, &quarantine_rel_str).await;
return HealStatus::Poisoned;
}
}
@@ -512,12 +525,22 @@ async fn heal_stale_version(storage_root: &Path, bucket: &str, key: &str) -> Hea
.join(key);
if let Some(parent) = dst.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
tracing::error!("Stale-version quarantine mkdir failed {}/{}: {}", bucket, key, e);
tracing::error!(
"Stale-version quarantine mkdir failed {}/{}: {}",
bucket,
key,
e
);
return HealStatus::Failed;
}
}
if let Err(e) = std::fs::rename(&src, &dst) {
tracing::error!("Stale-version quarantine rename failed {}/{}: {}", bucket, key, e);
tracing::error!(
"Stale-version quarantine rename failed {}/{}: {}",
bucket,
key,
e
);
return HealStatus::Failed;
}
tracing::info!("Quarantined stale version {}/{}", bucket, key);
@@ -577,11 +600,7 @@ async fn heal_etag_cache(
}
}
async fn heal_phantom_metadata(
storage: &FsStorageBackend,
bucket: &str,
key: &str,
) -> HealStatus {
async fn heal_phantom_metadata(storage: &FsStorageBackend, bucket: &str, key: &str) -> HealStatus {
match storage.delete_object_metadata_entry(bucket, key).await {
Ok(_) => {
tracing::info!("Dropped phantom metadata for {}/{}", bucket, key);
@@ -1062,6 +1081,9 @@ fn check_stale_versions(
}
state.objects_scanned += 1;
if !bin_stems.contains_key(stem) {
if manifest_is_delete_marker(path) {
continue;
}
state.stale_versions += 1;
let key = path
.strip_prefix(&versions_root)
@@ -1080,6 +1102,19 @@ fn check_stale_versions(
}
}
fn manifest_is_delete_marker(path: &Path) -> bool {
let Ok(content) = std::fs::read_to_string(path) else {
return false;
};
let Ok(value) = serde_json::from_str::<Value>(&content) else {
return false;
};
value
.get("is_delete_marker")
.and_then(Value::as_bool)
.unwrap_or(false)
}
fn check_etag_cache(
state: &mut ScanState,
storage_root: &Path,
@@ -1265,7 +1300,58 @@ mod tests {
.unwrap();
let state = scan_all_buckets(root, 10_000);
assert_eq!(state.corrupted_objects, 0, "poisoned entries must not re-flag");
assert_eq!(
state.corrupted_objects, 0,
"poisoned entries must not re-flag"
);
}
#[test]
fn delete_marker_manifests_are_not_flagged_stale() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let bucket = "vbucket";
fs::create_dir_all(root.join(bucket)).unwrap();
let versions_dir = root
.join(SYSTEM_ROOT)
.join(SYSTEM_BUCKETS_DIR)
.join(bucket)
.join(BUCKET_VERSIONS_DIR)
.join("v.txt");
fs::create_dir_all(&versions_dir).unwrap();
let dm = json!({
"version_id": "dm-vid-1",
"key": "v.txt",
"size": 0,
"etag": "",
"is_delete_marker": true,
});
fs::write(
versions_dir.join("dm-vid-1.json"),
serde_json::to_string(&dm).unwrap(),
)
.unwrap();
let truly_stale = json!({
"version_id": "broken-vid-2",
"key": "v.txt",
"size": 12,
"etag": "abc",
"is_delete_marker": false,
});
fs::write(
versions_dir.join("broken-vid-2.json"),
serde_json::to_string(&truly_stale).unwrap(),
)
.unwrap();
let state = scan_all_buckets(root, 10_000);
assert_eq!(
state.stale_versions, 1,
"delete-marker manifest must not be flagged; only the data-bearing orphan should count"
);
}
#[test]
@@ -1332,10 +1418,7 @@ mod tests {
write_index(
&meta_root,
&[(
"multi.bin",
"deadbeefdeadbeefdeadbeefdeadbeef-3",
)],
&[("multi.bin", "deadbeefdeadbeefdeadbeefdeadbeef-3")],
);
let state = scan_all_buckets(root, 10_000);
@@ -1343,6 +1426,10 @@ mod tests {
state.corrupted_objects, 0,
"multipart-style ETags must not be checked against whole-body MD5"
);
assert!(state.errors.is_empty(), "unexpected errors: {:?}", state.errors);
assert!(
state.errors.is_empty(),
"unexpected errors: {:?}",
state.errors
);
}
}

View File

@@ -158,6 +158,12 @@ impl PeerFetcher {
};
}
if is_multipart_etag(expected_etag) {
return self
.fetch_multipart_for_heal(&client, &target_bucket, key, expected_etag, dest_path)
.await;
}
let resp = match client
.get_object()
.bucket(&target_bucket)
@@ -225,7 +231,7 @@ impl PeerFetcher {
drop(file);
let actual = format!("{:x}", hasher.finalize());
if !is_multipart_etag(expected_etag) && actual != expected_etag {
if actual != expected_etag {
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::VerifyFailed {
expected: expected_etag.to_string(),
@@ -238,6 +244,129 @@ impl PeerFetcher {
bytes: total,
}
}
async fn fetch_multipart_for_heal(
&self,
client: &Client,
target_bucket: &str,
key: &str,
expected_etag: &str,
dest_path: &Path,
) -> HealOutcome {
let part_count = match expected_etag
.split_once('-')
.and_then(|(_, n)| n.parse::<u32>().ok())
{
Some(n) if n >= 1 => n,
_ => {
return HealOutcome::VerifyFailed {
expected: expected_etag.to_string(),
actual: format!("unparseable multipart suffix in {}", expected_etag),
};
}
};
if let Some(parent) = dest_path.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
return HealOutcome::PeerUnavailable {
error: format!("mkdir parent: {}", e),
};
}
}
let mut file = match tokio::fs::File::create(dest_path).await {
Ok(f) => f,
Err(e) => {
return HealOutcome::PeerUnavailable {
error: format!("create temp: {}", e),
};
}
};
let mut composite = Md5::new();
let mut total: u64 = 0;
let mut buf = vec![0u8; 64 * 1024];
for part_no in 1..=part_count {
let part_no_i32 = part_no as i32;
let resp = match client
.get_object()
.bucket(target_bucket)
.key(key)
.part_number(part_no_i32)
.send()
.await
{
Ok(r) => r,
Err(err) => {
drop(file);
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::PeerUnavailable {
error: format!("GetObject part {}: {:?}", part_no, err),
};
}
};
let mut reader = resp.body.into_async_read();
let mut part_hasher = Md5::new();
let mut part_bytes: u64 = 0;
loop {
let n = match reader.read(&mut buf).await {
Ok(n) => n,
Err(e) => {
drop(file);
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::PeerUnavailable {
error: format!("read part {}: {}", part_no, e),
};
}
};
if n == 0 {
break;
}
part_hasher.update(&buf[..n]);
if let Err(e) = file.write_all(&buf[..n]).await {
drop(file);
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::PeerUnavailable {
error: format!("write part {}: {}", part_no, e),
};
}
part_bytes += n as u64;
}
if part_bytes == 0 {
drop(file);
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::VerifyFailed {
expected: expected_etag.to_string(),
actual: format!("part {} returned zero bytes", part_no),
};
}
composite.update(part_hasher.finalize().as_slice());
total += part_bytes;
}
if let Err(e) = file.flush().await {
return HealOutcome::PeerUnavailable {
error: format!("flush temp: {}", e),
};
}
drop(file);
let composite_etag = format!("{:x}-{}", composite.finalize(), part_count);
if composite_etag != expected_etag {
let _ = tokio::fs::remove_file(dest_path).await;
return HealOutcome::VerifyFailed {
expected: expected_etag.to_string(),
actual: composite_etag,
};
}
HealOutcome::Healed {
peer_etag: expected_etag.to_string(),
bytes: total,
}
}
}
#[cfg(test)]

View File

@@ -2647,7 +2647,11 @@ async fn test_consecutive_slashes_in_key_round_trip() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/slashes-bucket", Body::empty()))
.oneshot(signed_request(
Method::PUT,
"/slashes-bucket",
Body::empty(),
))
.await
.unwrap();
@@ -2750,7 +2754,11 @@ async fn test_delete_live_version_restores_previous_to_live_slot() {
let (app, _tmp) = test_app();
app.clone()
.oneshot(signed_request(Method::PUT, "/restore-bucket", Body::empty()))
.oneshot(signed_request(
Method::PUT,
"/restore-bucket",
Body::empty(),
))
.await
.unwrap();
app.clone()
@@ -2952,11 +2960,7 @@ async fn test_versioned_get_on_delete_marker_returns_method_not_allowed() {
.unwrap();
app.clone()
.oneshot(signed_request(
Method::PUT,
"/dm-bucket/k",
Body::from("x"),
))
.oneshot(signed_request(Method::PUT, "/dm-bucket/k", Body::from("x")))
.await
.unwrap();
@@ -4948,7 +4952,9 @@ async fn test_kms_encrypt_decrypt() {
}
fn deterministic_payload(len: usize) -> Vec<u8> {
(0..len).map(|i| ((i * 2654435761usize) >> 16) as u8).collect()
(0..len)
.map(|i| ((i * 2654435761usize) >> 16) as u8)
.collect()
}
async fn put_sse_s3(
@@ -5000,7 +5006,12 @@ async fn range_get(
}
async fn body_bytes(resp: axum::http::Response<Body>) -> Vec<u8> {
resp.into_body().collect().await.unwrap().to_bytes().to_vec()
resp.into_body()
.collect()
.await
.unwrap()
.to_bytes()
.to_vec()
}
#[tokio::test]

View File

@@ -55,7 +55,11 @@ fn fs_encode_key(key: &str) -> String {
let trailing = key.ends_with('/');
let body = if trailing { &key[..key.len() - 1] } else { key };
if body.is_empty() {
return if trailing { "/".to_string() } else { String::new() };
return if trailing {
"/".to_string()
} else {
String::new()
};
}
let encoded: Vec<String> = body
.split('/')
@@ -463,6 +467,14 @@ impl FsStorageBackend {
&self.object_lock_stripes[idx]
}
pub fn lock_object_write(
&self,
bucket: &str,
key: &str,
) -> parking_lot::RwLockWriteGuard<'_, ()> {
self.get_object_lock(bucket, key).write()
}
fn prune_meta_read_cache(&self) {
if self.object_cache_max_size == 0 {
self.meta_read_cache.clear();
@@ -772,11 +784,7 @@ impl FsStorageBackend {
Ok(())
}
pub async fn delete_object_metadata_entry(
&self,
bucket: &str,
key: &str,
) -> StorageResult<()> {
pub async fn delete_object_metadata_entry(&self, bucket: &str, key: &str) -> StorageResult<()> {
run_blocking(|| {
let _guard = self.get_object_lock(bucket, key).write();
self.delete_metadata_sync(bucket, key)
@@ -1123,11 +1131,7 @@ impl FsStorageBackend {
Ok(Some(version_id))
}
fn write_delete_marker_sync(
&self,
bucket_name: &str,
key: &str,
) -> std::io::Result<String> {
fn write_delete_marker_sync(&self, bucket_name: &str, key: &str) -> std::io::Result<String> {
let version_dir = self.version_dir(bucket_name, key);
std::fs::create_dir_all(&version_dir)?;
let now = Utc::now();
@@ -1197,7 +1201,9 @@ impl FsStorageBackend {
self.validate_key(key)?;
Self::validate_version_id(bucket_name, key, version_id)?;
if let Some(record_and_path) = self.try_live_version_record_sync(bucket_name, key, version_id) {
if let Some(record_and_path) =
self.try_live_version_record_sync(bucket_name, key, version_id)
{
return Ok(record_and_path);
}
@@ -1523,9 +1529,7 @@ impl FsStorageBackend {
let (etag, version_id) = if is_dir_marker {
(None, None)
} else {
idx.get(name_str.as_ref())
.cloned()
.unwrap_or((None, None))
idx.get(name_str.as_ref()).cloned().unwrap_or((None, None))
};
let key = fs_decode_key(&fs_rel);
@@ -2190,7 +2194,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
detail: metadata_corruption_detail(&stored_meta),
});
}
if self.read_bucket_config_sync(bucket).versioning_status().is_active() {
if self
.read_bucket_config_sync(bucket)
.versioning_status()
.is_active()
{
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
@@ -2270,7 +2278,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
detail: metadata_corruption_detail(&stored_meta),
});
}
if self.read_bucket_config_sync(bucket).versioning_status().is_active() {
if self
.read_bucket_config_sync(bucket)
.versioning_status()
.is_active()
{
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
@@ -2292,7 +2304,8 @@ impl crate::traits::StorageEngine for FsStorageBackend {
return Err(StorageError::InvalidRange);
}
if start > 0 {
file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?;
file.seek(SeekFrom::Start(start))
.map_err(StorageError::Io)?;
}
let mtime = meta
@@ -2360,7 +2373,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
detail: metadata_corruption_detail(&stored_meta),
});
}
if self.read_bucket_config_sync(bucket).versioning_status().is_active() {
if self
.read_bucket_config_sync(bucket)
.versioning_status()
.is_active()
{
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
@@ -2460,7 +2477,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
detail: metadata_corruption_detail(&stored_meta),
});
}
if self.read_bucket_config_sync(bucket).versioning_status().is_active() {
if self
.read_bucket_config_sync(bucket)
.versioning_status()
.is_active()
{
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
@@ -2595,7 +2616,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
detail: metadata_corruption_detail(&stored_meta),
});
}
if self.read_bucket_config_sync(bucket).versioning_status().is_active() {
if self
.read_bucket_config_sync(bucket)
.versioning_status()
.is_active()
{
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
@@ -2701,7 +2726,8 @@ impl crate::traits::StorageEngine for FsStorageBackend {
return Err(StorageError::InvalidRange);
}
if start > 0 {
file.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?;
file.seek(SeekFrom::Start(start))
.map_err(StorageError::Io)?;
}
let obj = self.object_meta_from_version_record(key, &record, &data_path)?;
Ok((obj, file))
@@ -2930,45 +2956,47 @@ impl crate::traits::StorageEngine for FsStorageBackend {
// guard is released at the end of this block before we take the dst
// write guard, so even when src == dst (same stripe) there's no
// upgrade deadlock.
let copy_res = run_blocking(|| -> StorageResult<(String, u64, HashMap<String, String>)> {
let _src_guard = self.get_object_lock(src_bucket, src_key).read();
let src_path = self.object_path(src_bucket, src_key)?;
if !src_path.is_file() {
return Err(StorageError::ObjectNotFound {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
});
}
use std::io::{BufReader, BufWriter, Read, Write};
let src_file = std::fs::File::open(&src_path).map_err(StorageError::Io)?;
let mut reader = BufReader::with_capacity(chunk_size, src_file);
let tmp_file = std::fs::File::create(&tmp_path).map_err(StorageError::Io)?;
let mut writer = BufWriter::with_capacity(chunk_size * 4, tmp_file);
let mut hasher = Md5::new();
let mut buf = vec![0u8; chunk_size];
let mut total: u64 = 0;
loop {
let n = reader.read(&mut buf).map_err(StorageError::Io)?;
if n == 0 {
break;
let copy_res = run_blocking(
|| -> StorageResult<(String, u64, HashMap<String, String>)> {
let _src_guard = self.get_object_lock(src_bucket, src_key).read();
let src_path = self.object_path(src_bucket, src_key)?;
if !src_path.is_file() {
return Err(StorageError::ObjectNotFound {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
});
}
hasher.update(&buf[..n]);
writer.write_all(&buf[..n]).map_err(StorageError::Io)?;
total += n as u64;
}
writer.flush().map_err(StorageError::Io)?;
let src_metadata = self.read_metadata_sync(src_bucket, src_key);
if metadata_is_corrupted(&src_metadata) {
return Err(StorageError::ObjectCorrupted {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
detail: metadata_corruption_detail(&src_metadata),
});
}
Ok((format!("{:x}", hasher.finalize()), total, src_metadata))
});
use std::io::{BufReader, BufWriter, Read, Write};
let src_file = std::fs::File::open(&src_path).map_err(StorageError::Io)?;
let mut reader = BufReader::with_capacity(chunk_size, src_file);
let tmp_file = std::fs::File::create(&tmp_path).map_err(StorageError::Io)?;
let mut writer = BufWriter::with_capacity(chunk_size * 4, tmp_file);
let mut hasher = Md5::new();
let mut buf = vec![0u8; chunk_size];
let mut total: u64 = 0;
loop {
let n = reader.read(&mut buf).map_err(StorageError::Io)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
writer.write_all(&buf[..n]).map_err(StorageError::Io)?;
total += n as u64;
}
writer.flush().map_err(StorageError::Io)?;
let src_metadata = self.read_metadata_sync(src_bucket, src_key);
if metadata_is_corrupted(&src_metadata) {
return Err(StorageError::ObjectCorrupted {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
detail: metadata_corruption_detail(&src_metadata),
});
}
Ok((format!("{:x}", hasher.finalize()), total, src_metadata))
},
);
let (etag, new_size, src_metadata) = match copy_res {
Ok(v) => v,
@@ -3179,79 +3207,77 @@ impl crate::traits::StorageEngine for FsStorageBackend {
// between our metadata read and our file open, we'd otherwise record
// the old size/last_modified in the manifest but copy bytes from the
// new version.
let copy_res = run_blocking(
|| -> StorageResult<(String, u64, DateTime<Utc>)> {
let _guard = self.get_object_lock(src_bucket, src_key).read();
let copy_res = run_blocking(|| -> StorageResult<(String, u64, DateTime<Utc>)> {
let _guard = self.get_object_lock(src_bucket, src_key).read();
let src_path = self.object_path(src_bucket, src_key)?;
if !src_path.is_file() {
return Err(StorageError::ObjectNotFound {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
});
}
let src_path = self.object_path(src_bucket, src_key)?;
if !src_path.is_file() {
return Err(StorageError::ObjectNotFound {
bucket: src_bucket.to_string(),
key: src_key.to_string(),
});
}
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
// Open first so subsequent metadata/seek/read are all
// anchored to the same inode, even if a later rename swaps
// the path after we release the guard.
let mut src = std::fs::File::open(&src_path).map_err(StorageError::Io)?;
let src_meta = src.metadata().map_err(StorageError::Io)?;
let src_size = src_meta.len();
let src_mtime = src_meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let last_modified = Utc
.timestamp_opt(
src_mtime as i64,
((src_mtime % 1.0) * 1_000_000_000.0) as u32,
)
.single()
.unwrap_or_else(Utc::now);
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
// Open first so subsequent metadata/seek/read are all
// anchored to the same inode, even if a later rename swaps
// the path after we release the guard.
let mut src = std::fs::File::open(&src_path).map_err(StorageError::Io)?;
let src_meta = src.metadata().map_err(StorageError::Io)?;
let src_size = src_meta.len();
let src_mtime = src_meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let last_modified = Utc
.timestamp_opt(
src_mtime as i64,
((src_mtime % 1.0) * 1_000_000_000.0) as u32,
)
.single()
.unwrap_or_else(Utc::now);
let (start, end) = match range {
Some((s, e)) => {
if s >= src_size || e >= src_size || s > e {
return Err(StorageError::InvalidRange);
}
(s, e)
let (start, end) = match range {
Some((s, e)) => {
if s >= src_size || e >= src_size || s > e {
return Err(StorageError::InvalidRange);
}
None => {
if src_size == 0 {
(0u64, 0u64)
} else {
(0u64, src_size - 1)
}
(s, e)
}
None => {
if src_size == 0 {
(0u64, 0u64)
} else {
(0u64, src_size - 1)
}
};
let length = if src_size == 0 { 0 } else { end - start + 1 };
}
};
let length = if src_size == 0 { 0 } else { end - start + 1 };
if start > 0 {
src.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?;
if start > 0 {
src.seek(SeekFrom::Start(start)).map_err(StorageError::Io)?;
}
let mut src = std::io::BufReader::with_capacity(chunk_size, src);
let dst = std::fs::File::create(&tmp_file).map_err(StorageError::Io)?;
let mut dst = BufWriter::with_capacity(chunk_size * 4, dst);
let mut hasher = Md5::new();
let mut remaining = length;
let mut buf = vec![0u8; chunk_size];
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, buf.len());
let n = src.read(&mut buf[..to_read]).map_err(StorageError::Io)?;
if n == 0 {
break;
}
let mut src = std::io::BufReader::with_capacity(chunk_size, src);
let dst = std::fs::File::create(&tmp_file).map_err(StorageError::Io)?;
let mut dst = BufWriter::with_capacity(chunk_size * 4, dst);
let mut hasher = Md5::new();
let mut remaining = length;
let mut buf = vec![0u8; chunk_size];
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, buf.len());
let n = src.read(&mut buf[..to_read]).map_err(StorageError::Io)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
dst.write_all(&buf[..n]).map_err(StorageError::Io)?;
remaining -= n as u64;
}
dst.flush().map_err(StorageError::Io)?;
Ok((format!("{:x}", hasher.finalize()), length, last_modified))
},
);
hasher.update(&buf[..n]);
dst.write_all(&buf[..n]).map_err(StorageError::Io)?;
remaining -= n as u64;
}
dst.flush().map_err(StorageError::Io)?;
Ok((format!("{:x}", hasher.finalize()), length, last_modified))
});
let (etag, length, last_modified) = match copy_res {
Ok(v) => v,
@@ -3336,8 +3362,8 @@ impl crate::traits::StorageEngine for FsStorageBackend {
let mut buf = vec![0u8; chunk_size];
for part_info in &part_infos {
let part_file = upload_dir_owned
.join(format!("part-{:05}.part", part_info.part_number));
let part_file =
upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number));
if !part_file.exists() {
return Err(StorageError::InvalidObjectKey(format!(
"Part {} not found",
@@ -4260,9 +4286,11 @@ mod tests {
std::fs::create_dir_all(&tmp_dir).unwrap();
// Seed with known content.
let data: AsyncReadStream =
Box::pin(std::io::Cursor::new(vec![b'a'; 4096]));
backend.put_object("link-bkt", "hot", data, None).await.unwrap();
let data: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'a'; 4096]));
backend
.put_object("link-bkt", "hot", data, None)
.await
.unwrap();
let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false));
let mut handles = Vec::new();
@@ -4343,8 +4371,7 @@ mod tests {
let backend = StdArc::new(backend);
backend.create_bucket("snap-bkt").await.unwrap();
let data: AsyncReadStream =
Box::pin(std::io::Cursor::new(vec![b'a'; 1024]));
let data: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'a'; 1024]));
backend
.put_object("snap-bkt", "sz", data, None)
.await
@@ -4424,7 +4451,10 @@ mod tests {
const SIZE: u64 = 256 * 1024;
let seed = vec![b'a'; SIZE as usize];
let data: AsyncReadStream = Box::pin(std::io::Cursor::new(seed));
backend.put_object("range-bkt", "hot", data, None).await.unwrap();
backend
.put_object("range-bkt", "hot", data, None)
.await
.unwrap();
let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false));
let mut handles = Vec::new();
@@ -4455,8 +4485,9 @@ mod tests {
while !stop.load(Ordering::Relaxed) {
let start = 1000u64;
let len = 4000u64;
if let Ok((meta, mut stream)) =
b.get_object_range("range-bkt", "hot", start, Some(len)).await
if let Ok((meta, mut stream)) = b
.get_object_range("range-bkt", "hot", start, Some(len))
.await
{
let mut buf = Vec::with_capacity(len as usize);
if stream.read_to_end(&mut buf).await.is_ok() && !buf.is_empty() {
@@ -4466,10 +4497,8 @@ mod tests {
// that byte at full object size.
let fill = buf[0];
let all_match = buf.iter().all(|b| *b == fill);
let expected_etag = format!(
"{:x}",
Md5::digest(&vec![fill; SIZE as usize])
);
let expected_etag =
format!("{:x}", Md5::digest(&vec![fill; SIZE as usize]));
let etag_ok = meta.etag.as_deref() == Some(expected_etag.as_str());
reads.fetch_add(1, Ordering::Relaxed);
if !(all_match && etag_ok) {
@@ -4556,9 +4585,7 @@ mod tests {
Err(_) => continue,
};
let res = b
.upload_part_copy(
"mp-bkt", &upload_id, 1, "mp-bkt", "src", None,
)
.upload_part_copy("mp-bkt", &upload_id, 1, "mp-bkt", "src", None)
.await;
if let Ok((etag, _lm)) = res {
// The part etag is the MD5 of the copied bytes; it
@@ -4583,7 +4610,11 @@ mod tests {
let o = ops.load(Ordering::Relaxed);
let x = bad.load(Ordering::Relaxed);
assert!(o >= 4, "expected at least a few upload_part_copy ops, got {}", o);
assert!(
o >= 4,
"expected at least a few upload_part_copy ops, got {}",
o
);
assert_eq!(
x, 0,
"observed {} upload_part_copy results with etag unrelated to source content (out of {})",

View File

@@ -47,7 +47,6 @@ pub fn validate_object_key(
normalized.split('/').collect()
};
for part in &parts {
if part.is_empty() {
continue;

View File

@@ -336,7 +336,7 @@ When `INTEGRITY_AUTO_HEAL=true` (and `INTEGRITY_DRY_RUN=false`), each scan ends
1. **Pull from peer.** If a replication rule for the bucket points at a healthy remote whose `HEAD` returns the same ETag the local index has, the body is streamed to a temp file, MD5-verified against the stored ETag, and atomically swapped into the live path. The poison flags are cleared on success.
2. **Poison the entry.** If there is no replication target, the peer disagrees on the ETag, the peer is unreachable, or the downloaded body fails verification, the index entry is mutated to add `__corrupted__: "true"`, `__corrupted_at__`, `__corruption_detail__`, and `__quarantine_path__`. The data file stays in quarantine for `INTEGRITY_QUARANTINE_RETENTION_DAYS`.
Subsequent reads (`GET`, `HEAD`, `CopyObject` source) on a poisoned key return `500 ObjectCorrupted` instead of serving rotted bytes; replication push skips poisoned keys; subsequent integrity scans skip poisoned keys instead of re-flagging them. Overwriting the key with a fresh `PUT` clears the poison.
Subsequent reads (`GET`, `HEAD`, `CopyObject` source) on a poisoned key return `422 ObjectCorrupted` instead of serving rotted bytes; the response includes an `x-amz-error-code: ObjectCorrupted` header so HEAD callers (which receive no body) can still detect the condition. Replication push skips poisoned keys; subsequent integrity scans skip poisoned keys instead of re-flagging them. Overwriting the key with a fresh `PUT` clears the poison.
`stale_version`, `etag_cache_inconsistency`, and `phantom_metadata` issues are healed locally (move-to-quarantine, rebuild cache, drop entry); `orphaned_object` is reported only.