Fix object browser UI issues
This commit is contained in:
@@ -36,11 +36,11 @@ class GzipMiddleware:
|
||||
content_type = None
|
||||
content_length = None
|
||||
should_compress = False
|
||||
is_streaming = False
|
||||
passthrough = False
|
||||
exc_info_holder = [None]
|
||||
|
||||
def custom_start_response(status: str, headers: List[Tuple[str, str]], exc_info=None):
|
||||
nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress, is_streaming
|
||||
nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress, passthrough
|
||||
response_started = True
|
||||
status_code = int(status.split(' ', 1)[0])
|
||||
response_headers = list(headers)
|
||||
@@ -51,23 +51,29 @@ class GzipMiddleware:
|
||||
if name_lower == 'content-type':
|
||||
content_type = value.split(';')[0].strip().lower()
|
||||
elif name_lower == 'content-length':
|
||||
content_length = int(value)
|
||||
try:
|
||||
content_length = int(value)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
elif name_lower == 'content-encoding':
|
||||
should_compress = False
|
||||
passthrough = True
|
||||
return start_response(status, headers, exc_info)
|
||||
elif name_lower == 'x-stream-response':
|
||||
is_streaming = True
|
||||
passthrough = True
|
||||
return start_response(status, headers, exc_info)
|
||||
|
||||
if content_type and content_type in COMPRESSIBLE_MIMES:
|
||||
if content_length is None or content_length >= self.min_size:
|
||||
should_compress = True
|
||||
else:
|
||||
passthrough = True
|
||||
return start_response(status, headers, exc_info)
|
||||
|
||||
return None
|
||||
|
||||
app_iter = self.app(environ, custom_start_response)
|
||||
|
||||
if is_streaming:
|
||||
if passthrough:
|
||||
return app_iter
|
||||
|
||||
response_body = b''.join(app_iter)
|
||||
|
||||
@@ -2781,7 +2781,7 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
try:
|
||||
stat = path.stat()
|
||||
file_size = stat.st_size
|
||||
etag = storage._compute_etag(path)
|
||||
etag = metadata.get("__etag__") or storage._compute_etag(path)
|
||||
except PermissionError:
|
||||
return _error_response("AccessDenied", "Permission denied accessing object", 403)
|
||||
except OSError as exc:
|
||||
@@ -2829,7 +2829,7 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
try:
|
||||
stat = path.stat()
|
||||
response = Response(status=200)
|
||||
etag = storage._compute_etag(path)
|
||||
etag = metadata.get("__etag__") or storage._compute_etag(path)
|
||||
except PermissionError:
|
||||
return _error_response("AccessDenied", "Permission denied accessing object", 403)
|
||||
except OSError as exc:
|
||||
|
||||
180
app/storage.py
180
app/storage.py
@@ -188,6 +188,7 @@ class ObjectStorage:
|
||||
self._object_cache_max_size = object_cache_max_size
|
||||
self._object_key_max_length_bytes = object_key_max_length_bytes
|
||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||
self._meta_index_locks: Dict[str, threading.Lock] = {}
|
||||
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
@@ -816,6 +817,10 @@ class ObjectStorage:
|
||||
if not object_path.exists():
|
||||
raise ObjectNotFoundError("Object does not exist")
|
||||
|
||||
entry = self._read_index_entry(bucket_path.name, safe_key)
|
||||
if entry is not None:
|
||||
tags = entry.get("tags")
|
||||
return tags if isinstance(tags, list) else []
|
||||
for meta_file in (self._metadata_file(bucket_path.name, safe_key), self._legacy_metadata_file(bucket_path.name, safe_key)):
|
||||
if not meta_file.exists():
|
||||
continue
|
||||
@@ -839,30 +844,31 @@ class ObjectStorage:
|
||||
if not object_path.exists():
|
||||
raise ObjectNotFoundError("Object does not exist")
|
||||
|
||||
meta_file = self._metadata_file(bucket_path.name, safe_key)
|
||||
|
||||
existing_payload: Dict[str, Any] = {}
|
||||
if meta_file.exists():
|
||||
try:
|
||||
existing_payload = json.loads(meta_file.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
bucket_id = bucket_path.name
|
||||
existing_entry = self._read_index_entry(bucket_id, safe_key) or {}
|
||||
if not existing_entry:
|
||||
meta_file = self._metadata_file(bucket_id, safe_key)
|
||||
if meta_file.exists():
|
||||
try:
|
||||
existing_entry = json.loads(meta_file.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
if tags:
|
||||
existing_payload["tags"] = tags
|
||||
existing_entry["tags"] = tags
|
||||
else:
|
||||
existing_payload.pop("tags", None)
|
||||
|
||||
if existing_payload.get("metadata") or existing_payload.get("tags"):
|
||||
meta_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
meta_file.write_text(json.dumps(existing_payload), encoding="utf-8")
|
||||
elif meta_file.exists():
|
||||
meta_file.unlink()
|
||||
parent = meta_file.parent
|
||||
meta_root = self._bucket_meta_root(bucket_path.name)
|
||||
while parent != meta_root and parent.exists() and not any(parent.iterdir()):
|
||||
parent.rmdir()
|
||||
parent = parent.parent
|
||||
existing_entry.pop("tags", None)
|
||||
|
||||
if existing_entry.get("metadata") or existing_entry.get("tags"):
|
||||
self._write_index_entry(bucket_id, safe_key, existing_entry)
|
||||
else:
|
||||
self._delete_index_entry(bucket_id, safe_key)
|
||||
old_meta = self._metadata_file(bucket_id, safe_key)
|
||||
try:
|
||||
if old_meta.exists():
|
||||
old_meta.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def delete_object_tags(self, bucket_name: str, object_key: str) -> None:
|
||||
"""Delete all tags from an object."""
|
||||
@@ -1529,7 +1535,7 @@ class ObjectStorage:
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
if check_newer(entry.path):
|
||||
return True
|
||||
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
|
||||
elif entry.is_file(follow_symlinks=False) and (entry.name.endswith('.meta.json') or entry.name == '_index.json'):
|
||||
if entry.stat().st_mtime > index_mtime:
|
||||
return True
|
||||
except OSError:
|
||||
@@ -1543,22 +1549,50 @@ class ObjectStorage:
|
||||
meta_str = str(meta_root)
|
||||
meta_len = len(meta_str) + 1
|
||||
meta_files: list[tuple[str, str]] = []
|
||||
|
||||
index_files: list[str] = []
|
||||
|
||||
def collect_meta_files(dir_path: str) -> None:
|
||||
try:
|
||||
with os.scandir(dir_path) as it:
|
||||
for entry in it:
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
collect_meta_files(entry.path)
|
||||
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
|
||||
rel = entry.path[meta_len:]
|
||||
key = rel[:-10].replace(os.sep, '/')
|
||||
meta_files.append((key, entry.path))
|
||||
elif entry.is_file(follow_symlinks=False):
|
||||
if entry.name == '_index.json':
|
||||
index_files.append(entry.path)
|
||||
elif entry.name.endswith('.meta.json'):
|
||||
rel = entry.path[meta_len:]
|
||||
key = rel[:-10].replace(os.sep, '/')
|
||||
meta_files.append((key, entry.path))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
collect_meta_files(meta_str)
|
||||
|
||||
|
||||
meta_cache = {}
|
||||
|
||||
for idx_path in index_files:
|
||||
try:
|
||||
with open(idx_path, 'r', encoding='utf-8') as f:
|
||||
idx_data = json.load(f)
|
||||
rel_dir = idx_path[meta_len:]
|
||||
rel_dir = rel_dir.replace(os.sep, '/')
|
||||
if rel_dir.endswith('/_index.json'):
|
||||
dir_prefix = rel_dir[:-len('/_index.json')]
|
||||
else:
|
||||
dir_prefix = ''
|
||||
for entry_name, entry_data in idx_data.items():
|
||||
if dir_prefix:
|
||||
key = f"{dir_prefix}/{entry_name}"
|
||||
else:
|
||||
key = entry_name
|
||||
meta = entry_data.get("metadata", {})
|
||||
etag = meta.get("__etag__")
|
||||
if etag:
|
||||
meta_cache[key] = etag
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]:
|
||||
key, path = item
|
||||
try:
|
||||
@@ -1575,15 +1609,16 @@ class ObjectStorage:
|
||||
return key, None
|
||||
except (OSError, UnicodeDecodeError):
|
||||
return key, None
|
||||
|
||||
if meta_files:
|
||||
meta_cache = {}
|
||||
max_workers = min((os.cpu_count() or 4) * 2, len(meta_files), 16)
|
||||
|
||||
legacy_meta_files = [(k, p) for k, p in meta_files if k not in meta_cache]
|
||||
if legacy_meta_files:
|
||||
max_workers = min((os.cpu_count() or 4) * 2, len(legacy_meta_files), 16)
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
for key, etag in executor.map(read_meta_file, meta_files):
|
||||
for key, etag in executor.map(read_meta_file, legacy_meta_files):
|
||||
if etag:
|
||||
meta_cache[key] = etag
|
||||
|
||||
|
||||
if meta_cache:
|
||||
try:
|
||||
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(etag_index_path, 'w', encoding='utf-8') as f:
|
||||
@@ -1833,6 +1868,64 @@ class ObjectStorage:
|
||||
meta_rel = Path(key.as_posix() + ".meta.json")
|
||||
return meta_root / meta_rel
|
||||
|
||||
def _index_file_for_key(self, bucket_name: str, key: Path) -> tuple[Path, str]:
|
||||
meta_root = self._bucket_meta_root(bucket_name)
|
||||
parent = key.parent
|
||||
entry_name = key.name
|
||||
if parent == Path("."):
|
||||
return meta_root / "_index.json", entry_name
|
||||
return meta_root / parent / "_index.json", entry_name
|
||||
|
||||
def _get_meta_index_lock(self, index_path: str) -> threading.Lock:
|
||||
with self._cache_lock:
|
||||
if index_path not in self._meta_index_locks:
|
||||
self._meta_index_locks[index_path] = threading.Lock()
|
||||
return self._meta_index_locks[index_path]
|
||||
|
||||
def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]:
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
if not index_path.exists():
|
||||
return None
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
return index_data.get(entry_name)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None:
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
lock = self._get_meta_index_lock(str(index_path))
|
||||
with lock:
|
||||
index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
index_data: Dict[str, Any] = {}
|
||||
if index_path.exists():
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
index_data[entry_name] = entry
|
||||
index_path.write_text(json.dumps(index_data), encoding="utf-8")
|
||||
|
||||
def _delete_index_entry(self, bucket_name: str, key: Path) -> None:
|
||||
index_path, entry_name = self._index_file_for_key(bucket_name, key)
|
||||
if not index_path.exists():
|
||||
return
|
||||
lock = self._get_meta_index_lock(str(index_path))
|
||||
with lock:
|
||||
try:
|
||||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return
|
||||
if entry_name in index_data:
|
||||
del index_data[entry_name]
|
||||
if index_data:
|
||||
index_path.write_text(json.dumps(index_data), encoding="utf-8")
|
||||
else:
|
||||
try:
|
||||
index_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
|
||||
if not metadata:
|
||||
return None
|
||||
@@ -1844,9 +1937,13 @@ class ObjectStorage:
|
||||
if not clean:
|
||||
self._delete_metadata(bucket_name, key)
|
||||
return
|
||||
meta_file = self._metadata_file(bucket_name, key)
|
||||
meta_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
meta_file.write_text(json.dumps({"metadata": clean}), encoding="utf-8")
|
||||
self._write_index_entry(bucket_name, key, {"metadata": clean})
|
||||
old_meta = self._metadata_file(bucket_name, key)
|
||||
try:
|
||||
if old_meta.exists():
|
||||
old_meta.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _archive_current_version(self, bucket_name: str, key: Path, *, reason: str) -> None:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
@@ -1873,6 +1970,10 @@ class ObjectStorage:
|
||||
manifest_path.write_text(json.dumps(record), encoding="utf-8")
|
||||
|
||||
def _read_metadata(self, bucket_name: str, key: Path) -> Dict[str, str]:
|
||||
entry = self._read_index_entry(bucket_name, key)
|
||||
if entry is not None:
|
||||
data = entry.get("metadata")
|
||||
return data if isinstance(data, dict) else {}
|
||||
for meta_file in (self._metadata_file(bucket_name, key), self._legacy_metadata_file(bucket_name, key)):
|
||||
if not meta_file.exists():
|
||||
continue
|
||||
@@ -1903,6 +2004,7 @@ class ObjectStorage:
|
||||
raise StorageError(message) from last_error
|
||||
|
||||
def _delete_metadata(self, bucket_name: str, key: Path) -> None:
|
||||
self._delete_index_entry(bucket_name, key)
|
||||
locations = (
|
||||
(self._metadata_file(bucket_name, key), self._bucket_meta_root(bucket_name)),
|
||||
(self._legacy_metadata_file(bucket_name, key), self._legacy_meta_root(bucket_name)),
|
||||
|
||||
@@ -1162,7 +1162,9 @@ def object_preview(bucket_name: str, object_key: str) -> Response:
|
||||
"text/html", "text/xml", "application/xhtml+xml",
|
||||
"application/xml", "image/svg+xml",
|
||||
}
|
||||
force_download = content_type.split(";")[0].strip().lower() in _DANGEROUS_TYPES
|
||||
base_ct = content_type.split(";")[0].strip().lower()
|
||||
if not download and base_ct in _DANGEROUS_TYPES:
|
||||
content_type = "text/plain; charset=utf-8"
|
||||
|
||||
def generate():
|
||||
try:
|
||||
@@ -1181,7 +1183,7 @@ def object_preview(bucket_name: str, object_key: str) -> Response:
|
||||
headers["Content-Length"] = str(content_length)
|
||||
if content_range:
|
||||
headers["Content-Range"] = content_range
|
||||
disposition = "attachment" if download or force_download else "inline"
|
||||
disposition = "attachment" if download else "inline"
|
||||
if ascii_safe:
|
||||
headers["Content-Disposition"] = f'{disposition}; filename="{safe_filename}"'
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user