Fix S3 versioning/delete markers, path-safety leaks, and error-code conformance; parallelize DeleteObjects; restore per-op rate limits

This commit is contained in:
2026-04-23 20:23:11 +08:00
parent 7ef3820f6e
commit bd405cc2fe
19 changed files with 893 additions and 147 deletions

View File

@@ -17,10 +17,18 @@ pub enum StorageError {
key: String,
version_id: String,
},
#[error("Object is a delete marker: {bucket}/{key}")]
DeleteMarker {
bucket: String,
key: String,
version_id: String,
},
#[error("Invalid bucket name: {0}")]
InvalidBucketName(String),
#[error("Invalid object key: {0}")]
InvalidObjectKey(String),
#[error("Method not allowed: {0}")]
MethodNotAllowed(String),
#[error("Upload not found: {0}")]
UploadNotFound(String),
#[error("Quota exceeded: {0}")]
@@ -58,10 +66,17 @@ impl From<StorageError> for S3Error {
version_id,
} => S3Error::from_code(S3ErrorCode::NoSuchVersion)
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
StorageError::DeleteMarker {
bucket,
key,
version_id,
} => S3Error::from_code(S3ErrorCode::MethodNotAllowed)
.with_resource(format!("/{}/{}?versionId={}", bucket, key, version_id)),
StorageError::InvalidBucketName(msg) => {
S3Error::new(S3ErrorCode::InvalidBucketName, msg)
}
StorageError::InvalidObjectKey(msg) => S3Error::new(S3ErrorCode::InvalidKey, msg),
StorageError::MethodNotAllowed(msg) => S3Error::new(S3ErrorCode::MethodNotAllowed, msg),
StorageError::UploadNotFound(id) => S3Error::new(
S3ErrorCode::NoSuchUpload,
format!("Upload {} not found", id),

View File

@@ -213,7 +213,14 @@ impl FsStorageBackend {
fn object_path(&self, bucket_name: &str, object_key: &str) -> StorageResult<PathBuf> {
self.validate_key(object_key)?;
Ok(self.bucket_path(bucket_name).join(object_key))
if object_key.ends_with('/') {
Ok(self
.bucket_path(bucket_name)
.join(object_key)
.join(DIR_MARKER_FILE))
} else {
Ok(self.bucket_path(bucket_name).join(object_key))
}
}
fn validate_key(&self, object_key: &str) -> StorageResult<()> {
@@ -239,6 +246,16 @@ impl FsStorageBackend {
fn index_file_for_key(&self, bucket_name: &str, key: &str) -> (PathBuf, String) {
let meta_root = self.bucket_meta_root(bucket_name);
if key.ends_with('/') {
let trimmed = key.trim_end_matches('/');
if trimmed.is_empty() {
return (meta_root.join(INDEX_FILE), DIR_MARKER_FILE.to_string());
}
return (
meta_root.join(trimmed).join(INDEX_FILE),
DIR_MARKER_FILE.to_string(),
);
}
let key_path = Path::new(key);
let entry_name = key_path
.file_name()
@@ -330,6 +347,55 @@ impl FsStorageBackend {
self.bucket_versions_root(bucket_name).join(key)
}
fn delete_markers_root(&self, bucket_name: &str) -> PathBuf {
self.system_bucket_root(bucket_name).join("delete_markers")
}
fn delete_marker_path(&self, bucket_name: &str, key: &str) -> PathBuf {
self.delete_markers_root(bucket_name)
.join(format!("{}.json", key))
}
fn read_delete_marker_sync(
&self,
bucket_name: &str,
key: &str,
) -> Option<(String, chrono::DateTime<Utc>)> {
let path = self.delete_marker_path(bucket_name, key);
if !path.is_file() {
return None;
}
let content = std::fs::read_to_string(&path).ok()?;
let record: Value = serde_json::from_str(&content).ok()?;
let version_id = record
.get("version_id")
.and_then(Value::as_str)?
.to_string();
let last_modified = record
.get("last_modified")
.and_then(Value::as_str)
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&Utc))
.unwrap_or_else(Utc::now);
Some((version_id, last_modified))
}
fn clear_delete_marker_sync(&self, bucket_name: &str, key: &str) {
let path = self.delete_marker_path(bucket_name, key);
if path.exists() {
let _ = std::fs::remove_file(&path);
}
}
fn new_version_id_sync() -> String {
let now = Utc::now();
format!(
"{}-{}",
now.format("%Y%m%dT%H%M%S%6fZ"),
&Uuid::new_v4().to_string()[..8]
)
}
fn legacy_meta_root(&self, bucket_name: &str) -> PathBuf {
self.bucket_path(bucket_name).join(".meta")
}
@@ -737,22 +803,23 @@ impl FsStorageBackend {
bucket_name: &str,
key: &str,
reason: &str,
) -> std::io::Result<u64> {
) -> std::io::Result<(u64, Option<String>)> {
let bucket_path = self.bucket_path(bucket_name);
let source = bucket_path.join(key);
if !source.exists() {
return Ok(0);
return Ok((0, None));
}
let version_dir = self.version_dir(bucket_name, key);
std::fs::create_dir_all(&version_dir)?;
let now = Utc::now();
let version_id = format!(
"{}-{}",
now.format("%Y%m%dT%H%M%S%6fZ"),
&Uuid::new_v4().to_string()[..8]
);
let metadata = self.read_metadata_sync(bucket_name, key);
let version_id = metadata
.get("__version_id__")
.cloned()
.filter(|v| !v.is_empty() && !v.contains('/') && !v.contains('\\') && !v.contains(".."))
.unwrap_or_else(Self::new_version_id_sync);
let data_path = version_dir.join(format!("{}.bin", version_id));
std::fs::copy(&source, &data_path)?;
@@ -760,7 +827,6 @@ impl FsStorageBackend {
let source_meta = source.metadata()?;
let source_size = source_meta.len();
let metadata = self.read_metadata_sync(bucket_name, key);
let etag = Self::compute_etag_sync(&source).unwrap_or_default();
let record = serde_json::json!({
@@ -776,7 +842,43 @@ impl FsStorageBackend {
let manifest_path = version_dir.join(format!("{}.json", version_id));
Self::atomic_write_json_sync(&manifest_path, &record, true)?;
Ok(source_size)
Ok((source_size, Some(version_id)))
}
fn write_delete_marker_sync(
&self,
bucket_name: &str,
key: &str,
) -> std::io::Result<String> {
let version_dir = self.version_dir(bucket_name, key);
std::fs::create_dir_all(&version_dir)?;
let now = Utc::now();
let version_id = Self::new_version_id_sync();
let record = serde_json::json!({
"version_id": version_id,
"key": key,
"size": 0,
"archived_at": now.to_rfc3339(),
"etag": "",
"metadata": HashMap::<String, String>::new(),
"reason": "delete-marker",
"is_delete_marker": true,
});
let manifest_path = version_dir.join(format!("{}.json", version_id));
Self::atomic_write_json_sync(&manifest_path, &record, true)?;
let marker_path = self.delete_marker_path(bucket_name, key);
if let Some(parent) = marker_path.parent() {
std::fs::create_dir_all(parent)?;
}
let marker_record = serde_json::json!({
"version_id": version_id,
"last_modified": now.to_rfc3339(),
});
Self::atomic_write_json_sync(&marker_path, &marker_record, true)?;
Ok(version_id)
}
fn version_record_paths(
@@ -869,6 +971,15 @@ impl FsStorageBackend {
.map(ToOwned::to_owned)
.or_else(|| metadata.get("__etag__").cloned());
let version_id = record
.get("version_id")
.and_then(Value::as_str)
.map(|s| s.to_string());
let is_delete_marker = record
.get("is_delete_marker")
.and_then(Value::as_bool)
.unwrap_or(false);
let mut obj = ObjectMeta::new(key.to_string(), size, last_modified);
obj.etag = etag;
obj.content_type = metadata.get("__content_type__").cloned();
@@ -880,6 +991,8 @@ impl FsStorageBackend {
.into_iter()
.filter(|(k, _)| !k.starts_with("__"))
.collect();
obj.version_id = version_id;
obj.is_delete_marker = is_delete_marker;
Ok(obj)
}
@@ -905,6 +1018,10 @@ impl FsStorageBackend {
.get("etag")
.and_then(Value::as_str)
.map(|s| s.to_string());
let is_delete_marker = record
.get("is_delete_marker")
.and_then(Value::as_bool)
.unwrap_or(false);
VersionInfo {
version_id,
@@ -913,7 +1030,7 @@ impl FsStorageBackend {
last_modified: archived_at,
etag,
is_latest: false,
is_delete_marker: false,
is_delete_marker,
}
}
@@ -1033,7 +1150,14 @@ impl FsStorageBackend {
stack.push(entry.path().to_string_lossy().to_string());
} else if ft.is_file() {
let full_path = entry.path().to_string_lossy().to_string();
let key = full_path[bucket_prefix_len..].replace('\\', "/");
let mut key = full_path[bucket_prefix_len..].replace('\\', "/");
let is_dir_marker = name_str.as_ref() == DIR_MARKER_FILE;
if is_dir_marker {
key = key
.strip_suffix(DIR_MARKER_FILE)
.unwrap_or(&key)
.to_string();
}
if let Ok(meta) = entry.metadata() {
let mtime = meta
.modified()
@@ -1049,7 +1173,11 @@ impl FsStorageBackend {
let etags = dir_etag_cache
.entry(rel_dir.clone())
.or_insert_with(|| self.load_dir_index_sync(bucket_name, &rel_dir));
let etag = etags.get(name_str.as_ref()).cloned();
let etag = if is_dir_marker {
None
} else {
etags.get(name_str.as_ref()).cloned()
};
all_keys.push((key, meta.len(), mtime, etag));
}
@@ -1200,11 +1328,36 @@ impl FsStorageBackend {
Err(_) => continue,
};
let rel = format!("{}{}", rel_dir_prefix, name_str);
if ft.is_dir() {
dirs.push(format!("{}{}", rel, delimiter));
let subdir_path = entry.path();
let marker_path = subdir_path.join(DIR_MARKER_FILE);
if marker_path.is_file() {
if let Ok(meta) = std::fs::metadata(&marker_path) {
let mtime = meta
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let lm = Utc
.timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32)
.single()
.unwrap_or_else(Utc::now);
let mut obj = ObjectMeta::new(
format!("{}{}/", rel_dir_prefix, name_str),
meta.len(),
lm,
);
obj.etag = None;
files.push(obj);
}
}
dirs.push(format!("{}{}{}", rel_dir_prefix, name_str, delimiter));
} else if ft.is_file() {
if name_str == DIR_MARKER_FILE {
continue;
}
let rel = format!("{}{}", rel_dir_prefix, name_str);
if let Ok(meta) = entry.metadata() {
let mtime = meta
.modified()
@@ -1438,10 +1591,19 @@ impl FsStorageBackend {
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let new_version_id = if versioning_enabled {
Some(Self::new_version_id_sync())
} else {
None
};
let mut internal_meta = HashMap::new();
internal_meta.insert("__etag__".to_string(), etag.clone());
internal_meta.insert("__size__".to_string(), new_size.to_string());
internal_meta.insert("__last_modified__".to_string(), mtime.to_string());
if let Some(ref vid) = new_version_id {
internal_meta.insert("__version_id__".to_string(), vid.clone());
}
if let Some(ref user_meta) = metadata {
for (k, v) in user_meta {
@@ -1452,6 +1614,10 @@ impl FsStorageBackend {
self.write_metadata_sync(bucket_name, key, &internal_meta)
.map_err(StorageError::Io)?;
if versioning_enabled {
self.clear_delete_marker_sync(bucket_name, key);
}
let lm = Utc
.timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32)
.single()
@@ -1460,6 +1626,7 @@ impl FsStorageBackend {
let mut obj = ObjectMeta::new(key.to_string(), new_size, lm);
obj.etag = Some(etag);
obj.metadata = metadata.unwrap_or_default();
obj.version_id = new_version_id;
Ok(obj)
}
@@ -1597,27 +1764,28 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.map_err(StorageError::Io)?;
let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4()));
let mut file = tokio::fs::File::create(&tmp_path)
let file = tokio::fs::File::create(&tmp_path)
.await
.map_err(StorageError::Io)?;
let mut writer = tokio::io::BufWriter::with_capacity(256 * 1024, file);
let mut hasher = Md5::new();
let mut total_size: u64 = 0;
let mut buf = [0u8; 65536];
let mut buf = vec![0u8; 256 * 1024];
loop {
let n = stream.read(&mut buf).await.map_err(StorageError::Io)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
tokio::io::AsyncWriteExt::write_all(&mut file, &buf[..n])
tokio::io::AsyncWriteExt::write_all(&mut writer, &buf[..n])
.await
.map_err(StorageError::Io)?;
total_size += n as u64;
}
tokio::io::AsyncWriteExt::flush(&mut file)
tokio::io::AsyncWriteExt::flush(&mut writer)
.await
.map_err(StorageError::Io)?;
drop(file);
drop(writer);
let etag = format!("{:x}", hasher.finalize());
self.finalize_put_sync(bucket, key, &tmp_path, etag, total_size, metadata)
@@ -1628,8 +1796,18 @@ impl crate::traits::StorageEngine for FsStorageBackend {
bucket: &str,
key: &str,
) -> StorageResult<(ObjectMeta, AsyncReadStream)> {
self.require_bucket(bucket)?;
let path = self.object_path(bucket, key)?;
if !path.is_file() {
if self.read_bucket_config_sync(bucket).versioning_enabled {
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: dm_version_id,
});
}
}
return Err(StorageError::ObjectNotFound {
bucket: bucket.to_string(),
key: key.to_string(),
@@ -1656,6 +1834,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.get("__storage_class__")
.cloned()
.or_else(|| Some("STANDARD".to_string()));
obj.version_id = stored_meta.get("__version_id__").cloned();
obj.metadata = stored_meta
.into_iter()
.filter(|(k, _)| !k.starts_with("__"))
@@ -1669,8 +1848,18 @@ impl crate::traits::StorageEngine for FsStorageBackend {
}
async fn get_object_path(&self, bucket: &str, key: &str) -> StorageResult<PathBuf> {
self.require_bucket(bucket)?;
let path = self.object_path(bucket, key)?;
if !path.is_file() {
if self.read_bucket_config_sync(bucket).versioning_enabled {
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: dm_version_id,
});
}
}
return Err(StorageError::ObjectNotFound {
bucket: bucket.to_string(),
key: key.to_string(),
@@ -1680,8 +1869,18 @@ impl crate::traits::StorageEngine for FsStorageBackend {
}
async fn head_object(&self, bucket: &str, key: &str) -> StorageResult<ObjectMeta> {
self.require_bucket(bucket)?;
let path = self.object_path(bucket, key)?;
if !path.is_file() {
if self.read_bucket_config_sync(bucket).versioning_enabled {
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
return Err(StorageError::DeleteMarker {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: dm_version_id,
});
}
}
return Err(StorageError::ObjectNotFound {
bucket: bucket.to_string(),
key: key.to_string(),
@@ -1708,6 +1907,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.get("__storage_class__")
.cloned()
.or_else(|| Some("STANDARD".to_string()));
obj.version_id = stored_meta.get("__version_id__").cloned();
obj.metadata = stored_meta
.into_iter()
.filter(|(k, _)| !k.starts_with("__"))
@@ -1760,17 +1960,33 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Ok(Self::version_metadata_from_record(&record))
}
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()> {
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome> {
let bucket_path = self.require_bucket(bucket)?;
let path = self.object_path(bucket, key)?;
if !path.exists() {
return Ok(());
let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled;
if versioning_enabled {
if path.exists() {
self.archive_current_version_sync(bucket, key, "delete")
.map_err(StorageError::Io)?;
Self::safe_unlink(&path).map_err(StorageError::Io)?;
self.delete_metadata_sync(bucket, key)
.map_err(StorageError::Io)?;
Self::cleanup_empty_parents(&path, &bucket_path);
}
let dm_version_id = self
.write_delete_marker_sync(bucket, key)
.map_err(StorageError::Io)?;
self.invalidate_bucket_caches(bucket);
return Ok(DeleteOutcome {
version_id: Some(dm_version_id),
is_delete_marker: true,
existed: true,
});
}
let versioning_enabled = self.read_bucket_config_sync(bucket).versioning_enabled;
if versioning_enabled {
self.archive_current_version_sync(bucket, key, "delete")
.map_err(StorageError::Io)?;
if !path.exists() {
return Ok(DeleteOutcome::default());
}
Self::safe_unlink(&path).map_err(StorageError::Io)?;
@@ -1779,7 +1995,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Self::cleanup_empty_parents(&path, &bucket_path);
self.invalidate_bucket_caches(bucket);
Ok(())
Ok(DeleteOutcome {
version_id: None,
is_delete_marker: false,
existed: true,
})
}
async fn delete_object_version(
@@ -1787,7 +2007,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
bucket: &str,
key: &str,
version_id: &str,
) -> StorageResult<()> {
) -> StorageResult<DeleteOutcome> {
self.require_bucket(bucket)?;
self.validate_key(key)?;
Self::validate_version_id(bucket, key, version_id)?;
@@ -1800,12 +2020,35 @@ impl crate::traits::StorageEngine for FsStorageBackend {
});
}
let is_delete_marker = if manifest_path.is_file() {
std::fs::read_to_string(&manifest_path)
.ok()
.and_then(|content| serde_json::from_str::<Value>(&content).ok())
.and_then(|record| record.get("is_delete_marker").and_then(Value::as_bool))
.unwrap_or(false)
} else {
false
};
Self::safe_unlink(&data_path).map_err(StorageError::Io)?;
Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?;
let versions_root = self.bucket_versions_root(bucket);
Self::cleanup_empty_parents(&manifest_path, &versions_root);
if is_delete_marker {
if let Some((dm_version_id, _)) = self.read_delete_marker_sync(bucket, key) {
if dm_version_id == version_id {
self.clear_delete_marker_sync(bucket, key);
}
}
}
self.invalidate_bucket_caches(bucket);
Ok(())
Ok(DeleteOutcome {
version_id: Some(version_id.to_string()),
is_delete_marker,
existed: true,
})
}
async fn copy_object(
@@ -2033,12 +2276,13 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.map_err(StorageError::Io)?;
}
let mut dst = tokio::fs::File::create(&tmp_file)
let dst_file = tokio::fs::File::create(&tmp_file)
.await
.map_err(StorageError::Io)?;
let mut dst = tokio::io::BufWriter::with_capacity(256 * 1024, dst_file);
let mut hasher = Md5::new();
let mut remaining = length;
let mut buf = vec![0u8; 65536];
let mut buf = vec![0u8; 256 * 1024];
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, buf.len());
let n = src
@@ -2122,12 +2366,14 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.map_err(StorageError::Io)?;
let tmp_path = tmp_dir.join(format!("{}.tmp", Uuid::new_v4()));
let mut out_file = tokio::fs::File::create(&tmp_path)
let out_raw = tokio::fs::File::create(&tmp_path)
.await
.map_err(StorageError::Io)?;
let mut out_file = tokio::io::BufWriter::with_capacity(256 * 1024, out_raw);
let mut md5_digest_concat = Vec::new();
let mut total_size: u64 = 0;
let part_count = parts.len();
let mut buf = vec![0u8; 256 * 1024];
for part_info in parts {
let part_file = upload_dir.join(format!("part-{:05}.part", part_info.part_number));
@@ -2138,11 +2384,11 @@ impl crate::traits::StorageEngine for FsStorageBackend {
part_info.part_number
)));
}
let mut part_reader = tokio::fs::File::open(&part_file)
let part_reader = tokio::fs::File::open(&part_file)
.await
.map_err(StorageError::Io)?;
let mut part_reader = tokio::io::BufReader::with_capacity(256 * 1024, part_reader);
let mut part_hasher = Md5::new();
let mut buf = [0u8; 65536];
loop {
let n = part_reader.read(&mut buf).await.map_err(StorageError::Io)?;
if n == 0 {

View File

@@ -62,14 +62,14 @@ pub trait StorageEngine: Send + Sync {
version_id: &str,
) -> StorageResult<HashMap<String, String>>;
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<()>;
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome>;
async fn delete_object_version(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> StorageResult<()>;
) -> StorageResult<DeleteOutcome>;
async fn copy_object(
&self,

View File

@@ -60,6 +60,12 @@ pub fn validate_object_key(
return Some("Object key contains invalid segments".to_string());
}
if part.len() > 255 {
return Some(
"Object key contains a path segment that exceeds 255 bytes".to_string(),
);
}
if part.chars().any(|c| (c as u32) < 32) {
return Some("Object key contains control characters".to_string());
}
@@ -98,6 +104,12 @@ pub fn validate_object_key(
}
}
for part in &non_empty_parts {
if *part == ".__myfsio_dirobj__" || part.starts_with("_index.json") {
return Some("Object key segment uses a reserved internal name".to_string());
}
}
None
}