Optimize replication failure caching, batch UI auth checks, add bulk download size limit, background parent cleanup
This commit is contained in:
12
app/iam.py
12
app/iam.py
@@ -309,6 +309,18 @@ class IamService:
|
|||||||
if not self._is_allowed(principal, normalized, action):
|
if not self._is_allowed(principal, normalized, action):
|
||||||
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
|
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
|
||||||
|
|
||||||
|
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]:
|
||||||
|
self._maybe_reload()
|
||||||
|
bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*")
|
||||||
|
normalized_actions = {a: self._normalize_action(a) for a in actions}
|
||||||
|
results: Dict[str, bool] = {}
|
||||||
|
for original, canonical in normalized_actions.items():
|
||||||
|
if canonical not in ALLOWED_ACTIONS:
|
||||||
|
results[original] = False
|
||||||
|
else:
|
||||||
|
results[original] = self._is_allowed(principal, bucket_name, canonical)
|
||||||
|
return results
|
||||||
|
|
||||||
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
|
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
|
||||||
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]
|
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]
|
||||||
|
|
||||||
|
|||||||
@@ -176,11 +176,12 @@ class ReplicationFailureStore:
|
|||||||
self.storage_root = storage_root
|
self.storage_root = storage_root
|
||||||
self.max_failures_per_bucket = max_failures_per_bucket
|
self.max_failures_per_bucket = max_failures_per_bucket
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
self._cache: Dict[str, List[ReplicationFailure]] = {}
|
||||||
|
|
||||||
def _get_failures_path(self, bucket_name: str) -> Path:
|
def _get_failures_path(self, bucket_name: str) -> Path:
|
||||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json"
|
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json"
|
||||||
|
|
||||||
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
|
def _load_from_disk(self, bucket_name: str) -> List[ReplicationFailure]:
|
||||||
path = self._get_failures_path(bucket_name)
|
path = self._get_failures_path(bucket_name)
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
return []
|
return []
|
||||||
@@ -192,7 +193,7 @@ class ReplicationFailureStore:
|
|||||||
logger.error(f"Failed to load replication failures for {bucket_name}: {e}")
|
logger.error(f"Failed to load replication failures for {bucket_name}: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
def _save_to_disk(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||||
path = self._get_failures_path(bucket_name)
|
path = self._get_failures_path(bucket_name)
|
||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
|
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
|
||||||
@@ -202,6 +203,18 @@ class ReplicationFailureStore:
|
|||||||
except OSError as e:
|
except OSError as e:
|
||||||
logger.error(f"Failed to save replication failures for {bucket_name}: {e}")
|
logger.error(f"Failed to save replication failures for {bucket_name}: {e}")
|
||||||
|
|
||||||
|
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
|
||||||
|
if bucket_name in self._cache:
|
||||||
|
return list(self._cache[bucket_name])
|
||||||
|
failures = self._load_from_disk(bucket_name)
|
||||||
|
self._cache[bucket_name] = failures
|
||||||
|
return list(failures)
|
||||||
|
|
||||||
|
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||||
|
trimmed = failures[:self.max_failures_per_bucket]
|
||||||
|
self._cache[bucket_name] = trimmed
|
||||||
|
self._save_to_disk(bucket_name, trimmed)
|
||||||
|
|
||||||
def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None:
|
def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
failures = self.load_failures(bucket_name)
|
failures = self.load_failures(bucket_name)
|
||||||
@@ -227,6 +240,7 @@ class ReplicationFailureStore:
|
|||||||
|
|
||||||
def clear_failures(self, bucket_name: str) -> None:
|
def clear_failures(self, bucket_name: str) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
self._cache.pop(bucket_name, None)
|
||||||
path = self._get_failures_path(bucket_name)
|
path = self._get_failures_path(bucket_name)
|
||||||
if path.exists():
|
if path.exists():
|
||||||
path.unlink()
|
path.unlink()
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import time
|
|||||||
import unicodedata
|
import unicodedata
|
||||||
import uuid
|
import uuid
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -187,6 +188,7 @@ class ObjectStorage:
|
|||||||
self._object_cache_max_size = object_cache_max_size
|
self._object_cache_max_size = object_cache_max_size
|
||||||
self._object_key_max_length_bytes = object_key_max_length_bytes
|
self._object_key_max_length_bytes = object_key_max_length_bytes
|
||||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||||
|
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
|
||||||
|
|
||||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||||
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
|
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
|
||||||
@@ -544,11 +546,14 @@ class ObjectStorage:
|
|||||||
return self._read_metadata(bucket_path.name, safe_key) or {}
|
return self._read_metadata(bucket_path.name, safe_key) or {}
|
||||||
|
|
||||||
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
|
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
|
||||||
"""Remove empty parent directories up to (but not including) stop_at.
|
"""Remove empty parent directories in a background thread.
|
||||||
|
|
||||||
On Windows/OneDrive, directories may be locked briefly after file deletion.
|
On Windows/OneDrive, directories may be locked briefly after file deletion.
|
||||||
This method retries with a small delay to handle that case.
|
Running this in the background avoids blocking the request thread with retries.
|
||||||
"""
|
"""
|
||||||
|
self._cleanup_executor.submit(self._do_cleanup_empty_parents, path, stop_at)
|
||||||
|
|
||||||
|
def _do_cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
|
||||||
for parent in path.parents:
|
for parent in path.parents:
|
||||||
if parent == stop_at:
|
if parent == stop_at:
|
||||||
break
|
break
|
||||||
@@ -556,7 +561,7 @@ class ObjectStorage:
|
|||||||
try:
|
try:
|
||||||
if parent.exists() and not any(parent.iterdir()):
|
if parent.exists() and not any(parent.iterdir()):
|
||||||
parent.rmdir()
|
parent.rmdir()
|
||||||
break
|
break
|
||||||
except OSError:
|
except OSError:
|
||||||
if attempt < 2:
|
if attempt < 2:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|||||||
83
app/ui.py
83
app/ui.py
@@ -423,57 +423,25 @@ def bucket_detail(bucket_name: str):
|
|||||||
},
|
},
|
||||||
indent=2,
|
indent=2,
|
||||||
)
|
)
|
||||||
can_edit_policy = False
|
iam = _iam()
|
||||||
if principal:
|
bucket_perms = iam.check_permissions(
|
||||||
try:
|
principal, bucket_name, ["policy", "lifecycle", "cors", "write", "replication"],
|
||||||
_iam().authorize(principal, bucket_name, "policy")
|
) if principal else {}
|
||||||
can_edit_policy = True
|
admin_perms = iam.check_permissions(
|
||||||
except IamError:
|
principal, None, ["iam:list_users"],
|
||||||
can_edit_policy = False
|
) if principal else {}
|
||||||
|
|
||||||
can_manage_lifecycle = False
|
can_edit_policy = bucket_perms.get("policy", False)
|
||||||
if principal:
|
can_manage_lifecycle = bucket_perms.get("lifecycle", False)
|
||||||
try:
|
can_manage_cors = bucket_perms.get("cors", False)
|
||||||
_iam().authorize(principal, bucket_name, "lifecycle")
|
can_manage_versioning = bucket_perms.get("write", False)
|
||||||
can_manage_lifecycle = True
|
can_manage_replication = bucket_perms.get("replication", False)
|
||||||
except IamError:
|
is_replication_admin = admin_perms.get("iam:list_users", False)
|
||||||
can_manage_lifecycle = False
|
|
||||||
|
|
||||||
can_manage_cors = False
|
|
||||||
if principal:
|
|
||||||
try:
|
|
||||||
_iam().authorize(principal, bucket_name, "cors")
|
|
||||||
can_manage_cors = True
|
|
||||||
except IamError:
|
|
||||||
can_manage_cors = False
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
versioning_enabled = storage.is_versioning_enabled(bucket_name)
|
versioning_enabled = storage.is_versioning_enabled(bucket_name)
|
||||||
except StorageError:
|
except StorageError:
|
||||||
versioning_enabled = False
|
versioning_enabled = False
|
||||||
can_manage_versioning = False
|
|
||||||
if principal:
|
|
||||||
try:
|
|
||||||
_iam().authorize(principal, bucket_name, "write")
|
|
||||||
can_manage_versioning = True
|
|
||||||
except IamError:
|
|
||||||
can_manage_versioning = False
|
|
||||||
|
|
||||||
can_manage_replication = False
|
|
||||||
if principal:
|
|
||||||
try:
|
|
||||||
_iam().authorize(principal, bucket_name, "replication")
|
|
||||||
can_manage_replication = True
|
|
||||||
except IamError:
|
|
||||||
can_manage_replication = False
|
|
||||||
|
|
||||||
is_replication_admin = False
|
|
||||||
if principal:
|
|
||||||
try:
|
|
||||||
_iam().authorize(principal, None, "iam:list_users")
|
|
||||||
is_replication_admin = True
|
|
||||||
except IamError:
|
|
||||||
is_replication_admin = False
|
|
||||||
|
|
||||||
replication_rule = _replication().get_rule(bucket_name)
|
replication_rule = _replication().get_rule(bucket_name)
|
||||||
connections = _connections().list() if (is_replication_admin or replication_rule) else []
|
connections = _connections().list() if (is_replication_admin or replication_rule) else []
|
||||||
@@ -489,12 +457,7 @@ def bucket_detail(bucket_name: str):
|
|||||||
|
|
||||||
bucket_quota = storage.get_bucket_quota(bucket_name)
|
bucket_quota = storage.get_bucket_quota(bucket_name)
|
||||||
bucket_stats = storage.bucket_stats(bucket_name)
|
bucket_stats = storage.bucket_stats(bucket_name)
|
||||||
can_manage_quota = False
|
can_manage_quota = is_replication_admin
|
||||||
try:
|
|
||||||
_iam().authorize(principal, None, "iam:list_users")
|
|
||||||
can_manage_quota = True
|
|
||||||
except IamError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
|
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
|
||||||
objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name)
|
objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name)
|
||||||
@@ -1003,21 +966,33 @@ def bulk_download_objects(bucket_name: str):
|
|||||||
|
|
||||||
unique_keys = list(dict.fromkeys(cleaned))
|
unique_keys = list(dict.fromkeys(cleaned))
|
||||||
storage = _storage()
|
storage = _storage()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_authorize_ui(principal, bucket_name, "read")
|
_authorize_ui(principal, bucket_name, "read")
|
||||||
except IamError as exc:
|
except IamError as exc:
|
||||||
return jsonify({"error": str(exc)}), 403
|
return jsonify({"error": str(exc)}), 403
|
||||||
|
|
||||||
|
max_total_bytes = current_app.config.get("BULK_DOWNLOAD_MAX_BYTES", 1024 * 1024 * 1024)
|
||||||
|
total_size = 0
|
||||||
|
for key in unique_keys:
|
||||||
|
try:
|
||||||
|
path = storage.get_object_path(bucket_name, key)
|
||||||
|
total_size += path.stat().st_size
|
||||||
|
except (StorageError, OSError):
|
||||||
|
continue
|
||||||
|
if total_size > max_total_bytes:
|
||||||
|
limit_mb = max_total_bytes // (1024 * 1024)
|
||||||
|
return jsonify({"error": f"Total download size exceeds {limit_mb} MB limit. Select fewer objects."}), 400
|
||||||
|
|
||||||
buffer = io.BytesIO()
|
buffer = io.BytesIO()
|
||||||
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
for key in unique_keys:
|
for key in unique_keys:
|
||||||
try:
|
try:
|
||||||
_authorize_ui(principal, bucket_name, "read", object_key=key)
|
_authorize_ui(principal, bucket_name, "read", object_key=key)
|
||||||
|
|
||||||
metadata = storage.get_object_metadata(bucket_name, key)
|
metadata = storage.get_object_metadata(bucket_name, key)
|
||||||
is_encrypted = "x-amz-server-side-encryption" in metadata
|
is_encrypted = "x-amz-server-side-encryption" in metadata
|
||||||
|
|
||||||
if is_encrypted and hasattr(storage, 'get_object_data'):
|
if is_encrypted and hasattr(storage, 'get_object_data'):
|
||||||
data, _ = storage.get_object_data(bucket_name, key)
|
data, _ = storage.get_object_data(bucket_name, key)
|
||||||
zf.writestr(key, data)
|
zf.writestr(key, data)
|
||||||
|
|||||||
Reference in New Issue
Block a user