Optimize bucket listing for 10K-100K objects

- Shallow listing: read per-directory _index.json once for eTags instead
  of N serial .meta.json reads. Validate prefix for path traversal and
  verify normalized target stays within bucket root.
- Recursive listing: cache full per-directory index during the walk so
  each _index.json is parsed at most once per call.
- Per-bucket listing cache with 5s TTL and per-bucket rebuild mutex.
  Invalidated on put/delete/copy/metadata/tags/multipart-complete.
  Pagination uses partition_point for O(log n) start lookup.
- UI stream endpoint now actually streams via mpsc + Body::from_stream
  instead of buffering into a Vec<String>. Cancels producer on client
  disconnect.
- UI JSON endpoint honors delimiter=/ and returns common_prefixes.
- run_blocking wrapper dispatches sync filesystem work via
  block_in_place on multi-threaded runtimes, falls back to inline on
  current-thread runtimes (unit tests).
This commit is contained in:
2026-04-22 19:55:44 +08:00
parent 217af6d1c6
commit 2767e7e79d
6 changed files with 549 additions and 224 deletions

12
Cargo.lock generated
View File

@@ -2740,6 +2740,7 @@ dependencies = [
"tempfile", "tempfile",
"tera", "tera",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",
"tower-http", "tower-http",
@@ -4193,6 +4194,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"

View File

@@ -43,6 +43,7 @@ thiserror = "2"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
base64 = "0.22" base64 = "0.22"
tokio-util = { version = "0.7", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
tokio-stream = "0.1"
futures = "0.3" futures = "0.3"
dashmap = "6" dashmap = "6"
crc32fast = "1" crc32fast = "1"

View File

@@ -23,6 +23,7 @@ serde_urlencoded = "0.7"
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tokio-stream = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
futures = { workspace = true } futures = { workspace = true }

View File

@@ -904,6 +904,35 @@ pub struct ListObjectsQuery {
pub prefix: Option<String>, pub prefix: Option<String>,
#[serde(default)] #[serde(default)]
pub start_after: Option<String>, pub start_after: Option<String>,
#[serde(default)]
pub delimiter: Option<String>,
}
fn object_json(bucket_name: &str, o: &myfsio_common::types::ObjectMeta) -> Value {
json!({
"key": o.key,
"size": o.size,
"last_modified": o.last_modified.to_rfc3339(),
"last_modified_iso": o.last_modified.to_rfc3339(),
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
"etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
"content_type": o.content_type.clone().unwrap_or_default(),
"download_url": build_ui_object_url(bucket_name, &o.key, "download"),
"preview_url": build_ui_object_url(bucket_name, &o.key, "preview"),
"delete_endpoint": build_ui_object_url(bucket_name, &o.key, "delete"),
"presign_endpoint": build_ui_object_url(bucket_name, &o.key, "presign"),
"metadata_url": build_ui_object_url(bucket_name, &o.key, "metadata"),
"versions_endpoint": build_ui_object_url(bucket_name, &o.key, "versions"),
"restore_template": format!(
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
bucket_name,
encode_object_key(&o.key)
),
"tags_url": build_ui_object_url(bucket_name, &o.key, "tags"),
"copy_url": build_ui_object_url(bucket_name, &o.key, "copy"),
"move_url": build_ui_object_url(bucket_name, &o.key, "move"),
})
} }
pub async fn list_bucket_objects( pub async fn list_bucket_objects(
@@ -917,6 +946,49 @@ pub async fn list_bucket_objects(
} }
let max_keys = q.max_keys.unwrap_or(1000).min(5000); let max_keys = q.max_keys.unwrap_or(1000).min(5000);
let versioning_enabled = state
.storage
.is_versioning_enabled(&bucket_name)
.await
.unwrap_or(false);
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
let use_shallow = q.delimiter.as_deref() == Some("/");
if use_shallow {
let params = myfsio_common::types::ShallowListParams {
prefix: q.prefix.clone().unwrap_or_default(),
delimiter: "/".to_string(),
max_keys,
continuation_token: q.continuation_token.clone(),
};
return match state
.storage
.list_objects_shallow(&bucket_name, &params)
.await
{
Ok(res) => {
let objects: Vec<Value> = res
.objects
.iter()
.map(|o| object_json(&bucket_name, o))
.collect();
Json(json!({
"versioning_enabled": versioning_enabled,
"total_count": total_count,
"is_truncated": res.is_truncated,
"next_continuation_token": res.next_continuation_token,
"url_templates": url_templates_for(&bucket_name),
"objects": objects,
"common_prefixes": res.common_prefixes,
}))
.into_response()
}
Err(e) => storage_json_error(e),
};
}
let params = ListParams { let params = ListParams {
max_keys, max_keys,
continuation_token: q.continuation_token.clone(), continuation_token: q.continuation_token.clone(),
@@ -924,46 +996,12 @@ pub async fn list_bucket_objects(
start_after: q.start_after.clone(), start_after: q.start_after.clone(),
}; };
let versioning_enabled = state
.storage
.is_versioning_enabled(&bucket_name)
.await
.unwrap_or(false);
let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
match state.storage.list_objects(&bucket_name, &params).await { match state.storage.list_objects(&bucket_name, &params).await {
Ok(res) => { Ok(res) => {
let objects: Vec<Value> = res let objects: Vec<Value> = res
.objects .objects
.iter() .iter()
.map(|o| { .map(|o| object_json(&bucket_name, o))
json!({
"key": o.key,
"size": o.size,
"last_modified": o.last_modified.to_rfc3339(),
"last_modified_iso": o.last_modified.to_rfc3339(),
"last_modified_display": o.last_modified.format("%Y-%m-%d %H:%M:%S").to_string(),
"etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
"content_type": o.content_type.clone().unwrap_or_default(),
"download_url": build_ui_object_url(&bucket_name, &o.key, "download"),
"preview_url": build_ui_object_url(&bucket_name, &o.key, "preview"),
"delete_endpoint": build_ui_object_url(&bucket_name, &o.key, "delete"),
"presign_endpoint": build_ui_object_url(&bucket_name, &o.key, "presign"),
"metadata_url": build_ui_object_url(&bucket_name, &o.key, "metadata"),
"versions_endpoint": build_ui_object_url(&bucket_name, &o.key, "versions"),
"restore_template": format!(
"/ui/buckets/{}/objects/{}/restore/VERSION_ID_PLACEHOLDER",
bucket_name,
encode_object_key(&o.key)
),
"tags_url": build_ui_object_url(&bucket_name, &o.key, "tags"),
"copy_url": build_ui_object_url(&bucket_name, &o.key, "copy"),
"move_url": build_ui_object_url(&bucket_name, &o.key, "move"),
})
})
.collect(); .collect();
Json(json!({ Json(json!({
@@ -1006,41 +1044,62 @@ pub async fn stream_bucket_objects(
let stats = state.storage.bucket_stats(&bucket_name).await.ok(); let stats = state.storage.bucket_stats(&bucket_name).await.ok();
let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0); let total_count = stats.as_ref().map(|s| s.objects).unwrap_or(0);
let mut lines: Vec<String> = Vec::new();
lines.push(
json!({
"type": "meta",
"url_templates": url_templates_for(&bucket_name),
"versioning_enabled": versioning_enabled,
})
.to_string(),
);
lines.push(json!({ "type": "count", "total_count": total_count }).to_string());
let use_delimiter = q.delimiter.as_deref() == Some("/"); let use_delimiter = q.delimiter.as_deref() == Some("/");
let prefix = q.prefix.clone().unwrap_or_default(); let prefix = q.prefix.clone().unwrap_or_default();
if use_delimiter { let (tx, rx) = tokio::sync::mpsc::channel::<Result<bytes::Bytes, std::io::Error>>(64);
let mut token: Option<String> = None;
loop { let meta_line = json!({
let params = myfsio_common::types::ShallowListParams { "type": "meta",
prefix: prefix.clone(), "url_templates": url_templates_for(&bucket_name),
delimiter: "/".to_string(), "versioning_enabled": versioning_enabled,
max_keys: UI_OBJECT_BROWSER_MAX_KEYS, })
continuation_token: token.clone(), .to_string()
}; + "\n";
match state let count_line = json!({ "type": "count", "total_count": total_count }).to_string() + "\n";
.storage
.list_objects_shallow(&bucket_name, &params) let storage = state.storage.clone();
.await let bucket = bucket_name.clone();
{
Ok(res) => { tokio::spawn(async move {
for p in &res.common_prefixes { if tx
lines.push(json!({ "type": "folder", "prefix": p }).to_string()); .send(Ok(bytes::Bytes::from(meta_line.into_bytes())))
} .await
for o in &res.objects { .is_err()
lines.push( {
json!({ return;
}
if tx
.send(Ok(bytes::Bytes::from(count_line.into_bytes())))
.await
.is_err()
{
return;
}
if use_delimiter {
let mut token: Option<String> = None;
loop {
let params = myfsio_common::types::ShallowListParams {
prefix: prefix.clone(),
delimiter: "/".to_string(),
max_keys: UI_OBJECT_BROWSER_MAX_KEYS,
continuation_token: token.clone(),
};
match storage.list_objects_shallow(&bucket, &params).await {
Ok(res) => {
for p in &res.common_prefixes {
let line = json!({ "type": "folder", "prefix": p }).to_string() + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
}
for o in &res.objects {
let line = json!({
"type": "object", "type": "object",
"key": o.key, "key": o.key,
"size": o.size, "size": o.size,
@@ -1050,38 +1109,46 @@ pub async fn stream_bucket_objects(
"etag": o.etag.clone().unwrap_or_default(), "etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
}) })
.to_string(), .to_string()
); + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
}
if !res.is_truncated || res.next_continuation_token.is_none() {
break;
}
token = res.next_continuation_token;
} }
if !res.is_truncated || res.next_continuation_token.is_none() { Err(e) => {
break; let line =
json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
return;
} }
token = res.next_continuation_token;
}
Err(e) => {
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string());
break;
} }
} }
} } else {
} else { let mut token: Option<String> = None;
let mut token: Option<String> = None; loop {
loop { let params = ListParams {
let params = ListParams { max_keys: 1000,
max_keys: 1000, continuation_token: token.clone(),
continuation_token: token.clone(), prefix: if prefix.is_empty() {
prefix: if prefix.is_empty() { None
None } else {
} else { Some(prefix.clone())
Some(prefix.clone()) },
}, start_after: None,
start_after: None, };
}; match storage.list_objects(&bucket, &params).await {
match state.storage.list_objects(&bucket_name, &params).await { Ok(res) => {
Ok(res) => { for o in &res.objects {
for o in &res.objects { let line = json!({
lines.push(
json!({
"type": "object", "type": "object",
"key": o.key, "key": o.key,
"size": o.size, "size": o.size,
@@ -1091,30 +1158,48 @@ pub async fn stream_bucket_objects(
"etag": o.etag.clone().unwrap_or_default(), "etag": o.etag.clone().unwrap_or_default(),
"storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()), "storage_class": o.storage_class.clone().unwrap_or_else(|| "STANDARD".to_string()),
}) })
.to_string(), .to_string()
); + "\n";
if tx
.send(Ok(bytes::Bytes::from(line.into_bytes())))
.await
.is_err()
{
return;
}
}
if !res.is_truncated || res.next_continuation_token.is_none() {
break;
}
token = res.next_continuation_token;
} }
if !res.is_truncated || res.next_continuation_token.is_none() { Err(e) => {
break; let line =
json!({ "type": "error", "error": e.to_string() }).to_string() + "\n";
let _ = tx.send(Ok(bytes::Bytes::from(line.into_bytes()))).await;
return;
} }
token = res.next_continuation_token;
}
Err(e) => {
lines.push(json!({ "type": "error", "error": e.to_string() }).to_string());
break;
} }
} }
} }
}
lines.push(json!({ "type": "done" }).to_string()); let done_line = json!({ "type": "done" }).to_string() + "\n";
let _ = tx
.send(Ok(bytes::Bytes::from(done_line.into_bytes())))
.await;
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
let body = Body::from_stream(stream);
let body = lines.join("\n") + "\n";
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert( headers.insert(
header::CONTENT_TYPE, header::CONTENT_TYPE,
"application/x-ndjson; charset=utf-8".parse().unwrap(), "application/x-ndjson; charset=utf-8".parse().unwrap(),
); );
headers.insert(header::CACHE_CONTROL, "no-cache".parse().unwrap());
headers.insert("x-accel-buffering", "no".parse().unwrap());
(StatusCode::OK, headers, body).into_response() (StatusCode::OK, headers, body).into_response()
} }

View File

@@ -227,9 +227,7 @@ async fn parse_form_any(
if is_multipart { if is_multipart {
let boundary = multer::parse_boundary(&content_type) let boundary = multer::parse_boundary(&content_type)
.map_err(|_| "Missing multipart boundary".to_string())?; .map_err(|_| "Missing multipart boundary".to_string())?;
let stream = futures::stream::once(async move { let stream = futures::stream::once(async move { Ok::<_, std::io::Error>(bytes) });
Ok::<_, std::io::Error>(bytes)
});
let mut multipart = multer::Multipart::new(stream, boundary); let mut multipart = multer::Multipart::new(stream, boundary);
let mut out = HashMap::new(); let mut out = HashMap::new();
while let Some(field) = multipart while let Some(field) = multipart
@@ -2173,10 +2171,7 @@ pub async fn create_bucket(
let wants_json = wants_json(&headers); let wants_json = wants_json(&headers);
let form = match parse_form_any(&headers, body).await { let form = match parse_form_any(&headers, body).await {
Ok(fields) => CreateBucketForm { Ok(fields) => CreateBucketForm {
bucket_name: fields bucket_name: fields.get("bucket_name").cloned().unwrap_or_default(),
.get("bucket_name")
.cloned()
.unwrap_or_default(),
csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(), csrf_token: fields.get("csrf_token").cloned().unwrap_or_default(),
}, },
Err(message) => { Err(message) => {

View File

@@ -10,12 +10,92 @@ use md5::{Digest, Md5};
use parking_lot::Mutex; use parking_lot::Mutex;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path, PathBuf}; use std::path::{Component, Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use uuid::Uuid; use uuid::Uuid;
fn validate_list_prefix(prefix: &str) -> StorageResult<()> {
if prefix.contains('\0') {
return Err(StorageError::InvalidObjectKey(
"prefix contains null bytes".to_string(),
));
}
if prefix.starts_with('/') || prefix.starts_with('\\') {
return Err(StorageError::InvalidObjectKey(
"prefix cannot start with a slash".to_string(),
));
}
for part in prefix.split(['/', '\\']) {
if part == ".." {
return Err(StorageError::InvalidObjectKey(
"prefix contains parent directory references".to_string(),
));
}
}
Ok(())
}
fn run_blocking<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
match tokio::runtime::Handle::try_current() {
Ok(handle) if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(f)
}
_ => f(),
}
}
fn slice_range_for_prefix<T, F>(items: &[T], key_of: F, prefix: &str) -> (usize, usize)
where
F: Fn(&T) -> &str,
{
if prefix.is_empty() {
return (0, items.len());
}
let start = items.partition_point(|item| key_of(item) < prefix);
let end_from_start = items[start..]
.iter()
.position(|item| !key_of(item).starts_with(prefix))
.map(|p| start + p)
.unwrap_or(items.len());
(start, end_from_start)
}
fn normalize_path(p: &Path) -> Option<PathBuf> {
let mut out = PathBuf::new();
for comp in p.components() {
match comp {
Component::ParentDir => {
if !out.pop() {
return None;
}
}
Component::CurDir => {}
other => out.push(other.as_os_str()),
}
}
Some(out)
}
fn path_is_within(candidate: &Path, root: &Path) -> bool {
match (normalize_path(candidate), normalize_path(root)) {
(Some(c), Some(r)) => c.starts_with(&r),
_ => false,
}
}
type ListCacheEntry = (String, u64, f64, Option<String>);
#[derive(Clone, Default)]
struct ShallowCacheEntry {
files: Vec<ObjectMeta>,
dirs: Vec<String>,
}
pub struct FsStorageBackend { pub struct FsStorageBackend {
root: PathBuf, root: PathBuf,
object_key_max_length_bytes: usize, object_key_max_length_bytes: usize,
@@ -26,6 +106,11 @@ pub struct FsStorageBackend {
meta_index_locks: DashMap<String, Arc<Mutex<()>>>, meta_index_locks: DashMap<String, Arc<Mutex<()>>>,
stats_cache: DashMap<String, (BucketStats, Instant)>, stats_cache: DashMap<String, (BucketStats, Instant)>,
stats_cache_ttl: std::time::Duration, stats_cache_ttl: std::time::Duration,
list_cache: DashMap<String, (Arc<Vec<ListCacheEntry>>, Instant)>,
shallow_cache: DashMap<(String, PathBuf, String), (Arc<ShallowCacheEntry>, Instant)>,
list_rebuild_locks: DashMap<String, Arc<Mutex<()>>>,
shallow_rebuild_locks: DashMap<(String, PathBuf, String), Arc<Mutex<()>>>,
list_cache_ttl: std::time::Duration,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -61,11 +146,22 @@ impl FsStorageBackend {
meta_index_locks: DashMap::new(), meta_index_locks: DashMap::new(),
stats_cache: DashMap::new(), stats_cache: DashMap::new(),
stats_cache_ttl: std::time::Duration::from_secs(60), stats_cache_ttl: std::time::Duration::from_secs(60),
list_cache: DashMap::new(),
shallow_cache: DashMap::new(),
list_rebuild_locks: DashMap::new(),
shallow_rebuild_locks: DashMap::new(),
list_cache_ttl: std::time::Duration::from_secs(5),
}; };
backend.ensure_system_roots(); backend.ensure_system_roots();
backend backend
} }
fn invalidate_bucket_caches(&self, bucket_name: &str) {
self.stats_cache.remove(bucket_name);
self.list_cache.remove(bucket_name);
self.shallow_cache.retain(|(b, _, _), _| b != bucket_name);
}
fn ensure_system_roots(&self) { fn ensure_system_roots(&self) {
let dirs = [ let dirs = [
self.system_root_path(), self.system_root_path(),
@@ -158,6 +254,39 @@ impl FsStorageBackend {
} }
} }
fn index_file_for_dir(&self, bucket_name: &str, rel_dir: &Path) -> PathBuf {
let meta_root = self.bucket_meta_root(bucket_name);
if rel_dir.as_os_str().is_empty() || rel_dir == Path::new(".") {
meta_root.join(INDEX_FILE)
} else {
meta_root.join(rel_dir).join(INDEX_FILE)
}
}
fn load_dir_index_sync(&self, bucket_name: &str, rel_dir: &Path) -> HashMap<String, String> {
let index_path = self.index_file_for_dir(bucket_name, rel_dir);
if !index_path.exists() {
return HashMap::new();
}
let Ok(text) = std::fs::read_to_string(&index_path) else {
return HashMap::new();
};
let Ok(index) = serde_json::from_str::<HashMap<String, Value>>(&text) else {
return HashMap::new();
};
let mut out = HashMap::with_capacity(index.len());
for (name, entry) in index {
if let Some(etag) = entry
.get("metadata")
.and_then(|m| m.get("__etag__"))
.and_then(|v| v.as_str())
{
out.insert(name, etag.to_string());
}
}
out
}
fn get_meta_index_lock(&self, index_path: &str) -> Arc<Mutex<()>> { fn get_meta_index_lock(&self, index_path: &str) -> Arc<Mutex<()>> {
self.meta_index_locks self.meta_index_locks
.entry(index_path.to_string()) .entry(index_path.to_string())
@@ -872,14 +1001,14 @@ impl FsStorageBackend {
Ok(stats) Ok(stats)
} }
fn list_objects_sync( fn build_full_listing_sync(
&self, &self,
bucket_name: &str, bucket_name: &str,
params: &ListParams, ) -> StorageResult<Arc<Vec<ListCacheEntry>>> {
) -> StorageResult<ListObjectsResult> {
let bucket_path = self.require_bucket(bucket_name)?; let bucket_path = self.require_bucket(bucket_name)?;
let mut all_keys: Vec<(String, u64, f64, Option<String>)> = Vec::new(); let mut all_keys: Vec<ListCacheEntry> = Vec::new();
let mut dir_etag_cache: HashMap<PathBuf, HashMap<String, String>> = HashMap::new();
let internal = INTERNAL_FOLDERS; let internal = INTERNAL_FOLDERS;
let bucket_str = bucket_path.to_string_lossy().to_string(); let bucket_str = bucket_path.to_string_lossy().to_string();
let bucket_prefix_len = bucket_str.len() + 1; let bucket_prefix_len = bucket_str.len() + 1;
@@ -912,28 +1041,78 @@ impl FsStorageBackend {
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs_f64()) .map(|d| d.as_secs_f64())
.unwrap_or(0.0); .unwrap_or(0.0);
all_keys.push((key, meta.len(), mtime, None));
let rel_dir = Path::new(&key)
.parent()
.map(|p| p.to_path_buf())
.unwrap_or_default();
let etags = dir_etag_cache
.entry(rel_dir.clone())
.or_insert_with(|| self.load_dir_index_sync(bucket_name, &rel_dir));
let etag = etags.get(name_str.as_ref()).cloned();
all_keys.push((key, meta.len(), mtime, etag));
} }
} }
} }
} }
all_keys.sort_by(|a, b| a.0.cmp(&b.0)); all_keys.sort_by(|a, b| a.0.cmp(&b.0));
Ok(Arc::new(all_keys))
}
if let Some(ref prefix) = params.prefix { fn get_full_listing_sync(&self, bucket_name: &str) -> StorageResult<Arc<Vec<ListCacheEntry>>> {
all_keys.retain(|k| k.0.starts_with(prefix.as_str())); if let Some(entry) = self.list_cache.get(bucket_name) {
let (cached, cached_at) = entry.value();
if cached_at.elapsed() < self.list_cache_ttl {
return Ok(cached.clone());
}
} }
let lock = self
.list_rebuild_locks
.entry(bucket_name.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone();
let _guard = lock.lock();
if let Some(entry) = self.list_cache.get(bucket_name) {
let (cached, cached_at) = entry.value();
if cached_at.elapsed() < self.list_cache_ttl {
return Ok(cached.clone());
}
}
let listing = self.build_full_listing_sync(bucket_name)?;
self.list_cache
.insert(bucket_name.to_string(), (listing.clone(), Instant::now()));
Ok(listing)
}
fn list_objects_sync(
&self,
bucket_name: &str,
params: &ListParams,
) -> StorageResult<ListObjectsResult> {
self.require_bucket(bucket_name)?;
if let Some(ref prefix) = params.prefix {
if !prefix.is_empty() {
validate_list_prefix(prefix)?;
}
}
let listing = self.get_full_listing_sync(bucket_name)?;
let (slice_start, slice_end) = match params.prefix.as_deref() {
Some(p) if !p.is_empty() => slice_range_for_prefix(&listing[..], |e| &e.0, p),
_ => (0, listing.len()),
};
let prefix_filter = &listing[slice_start..slice_end];
let start_idx = if let Some(ref token) = params.continuation_token { let start_idx = if let Some(ref token) = params.continuation_token {
all_keys prefix_filter.partition_point(|k| k.0.as_str() <= token.as_str())
.iter()
.position(|k| k.0.as_str() > token.as_str())
.unwrap_or(all_keys.len())
} else if let Some(ref start_after) = params.start_after { } else if let Some(ref start_after) = params.start_after {
all_keys prefix_filter.partition_point(|k| k.0.as_str() <= start_after.as_str())
.iter()
.position(|k| k.0.as_str() > start_after.as_str())
.unwrap_or(all_keys.len())
} else { } else {
0 0
}; };
@@ -944,10 +1123,10 @@ impl FsStorageBackend {
params.max_keys params.max_keys
}; };
let end_idx = std::cmp::min(start_idx + max_keys, all_keys.len()); let end_idx = std::cmp::min(start_idx + max_keys, prefix_filter.len());
let is_truncated = end_idx < all_keys.len(); let is_truncated = end_idx < prefix_filter.len();
let objects: Vec<ObjectMeta> = all_keys[start_idx..end_idx] let objects: Vec<ObjectMeta> = prefix_filter[start_idx..end_idx]
.iter() .iter()
.map(|(key, size, mtime, etag)| { .map(|(key, size, mtime, etag)| {
let lm = Utc let lm = Utc
@@ -955,10 +1134,7 @@ impl FsStorageBackend {
.single() .single()
.unwrap_or_else(Utc::now); .unwrap_or_else(Utc::now);
let mut obj = ObjectMeta::new(key.clone(), *size, lm); let mut obj = ObjectMeta::new(key.clone(), *size, lm);
obj.etag = etag.clone().or_else(|| { obj.etag = etag.clone();
let meta = self.read_metadata_sync(bucket_name, key);
meta.get("__etag__").cloned()
});
obj obj
}) })
.collect(); .collect();
@@ -976,37 +1152,40 @@ impl FsStorageBackend {
}) })
} }
fn list_objects_shallow_sync( fn build_shallow_sync(
&self, &self,
bucket_name: &str, bucket_name: &str,
params: &ShallowListParams, rel_dir: &Path,
) -> StorageResult<ShallowListResult> { delimiter: &str,
) -> StorageResult<Arc<ShallowCacheEntry>> {
let bucket_path = self.require_bucket(bucket_name)?; let bucket_path = self.require_bucket(bucket_name)?;
let target_dir = bucket_path.join(rel_dir);
let target_dir = if params.prefix.is_empty() { if !path_is_within(&target_dir, &bucket_path) {
bucket_path.clone() return Err(StorageError::InvalidObjectKey(
} else { "prefix escapes bucket root".to_string(),
let prefix_path = Path::new(&params.prefix); ));
let dir_part = if params.prefix.ends_with(&params.delimiter) { }
prefix_path.to_path_buf()
} else {
prefix_path.parent().unwrap_or(Path::new("")).to_path_buf()
};
bucket_path.join(dir_part)
};
if !target_dir.exists() { if !target_dir.exists() {
return Ok(ShallowListResult { return Ok(Arc::new(ShallowCacheEntry::default()));
objects: Vec::new(),
common_prefixes: Vec::new(),
is_truncated: false,
next_continuation_token: None,
});
} }
let dir_etags = self.load_dir_index_sync(bucket_name, rel_dir);
let mut files = Vec::new(); let mut files = Vec::new();
let mut dirs = Vec::new(); let mut dirs = Vec::new();
let rel_dir_prefix = if rel_dir.as_os_str().is_empty() {
String::new()
} else {
let mut s = rel_dir.to_string_lossy().replace('\\', "/");
if !s.ends_with('/') {
s.push('/');
}
s
};
let entries = std::fs::read_dir(&target_dir).map_err(StorageError::Io)?; let entries = std::fs::read_dir(&target_dir).map_err(StorageError::Io)?;
for entry in entries.flatten() { for entry in entries.flatten() {
let name = entry.file_name(); let name = entry.file_name();
@@ -1021,19 +1200,10 @@ impl FsStorageBackend {
Err(_) => continue, Err(_) => continue,
}; };
let rel = entry let rel = format!("{}{}", rel_dir_prefix, name_str);
.path()
.strip_prefix(&bucket_path)
.unwrap_or(Path::new(""))
.to_string_lossy()
.replace('\\', "/");
if !params.prefix.is_empty() && !rel.starts_with(&params.prefix) {
continue;
}
if ft.is_dir() { if ft.is_dir() {
dirs.push(format!("{}{}", rel, &params.delimiter)); dirs.push(format!("{}{}", rel, delimiter));
} else if ft.is_file() { } else if ft.is_file() {
if let Ok(meta) = entry.metadata() { if let Ok(meta) = entry.metadata() {
let mtime = meta let mtime = meta
@@ -1046,10 +1216,7 @@ impl FsStorageBackend {
.timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32) .timestamp_opt(mtime as i64, ((mtime % 1.0) * 1_000_000_000.0) as u32)
.single() .single()
.unwrap_or_else(Utc::now); .unwrap_or_else(Utc::now);
let etag = self let etag = dir_etags.get(&name_str).cloned();
.read_metadata_sync(bucket_name, &rel)
.get("__etag__")
.cloned();
let mut obj = ObjectMeta::new(rel, meta.len(), lm); let mut obj = ObjectMeta::new(rel, meta.len(), lm);
obj.etag = etag; obj.etag = etag;
files.push(obj); files.push(obj);
@@ -1059,67 +1226,133 @@ impl FsStorageBackend {
files.sort_by(|a, b| a.key.cmp(&b.key)); files.sort_by(|a, b| a.key.cmp(&b.key));
dirs.sort(); dirs.sort();
Ok(Arc::new(ShallowCacheEntry { files, dirs }))
}
let mut merged: Vec<Either> = Vec::new(); fn get_shallow_sync(
let mut fi = 0; &self,
let mut di = 0; bucket_name: &str,
while fi < files.len() && di < dirs.len() { rel_dir: &Path,
if files[fi].key < dirs[di] { delimiter: &str,
merged.push(Either::File(fi)); ) -> StorageResult<Arc<ShallowCacheEntry>> {
fi += 1; let cache_key = (
} else { bucket_name.to_string(),
merged.push(Either::Dir(di)); rel_dir.to_path_buf(),
di += 1; delimiter.to_string(),
);
if let Some(entry) = self.shallow_cache.get(&cache_key) {
let (cached, cached_at) = entry.value();
if cached_at.elapsed() < self.list_cache_ttl {
return Ok(cached.clone());
} }
} }
while fi < files.len() {
merged.push(Either::File(fi)); let lock = self
fi += 1; .shallow_rebuild_locks
} .entry(cache_key.clone())
while di < dirs.len() { .or_insert_with(|| Arc::new(Mutex::new(())))
merged.push(Either::Dir(di)); .clone();
di += 1; let _guard = lock.lock();
if let Some(entry) = self.shallow_cache.get(&cache_key) {
let (cached, cached_at) = entry.value();
if cached_at.elapsed() < self.list_cache_ttl {
return Ok(cached.clone());
}
} }
let start_idx = if let Some(ref token) = params.continuation_token { let built = self.build_shallow_sync(bucket_name, rel_dir, delimiter)?;
merged self.shallow_cache
.iter() .insert(cache_key, (built.clone(), Instant::now()));
.position(|e| match e { Ok(built)
Either::File(i) => files[*i].key.as_str() > token.as_str(), }
Either::Dir(i) => dirs[*i].as_str() > token.as_str(),
}) fn list_objects_shallow_sync(
.unwrap_or(merged.len()) &self,
bucket_name: &str,
params: &ShallowListParams,
) -> StorageResult<ShallowListResult> {
self.require_bucket(bucket_name)?;
let rel_dir: PathBuf = if params.prefix.is_empty() {
PathBuf::new()
} else { } else {
0 validate_list_prefix(&params.prefix)?;
let prefix_path = Path::new(&params.prefix);
if params.prefix.ends_with(&params.delimiter) {
prefix_path.to_path_buf()
} else {
prefix_path.parent().unwrap_or(Path::new("")).to_path_buf()
}
}; };
let cached = self.get_shallow_sync(bucket_name, &rel_dir, &params.delimiter)?;
let (file_start, file_end) =
slice_range_for_prefix(&cached.files, |o| &o.key, &params.prefix);
let (dir_start, dir_end) = slice_range_for_prefix(&cached.dirs, |s| s, &params.prefix);
let files = &cached.files[file_start..file_end];
let dirs = &cached.dirs[dir_start..dir_end];
let max_keys = if params.max_keys == 0 { let max_keys = if params.max_keys == 0 {
DEFAULT_MAX_KEYS DEFAULT_MAX_KEYS
} else { } else {
params.max_keys params.max_keys
}; };
let end_idx = std::cmp::min(start_idx + max_keys, merged.len()); let token_filter = |key: &str| -> bool {
let is_truncated = end_idx < merged.len(); params
.continuation_token
.as_deref()
.map(|t| key > t)
.unwrap_or(true)
};
let mut result_objects = Vec::new(); let file_skip = params
let mut result_prefixes = Vec::new(); .continuation_token
.as_deref()
.map(|t| files.partition_point(|o| o.key.as_str() <= t))
.unwrap_or(0);
let dir_skip = params
.continuation_token
.as_deref()
.map(|t| dirs.partition_point(|d| d.as_str() <= t))
.unwrap_or(0);
for item in &merged[start_idx..end_idx] { let mut fi = file_skip;
match item { let mut di = dir_skip;
Either::File(i) => result_objects.push(files[*i].clone()), let mut result_objects: Vec<ObjectMeta> = Vec::new();
Either::Dir(i) => result_prefixes.push(dirs[*i].clone()), let mut result_prefixes: Vec<String> = Vec::new();
let mut last_key: Option<String> = None;
let mut total = 0usize;
while total < max_keys && (fi < files.len() || di < dirs.len()) {
let take_file = match (fi < files.len(), di < dirs.len()) {
(true, true) => files[fi].key.as_str() < dirs[di].as_str(),
(true, false) => true,
(false, true) => false,
_ => break,
};
if take_file {
if token_filter(&files[fi].key) {
last_key = Some(files[fi].key.clone());
result_objects.push(files[fi].clone());
total += 1;
}
fi += 1;
} else {
if token_filter(&dirs[di]) {
last_key = Some(dirs[di].clone());
result_prefixes.push(dirs[di].clone());
total += 1;
}
di += 1;
} }
} }
let next_token = if is_truncated { let remaining = fi < files.len() || di < dirs.len();
match &merged[end_idx - 1] { let is_truncated = remaining;
Either::File(i) => Some(files[*i].key.clone()), let next_token = if is_truncated { last_key } else { None };
Either::Dir(i) => Some(dirs[*i].clone()),
}
} else {
None
};
Ok(ShallowListResult { Ok(ShallowListResult {
objects: result_objects, objects: result_objects,
@@ -1195,7 +1428,7 @@ impl FsStorageBackend {
StorageError::Io(e) StorageError::Io(e)
})?; })?;
self.stats_cache.remove(bucket_name); self.invalidate_bucket_caches(bucket_name);
let file_meta = std::fs::metadata(&destination).map_err(StorageError::Io)?; let file_meta = std::fs::metadata(&destination).map_err(StorageError::Io)?;
let mtime = file_meta let mtime = file_meta
@@ -1254,11 +1487,6 @@ impl FsStorageBackend {
} }
} }
enum Either {
File(usize),
Dir(usize),
}
impl crate::traits::StorageEngine for FsStorageBackend { impl crate::traits::StorageEngine for FsStorageBackend {
async fn list_buckets(&self) -> StorageResult<Vec<BucketMeta>> { async fn list_buckets(&self) -> StorageResult<Vec<BucketMeta>> {
let root = self.root.clone(); let root = self.root.clone();
@@ -1341,7 +1569,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Self::remove_tree(&self.multipart_bucket_root(name)); Self::remove_tree(&self.multipart_bucket_root(name));
self.bucket_config_cache.remove(name); self.bucket_config_cache.remove(name);
self.stats_cache.remove(name); self.invalidate_bucket_caches(name);
Ok(()) Ok(())
} }
@@ -1550,6 +1778,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
.map_err(StorageError::Io)?; .map_err(StorageError::Io)?;
Self::cleanup_empty_parents(&path, &bucket_path); Self::cleanup_empty_parents(&path, &bucket_path);
self.invalidate_bucket_caches(bucket);
Ok(()) Ok(())
} }
@@ -1575,7 +1804,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?; Self::safe_unlink(&manifest_path).map_err(StorageError::Io)?;
let versions_root = self.bucket_versions_root(bucket); let versions_root = self.bucket_versions_root(bucket);
Self::cleanup_empty_parents(&manifest_path, &versions_root); Self::cleanup_empty_parents(&manifest_path, &versions_root);
self.stats_cache.remove(bucket); self.invalidate_bucket_caches(bucket);
Ok(()) Ok(())
} }
@@ -1621,6 +1850,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
entry.insert("metadata".to_string(), Value::Object(meta_map)); entry.insert("metadata".to_string(), Value::Object(meta_map));
self.write_index_entry_sync(bucket, key, &entry) self.write_index_entry_sync(bucket, key, &entry)
.map_err(StorageError::Io)?; .map_err(StorageError::Io)?;
self.invalidate_bucket_caches(bucket);
Ok(()) Ok(())
} }
@@ -1629,7 +1859,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
bucket: &str, bucket: &str,
params: &ListParams, params: &ListParams,
) -> StorageResult<ListObjectsResult> { ) -> StorageResult<ListObjectsResult> {
self.list_objects_sync(bucket, params) run_blocking(|| self.list_objects_sync(bucket, params))
} }
async fn list_objects_shallow( async fn list_objects_shallow(
@@ -1637,7 +1867,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
bucket: &str, bucket: &str,
params: &ShallowListParams, params: &ShallowListParams,
) -> StorageResult<ShallowListResult> { ) -> StorageResult<ShallowListResult> {
self.list_objects_shallow_sync(bucket, params) run_blocking(|| self.list_objects_shallow_sync(bucket, params))
} }
async fn initiate_multipart( async fn initiate_multipart(
@@ -2192,6 +2422,7 @@ impl crate::traits::StorageEngine for FsStorageBackend {
self.write_index_entry_sync(bucket, key, &entry) self.write_index_entry_sync(bucket, key, &entry)
.map_err(StorageError::Io)?; .map_err(StorageError::Io)?;
self.invalidate_bucket_caches(bucket);
Ok(()) Ok(())
} }