Fix DeleteObject(VersionId='null') to permanently delete null version instead of creating a delete-marker
This commit is contained in:
@@ -197,6 +197,46 @@ async fn ensure_object_lock_allows_write(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_archived_null_lock_allows_overwrite(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
headers: Option<&HeaderMap>,
|
||||
) -> Result<(), Response> {
|
||||
let status = match state.storage.get_versioning_status(bucket).await {
|
||||
Ok(status) => status,
|
||||
Err(myfsio_storage::error::StorageError::BucketNotFound(_)) => return Ok(()),
|
||||
Err(err) => return Err(storage_err_response(err)),
|
||||
};
|
||||
if !matches!(status, myfsio_common::types::VersioningStatus::Suspended) {
|
||||
return Ok(());
|
||||
}
|
||||
let metadata = match state
|
||||
.storage
|
||||
.get_archived_null_version_metadata(bucket, key)
|
||||
.await
|
||||
{
|
||||
Ok(Some(metadata)) => metadata,
|
||||
Ok(None) => return Ok(()),
|
||||
Err(err) => return Err(storage_err_response(err)),
|
||||
};
|
||||
let bypass_governance = headers
|
||||
.and_then(|headers| {
|
||||
headers
|
||||
.get("x-amz-bypass-governance-retention")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
})
|
||||
.map(|value| value.eq_ignore_ascii_case("true"))
|
||||
.unwrap_or(false);
|
||||
if let Err(message) = object_lock::can_delete_object(&metadata, bypass_governance) {
|
||||
return Err(s3_error_response(S3Error::new(
|
||||
S3ErrorCode::AccessDenied,
|
||||
message,
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ensure_object_version_lock_allows_delete(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
@@ -709,7 +749,8 @@ pub async fn post_bucket(
|
||||
|
||||
if let Some(ct) = headers.get("content-type").and_then(|v| v.to_str().ok()) {
|
||||
if ct.to_ascii_lowercase().starts_with("multipart/form-data") {
|
||||
return post_object_form_handler(&state, &bucket, ct, body).await;
|
||||
let ct = ct.to_string();
|
||||
return post_object_form_handler(&state, &bucket, &ct, &headers, body).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1110,10 +1151,6 @@ fn apply_user_metadata(headers: &mut HeaderMap, metadata: &HashMap<String, Strin
|
||||
}
|
||||
}
|
||||
|
||||
fn is_null_version(version_id: Option<&str>) -> bool {
|
||||
version_id.is_none_or(|value| value == "null")
|
||||
}
|
||||
|
||||
fn bad_digest_response(message: impl Into<String>) -> Response {
|
||||
s3_error_response(S3Error::new(S3ErrorCode::BadDigest, message))
|
||||
}
|
||||
@@ -1387,6 +1424,11 @@ pub async fn put_object(
|
||||
{
|
||||
return response;
|
||||
}
|
||||
if let Err(response) =
|
||||
ensure_archived_null_lock_allows_overwrite(&state, &bucket, &key, Some(&headers)).await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
if let Some(response) = evaluate_put_preconditions(&state, &bucket, &key, &headers).await {
|
||||
return response;
|
||||
}
|
||||
@@ -1604,10 +1646,7 @@ pub async fn get_object(
|
||||
return list_parts_handler(&state, &bucket, &key, upload_id).await;
|
||||
}
|
||||
|
||||
let version_id = query
|
||||
.version_id
|
||||
.as_deref()
|
||||
.filter(|value| !is_null_version(Some(*value)));
|
||||
let version_id = query.version_id.as_deref();
|
||||
|
||||
let range_header = headers
|
||||
.get("range")
|
||||
@@ -1805,7 +1844,7 @@ pub async fn post_object(
|
||||
}
|
||||
|
||||
if let Some(ref upload_id) = query.upload_id {
|
||||
return complete_multipart_handler(&state, &bucket, &key, upload_id, body).await;
|
||||
return complete_multipart_handler(&state, &bucket, &key, upload_id, &headers, body).await;
|
||||
}
|
||||
|
||||
if query.select.is_some() {
|
||||
@@ -1832,11 +1871,7 @@ pub async fn delete_object(
|
||||
return abort_multipart_handler(&state, &bucket, upload_id).await;
|
||||
}
|
||||
|
||||
if let Some(version_id) = query
|
||||
.version_id
|
||||
.as_deref()
|
||||
.filter(|value| !is_null_version(Some(*value)))
|
||||
{
|
||||
if let Some(version_id) = query.version_id.as_deref() {
|
||||
if let Err(response) =
|
||||
ensure_object_version_lock_allows_delete(&state, &bucket, &key, version_id, &headers)
|
||||
.await
|
||||
@@ -1897,10 +1932,7 @@ pub async fn head_object(
|
||||
Query(query): Query<ObjectQuery>,
|
||||
headers: HeaderMap,
|
||||
) -> Response {
|
||||
let version_id = query
|
||||
.version_id
|
||||
.as_deref()
|
||||
.filter(|value| !is_null_version(Some(*value)));
|
||||
let version_id = query.version_id.as_deref();
|
||||
let result = match version_id {
|
||||
Some(version_id) => {
|
||||
state
|
||||
@@ -2145,29 +2177,24 @@ async fn upload_part_copy_handler(
|
||||
range_header: Option<&str>,
|
||||
headers: &HeaderMap,
|
||||
) -> Response {
|
||||
let source = copy_source.strip_prefix('/').unwrap_or(copy_source);
|
||||
let source = match percent_encoding::percent_decode_str(source).decode_utf8() {
|
||||
Ok(s) => s.into_owned(),
|
||||
Err(_) => {
|
||||
return s3_error_response(S3Error::new(
|
||||
myfsio_common::error::S3ErrorCode::InvalidArgument,
|
||||
"Invalid x-amz-copy-source encoding",
|
||||
));
|
||||
}
|
||||
};
|
||||
let (src_bucket, src_key) = match source.split_once('/') {
|
||||
Some((b, k)) => (b.to_string(), k.to_string()),
|
||||
None => {
|
||||
return s3_error_response(S3Error::new(
|
||||
myfsio_common::error::S3ErrorCode::InvalidArgument,
|
||||
"Invalid x-amz-copy-source",
|
||||
));
|
||||
}
|
||||
let (src_bucket, src_key, src_version_id) = match parse_copy_source(copy_source) {
|
||||
Ok(parts) => parts,
|
||||
Err(response) => return response,
|
||||
};
|
||||
|
||||
let source_meta = match state.storage.head_object(&src_bucket, &src_key).await {
|
||||
let source_meta = match src_version_id.as_deref() {
|
||||
Some(version_id) => match state
|
||||
.storage
|
||||
.head_object_version(&src_bucket, &src_key, version_id)
|
||||
.await
|
||||
{
|
||||
Ok(m) => m,
|
||||
Err(e) => return storage_err_response(e),
|
||||
},
|
||||
None => match state.storage.head_object(&src_bucket, &src_key).await {
|
||||
Ok(m) => m,
|
||||
Err(e) => return storage_err_response(e),
|
||||
},
|
||||
};
|
||||
if let Some(resp) = evaluate_copy_preconditions(headers, &source_meta) {
|
||||
return resp;
|
||||
@@ -2194,6 +2221,7 @@ async fn upload_part_copy_handler(
|
||||
part_number,
|
||||
&src_bucket,
|
||||
&src_key,
|
||||
src_version_id.as_deref(),
|
||||
range,
|
||||
)
|
||||
.await
|
||||
@@ -2224,8 +2252,15 @@ async fn complete_multipart_handler(
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
upload_id: &str,
|
||||
headers: &HeaderMap,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
if let Err(response) =
|
||||
ensure_archived_null_lock_allows_overwrite(state, bucket, key, Some(headers)).await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
let body_bytes = match http_body_util::BodyExt::collect(body).await {
|
||||
Ok(collected) => collected.to_bytes(),
|
||||
Err(_) => {
|
||||
@@ -2463,6 +2498,11 @@ async fn copy_object_handler(
|
||||
{
|
||||
return response;
|
||||
}
|
||||
if let Err(response) =
|
||||
ensure_archived_null_lock_allows_overwrite(state, dst_bucket, dst_key, Some(headers)).await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
let (src_bucket, src_key, src_version_id) = match parse_copy_source(copy_source) {
|
||||
Ok(parts) => parts,
|
||||
@@ -2470,7 +2510,7 @@ async fn copy_object_handler(
|
||||
};
|
||||
|
||||
let source_meta = match src_version_id.as_deref() {
|
||||
Some(version_id) if version_id != "null" => match state
|
||||
Some(version_id) => match state
|
||||
.storage
|
||||
.head_object_version(&src_bucket, &src_key, version_id)
|
||||
.await
|
||||
@@ -2478,7 +2518,7 @@ async fn copy_object_handler(
|
||||
Ok(m) => m,
|
||||
Err(e) => return storage_err_response(e),
|
||||
},
|
||||
_ => match state.storage.head_object(&src_bucket, &src_key).await {
|
||||
None => match state.storage.head_object(&src_bucket, &src_key).await {
|
||||
Ok(m) => m,
|
||||
Err(e) => return storage_err_response(e),
|
||||
},
|
||||
@@ -2511,7 +2551,7 @@ async fn copy_object_handler(
|
||||
}
|
||||
|
||||
let source_metadata_existing = match src_version_id.as_deref() {
|
||||
Some(version_id) if version_id != "null" => {
|
||||
Some(version_id) => {
|
||||
match state
|
||||
.storage
|
||||
.get_object_version_metadata(&src_bucket, &src_key, version_id)
|
||||
@@ -2521,7 +2561,7 @@ async fn copy_object_handler(
|
||||
Err(e) => return storage_err_response(e),
|
||||
}
|
||||
}
|
||||
_ => match state
|
||||
None => match state
|
||||
.storage
|
||||
.get_object_metadata(&src_bucket, &src_key)
|
||||
.await
|
||||
@@ -2569,7 +2609,7 @@ async fn copy_object_handler(
|
||||
};
|
||||
|
||||
let (_meta, reader) = match src_version_id.as_deref() {
|
||||
Some(version_id) if version_id != "null" => {
|
||||
Some(version_id) => {
|
||||
match state
|
||||
.storage
|
||||
.get_object_version(&src_bucket, &src_key, version_id)
|
||||
@@ -2579,7 +2619,7 @@ async fn copy_object_handler(
|
||||
Err(e) => return storage_err_response(e),
|
||||
}
|
||||
}
|
||||
_ => match state.storage.get_object(&src_bucket, &src_key).await {
|
||||
None => match state.storage.get_object(&src_bucket, &src_key).await {
|
||||
Ok(result) => result,
|
||||
Err(e) => return storage_err_response(e),
|
||||
},
|
||||
@@ -2685,17 +2725,20 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
|
||||
.map_err(|m| (S3ErrorCode::AccessDenied.as_str().to_string(), m))
|
||||
};
|
||||
let lock_check: Result<(), (String, String)> = match obj.version_id.as_deref() {
|
||||
Some(version_id) if version_id != "null" => {
|
||||
Some(version_id) => {
|
||||
match state
|
||||
.storage
|
||||
.get_object_version_metadata(&bucket, &obj.key, version_id)
|
||||
.await
|
||||
{
|
||||
Ok(metadata) => run_can_delete(&metadata),
|
||||
Err(myfsio_storage::error::StorageError::VersionNotFound {
|
||||
..
|
||||
}) => Ok(()),
|
||||
Err(err) => Err(to_err(err)),
|
||||
}
|
||||
}
|
||||
_ => match state.storage.head_object(&bucket, &obj.key).await {
|
||||
None => match state.storage.head_object(&bucket, &obj.key).await {
|
||||
Ok(_)
|
||||
| Err(myfsio_storage::error::StorageError::ObjectCorrupted { .. }) => {
|
||||
match state.storage.get_object_metadata(&bucket, &obj.key).await {
|
||||
@@ -2713,13 +2756,13 @@ async fn delete_objects_handler(state: &AppState, bucket: &str, body: Body) -> R
|
||||
Err(e) => Err(e),
|
||||
Ok(()) => {
|
||||
let outcome = match obj.version_id.as_deref() {
|
||||
Some(version_id) if version_id != "null" => {
|
||||
Some(version_id) => {
|
||||
state
|
||||
.storage
|
||||
.delete_object_version(&bucket, &obj.key, version_id)
|
||||
.await
|
||||
}
|
||||
_ => state.storage.delete_object(&bucket, &obj.key).await,
|
||||
None => state.storage.delete_object(&bucket, &obj.key).await,
|
||||
};
|
||||
outcome.map_err(|e| {
|
||||
let s3err = S3Error::from(e);
|
||||
@@ -2783,10 +2826,7 @@ async fn range_get_handler_inner(
|
||||
headers: &HeaderMap,
|
||||
parts_count: Option<u32>,
|
||||
) -> Response {
|
||||
let version_id = query
|
||||
.version_id
|
||||
.as_deref()
|
||||
.filter(|value| !is_null_version(Some(*value)));
|
||||
let version_id = query.version_id.as_deref();
|
||||
|
||||
let tmp_dir = state.config.storage_root.join(".myfsio.sys").join("tmp");
|
||||
let _ = tokio::fs::create_dir_all(&tmp_dir).await;
|
||||
@@ -3318,6 +3358,7 @@ async fn post_object_form_handler(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
content_type: &str,
|
||||
headers: &HeaderMap,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
use base64::engine::general_purpose::STANDARD as B64;
|
||||
@@ -3560,6 +3601,12 @@ async fn post_object_form_handler(
|
||||
guessed_content_type(&object_key, content_type_value.as_deref()),
|
||||
);
|
||||
|
||||
if let Err(response) =
|
||||
ensure_archived_null_lock_allows_overwrite(state, bucket, &object_key, Some(headers)).await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
let cursor = std::io::Cursor::new(file_data.to_vec());
|
||||
let boxed: myfsio_storage::traits::AsyncReadStream = Box::pin(cursor);
|
||||
|
||||
|
||||
@@ -2170,6 +2170,7 @@ pub async fn complete_multipart_upload(
|
||||
State(state): State<AppState>,
|
||||
Extension(_session): Extension<SessionHandle>,
|
||||
Path((bucket_name, upload_id)): Path<(String, String)>,
|
||||
headers: HeaderMap,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
let payload: CompleteMultipartPayload = match parse_json_body(body).await {
|
||||
@@ -2190,6 +2191,22 @@ pub async fn complete_multipart_upload(
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let upload_key = match state.storage.list_multipart_uploads(&bucket_name).await {
|
||||
Ok(uploads) => uploads
|
||||
.into_iter()
|
||||
.find(|u| u.upload_id == upload_id)
|
||||
.map(|u| u.key),
|
||||
Err(err) => return storage_json_error(err),
|
||||
};
|
||||
if let Some(ref key) = upload_key {
|
||||
if let Err(response) =
|
||||
super::ensure_archived_null_lock_allows_overwrite(&state, &bucket_name, key, Some(&headers))
|
||||
.await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
match state
|
||||
.storage
|
||||
.complete_multipart(&bucket_name, &upload_id, &parts)
|
||||
@@ -2654,7 +2671,13 @@ struct CopyMovePayload {
|
||||
dest_key: String,
|
||||
}
|
||||
|
||||
async fn copy_object_json(state: &AppState, bucket: &str, key: &str, body: Body) -> Response {
|
||||
async fn copy_object_json(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
headers: &HeaderMap,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
let payload: CopyMovePayload = match parse_json_body(body).await {
|
||||
Ok(payload) => payload,
|
||||
Err(response) => return response,
|
||||
@@ -2668,6 +2691,17 @@ async fn copy_object_json(state: &AppState, bucket: &str, key: &str, body: Body)
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(response) = super::ensure_archived_null_lock_allows_overwrite(
|
||||
state,
|
||||
dest_bucket,
|
||||
dest_key,
|
||||
Some(headers),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
match state
|
||||
.storage
|
||||
.copy_object(bucket, key, dest_bucket, dest_key)
|
||||
@@ -2687,7 +2721,13 @@ async fn copy_object_json(state: &AppState, bucket: &str, key: &str, body: Body)
|
||||
}
|
||||
}
|
||||
|
||||
async fn move_object_json(state: &AppState, bucket: &str, key: &str, body: Body) -> Response {
|
||||
async fn move_object_json(
|
||||
state: &AppState,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
headers: &HeaderMap,
|
||||
body: Body,
|
||||
) -> Response {
|
||||
let payload: CopyMovePayload = match parse_json_body(body).await {
|
||||
Ok(payload) => payload,
|
||||
Err(response) => return response,
|
||||
@@ -2707,6 +2747,17 @@ async fn move_object_json(state: &AppState, bucket: &str, key: &str, body: Body)
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(response) = super::ensure_archived_null_lock_allows_overwrite(
|
||||
state,
|
||||
dest_bucket,
|
||||
dest_key,
|
||||
Some(headers),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
match state.storage.copy_object(bucket, key, dest_bucket, dest_key).await {
|
||||
Ok(_) => match state.storage.delete_object(bucket, key).await {
|
||||
Ok(_) => {
|
||||
@@ -2969,8 +3020,12 @@ pub async fn object_post_dispatch(
|
||||
object_presign_json(&state, &session, &bucket_name, &key, body).await
|
||||
}
|
||||
ObjectPostAction::Tags => update_object_tags(&state, &bucket_name, &key, body).await,
|
||||
ObjectPostAction::Copy => copy_object_json(&state, &bucket_name, &key, body).await,
|
||||
ObjectPostAction::Move => move_object_json(&state, &bucket_name, &key, body).await,
|
||||
ObjectPostAction::Copy => {
|
||||
copy_object_json(&state, &bucket_name, &key, &headers, body).await
|
||||
}
|
||||
ObjectPostAction::Move => {
|
||||
move_object_json(&state, &bucket_name, &key, &headers, body).await
|
||||
}
|
||||
ObjectPostAction::Restore(version_id) => {
|
||||
restore_object_version_json(&state, &bucket_name, &key, &version_id).await
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1012,11 +1012,17 @@ impl FsStorageBackend {
|
||||
|
||||
let now = Utc::now();
|
||||
let metadata = self.read_metadata_sync(bucket_name, key);
|
||||
let version_id = metadata
|
||||
let raw_vid = metadata
|
||||
.get("__version_id__")
|
||||
.cloned()
|
||||
.filter(|v| !v.is_empty() && !v.contains('/') && !v.contains('\\') && !v.contains(".."))
|
||||
.unwrap_or_else(Self::new_version_id_sync);
|
||||
.map(String::as_str)
|
||||
.unwrap_or("");
|
||||
let version_id = if raw_vid.is_empty() {
|
||||
"null".to_string()
|
||||
} else if raw_vid.contains('/') || raw_vid.contains('\\') || raw_vid.contains("..") {
|
||||
Self::new_version_id_sync()
|
||||
} else {
|
||||
raw_vid.to_string()
|
||||
};
|
||||
|
||||
let data_path = version_dir.join(format!("{}.bin", version_id));
|
||||
std::fs::copy(&source, &data_path)?;
|
||||
@@ -1204,6 +1210,24 @@ impl FsStorageBackend {
|
||||
)
|
||||
}
|
||||
|
||||
fn purge_archived_null_version_sync(
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let (manifest_path, data_path) = self.version_record_paths(bucket_name, key, "null");
|
||||
if manifest_path.is_file() {
|
||||
Self::safe_unlink(&manifest_path)?;
|
||||
}
|
||||
if data_path.is_file() {
|
||||
Self::safe_unlink(&data_path)?;
|
||||
}
|
||||
let versions_root = self.bucket_versions_root(bucket_name);
|
||||
Self::cleanup_empty_parents(&manifest_path, &versions_root);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
fn validate_version_id(bucket_name: &str, key: &str, version_id: &str) -> StorageResult<()> {
|
||||
if version_id.is_empty()
|
||||
|| version_id.contains('/')
|
||||
@@ -1271,10 +1295,16 @@ impl FsStorageBackend {
|
||||
return None;
|
||||
}
|
||||
let metadata = self.read_metadata_sync(bucket_name, key);
|
||||
let live_version = metadata.get("__version_id__")?.clone();
|
||||
if live_version != version_id {
|
||||
let stored_version = metadata.get("__version_id__").map(String::as_str);
|
||||
let matches = if version_id == "null" {
|
||||
stored_version.is_none_or(|v| v.is_empty() || v == "null")
|
||||
} else {
|
||||
stored_version == Some(version_id)
|
||||
};
|
||||
if !matches {
|
||||
return None;
|
||||
}
|
||||
let live_version = stored_version.unwrap_or("null").to_string();
|
||||
let file_meta = std::fs::metadata(&live_path).ok()?;
|
||||
let mtime = file_meta
|
||||
.modified()
|
||||
@@ -1979,6 +2009,10 @@ impl FsStorageBackend {
|
||||
VersioningStatus::Disabled => {}
|
||||
}
|
||||
}
|
||||
if matches!(versioning_status, VersioningStatus::Suspended) {
|
||||
self.purge_archived_null_version_sync(bucket_name, key)
|
||||
.map_err(StorageError::Io)?;
|
||||
}
|
||||
|
||||
std::fs::rename(tmp_path, &destination).map_err(|e| {
|
||||
let _ = std::fs::remove_file(tmp_path);
|
||||
@@ -2828,6 +2862,23 @@ impl crate::traits::StorageEngine for FsStorageBackend {
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_archived_null_version_metadata(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> StorageResult<Option<HashMap<String, String>>> {
|
||||
run_blocking(|| {
|
||||
let _guard = self.get_object_lock(bucket, key).read();
|
||||
let (manifest_path, _) = self.version_record_paths(bucket, key, "null");
|
||||
if !manifest_path.is_file() {
|
||||
return Ok(None);
|
||||
}
|
||||
let content = std::fs::read_to_string(&manifest_path).map_err(StorageError::Io)?;
|
||||
let record: Value = serde_json::from_str(&content).map_err(StorageError::Json)?;
|
||||
Ok(Some(Self::version_metadata_from_record(&record)))
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome> {
|
||||
run_blocking(|| {
|
||||
let _guard = self.get_object_lock(bucket, key).write();
|
||||
@@ -2919,7 +2970,13 @@ impl crate::traits::StorageEngine for FsStorageBackend {
|
||||
let live_path = self.object_live_path(bucket, key);
|
||||
if live_path.is_file() {
|
||||
let metadata = self.read_metadata_sync(bucket, key);
|
||||
if metadata.get("__version_id__").map(String::as_str) == Some(version_id) {
|
||||
let stored_version = metadata.get("__version_id__").map(String::as_str);
|
||||
let live_matches = if version_id == "null" {
|
||||
stored_version.is_none_or(|v| v.is_empty() || v == "null")
|
||||
} else {
|
||||
stored_version == Some(version_id)
|
||||
};
|
||||
if live_matches {
|
||||
Self::safe_unlink(&live_path).map_err(StorageError::Io)?;
|
||||
self.delete_metadata_sync(bucket, key)
|
||||
.map_err(StorageError::Io)?;
|
||||
@@ -3234,6 +3291,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
|
||||
part_number: u32,
|
||||
src_bucket: &str,
|
||||
src_key: &str,
|
||||
src_version_id: Option<&str>,
|
||||
range: Option<(u64, u64)>,
|
||||
) -> StorageResult<(String, DateTime<Utc>)> {
|
||||
let upload_dir = self.multipart_bucket_root(bucket).join(upload_id);
|
||||
@@ -3245,17 +3303,29 @@ impl crate::traits::StorageEngine for FsStorageBackend {
|
||||
let part_file = upload_dir.join(format!("part-{:05}.part", part_number));
|
||||
let tmp_file = upload_dir.join(format!("part-{:05}.part.tmp", part_number));
|
||||
let chunk_size = self.stream_chunk_size;
|
||||
let src_version_id = src_version_id.map(str::to_string);
|
||||
|
||||
// Everything that must be consistent with the copied bytes — path
|
||||
// check, size/mtime, range validation, open+seek+read — happens under
|
||||
// one held read guard. If a concurrent PUT renames the source
|
||||
// between our metadata read and our file open, we'd otherwise record
|
||||
// the old size/last_modified in the manifest but copy bytes from the
|
||||
// new version.
|
||||
let copy_res = run_blocking(|| -> StorageResult<(String, u64, DateTime<Utc>)> {
|
||||
let _guard = self.get_object_lock(src_bucket, src_key).read();
|
||||
|
||||
let src_path = self.object_path(src_bucket, src_key)?;
|
||||
let src_path = match src_version_id.as_deref() {
|
||||
Some(version_id) => {
|
||||
let (record, data_path) =
|
||||
self.read_version_record_sync(src_bucket, src_key, version_id)?;
|
||||
if record
|
||||
.get("is_delete_marker")
|
||||
.and_then(Value::as_bool)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Err(StorageError::MethodNotAllowed(
|
||||
"The specified method is not allowed against a delete marker"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
data_path
|
||||
}
|
||||
None => self.object_path(src_bucket, src_key)?,
|
||||
};
|
||||
if !src_path.is_file() {
|
||||
return Err(StorageError::ObjectNotFound {
|
||||
bucket: src_bucket.to_string(),
|
||||
@@ -4749,7 +4819,7 @@ mod tests {
|
||||
Err(_) => continue,
|
||||
};
|
||||
let res = b
|
||||
.upload_part_copy("mp-bkt", &upload_id, 1, "mp-bkt", "src", None)
|
||||
.upload_part_copy("mp-bkt", &upload_id, 1, "mp-bkt", "src", None, None)
|
||||
.await;
|
||||
if let Ok((etag, _lm)) = res {
|
||||
// The part etag is the MD5 of the copied bytes; it
|
||||
|
||||
@@ -107,6 +107,12 @@ pub trait StorageEngine: Send + Sync {
|
||||
version_id: &str,
|
||||
) -> StorageResult<HashMap<String, String>>;
|
||||
|
||||
async fn get_archived_null_version_metadata(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> StorageResult<Option<HashMap<String, String>>>;
|
||||
|
||||
async fn delete_object(&self, bucket: &str, key: &str) -> StorageResult<DeleteOutcome>;
|
||||
|
||||
async fn delete_object_version(
|
||||
@@ -171,6 +177,7 @@ pub trait StorageEngine: Send + Sync {
|
||||
part_number: u32,
|
||||
src_bucket: &str,
|
||||
src_key: &str,
|
||||
src_version_id: Option<&str>,
|
||||
range: Option<(u64, u64)>,
|
||||
) -> StorageResult<(String, chrono::DateTime<chrono::Utc>)>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user