MyFSIO v0.2.7 Release #19
12
app/iam.py
12
app/iam.py
@@ -309,6 +309,18 @@ class IamService:
|
||||
if not self._is_allowed(principal, normalized, action):
|
||||
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
|
||||
|
||||
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]:
|
||||
self._maybe_reload()
|
||||
bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*")
|
||||
normalized_actions = {a: self._normalize_action(a) for a in actions}
|
||||
results: Dict[str, bool] = {}
|
||||
for original, canonical in normalized_actions.items():
|
||||
if canonical not in ALLOWED_ACTIONS:
|
||||
results[original] = False
|
||||
else:
|
||||
results[original] = self._is_allowed(principal, bucket_name, canonical)
|
||||
return results
|
||||
|
||||
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
|
||||
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]
|
||||
|
||||
|
||||
28
app/kms.py
28
app/kms.py
@@ -160,6 +160,7 @@ class KMSManager:
|
||||
self.generate_data_key_max_bytes = generate_data_key_max_bytes
|
||||
self._keys: Dict[str, KMSKey] = {}
|
||||
self._master_key: bytes | None = None
|
||||
self._master_aesgcm: AESGCM | None = None
|
||||
self._loaded = False
|
||||
|
||||
@property
|
||||
@@ -191,6 +192,7 @@ class KMSManager:
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
else:
|
||||
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
||||
self._master_aesgcm = AESGCM(self._master_key)
|
||||
return self._master_key
|
||||
|
||||
def _load_keys(self) -> None:
|
||||
@@ -231,18 +233,16 @@ class KMSManager:
|
||||
_set_secure_file_permissions(self.keys_path)
|
||||
|
||||
def _encrypt_key_material(self, key_material: bytes) -> bytes:
|
||||
"""Encrypt key material with the master key."""
|
||||
aesgcm = AESGCM(self.master_key)
|
||||
_ = self.master_key
|
||||
nonce = secrets.token_bytes(12)
|
||||
ciphertext = aesgcm.encrypt(nonce, key_material, None)
|
||||
ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None)
|
||||
return nonce + ciphertext
|
||||
|
||||
def _decrypt_key_material(self, encrypted: bytes) -> bytes:
|
||||
"""Decrypt key material with the master key."""
|
||||
aesgcm = AESGCM(self.master_key)
|
||||
_ = self.master_key
|
||||
nonce = encrypted[:12]
|
||||
ciphertext = encrypted[12:]
|
||||
return aesgcm.decrypt(nonce, ciphertext, None)
|
||||
return self._master_aesgcm.decrypt(nonce, ciphertext, None)
|
||||
|
||||
def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey:
|
||||
"""Create a new KMS key."""
|
||||
@@ -404,22 +404,6 @@ class KMSManager:
|
||||
plaintext, _ = self.decrypt(encrypted_key, context)
|
||||
return plaintext
|
||||
|
||||
def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider:
|
||||
"""Get an encryption provider for a specific key."""
|
||||
self._load_keys()
|
||||
|
||||
if key_id is None:
|
||||
if not self._keys:
|
||||
key = self.create_key("Default KMS Key")
|
||||
key_id = key.key_id
|
||||
else:
|
||||
key_id = next(iter(self._keys.keys()))
|
||||
|
||||
if key_id not in self._keys:
|
||||
raise EncryptionError(f"Key not found: {key_id}")
|
||||
|
||||
return KMSEncryptionProvider(self, key_id)
|
||||
|
||||
def re_encrypt(self, ciphertext: bytes, destination_key_id: str,
|
||||
source_context: Dict[str, str] | None = None,
|
||||
destination_context: Dict[str, str] | None = None) -> bytes:
|
||||
|
||||
@@ -176,11 +176,12 @@ class ReplicationFailureStore:
|
||||
self.storage_root = storage_root
|
||||
self.max_failures_per_bucket = max_failures_per_bucket
|
||||
self._lock = threading.Lock()
|
||||
self._cache: Dict[str, List[ReplicationFailure]] = {}
|
||||
|
||||
def _get_failures_path(self, bucket_name: str) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json"
|
||||
|
||||
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
|
||||
def _load_from_disk(self, bucket_name: str) -> List[ReplicationFailure]:
|
||||
path = self._get_failures_path(bucket_name)
|
||||
if not path.exists():
|
||||
return []
|
||||
@@ -192,7 +193,7 @@ class ReplicationFailureStore:
|
||||
logger.error(f"Failed to load replication failures for {bucket_name}: {e}")
|
||||
return []
|
||||
|
||||
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||
def _save_to_disk(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||
path = self._get_failures_path(bucket_name)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
|
||||
@@ -202,6 +203,18 @@ class ReplicationFailureStore:
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to save replication failures for {bucket_name}: {e}")
|
||||
|
||||
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
|
||||
if bucket_name in self._cache:
|
||||
return list(self._cache[bucket_name])
|
||||
failures = self._load_from_disk(bucket_name)
|
||||
self._cache[bucket_name] = failures
|
||||
return list(failures)
|
||||
|
||||
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||
trimmed = failures[:self.max_failures_per_bucket]
|
||||
self._cache[bucket_name] = trimmed
|
||||
self._save_to_disk(bucket_name, trimmed)
|
||||
|
||||
def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None:
|
||||
with self._lock:
|
||||
failures = self.load_failures(bucket_name)
|
||||
@@ -227,6 +240,7 @@ class ReplicationFailureStore:
|
||||
|
||||
def clear_failures(self, bucket_name: str) -> None:
|
||||
with self._lock:
|
||||
self._cache.pop(bucket_name, None)
|
||||
path = self._get_failures_path(bucket_name)
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
|
||||
@@ -999,7 +999,8 @@ def _apply_object_headers(
|
||||
etag: str,
|
||||
) -> None:
|
||||
if file_stat is not None:
|
||||
response.headers["Content-Length"] = str(file_stat.st_size)
|
||||
if response.status_code != 206:
|
||||
response.headers["Content-Length"] = str(file_stat.st_size)
|
||||
response.headers["Last-Modified"] = http_date(file_stat.st_mtime)
|
||||
response.headers["ETag"] = f'"{etag}"'
|
||||
response.headers["Accept-Ranges"] = "bytes"
|
||||
|
||||
@@ -11,6 +11,7 @@ import time
|
||||
import unicodedata
|
||||
import uuid
|
||||
from collections import OrderedDict
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
@@ -187,6 +188,7 @@ class ObjectStorage:
|
||||
self._object_cache_max_size = object_cache_max_size
|
||||
self._object_key_max_length_bytes = object_key_max_length_bytes
|
||||
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
|
||||
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
|
||||
|
||||
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
|
||||
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
|
||||
@@ -544,11 +546,14 @@ class ObjectStorage:
|
||||
return self._read_metadata(bucket_path.name, safe_key) or {}
|
||||
|
||||
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
|
||||
"""Remove empty parent directories up to (but not including) stop_at.
|
||||
"""Remove empty parent directories in a background thread.
|
||||
|
||||
On Windows/OneDrive, directories may be locked briefly after file deletion.
|
||||
This method retries with a small delay to handle that case.
|
||||
Running this in the background avoids blocking the request thread with retries.
|
||||
"""
|
||||
self._cleanup_executor.submit(self._do_cleanup_empty_parents, path, stop_at)
|
||||
|
||||
def _do_cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
|
||||
for parent in path.parents:
|
||||
if parent == stop_at:
|
||||
break
|
||||
|
||||
77
app/ui.py
77
app/ui.py
@@ -423,57 +423,25 @@ def bucket_detail(bucket_name: str):
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
can_edit_policy = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, bucket_name, "policy")
|
||||
can_edit_policy = True
|
||||
except IamError:
|
||||
can_edit_policy = False
|
||||
iam = _iam()
|
||||
bucket_perms = iam.check_permissions(
|
||||
principal, bucket_name, ["policy", "lifecycle", "cors", "write", "replication"],
|
||||
) if principal else {}
|
||||
admin_perms = iam.check_permissions(
|
||||
principal, None, ["iam:list_users"],
|
||||
) if principal else {}
|
||||
|
||||
can_manage_lifecycle = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, bucket_name, "lifecycle")
|
||||
can_manage_lifecycle = True
|
||||
except IamError:
|
||||
can_manage_lifecycle = False
|
||||
|
||||
can_manage_cors = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, bucket_name, "cors")
|
||||
can_manage_cors = True
|
||||
except IamError:
|
||||
can_manage_cors = False
|
||||
can_edit_policy = bucket_perms.get("policy", False)
|
||||
can_manage_lifecycle = bucket_perms.get("lifecycle", False)
|
||||
can_manage_cors = bucket_perms.get("cors", False)
|
||||
can_manage_versioning = bucket_perms.get("write", False)
|
||||
can_manage_replication = bucket_perms.get("replication", False)
|
||||
is_replication_admin = admin_perms.get("iam:list_users", False)
|
||||
|
||||
try:
|
||||
versioning_enabled = storage.is_versioning_enabled(bucket_name)
|
||||
except StorageError:
|
||||
versioning_enabled = False
|
||||
can_manage_versioning = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, bucket_name, "write")
|
||||
can_manage_versioning = True
|
||||
except IamError:
|
||||
can_manage_versioning = False
|
||||
|
||||
can_manage_replication = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, bucket_name, "replication")
|
||||
can_manage_replication = True
|
||||
except IamError:
|
||||
can_manage_replication = False
|
||||
|
||||
is_replication_admin = False
|
||||
if principal:
|
||||
try:
|
||||
_iam().authorize(principal, None, "iam:list_users")
|
||||
is_replication_admin = True
|
||||
except IamError:
|
||||
is_replication_admin = False
|
||||
|
||||
replication_rule = _replication().get_rule(bucket_name)
|
||||
connections = _connections().list() if (is_replication_admin or replication_rule) else []
|
||||
@@ -489,12 +457,7 @@ def bucket_detail(bucket_name: str):
|
||||
|
||||
bucket_quota = storage.get_bucket_quota(bucket_name)
|
||||
bucket_stats = storage.bucket_stats(bucket_name)
|
||||
can_manage_quota = False
|
||||
try:
|
||||
_iam().authorize(principal, None, "iam:list_users")
|
||||
can_manage_quota = True
|
||||
except IamError:
|
||||
pass
|
||||
can_manage_quota = is_replication_admin
|
||||
|
||||
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
|
||||
objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name)
|
||||
@@ -1009,6 +972,18 @@ def bulk_download_objects(bucket_name: str):
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
max_total_bytes = current_app.config.get("BULK_DOWNLOAD_MAX_BYTES", 1024 * 1024 * 1024)
|
||||
total_size = 0
|
||||
for key in unique_keys:
|
||||
try:
|
||||
path = storage.get_object_path(bucket_name, key)
|
||||
total_size += path.stat().st_size
|
||||
except (StorageError, OSError):
|
||||
continue
|
||||
if total_size > max_total_bytes:
|
||||
limit_mb = max_total_bytes // (1024 * 1024)
|
||||
return jsonify({"error": f"Total download size exceeds {limit_mb} MB limit. Select fewer objects."}), 400
|
||||
|
||||
buffer = io.BytesIO()
|
||||
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for key in unique_keys:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.2.6"
|
||||
APP_VERSION = "0.2.7"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
@@ -516,6 +516,9 @@
|
||||
};
|
||||
};
|
||||
|
||||
let lastStreamRenderTime = 0;
|
||||
const STREAM_RENDER_THROTTLE_MS = 500;
|
||||
|
||||
const flushPendingStreamObjects = () => {
|
||||
if (pendingStreamObjects.length === 0) return;
|
||||
const batch = pendingStreamObjects.splice(0, pendingStreamObjects.length);
|
||||
@@ -532,6 +535,19 @@
|
||||
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()}${countText} loading...`;
|
||||
}
|
||||
}
|
||||
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
|
||||
const loadingText = objectsLoadingRow.querySelector('p');
|
||||
if (loadingText) {
|
||||
const countText = totalObjectCount > 0 ? ` of ${totalObjectCount.toLocaleString()}` : '';
|
||||
loadingText.textContent = `Loading ${loadedObjectCount.toLocaleString()}${countText} objects...`;
|
||||
}
|
||||
}
|
||||
const now = performance.now();
|
||||
if (!streamingComplete && now - lastStreamRenderTime < STREAM_RENDER_THROTTLE_MS) {
|
||||
streamRenderScheduled = false;
|
||||
return;
|
||||
}
|
||||
lastStreamRenderTime = now;
|
||||
refreshVirtualList();
|
||||
streamRenderScheduled = false;
|
||||
};
|
||||
@@ -555,6 +571,7 @@
|
||||
memoizedVisibleItems = null;
|
||||
memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null };
|
||||
pendingStreamObjects = [];
|
||||
lastStreamRenderTime = 0;
|
||||
|
||||
streamAbortController = new AbortController();
|
||||
|
||||
@@ -569,7 +586,10 @@
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
}
|
||||
|
||||
if (objectsLoadingRow) objectsLoadingRow.remove();
|
||||
if (objectsLoadingRow) {
|
||||
const loadingText = objectsLoadingRow.querySelector('p');
|
||||
if (loadingText) loadingText.textContent = 'Receiving objects...';
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
@@ -597,6 +617,10 @@
|
||||
break;
|
||||
case 'count':
|
||||
totalObjectCount = msg.total_count || 0;
|
||||
if (objectsLoadingRow) {
|
||||
const loadingText = objectsLoadingRow.querySelector('p');
|
||||
if (loadingText) loadingText.textContent = `Loading 0 of ${totalObjectCount.toLocaleString()} objects...`;
|
||||
}
|
||||
break;
|
||||
case 'object':
|
||||
pendingStreamObjects.push(processStreamObject(msg));
|
||||
@@ -630,11 +654,15 @@
|
||||
} catch (e) { }
|
||||
}
|
||||
|
||||
flushPendingStreamObjects();
|
||||
streamingComplete = true;
|
||||
flushPendingStreamObjects();
|
||||
hasMoreObjects = false;
|
||||
updateObjectCountBadge();
|
||||
|
||||
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
|
||||
objectsLoadingRow.remove();
|
||||
}
|
||||
|
||||
if (loadMoreStatus) {
|
||||
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user