From 2767e7e79da4a6a1778811947bc5f5affb8a5d52 Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 22 Apr 2026 19:55:44 +0800 Subject: [PATCH] Optimize bucket listing for 10K-100K objects - Shallow listing: read per-directory _index.json once for eTags instead of N serial .meta.json reads. Validate prefix for path traversal and verify normalized target stays within bucket root. - Recursive listing: cache full per-directory index during the walk so each _index.json is parsed at most once per call. - Per-bucket listing cache with 5s TTL and per-bucket rebuild mutex. Invalidated on put/delete/copy/metadata/tags/multipart-complete. Pagination uses partition_point for O(log n) start lookup. - UI stream endpoint now actually streams via mpsc + Body::from_stream instead of buffering into a Vec. Cancels producer on client disconnect. - UI JSON endpoint honors delimiter=/ and returns common_prefixes. - run_blocking wrapper dispatches sync filesystem work via block_in_place on multi-threaded runtimes, falls back to inline on current-thread runtimes (unit tests). --- Cargo.lock | 12 + Cargo.toml | 1 + crates/myfsio-server/Cargo.toml | 1 + crates/myfsio-server/src/handlers/ui_api.rs | 299 +++++++----- crates/myfsio-server/src/handlers/ui_pages.rs | 9 +- crates/myfsio-storage/src/fs_backend.rs | 451 +++++++++++++----- 6 files changed, 549 insertions(+), 224 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d57d448..682f84e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2740,6 +2740,7 @@ dependencies = [ "tempfile", "tera", "tokio", + "tokio-stream", "tokio-util", "tower", "tower-http", @@ -4193,6 +4194,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" diff --git a/Cargo.toml b/Cargo.toml index e5360ec..e025cc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ thiserror = "2" chrono = { version = "0.4", features = ["serde"] } base64 = "0.22" tokio-util = { version = "0.7", features = ["io"] } +tokio-stream = "0.1" futures = "0.3" dashmap = "6" crc32fast = "1" diff --git a/crates/myfsio-server/Cargo.toml b/crates/myfsio-server/Cargo.toml index 40ca42e..b1e305a 100644 --- a/crates/myfsio-server/Cargo.toml +++ b/crates/myfsio-server/Cargo.toml @@ -23,6 +23,7 @@ serde_urlencoded = "0.7" tracing = { workspace = true } tracing-subscriber = { workspace = true } tokio-util = { workspace = true } +tokio-stream = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } futures = { workspace = true } diff --git a/crates/myfsio-server/src/handlers/ui_api.rs b/crates/myfsio-server/src/handlers/ui_api.rs index b4b002b..1d778be 100644 --- a/crates/myfsio-server/src/handlers/ui_api.rs +++ b/crates/myfsio-server/src/handlers/ui_api.rs @@ -904,6 +904,35 @@ pub struct ListObjectsQuery { pub prefix: Option, #[serde(default)] pub start_after: Option, + #[serde(default)] + pub delimiter: Option, +} + +fn object_json(bucket_name: &str, o: &myfsio_common::types::ObjectMeta) -> Value { + json!({ + "key": o.key, + "size": o.size, + "last_modified": o.last_modified.to_rfc3339(), + "last_modified_iso": o.last_modified.to_rfc3339(), + "last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(), + "etag": o.etag.clone().unwrap_or_default(), + "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), + "content_type": o.content_type.clone().unwrap_or_default(), + "download_url": build_ui_object_url(bucket_name, &o.key, "download"), + "preview_url": build_ui_object_url(bucket_name, &o.key, "preview"), + "delete_endpoint": build_ui_object_url(bucket_name, &o.key, "delete"), + "presign_endpoint": build_ui_object_url(bucket_name, &o.key, "presign"), + "metadata_url": build_ui_object_url(bucket_name, &o.key, "metadata"), + "versions_endpoint": build_ui_object_url(bucket_name, &o.key, "versions"), + "restore_template": format!( + "/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER", + bucket_name, + encode_object_key(&o.key) + ), + "tags_url": build_ui_object_url(bucket_name, &o.key, "tags"), + "copy_url": build_ui_object_url(bucket_name, &o.key, "copy"), + "move_url": build_ui_object_url(bucket_name, &o.key, "move"), + }) } pub async fn list_bucket_objects( @@ -917,6 +946,49 @@ pub async fn list_bucket_objects( } let max_keys = q.max_keys.unwrap_or(1000).min(5000); + let versioning_enabled = state + .storage + .is_versioning_enabled(&bucket_name) + .await + .unwrap_or(false); + let stats = state.storage.bucket_stats(&bucket_name).await.ok(); + let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0); + + let use_shallow = q.delimiter.as_deref() == Some("/"); + + if use_shallow { + let params = myfsio_common::types::ShallowListParams { + prefix: q.prefix.clone().unwrap_or_default(), + delimiter: "/".to_string(), + max_keys, + continuation_token: q.continuation_token.clone(), + }; + return match state + .storage + .list_objects_shallow(&bucket_name, ¶ms) + .await + { + Ok(res) => { + let objects: Vec = res + .objects + .iter() + .map(|o| object_json(&bucket_name, o)) + .collect(); + Json(json!({ + "versioning_enabled": versioning_enabled, + "total_count": total_count, + "is_truncated": res.is_truncated, + "next_continuation_token": res.next_continuation_token, + "url_templates": url_templates_for(&bucket_name), + "objects": objects, + "common_prefixes": res.common_prefixes, + })) + .into_response() + } + Err(e) => storage_json_error(e), + }; + } + let params = ListParams { max_keys, continuation_token: q.continuation_token.clone(), @@ -924,46 +996,12 @@ pub async fn list_bucket_objects( start_after: q.start_after.clone(), }; - let versioning_enabled = state - .storage - .is_versioning_enabled(&bucket_name) - .await - .unwrap_or(false); - - let stats = state.storage.bucket_stats(&bucket_name).await.ok(); - let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0); - match state.storage.list_objects(&bucket_name, ¶ms).await { Ok(res) => { let objects: Vec = res .objects .iter() - .map(|o| { - json!({ - "key": o.key, - "size": o.size, - "last_modified": o.last_modified.to_rfc3339(), - "last_modified_iso": o.last_modified.to_rfc3339(), - "last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(), - "etag": o.etag.clone().unwrap_or_default(), - "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), - "content_type": o.content_type.clone().unwrap_or_default(), - "download_url": build_ui_object_url(&bucket_name, &o.key, "download"), - "preview_url": build_ui_object_url(&bucket_name, &o.key, "preview"), - "delete_endpoint": build_ui_object_url(&bucket_name, &o.key, "delete"), - "presign_endpoint": build_ui_object_url(&bucket_name, &o.key, "presign"), - "metadata_url": build_ui_object_url(&bucket_name, &o.key, "metadata"), - "versions_endpoint": build_ui_object_url(&bucket_name, &o.key, "versions"), - "restore_template": format!( - "/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER", - bucket_name, - encode_object_key(&o.key) - ), - "tags_url": build_ui_object_url(&bucket_name, &o.key, "tags"), - "copy_url": build_ui_object_url(&bucket_name, &o.key, "copy"), - "move_url": build_ui_object_url(&bucket_name, &o.key, "move"), - }) - }) + .map(|o| object_json(&bucket_name, o)) .collect(); Json(json!({ @@ -1006,41 +1044,62 @@ pub async fn stream_bucket_objects( let stats = state.storage.bucket_stats(&bucket_name).await.ok(); let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0); - let mut lines: Vec = Vec::new(); - lines.push( - json!({ - "type": "meta", - "url_templates": url_templates_for(&bucket_name), - "versioning_enabled": versioning_enabled, - }) - .to_string(), - ); - lines.push(json!({ "type": "count", "total_count": total_count }).to_string()); - let use_delimiter = q.delimiter.as_deref() == Some("/"); let prefix = q.prefix.clone().unwrap_or_default(); - if use_delimiter { - let mut token: Option = None; - loop { - let params = myfsio_common::types::ShallowListParams { - prefix: prefix.clone(), - delimiter: "/".to_string(), - max_keys: UI_OBJECT_BROWSER_MAX_KEYS, - continuation_token: token.clone(), - }; - match state - .storage - .list_objects_shallow(&bucket_name, ¶ms) - .await - { - Ok(res) => { - for p in &res.common_prefixes { - lines.push(json!({ "type": "folder", "prefix": p }).to_string()); - } - for o in &res.objects { - lines.push( - json!({ + let (tx, rx) = tokio::sync::mpsc::channel::>(64); + + let meta_line = json!({ + "type": "meta", + "url_templates": url_templates_for(&bucket_name), + "versioning_enabled": versioning_enabled, + }) + .to_string() + + "\n"; + let count_line = json!({ "type": "count", "total_count": total_count }).to_string() + "\n"; + + let storage = state.storage.clone(); + let bucket = bucket_name.clone(); + + tokio::spawn(async move { + if tx + .send(Ok(bytes::Bytes::from(meta_line.into_bytes()))) + .await + .is_err() + { + return; + } + if tx + .send(Ok(bytes::Bytes::from(count_line.into_bytes()))) + .await + .is_err() + { + return; + } + + if use_delimiter { + let mut token: Option = None; + loop { + let params = myfsio_common::types::ShallowListParams { + prefix: prefix.clone(), + delimiter: "/".to_string(), + max_keys: UI_OBJECT_BROWSER_MAX_KEYS, + continuation_token: token.clone(), + }; + match storage.list_objects_shallow(&bucket, ¶ms).await { + Ok(res) => { + for p in &res.common_prefixes { + let line = json!({ "type": "folder", "prefix": p }).to_string() + "\n"; + if tx + .send(Ok(bytes::Bytes::from(line.into_bytes()))) + .await + .is_err() + { + return; + } + } + for o in &res.objects { + let line = json!({ "type": "object", "key": o.key, "size": o.size, @@ -1050,38 +1109,46 @@ pub async fn stream_bucket_objects( "etag": o.etag.clone().unwrap_or_default(), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), }) - .to_string(), - ); + .to_string() + + "\n"; + if tx + .send(Ok(bytes::Bytes::from(line.into_bytes()))) + .await + .is_err() + { + return; + } + } + if !res.is_truncated || res.next_continuation_token.is_none() { + break; + } + token = res.next_continuation_token; } - if !res.is_truncated || res.next_continuation_token.is_none() { - break; + Err(e) => { + let line = + json!({ "type": "error", "error": e.to_string() }).to_string() + "\n"; + let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await; + return; } - token = res.next_continuation_token; - } - Err(e) => { - lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()); - break; } } - } - } else { - let mut token: Option = None; - loop { - let params = ListParams { - max_keys: 1000, - continuation_token: token.clone(), - prefix: if prefix.is_empty() { - None - } else { - Some(prefix.clone()) - }, - start_after: None, - }; - match state.storage.list_objects(&bucket_name, ¶ms).await { - Ok(res) => { - for o in &res.objects { - lines.push( - json!({ + } else { + let mut token: Option = None; + loop { + let params = ListParams { + max_keys: 1000, + continuation_token: token.clone(), + prefix: if prefix.is_empty() { + None + } else { + Some(prefix.clone()) + }, + start_after: None, + }; + match storage.list_objects(&bucket, ¶ms).await { + Ok(res) => { + for o in &res.objects { + let line = json!({ "type": "object", "key": o.key, "size": o.size, @@ -1091,30 +1158,48 @@ pub async fn stream_bucket_objects( "etag": o.etag.clone().unwrap_or_default(), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), }) - .to_string(), - ); + .to_string() + + "\n"; + if tx + .send(Ok(bytes::Bytes::from(line.into_bytes()))) + .await + .is_err() + { + return; + } + } + if !res.is_truncated || res.next_continuation_token.is_none() { + break; + } + token = res.next_continuation_token; } - if !res.is_truncated || res.next_continuation_token.is_none() { - break; + Err(e) => { + let line = + json!({ "type": "error", "error": e.to_string() }).to_string() + "\n"; + let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await; + return; } - token = res.next_continuation_token; - } - Err(e) => { - lines.push(json!({ "type": "error", "error": e.to_string() }).to_string()); - break; } } } - } - lines.push(json!({ "type": "done" }).to_string()); + let done_line = json!({ "type": "done" }).to_string() + "\n"; + let _ = tx + .send(Ok(bytes::Bytes::from(done_line.into_bytes()))) + .await; + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let body = Body::from_stream(stream); - let body = lines.join("\n") + "\n"; let mut headers = HeaderMap::new(); headers.insert( header::CONTENT_TYPE, "application/x-ndjson; charset=utf-8".parse().unwrap(), ); + headers.insert(header::CACHE_CONTROL, "no-cache".parse().unwrap()); + headers.insert("x-accel-buffering", "no".parse().unwrap()); + (StatusCode::OK, headers, body).into_response() } diff --git a/crates/myfsio-server/src/handlers/ui_pages.rs b/crates/myfsio-server/src/handlers/ui_pages.rs index ec86ded..8e98192 100644 --- a/crates/myfsio-server/src/handlers/ui_pages.rs +++ b/crates/myfsio-server/src/handlers/ui_pages.rs @@ -227,9 +227,7 @@ async fn parse_form_any( if is_multipart { let boundary = multer::parse_boundary(&content_type) .map_err(|_| "Missing multipart boundary".to_string())?; - let stream = futures::stream::once(async move { - Ok::<_, std::io::Error>(bytes) - }); + let stream = futures::stream::once(async move { Ok::<_, std::io::Error>(bytes) }); let mut multipart = multer::Multipart::new(stream, boundary); let mut out = HashMap::new(); while let Some(field) = multipart @@ -2173,10 +2171,7 @@ pub async fn create_bucket( let wants_json = wants_json(&headers); let form = match parse_form_any(&headers, body).await { Ok(fields) => CreateBucketForm { - bucket_name: fields - .get("bucket_name") - .cloned() - .unwrap_or_default(), + bucket_name: fields.get("bucket_name").cloned().unwrap_or_default(), csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(), }, Err(message) => { diff --git a/crates/myfsio-storage/src/fs_backend.rs b/crates/myfsio-storage/src/fs_backend.rs index 40a052b..0365beb 100644 --- a/crates/myfsio-storage/src/fs_backend.rs +++ b/crates/myfsio-storage/src/fs_backend.rs @@ -10,12 +10,92 @@ use md5::{Digest, Md5}; use parking_lot::Mutex; use serde_json::Value; use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use std::time::Instant; use tokio::io::AsyncReadExt; use uuid::Uuid; +fn validate_list_prefix(prefix: &str) -> StorageResult<()> { + if prefix.contains('\0') { + return Err(StorageError::InvalidObjectKey( + "prefix contains null bytes".to_string(), + )); + } + if prefix.starts_with('/') || prefix.starts_with('\\') { + return Err(StorageError::InvalidObjectKey( + "prefix cannot start with a slash".to_string(), + )); + } + for part in prefix.split(['/', '\\']) { + if part == ".." { + return Err(StorageError::InvalidObjectKey( + "prefix contains parent directory references".to_string(), + )); + } + } + Ok(()) +} + +fn run_blocking(f: F) -> R +where + F: FnOnce() -> R, +{ + match tokio::runtime::Handle::try_current() { + Ok(handle) if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => { + tokio::task::block_in_place(f) + } + _ => f(), + } +} + +fn slice_range_for_prefix(items: &[T], key_of: F, prefix: &str) -> (usize, usize) +where + F: Fn(&T) -> &str, +{ + if prefix.is_empty() { + return (0, items.len()); + } + let start = items.partition_point(|item| key_of(item) < prefix); + let end_from_start = items[start..] + .iter() + .position(|item| !key_of(item).starts_with(prefix)) + .map(|p| start + p) + .unwrap_or(items.len()); + (start, end_from_start) +} + +fn normalize_path(p: &Path) -> Option { + let mut out = PathBuf::new(); + for comp in p.components() { + match comp { + Component::ParentDir => { + if !out.pop() { + return None; + } + } + Component::CurDir => {} + other => out.push(other.as_os_str()), + } + } + Some(out) +} + +fn path_is_within(candidate: &Path, root: &Path) -> bool { + match (normalize_path(candidate), normalize_path(root)) { + (Some(c), Some(r)) => c.starts_with(&r), + _ => false, + } +} + +type ListCacheEntry = (String, u64, f64, Option); + +#[derive(Clone, Default)] +struct ShallowCacheEntry { + files: Vec, + dirs: Vec, +} + pub struct FsStorageBackend { root: PathBuf, object_key_max_length_bytes: usize, @@ -26,6 +106,11 @@ pub struct FsStorageBackend { meta_index_locks: DashMap>>, stats_cache: DashMap, stats_cache_ttl: std::time::Duration, + list_cache: DashMap>, Instant)>, + shallow_cache: DashMap<(String, PathBuf, String), (Arc, Instant)>, + list_rebuild_locks: DashMap>>, + shallow_rebuild_locks: DashMap<(String, PathBuf, String), Arc>>, + list_cache_ttl: std::time::Duration, } #[derive(Debug, Clone)] @@ -61,11 +146,22 @@ impl FsStorageBackend { meta_index_locks: DashMap::new(), stats_cache: DashMap::new(), stats_cache_ttl: std::time::Duration::from_secs(60), + list_cache: DashMap::new(), + shallow_cache: DashMap::new(), + list_rebuild_locks: DashMap::new(), + shallow_rebuild_locks: DashMap::new(), + list_cache_ttl: std::time::Duration::from_secs(5), }; backend.ensure_system_roots(); backend } + fn invalidate_bucket_caches(&self, bucket_name: &str) { + self.stats_cache.remove(bucket_name); + self.list_cache.remove(bucket_name); + self.shallow_cache.retain(|(b, _, _), _| b != bucket_name); + } + fn ensure_system_roots(&self) { let dirs = [ self.system_root_path(), @@ -158,6 +254,39 @@ impl FsStorageBackend { } } + fn index_file_for_dir(&self, bucket_name: &str, rel_dir: &Path) -> PathBuf { + let meta_root = self.bucket_meta_root(bucket_name); + if rel_dir.as_os_str().is_empty() || rel_dir == Path::new(".") { + meta_root.join(INDEX_FILE) + } else { + meta_root.join(rel_dir).join(INDEX_FILE) + } + } + + fn load_dir_index_sync(&self, bucket_name: &str, rel_dir: &Path) -> HashMap { + let index_path = self.index_file_for_dir(bucket_name, rel_dir); + if !index_path.exists() { + return HashMap::new(); + } + let Ok(text) = std::fs::read_to_string(&index_path) else { + return HashMap::new(); + }; + let Ok(index) = serde_json::from_str::>(&text) else { + return HashMap::new(); + }; + let mut out = HashMap::with_capacity(index.len()); + for (name, entry) in index { + if let Some(etag) = entry + .get("metadata") + .and_then(|m| m.get("__etag__")) + .and_then(|v| v.as_str()) + { + out.insert(name, etag.to_string()); + } + } + out + } + fn get_meta_index_lock(&self, index_path: &str) -> Arc> { self.meta_index_locks .entry(index_path.to_string()) @@ -872,14 +1001,14 @@ impl FsStorageBackend { Ok(stats) } - fn list_objects_sync( + fn build_full_listing_sync( &self, bucket_name: &str, - params: &ListParams, - ) -> StorageResult { + ) -> StorageResult>> { let bucket_path = self.require_bucket(bucket_name)?; - let mut all_keys: Vec<(String, u64, f64, Option)> = Vec::new(); + let mut all_keys: Vec = Vec::new(); + let mut dir_etag_cache: HashMap> = HashMap::new(); let internal = INTERNAL_FOLDERS; let bucket_str = bucket_path.to_string_lossy().to_string(); let bucket_prefix_len = bucket_str.len() + 1; @@ -912,28 +1041,78 @@ impl FsStorageBackend { .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .map(|d| d.as_secs_f64()) .unwrap_or(0.0); - all_keys.push((key, meta.len(), mtime, None)); + + let rel_dir = Path::new(&key) + .parent() + .map(|p| p.to_path_buf()) + .unwrap_or_default(); + let etags = dir_etag_cache + .entry(rel_dir.clone()) + .or_insert_with(|| self.load_dir_index_sync(bucket_name, &rel_dir)); + let etag = etags.get(name_str.as_ref()).cloned(); + + all_keys.push((key, meta.len(), mtime, etag)); } } } } all_keys.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(Arc::new(all_keys)) + } - if let Some(ref prefix) = params.prefix { - all_keys.retain(|k| k.0.starts_with(prefix.as_str())); + fn get_full_listing_sync(&self, bucket_name: &str) -> StorageResult>> { + if let Some(entry) = self.list_cache.get(bucket_name) { + let (cached, cached_at) = entry.value(); + if cached_at.elapsed() < self.list_cache_ttl { + return Ok(cached.clone()); + } } + let lock = self + .list_rebuild_locks + .entry(bucket_name.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _guard = lock.lock(); + + if let Some(entry) = self.list_cache.get(bucket_name) { + let (cached, cached_at) = entry.value(); + if cached_at.elapsed() < self.list_cache_ttl { + return Ok(cached.clone()); + } + } + + let listing = self.build_full_listing_sync(bucket_name)?; + self.list_cache + .insert(bucket_name.to_string(), (listing.clone(), Instant::now())); + Ok(listing) + } + + fn list_objects_sync( + &self, + bucket_name: &str, + params: &ListParams, + ) -> StorageResult { + self.require_bucket(bucket_name)?; + if let Some(ref prefix) = params.prefix { + if !prefix.is_empty() { + validate_list_prefix(prefix)?; + } + } + + let listing = self.get_full_listing_sync(bucket_name)?; + + let (slice_start, slice_end) = match params.prefix.as_deref() { + Some(p) if !p.is_empty() => slice_range_for_prefix(&listing[..], |e| &e.0, p), + _ => (0, listing.len()), + }; + let prefix_filter = &listing[slice_start..slice_end]; + let start_idx = if let Some(ref token) = params.continuation_token { - all_keys - .iter() - .position(|k| k.0.as_str() > token.as_str()) - .unwrap_or(all_keys.len()) + prefix_filter.partition_point(|k| k.0.as_str() <= token.as_str()) } else if let Some(ref start_after) = params.start_after { - all_keys - .iter() - .position(|k| k.0.as_str() > start_after.as_str()) - .unwrap_or(all_keys.len()) + prefix_filter.partition_point(|k| k.0.as_str() <= start_after.as_str()) } else { 0 }; @@ -944,10 +1123,10 @@ impl FsStorageBackend { params.max_keys }; - let end_idx = std::cmp::min(start_idx + max_keys, all_keys.len()); - let is_truncated = end_idx < all_keys.len(); + let end_idx = std::cmp::min(start_idx + max_keys, prefix_filter.len()); + let is_truncated = end_idx < prefix_filter.len(); - let objects: Vec = all_keys[start_idx..end_idx] + let objects: Vec = prefix_filter[start_idx..end_idx] .iter() .map(|(key, size, mtime, etag)| { let lm = Utc @@ -955,10 +1134,7 @@ impl FsStorageBackend { .single() .unwrap_or_else(Utc::now); let mut obj = ObjectMeta::new(key.clone(), *size, lm); - obj.etag = etag.clone().or_else(|| { - let meta = self.read_metadata_sync(bucket_name, key); - meta.get("__etag__").cloned() - }); + obj.etag = etag.clone(); obj }) .collect(); @@ -976,37 +1152,40 @@ impl FsStorageBackend { }) } - fn list_objects_shallow_sync( + fn build_shallow_sync( &self, bucket_name: &str, - params: &ShallowListParams, - ) -> StorageResult { + rel_dir: &Path, + delimiter: &str, + ) -> StorageResult> { let bucket_path = self.require_bucket(bucket_name)?; + let target_dir = bucket_path.join(rel_dir); - let target_dir = if params.prefix.is_empty() { - bucket_path.clone() - } else { - let prefix_path = Path::new(¶ms.prefix); - let dir_part = if params.prefix.ends_with(¶ms.delimiter) { - prefix_path.to_path_buf() - } else { - prefix_path.parent().unwrap_or(Path::new("")).to_path_buf() - }; - bucket_path.join(dir_part) - }; + if !path_is_within(&target_dir, &bucket_path) { + return Err(StorageError::InvalidObjectKey( + "prefix escapes bucket root".to_string(), + )); + } if !target_dir.exists() { - return Ok(ShallowListResult { - objects: Vec::new(), - common_prefixes: Vec::new(), - is_truncated: false, - next_continuation_token: None, - }); + return Ok(Arc::new(ShallowCacheEntry::default())); } + let dir_etags = self.load_dir_index_sync(bucket_name, rel_dir); + let mut files = Vec::new(); let mut dirs = Vec::new(); + let rel_dir_prefix = if rel_dir.as_os_str().is_empty() { + String::new() + } else { + let mut s = rel_dir.to_string_lossy().replace('\\', "/"); + if !s.ends_with('/') { + s.push('/'); + } + s + }; + let entries = std::fs::read_dir(&target_dir).map_err(StorageError::Io)?; for entry in entries.flatten() { let name = entry.file_name(); @@ -1021,19 +1200,10 @@ impl FsStorageBackend { Err(_) => continue, }; - let rel = entry - .path() - .strip_prefix(&bucket_path) - .unwrap_or(Path::new("")) - .to_string_lossy() - .replace('\\', "/"); - - if !params.prefix.is_empty() && !rel.starts_with(¶ms.prefix) { - continue; - } + let rel = format!("{}{}", rel_dir_prefix, name_str); if ft.is_dir() { - dirs.push(format!("{}{}", rel, ¶ms.delimiter)); + dirs.push(format!("{}{}", rel, delimiter)); } else if ft.is_file() { if let Ok(meta) = entry.metadata() { let mtime = meta @@ -1046,10 +1216,7 @@ impl FsStorageBackend { .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) .single() .unwrap_or_else(Utc::now); - let etag = self - .read_metadata_sync(bucket_name, &rel) - .get("__etag__") - .cloned(); + let etag = dir_etags.get(&name_str).cloned(); let mut obj = ObjectMeta::new(rel, meta.len(), lm); obj.etag = etag; files.push(obj); @@ -1059,67 +1226,133 @@ impl FsStorageBackend { files.sort_by(|a, b| a.key.cmp(&b.key)); dirs.sort(); + Ok(Arc::new(ShallowCacheEntry { files, dirs })) + } - let mut merged: Vec = Vec::new(); - let mut fi = 0; - let mut di = 0; - while fi < files.len() && di < dirs.len() { - if files[fi].key < dirs[di] { - merged.push(Either::File(fi)); - fi += 1; - } else { - merged.push(Either::Dir(di)); - di += 1; + fn get_shallow_sync( + &self, + bucket_name: &str, + rel_dir: &Path, + delimiter: &str, + ) -> StorageResult> { + let cache_key = ( + bucket_name.to_string(), + rel_dir.to_path_buf(), + delimiter.to_string(), + ); + if let Some(entry) = self.shallow_cache.get(&cache_key) { + let (cached, cached_at) = entry.value(); + if cached_at.elapsed() < self.list_cache_ttl { + return Ok(cached.clone()); } } - while fi < files.len() { - merged.push(Either::File(fi)); - fi += 1; - } - while di < dirs.len() { - merged.push(Either::Dir(di)); - di += 1; + + let lock = self + .shallow_rebuild_locks + .entry(cache_key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _guard = lock.lock(); + + if let Some(entry) = self.shallow_cache.get(&cache_key) { + let (cached, cached_at) = entry.value(); + if cached_at.elapsed() < self.list_cache_ttl { + return Ok(cached.clone()); + } } - let start_idx = if let Some(ref token) = params.continuation_token { - merged - .iter() - .position(|e| match e { - Either::File(i) => files[*i].key.as_str() > token.as_str(), - Either::Dir(i) => dirs[*i].as_str() > token.as_str(), - }) - .unwrap_or(merged.len()) + let built = self.build_shallow_sync(bucket_name, rel_dir, delimiter)?; + self.shallow_cache + .insert(cache_key, (built.clone(), Instant::now())); + Ok(built) + } + + fn list_objects_shallow_sync( + &self, + bucket_name: &str, + params: &ShallowListParams, + ) -> StorageResult { + self.require_bucket(bucket_name)?; + + let rel_dir: PathBuf = if params.prefix.is_empty() { + PathBuf::new() } else { - 0 + validate_list_prefix(¶ms.prefix)?; + let prefix_path = Path::new(¶ms.prefix); + if params.prefix.ends_with(¶ms.delimiter) { + prefix_path.to_path_buf() + } else { + prefix_path.parent().unwrap_or(Path::new("")).to_path_buf() + } }; + let cached = self.get_shallow_sync(bucket_name, &rel_dir, ¶ms.delimiter)?; + + let (file_start, file_end) = + slice_range_for_prefix(&cached.files, |o| &o.key, ¶ms.prefix); + let (dir_start, dir_end) = slice_range_for_prefix(&cached.dirs, |s| s, ¶ms.prefix); + let files = &cached.files[file_start..file_end]; + let dirs = &cached.dirs[dir_start..dir_end]; + let max_keys = if params.max_keys == 0 { DEFAULT_MAX_KEYS } else { params.max_keys }; - let end_idx = std::cmp::min(start_idx + max_keys, merged.len()); - let is_truncated = end_idx < merged.len(); + let token_filter = |key: &str| -> bool { + params + .continuation_token + .as_deref() + .map(|t| key > t) + .unwrap_or(true) + }; - let mut result_objects = Vec::new(); - let mut result_prefixes = Vec::new(); + let file_skip = params + .continuation_token + .as_deref() + .map(|t| files.partition_point(|o| o.key.as_str() <= t)) + .unwrap_or(0); + let dir_skip = params + .continuation_token + .as_deref() + .map(|t| dirs.partition_point(|d| d.as_str() <= t)) + .unwrap_or(0); - for item in &merged[start_idx..end_idx] { - match item { - Either::File(i) => result_objects.push(files[*i].clone()), - Either::Dir(i) => result_prefixes.push(dirs[*i].clone()), + let mut fi = file_skip; + let mut di = dir_skip; + let mut result_objects: Vec = Vec::new(); + let mut result_prefixes: Vec = Vec::new(); + let mut last_key: Option = None; + let mut total = 0usize; + + while total < max_keys && (fi < files.len() || di < dirs.len()) { + let take_file = match (fi < files.len(), di < dirs.len()) { + (true, true) => files[fi].key.as_str() < dirs[di].as_str(), + (true, false) => true, + (false, true) => false, + _ => break, + }; + if take_file { + if token_filter(&files[fi].key) { + last_key = Some(files[fi].key.clone()); + result_objects.push(files[fi].clone()); + total += 1; + } + fi += 1; + } else { + if token_filter(&dirs[di]) { + last_key = Some(dirs[di].clone()); + result_prefixes.push(dirs[di].clone()); + total += 1; + } + di += 1; } } - let next_token = if is_truncated { - match &merged[end_idx - 1] { - Either::File(i) => Some(files[*i].key.clone()), - Either::Dir(i) => Some(dirs[*i].clone()), - } - } else { - None - }; + let remaining = fi < files.len() || di < dirs.len(); + let is_truncated = remaining; + let next_token = if is_truncated { last_key } else { None }; Ok(ShallowListResult { objects: result_objects, @@ -1195,7 +1428,7 @@ impl FsStorageBackend { StorageError::Io(e) })?; - self.stats_cache.remove(bucket_name); + self.invalidate_bucket_caches(bucket_name); let file_meta = std::fs::metadata(&destination).map_err(StorageError::Io)?; let mtime = file_meta @@ -1254,11 +1487,6 @@ impl FsStorageBackend { } } -enum Either { - File(usize), - Dir(usize), -} - impl crate::traits::StorageEngine for FsStorageBackend { async fn list_buckets(&self) -> StorageResult> { let root = self.root.clone(); @@ -1341,7 +1569,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { Self::remove_tree(&self.multipart_bucket_root(name)); self.bucket_config_cache.remove(name); - self.stats_cache.remove(name); + self.invalidate_bucket_caches(name); Ok(()) } @@ -1550,6 +1778,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { .map_err(StorageError::Io)?; Self::cleanup_empty_parents(&path, &bucket_path); + self.invalidate_bucket_caches(bucket); Ok(()) } @@ -1575,7 +1804,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; let versions_root = self.bucket_versions_root(bucket); Self::cleanup_empty_parents(&manifest_path, &versions_root); - self.stats_cache.remove(bucket); + self.invalidate_bucket_caches(bucket); Ok(()) } @@ -1621,6 +1850,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { entry.insert("metadata".to_string(), Value::Object(meta_map)); self.write_index_entry_sync(bucket, key, &entry) .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); Ok(()) } @@ -1629,7 +1859,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, params: &ListParams, ) -> StorageResult { - self.list_objects_sync(bucket, params) + run_blocking(|| self.list_objects_sync(bucket, params)) } async fn list_objects_shallow( @@ -1637,7 +1867,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { bucket: &str, params: &ShallowListParams, ) -> StorageResult { - self.list_objects_shallow_sync(bucket, params) + run_blocking(|| self.list_objects_shallow_sync(bucket, params)) } async fn initiate_multipart( @@ -2192,6 +2422,7 @@ impl crate::traits::StorageEngine for FsStorageBackend { self.write_index_entry_sync(bucket, key, &entry) .map_err(StorageError::Io)?; + self.invalidate_bucket_caches(bucket); Ok(()) }