MyFSIO v0.2.7 Release

Reviewed-on: #19
This commit was merged in pull request #19.
This commit is contained in:
2026-02-09 12:22:37 +00:00
8 changed files with 106 additions and 87 deletions

View File

@@ -309,6 +309,18 @@ class IamService:
if not self._is_allowed(principal, normalized, action): if not self._is_allowed(principal, normalized, action):
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'") raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]:
self._maybe_reload()
bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*")
normalized_actions = {a: self._normalize_action(a) for a in actions}
results: Dict[str, bool] = {}
for original, canonical in normalized_actions.items():
if canonical not in ALLOWED_ACTIONS:
results[original] = False
else:
results[original] = self._is_allowed(principal, bucket_name, canonical)
return results
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]: def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")] return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]

View File

@@ -160,6 +160,7 @@ class KMSManager:
self.generate_data_key_max_bytes = generate_data_key_max_bytes self.generate_data_key_max_bytes = generate_data_key_max_bytes
self._keys: Dict[str, KMSKey] = {} self._keys: Dict[str, KMSKey] = {}
self._master_key: bytes | None = None self._master_key: bytes | None = None
self._master_aesgcm: AESGCM | None = None
self._loaded = False self._loaded = False
@property @property
@@ -191,6 +192,7 @@ class KMSManager:
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
else: else:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
self._master_aesgcm = AESGCM(self._master_key)
return self._master_key return self._master_key
def _load_keys(self) -> None: def _load_keys(self) -> None:
@@ -231,18 +233,16 @@ class KMSManager:
_set_secure_file_permissions(self.keys_path) _set_secure_file_permissions(self.keys_path)
def _encrypt_key_material(self, key_material: bytes) -> bytes: def _encrypt_key_material(self, key_material: bytes) -> bytes:
"""Encrypt key material with the master key.""" _ = self.master_key
aesgcm = AESGCM(self.master_key)
nonce = secrets.token_bytes(12) nonce = secrets.token_bytes(12)
ciphertext = aesgcm.encrypt(nonce, key_material, None) ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None)
return nonce + ciphertext return nonce + ciphertext
def _decrypt_key_material(self, encrypted: bytes) -> bytes: def _decrypt_key_material(self, encrypted: bytes) -> bytes:
"""Decrypt key material with the master key.""" _ = self.master_key
aesgcm = AESGCM(self.master_key)
nonce = encrypted[:12] nonce = encrypted[:12]
ciphertext = encrypted[12:] ciphertext = encrypted[12:]
return aesgcm.decrypt(nonce, ciphertext, None) return self._master_aesgcm.decrypt(nonce, ciphertext, None)
def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey: def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey:
"""Create a new KMS key.""" """Create a new KMS key."""
@@ -404,22 +404,6 @@ class KMSManager:
plaintext, _ = self.decrypt(encrypted_key, context) plaintext, _ = self.decrypt(encrypted_key, context)
return plaintext return plaintext
def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider:
"""Get an encryption provider for a specific key."""
self._load_keys()
if key_id is None:
if not self._keys:
key = self.create_key("Default KMS Key")
key_id = key.key_id
else:
key_id = next(iter(self._keys.keys()))
if key_id not in self._keys:
raise EncryptionError(f"Key not found: {key_id}")
return KMSEncryptionProvider(self, key_id)
def re_encrypt(self, ciphertext: bytes, destination_key_id: str, def re_encrypt(self, ciphertext: bytes, destination_key_id: str,
source_context: Dict[str, str] | None = None, source_context: Dict[str, str] | None = None,
destination_context: Dict[str, str] | None = None) -> bytes: destination_context: Dict[str, str] | None = None) -> bytes:

View File

@@ -176,11 +176,12 @@ class ReplicationFailureStore:
self.storage_root = storage_root self.storage_root = storage_root
self.max_failures_per_bucket = max_failures_per_bucket self.max_failures_per_bucket = max_failures_per_bucket
self._lock = threading.Lock() self._lock = threading.Lock()
self._cache: Dict[str, List[ReplicationFailure]] = {}
def _get_failures_path(self, bucket_name: str) -> Path: def _get_failures_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json" return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json"
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]: def _load_from_disk(self, bucket_name: str) -> List[ReplicationFailure]:
path = self._get_failures_path(bucket_name) path = self._get_failures_path(bucket_name)
if not path.exists(): if not path.exists():
return [] return []
@@ -192,7 +193,7 @@ class ReplicationFailureStore:
logger.error(f"Failed to load replication failures for {bucket_name}: {e}") logger.error(f"Failed to load replication failures for {bucket_name}: {e}")
return [] return []
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None: def _save_to_disk(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
path = self._get_failures_path(bucket_name) path = self._get_failures_path(bucket_name)
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]} data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
@@ -202,6 +203,18 @@ class ReplicationFailureStore:
except OSError as e: except OSError as e:
logger.error(f"Failed to save replication failures for {bucket_name}: {e}") logger.error(f"Failed to save replication failures for {bucket_name}: {e}")
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
if bucket_name in self._cache:
return list(self._cache[bucket_name])
failures = self._load_from_disk(bucket_name)
self._cache[bucket_name] = failures
return list(failures)
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
trimmed = failures[:self.max_failures_per_bucket]
self._cache[bucket_name] = trimmed
self._save_to_disk(bucket_name, trimmed)
def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None: def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None:
with self._lock: with self._lock:
failures = self.load_failures(bucket_name) failures = self.load_failures(bucket_name)
@@ -227,6 +240,7 @@ class ReplicationFailureStore:
def clear_failures(self, bucket_name: str) -> None: def clear_failures(self, bucket_name: str) -> None:
with self._lock: with self._lock:
self._cache.pop(bucket_name, None)
path = self._get_failures_path(bucket_name) path = self._get_failures_path(bucket_name)
if path.exists(): if path.exists():
path.unlink() path.unlink()

