Bypass boto3 proxy for object streaming, read directly from storage layer; Add streaming object iterator to eliminate O(n²) directory rescanning on large buckets; Add iter_objects_shallow delegation to EncryptedObjectStorage

This commit is contained in:
2026-03-20 17:35:10 +08:00
parent 14786151e5
commit aa4f9f5566
3 changed files with 136 additions and 9 deletions

View File

@@ -193,6 +193,9 @@ class EncryptedObjectStorage:
def list_objects_shallow(self, bucket_name: str, **kwargs):
return self.storage.list_objects_shallow(bucket_name, **kwargs)
def iter_objects_shallow(self, bucket_name: str, **kwargs):
return self.storage.iter_objects_shallow(bucket_name, **kwargs)
def search_objects(self, bucket_name: str, query: str, **kwargs):
return self.storage.search_objects(bucket_name, query, **kwargs)

View File

@@ -714,6 +714,73 @@ class ObjectStorage:
next_continuation_token=next_token,
)
def iter_objects_shallow(
self,
bucket_name: str,
*,
prefix: str = "",
delimiter: str = "/",
) -> Generator[tuple[str, ObjectMeta | str], None, None]:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
raise BucketNotFoundError("Bucket does not exist")
bucket_id = bucket_path.name
target_dir = bucket_path
if prefix:
safe_prefix_path = Path(prefix.rstrip("/"))
if ".." in safe_prefix_path.parts:
return
target_dir = bucket_path / safe_prefix_path
try:
resolved = target_dir.resolve()
bucket_resolved = bucket_path.resolve()
if not str(resolved).startswith(str(bucket_resolved) + os.sep) and resolved != bucket_resolved:
return
except (OSError, ValueError):
return
if not target_dir.exists() or not target_dir.is_dir():
return
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {}
if etag_index_path.exists():
try:
with open(etag_index_path, 'r', encoding='utf-8') as f:
meta_cache = json.load(f)
except (OSError, json.JSONDecodeError):
pass
try:
with os.scandir(str(target_dir)) as it:
for entry in it:
name = entry.name
if name in self.INTERNAL_FOLDERS:
continue
if entry.is_dir(follow_symlinks=False):
yield ("folder", prefix + name + delimiter)
elif entry.is_file(follow_symlinks=False):
key = prefix + name
try:
st = entry.stat()
etag = meta_cache.get(key)
if etag is None:
safe_key = PurePosixPath(key)
meta = self._read_metadata(bucket_id, Path(safe_key))
etag = meta.get("__etag__") if meta else None
yield ("object", ObjectMeta(
key=key,
size=st.st_size,
last_modified=datetime.fromtimestamp(st.st_mtime, timezone.utc),
etag=etag,
metadata=None,
))
except OSError:
pass
except OSError:
return
def _shallow_via_full_scan(
self,
bucket_name: str,

View File

@@ -618,20 +618,77 @@ def stream_bucket_objects(bucket_name: str):
prefix = request.args.get("prefix") or None
delimiter = request.args.get("delimiter") or None
storage = _storage()
try:
client = get_session_s3_client()
except (PermissionError, RuntimeError) as exc:
return jsonify({"error": str(exc)}), 403
versioning_enabled = get_versioning_via_s3(client, bucket_name)
versioning_enabled = storage.is_versioning_enabled(bucket_name)
except StorageError:
versioning_enabled = False
url_templates = build_url_templates(bucket_name)
display_tz = current_app.config.get("DISPLAY_TIMEZONE", "UTC")
def generate():
yield json.dumps({
"type": "meta",
"versioning_enabled": versioning_enabled,
"url_templates": url_templates,
}) + "\n"
yield json.dumps({"type": "count", "total_count": 0}) + "\n"
running_count = 0
try:
if delimiter:
for item_type, item in storage.iter_objects_shallow(
bucket_name, prefix=prefix or "", delimiter=delimiter,
):
if item_type == "folder":
yield json.dumps({"type": "folder", "prefix": item}) + "\n"
else:
last_mod = item.last_modified
yield json.dumps({
"type": "object",
"key": item.key,
"size": item.size,
"last_modified": last_mod.isoformat(),
"last_modified_display": _format_datetime_display(last_mod, display_tz),
"last_modified_iso": _format_datetime_iso(last_mod, display_tz),
"etag": item.etag or "",
}) + "\n"
running_count += 1
if running_count % 1000 == 0:
yield json.dumps({"type": "count", "total_count": running_count}) + "\n"
else:
continuation_token = None
while True:
result = storage.list_objects(
bucket_name,
max_keys=1000,
continuation_token=continuation_token,
prefix=prefix,
)
for obj in result.objects:
last_mod = obj.last_modified
yield json.dumps({
"type": "object",
"key": obj.key,
"size": obj.size,
"last_modified": last_mod.isoformat(),
"last_modified_display": _format_datetime_display(last_mod, display_tz),
"last_modified_iso": _format_datetime_iso(last_mod, display_tz),
"etag": obj.etag or "",
}) + "\n"
running_count += len(result.objects)
yield json.dumps({"type": "count", "total_count": running_count}) + "\n"
if not result.is_truncated:
break
continuation_token = result.next_continuation_token
except StorageError as exc:
yield json.dumps({"type": "error", "error": str(exc)}) + "\n"
return
yield json.dumps({"type": "count", "total_count": running_count}) + "\n"
yield json.dumps({"type": "done"}) + "\n"
return Response(
stream_objects_ndjson(
client, bucket_name, prefix, url_templates, display_tz, versioning_enabled,
delimiter=delimiter,
),
generate(),
mimetype='application/x-ndjson',
headers={
'Cache-Control': 'no-cache',