perf: shallow listing, os.scandir stats, server-side search for large buckets
This commit is contained in:
@@ -692,6 +692,83 @@ class ObjectStorage:
|
||||
next_continuation_token=next_token,
|
||||
)
|
||||
|
||||
def search_objects(
|
||||
self,
|
||||
bucket_name: str,
|
||||
query: str,
|
||||
*,
|
||||
prefix: str = "",
|
||||
limit: int = 500,
|
||||
) -> Dict[str, Any]:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.is_dir():
|
||||
raise BucketNotFoundError("Bucket does not exist")
|
||||
|
||||
if prefix:
|
||||
search_root = bucket_path / prefix.replace("/", os.sep)
|
||||
if not search_root.is_dir():
|
||||
return {"results": [], "truncated": False}
|
||||
resolved = search_root.resolve()
|
||||
if not str(resolved).startswith(str(bucket_path.resolve())):
|
||||
return {"results": [], "truncated": False}
|
||||
else:
|
||||
search_root = bucket_path
|
||||
|
||||
query_lower = query.lower()
|
||||
results: list[Dict[str, Any]] = []
|
||||
internal = self.INTERNAL_FOLDERS
|
||||
bucket_str = str(bucket_path)
|
||||
bucket_len = len(bucket_str) + 1
|
||||
meta_root = self._bucket_meta_root(bucket_name)
|
||||
scan_limit = limit * 4
|
||||
|
||||
matched = 0
|
||||
scanned = 0
|
||||
search_str = str(search_root)
|
||||
stack = [search_str]
|
||||
while stack:
|
||||
current = stack.pop()
|
||||
try:
|
||||
with os.scandir(current) as it:
|
||||
for entry in it:
|
||||
if current == bucket_str and entry.name in internal:
|
||||
continue
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
stack.append(entry.path)
|
||||
elif entry.is_file(follow_symlinks=False):
|
||||
scanned += 1
|
||||
key = entry.path[bucket_len:].replace(os.sep, "/")
|
||||
if query_lower in key.lower():
|
||||
st = entry.stat(follow_symlinks=False)
|
||||
meta_path = meta_root / (key + ".meta.json")
|
||||
last_modified = ""
|
||||
try:
|
||||
if meta_path.exists():
|
||||
md = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
last_modified = md.get("last_modified", "")
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
if not last_modified:
|
||||
last_modified = datetime.fromtimestamp(
|
||||
st.st_mtime, tz=timezone.utc
|
||||
).strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||||
results.append({
|
||||
"key": key,
|
||||
"size": st.st_size,
|
||||
"last_modified": last_modified,
|
||||
})
|
||||
matched += 1
|
||||
if matched >= scan_limit:
|
||||
break
|
||||
except PermissionError:
|
||||
continue
|
||||
if matched >= scan_limit:
|
||||
break
|
||||
|
||||
results.sort(key=lambda r: r["key"])
|
||||
truncated = len(results) > limit
|
||||
return {"results": results[:limit], "truncated": truncated}
|
||||
|
||||
def put_object(
|
||||
self,
|
||||
bucket_name: str,
|
||||
|
||||
Reference in New Issue
Block a user