View File

@@ -999,7 +999,8 @@ def _apply_object_headers(
etag: str, etag: str,
) -> None: ) -> None:
if file_stat is not None: if file_stat is not None:
response.headers["Content-Length"] = str(file_stat.st_size) if response.status_code != 206:
response.headers["Content-Length"] = str(file_stat.st_size)
response.headers["Last-Modified"] = http_date(file_stat.st_mtime) response.headers["Last-Modified"] = http_date(file_stat.st_mtime)
response.headers["ETag"] = f'"{etag}"' response.headers["ETag"] = f'"{etag}"'
response.headers["Accept-Ranges"] = "bytes" response.headers["Accept-Ranges"] = "bytes"

View File

@@ -11,6 +11,7 @@ import time
import unicodedata import unicodedata
import uuid import uuid
from collections import OrderedDict from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -187,6 +188,7 @@ class ObjectStorage:
self._object_cache_max_size = object_cache_max_size self._object_cache_max_size = object_cache_max_size
self._object_key_max_length_bytes = object_key_max_length_bytes self._object_key_max_length_bytes = object_key_max_length_bytes
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {} self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock: def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
"""Get or create a lock for a specific bucket. Reduces global lock contention.""" """Get or create a lock for a specific bucket. Reduces global lock contention."""
@@ -544,11 +546,14 @@ class ObjectStorage:
return self._read_metadata(bucket_path.name, safe_key) or {} return self._read_metadata(bucket_path.name, safe_key) or {}
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None: def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
"""Remove empty parent directories up to (but not including) stop_at. """Remove empty parent directories in a background thread.
On Windows/OneDrive, directories may be locked briefly after file deletion. On Windows/OneDrive, directories may be locked briefly after file deletion.
This method retries with a small delay to handle that case. Running this in the background avoids blocking the request thread with retries.
""" """
self._cleanup_executor.submit(self._do_cleanup_empty_parents, path, stop_at)
def _do_cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
for parent in path.parents: for parent in path.parents:
if parent == stop_at: if parent == stop_at:
break break

View File

