Allow recovery of poisoned objects via PUT/DELETE/DeleteObjects while preserving object-lock; implement race-free GetObject/HeadObject ?partNumber=N with correct zero-length-part response

This commit is contained in:
2026-04-25 21:28:27 +08:00
parent 777d862a02
commit 1c9ebdeab7
3 changed files with 1525 additions and 83 deletions

View File

@@ -21,6 +21,34 @@ pub const META_KEY_CORRUPTED: &str = "__corrupted__";
pub const META_KEY_CORRUPTED_AT: &str = "__corrupted_at__";
pub const META_KEY_CORRUPTION_DETAIL: &str = "__corruption_detail__";
pub const META_KEY_QUARANTINE_PATH: &str = "__quarantine_path__";
pub const META_KEY_PART_SIZES: &str = "__part_sizes__";
pub fn encode_part_sizes(sizes: &[u64]) -> String {
let mut out = String::with_capacity(sizes.len() * 8);
for (i, s) in sizes.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push_str(&s.to_string());
}
out
}
pub fn parse_part_sizes(raw: &str) -> Option<Vec<u64>> {
let mut out = Vec::new();
for tok in raw.split(',') {
let tok = tok.trim();
if tok.is_empty() {
return None;
}
out.push(tok.parse::<u64>().ok()?);
}
if out.is_empty() {
None
} else {
Some(out)
}
}
pub fn metadata_is_corrupted(meta: &HashMap<String, String>) -> bool {
meta.get(META_KEY_CORRUPTED)
@@ -2829,6 +2857,12 @@ impl crate::traits::StorageEngine for FsStorageBackend {
self.delete_metadata_sync(bucket, key)
.map_err(StorageError::Io)?;
Self::cleanup_empty_parents(&path, &bucket_path);
} else {
let stored_meta = self.read_metadata_sync(bucket, key);
if !stored_meta.is_empty() {
self.delete_metadata_sync(bucket, key)
.map_err(StorageError::Io)?;
}
}
let dm_version_id = self
.write_delete_marker_sync(bucket, key)
@@ -2842,6 +2876,17 @@ impl crate::traits::StorageEngine for FsStorageBackend {
}
if !path.exists() {
let stored_meta = self.read_metadata_sync(bucket, key);
if !stored_meta.is_empty() {
self.delete_metadata_sync(bucket, key)
.map_err(StorageError::Io)?;
self.invalidate_bucket_caches(bucket);
return Ok(DeleteOutcome {
version_id: None,
is_delete_marker: false,
existed: true,
});
}
return Ok(DeleteOutcome::default());
}
@@ -3353,47 +3398,52 @@ impl crate::traits::StorageEngine for FsStorageBackend {
// Assemble parts on a blocking thread using std::fs, large buffers,
// and a single writer flush — no per-chunk runtime crossings.
let assemble_res = tokio::task::spawn_blocking(move || -> StorageResult<(String, u64)> {
use std::io::{BufReader, BufWriter, Read, Write};
let out_raw = std::fs::File::create(&tmp_path_owned).map_err(StorageError::Io)?;
let mut out_file = BufWriter::with_capacity(chunk_size * 4, out_raw);
let mut md5_digest_concat = Vec::with_capacity(part_infos.len() * 16);
let mut total_size: u64 = 0;
let mut buf = vec![0u8; chunk_size];
let assemble_res =
tokio::task::spawn_blocking(move || -> StorageResult<(String, u64, Vec<u64>)> {
use std::io::{BufReader, BufWriter, Read, Write};
let out_raw = std::fs::File::create(&tmp_path_owned).map_err(StorageError::Io)?;
let mut out_file = BufWriter::with_capacity(chunk_size * 4, out_raw);
let mut md5_digest_concat = Vec::with_capacity(part_infos.len() * 16);
let mut total_size: u64 = 0;
let mut part_sizes: Vec<u64> = Vec::with_capacity(part_infos.len());
let mut buf = vec![0u8; chunk_size];
for part_info in &part_infos {
let part_file =
upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number));
if !part_file.exists() {
return Err(StorageError::InvalidObjectKey(format!(
"Part {} not found",
part_info.part_number
)));
}
let reader = std::fs::File::open(&part_file).map_err(StorageError::Io)?;
let mut reader = BufReader::with_capacity(chunk_size, reader);
let mut part_hasher = Md5::new();
loop {
let n = reader.read(&mut buf).map_err(StorageError::Io)?;
if n == 0 {
break;
for part_info in &part_infos {
let part_file =
upload_dir_owned.join(format!("part-{:05}.part", part_info.part_number));
if !part_file.exists() {
return Err(StorageError::InvalidObjectKey(format!(
"Part {} not found",
part_info.part_number
)));
}
part_hasher.update(&buf[..n]);
out_file.write_all(&buf[..n]).map_err(StorageError::Io)?;
total_size += n as u64;
let reader = std::fs::File::open(&part_file).map_err(StorageError::Io)?;
let mut reader = BufReader::with_capacity(chunk_size, reader);
let mut part_hasher = Md5::new();
let mut part_bytes: u64 = 0;
loop {
let n = reader.read(&mut buf).map_err(StorageError::Io)?;
if n == 0 {
break;
}
part_hasher.update(&buf[..n]);
out_file.write_all(&buf[..n]).map_err(StorageError::Io)?;
total_size += n as u64;
part_bytes += n as u64;
}
md5_digest_concat.extend_from_slice(&part_hasher.finalize());
part_sizes.push(part_bytes);
}
md5_digest_concat.extend_from_slice(&part_hasher.finalize());
}
out_file.flush().map_err(StorageError::Io)?;
let mut composite_hasher = Md5::new();
composite_hasher.update(&md5_digest_concat);
let etag = format!("{:x}-{}", composite_hasher.finalize(), part_infos.len());
Ok((etag, total_size))
})
.await;
out_file.flush().map_err(StorageError::Io)?;
let mut composite_hasher = Md5::new();
composite_hasher.update(&md5_digest_concat);
let etag = format!("{:x}-{}", composite_hasher.finalize(), part_infos.len());
Ok((etag, total_size, part_sizes))
})
.await;
let (etag, total_size) = match assemble_res {
let (etag, total_size, part_sizes) = match assemble_res {
Ok(Ok(v)) => v,
Ok(Err(e)) => {
let _ = std::fs::remove_file(&tmp_path);
@@ -3408,6 +3458,9 @@ impl crate::traits::StorageEngine for FsStorageBackend {
}
};
let mut metadata = metadata;
metadata.insert(META_KEY_PART_SIZES.to_string(), encode_part_sizes(&part_sizes));
// Commit to the destination key atomically under its write lock.
// Lock acquisition happens inside run_blocking so the wait runs under
// block_in_place rather than parking the async worker.
@@ -4033,6 +4086,117 @@ mod tests {
);
}
#[test]
fn test_part_sizes_roundtrip() {
let sizes = vec![5_242_880, 5_242_880, 5_242_880, 12_345];
let encoded = encode_part_sizes(&sizes);
assert_eq!(encoded, "5242880,5242880,5242880,12345");
let parsed = parse_part_sizes(&encoded).unwrap();
assert_eq!(parsed, sizes);
assert!(parse_part_sizes("").is_none());
assert!(parse_part_sizes(",,,").is_none());
assert!(parse_part_sizes("abc").is_none());
assert!(parse_part_sizes("123,abc").is_none());
assert!(parse_part_sizes(" ").is_none());
}
#[tokio::test]
async fn test_delete_object_clears_poisoned_metadata() {
let (_dir, backend) = create_test_backend();
backend.create_bucket("test-bucket").await.unwrap();
let data: AsyncReadStream = Box::pin(std::io::Cursor::new(b"will rot".to_vec()));
backend
.put_object("test-bucket", "rot.txt", data, None)
.await
.unwrap();
let mut meta = backend
.get_object_metadata("test-bucket", "rot.txt")
.await
.unwrap();
meta.insert(META_KEY_CORRUPTED.to_string(), "true".to_string());
backend
.put_object_metadata("test-bucket", "rot.txt", &meta)
.await
.unwrap();
let live_path = backend
.get_object_path("test-bucket", "rot.txt")
.await
.unwrap();
std::fs::remove_file(&live_path).unwrap();
backend
.delete_object("test-bucket", "rot.txt")
.await
.unwrap();
match backend.head_object("test-bucket", "rot.txt").await {
Err(StorageError::ObjectNotFound { .. }) => {}
other => panic!(
"after DELETE on a poisoned/quarantined object, HEAD should be ObjectNotFound, got {:?}",
other
),
}
let leftover = backend
.get_object_metadata("test-bucket", "rot.txt")
.await
.unwrap();
assert!(
leftover.is_empty(),
"metadata sidecar must be cleared after DELETE on poisoned object"
);
}
#[tokio::test]
async fn test_complete_multipart_persists_part_sizes() {
let (_dir, backend) = create_test_backend();
backend.create_bucket("mp-bucket").await.unwrap();
let upload_id = backend
.initiate_multipart("mp-bucket", "obj.bin", None)
.await
.unwrap();
let part1: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'A'; 1024]));
backend
.upload_part("mp-bucket", &upload_id, 1, part1)
.await
.unwrap();
let part2: AsyncReadStream = Box::pin(std::io::Cursor::new(vec![b'B'; 512]));
backend
.upload_part("mp-bucket", &upload_id, 2, part2)
.await
.unwrap();
let parts = vec![
PartInfo {
part_number: 1,
etag: String::new(),
},
PartInfo {
part_number: 2,
etag: String::new(),
},
];
let obj = backend
.complete_multipart("mp-bucket", &upload_id, &parts)
.await
.unwrap();
assert_eq!(obj.size, 1536);
let stored = backend
.get_object_metadata("mp-bucket", "obj.bin")
.await
.unwrap();
let raw = stored
.get(META_KEY_PART_SIZES)
.expect("part sizes must be persisted on completion");
assert_eq!(parse_part_sizes(raw).unwrap(), vec![1024u64, 512u64]);
}
#[tokio::test]
async fn test_put_clears_poison_flag() {
let (_dir, backend) = create_test_backend();