@@ -423,57 +423,25 @@ def bucket_detail(bucket_name: str):
}, },
indent=2, indent=2,
) )
can_edit_policy = False iam = _iam()
if principal: bucket_perms = iam.check_permissions(
try: principal, bucket_name, ["policy", "lifecycle", "cors", "write", "replication"],
_iam().authorize(principal, bucket_name, "policy") ) if principal else {}
can_edit_policy = True admin_perms = iam.check_permissions(
except IamError: principal, None, ["iam:list_users"],
can_edit_policy = False ) if principal else {}
can_manage_lifecycle = False can_edit_policy = bucket_perms.get("policy", False)
if principal: can_manage_lifecycle = bucket_perms.get("lifecycle", False)
try: can_manage_cors = bucket_perms.get("cors", False)
_iam().authorize(principal, bucket_name, "lifecycle") can_manage_versioning = bucket_perms.get("write", False)
can_manage_lifecycle = True can_manage_replication = bucket_perms.get("replication", False)
except IamError: is_replication_admin = admin_perms.get("iam:list_users", False)
can_manage_lifecycle = False
can_manage_cors = False
if principal:
try:
_iam().authorize(principal, bucket_name, "cors")
can_manage_cors = True
except IamError:
can_manage_cors = False
try: try:
versioning_enabled = storage.is_versioning_enabled(bucket_name) versioning_enabled = storage.is_versioning_enabled(bucket_name)
except StorageError: except StorageError:
versioning_enabled = False versioning_enabled = False
can_manage_versioning = False
if principal:
try:
_iam().authorize(principal, bucket_name, "write")
can_manage_versioning = True
except IamError:
can_manage_versioning = False
can_manage_replication = False
if principal:
try:
_iam().authorize(principal, bucket_name, "replication")
can_manage_replication = True
except IamError:
can_manage_replication = False
is_replication_admin = False
if principal:
try:
_iam().authorize(principal, None, "iam:list_users")
is_replication_admin = True
except IamError:
is_replication_admin = False
replication_rule = _replication().get_rule(bucket_name) replication_rule = _replication().get_rule(bucket_name)
connections = _connections().list() if (is_replication_admin or replication_rule) else [] connections = _connections().list() if (is_replication_admin or replication_rule) else []
@@ -489,12 +457,7 @@ def bucket_detail(bucket_name: str):
bucket_quota = storage.get_bucket_quota(bucket_name) bucket_quota = storage.get_bucket_quota(bucket_name)
bucket_stats = storage.bucket_stats(bucket_name) bucket_stats = storage.bucket_stats(bucket_name)
can_manage_quota = False can_manage_quota = is_replication_admin
try:
_iam().authorize(principal, None, "iam:list_users")
can_manage_quota = True
except IamError:
pass
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name) objects_stream_url = url_for("ui.stream_bucket_objects", bucket_name=bucket_name)
@@ -1009,6 +972,18 @@ def bulk_download_objects(bucket_name: str):
except IamError as exc: except IamError as exc:
return jsonify({"error": str(exc)}), 403 return jsonify({"error": str(exc)}), 403
max_total_bytes = current_app.config.get("BULK_DOWNLOAD_MAX_BYTES", 1024 * 1024 * 1024)
total_size = 0
for key in unique_keys:
try:
path = storage.get_object_path(bucket_name, key)
total_size += path.stat().st_size
except (StorageError, OSError):
continue
if total_size > max_total_bytes:
limit_mb = max_total_bytes // (1024 * 1024)
return jsonify({"error": f"Total download size exceeds {limit_mb} MB limit. Select fewer objects."}), 400
buffer = io.BytesIO() buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for key in unique_keys: for key in unique_keys:

View File

@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.2.6" APP_VERSION = "0.2.7"
def get_version() -> str: def get_version() -> str:

View File

@@ -516,6 +516,9 @@
}; };
}; };
let lastStreamRenderTime = 0;
const STREAM_RENDER_THROTTLE_MS = 500;
const flushPendingStreamObjects = () => { const flushPendingStreamObjects = () => {
if (pendingStreamObjects.length === 0) return; if (pendingStreamObjects.length === 0) return;
const batch = pendingStreamObjects.splice(0, pendingStreamObjects.length); const batch = pendingStreamObjects.splice(0, pendingStreamObjects.length);
@@ -532,6 +535,19 @@
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()}${countText} loading...`; loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()}${countText} loading...`;
} }
} }
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) {
const countText = totalObjectCount > 0 ? ` of ${totalObjectCount.toLocaleString()}` : '';
loadingText.textContent = `Loading ${loadedObjectCount.toLocaleString()}${countText} objects...`;
}
}
const now = performance.now();
if (!streamingComplete && now - lastStreamRenderTime < STREAM_RENDER_THROTTLE_MS) {
streamRenderScheduled = false;
return;
}
lastStreamRenderTime = now;
refreshVirtualList(); refreshVirtualList();
streamRenderScheduled = false; streamRenderScheduled = false;
}; };
@@ -555,6 +571,7 @@
memoizedVisibleItems = null; memoizedVisibleItems = null;
memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null }; memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null };
pendingStreamObjects = []; pendingStreamObjects = [];
lastStreamRenderTime = 0;
streamAbortController = new AbortController(); streamAbortController = new AbortController();
@@ -569,7 +586,10 @@
throw new Error(`HTTP ${response.status}`); throw new Error(`HTTP ${response.status}`);
} }
if (objectsLoadingRow) objectsLoadingRow.remove(); if (objectsLoadingRow) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) loadingText.textContent = 'Receiving objects...';
}
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
@@ -597,6 +617,10 @@
break; break;
case 'count': case 'count':
totalObjectCount = msg.total_count || 0; totalObjectCount = msg.total_count || 0;
if (objectsLoadingRow) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) loadingText.textContent = `Loading 0 of ${totalObjectCount.toLocaleString()} objects...`;
}
break; break;
case 'object': case 'object':
pendingStreamObjects.push(processStreamObject(msg)); pendingStreamObjects.push(processStreamObject(msg));
@@ -630,11 +654,15 @@
} catch (e) { } } catch (e) { }
} }
flushPendingStreamObjects();
streamingComplete = true; streamingComplete = true;
flushPendingStreamObjects();
hasMoreObjects = false; hasMoreObjects = false;
updateObjectCountBadge(); updateObjectCountBadge();
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
objectsLoadingRow.remove();
}
if (loadMoreStatus) { if (loadMoreStatus) {
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`; loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`;
} }