23 Commits

Author SHA1 Message Date
35f61313e0 MyFSIO v0.2.8 Release
Reviewed-on: #20
2026-02-10 14:16:22 +00:00
01e79e6993 Fix object browser UI issues 2026-02-10 11:41:02 +08:00
1e3c4b545f Migrate UI backend from direct storage calls to S3 API proxy via boto3 2026-02-09 22:33:47 +08:00
c470cfb576 MyFSIO v0.2.7 Release
Reviewed-on: #19
2026-02-09 12:22:37 +00:00
4ecd32a554 Fix empty UI on large bucket first load: keep loading row during streaming, add progress indicator, throttle renders 2026-02-09 19:29:50 +08:00
aa6d7c4d28 Optimize replication failure caching, batch UI auth checks, add bulk download size limit, background parent cleanup 2026-02-09 18:23:45 +08:00
6e6d6d32bf Optimize KMS: cache AESGCM instance, remove duplicate get_provider 2026-02-09 17:01:19 +08:00
54705ab9c4 Fix Content-Length mismatch on range requests (206 Partial Content) 2026-02-06 16:14:35 +08:00
jun
d96955deee MyFSIO v0.2.6 Release
Reviewed-on: #18
2026-02-05 16:18:03 +00:00
77a46d0725 Binary run fix 2026-02-05 23:49:36 +08:00
0f750b9d89 Optimize object browser for large listings on slow networks 2026-02-05 22:56:00 +08:00
e0dee9db36 Fix UI object browser not showing objects uploaded via S3 API 2026-02-05 22:22:59 +08:00
126657c99f Further debugging of object browser object count delay 2026-02-05 21:45:02 +08:00
07fb1ac773 Fix cross-process cache invalidation on Windows using version counter instead of mtime 2026-02-05 21:32:40 +08:00
147962e1dd Further debugging of object browser object count delay 2026-02-05 21:18:35 +08:00
2643a79121 Debug object browser object count delay 2026-02-05 21:08:18 +08:00
e9a035827b Add _touch_cache_marker for UI object delay count issue 2026-02-05 20:56:42 +08:00
033b8a82be Fix error handlers for API mode; distinguish files from directories in object lookup; Fix UI not showing newly uploaded objects by adding Cache-Control headers 2026-02-05 20:44:11 +08:00
e76c311231 Update install/uninstall scripts with new config options and credential capture 2026-02-05 19:21:18 +08:00
cbdf1a27c8 Pin dockerfile python version to 3.14.3 2026-02-05 19:11:42 +08:00
4a60cb269a Update python version in Dockerfile 2026-02-05 19:11:00 +08:00
ebe7f6222d Fix hardcoded secret key ttl session 2026-02-05 19:08:18 +08:00
70b61fd8e6 Further optimize CPU usage; Improve security and performance; 4 bug fixes. 2026-02-05 17:45:34 +08:00
27 changed files with 2220 additions and 938 deletions

View File

@@ -1,12 +1,10 @@
# syntax=docker/dockerfile:1.7
FROM python:3.12.12-slim
FROM python:3.14.3-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
# Install build deps for any wheels that need compilation, then clean up
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential \
&& rm -rf /var/lib/apt/lists/*
@@ -16,10 +14,8 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Make entrypoint executable
RUN chmod +x docker-entrypoint.sh
# Create data directory and set permissions
RUN mkdir -p /app/data \
&& useradd -m -u 1000 myfsio \
&& chown -R myfsio:myfsio /app

View File

@@ -223,6 +223,13 @@ def create_app(
app.extensions["access_logging"] = access_logging_service
app.extensions["site_registry"] = site_registry
from .s3_client import S3ProxyClient
api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000"
app.extensions["s3_proxy"] = S3ProxyClient(
api_base_url=api_base,
region=app.config.get("AWS_REGION", "us-east-1"),
)
operation_metrics_collector = None
if app.config.get("OPERATION_METRICS_ENABLED", False):
operation_metrics_collector = OperationMetricsCollector(
@@ -263,11 +270,37 @@ def create_app(
@app.errorhandler(500)
def internal_error(error):
wants_html = request.accept_mimetypes.accept_html
path = request.path or ""
if include_ui and wants_html and (path.startswith("/ui") or path == "/"):
return render_template('500.html'), 500
error_xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error>'
'<Code>InternalError</Code>'
'<Message>An internal server error occurred</Message>'
f'<Resource>{path}</Resource>'
f'<RequestId>{getattr(g, "request_id", "-")}</RequestId>'
'</Error>'
)
return error_xml, 500, {'Content-Type': 'application/xml'}
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
wants_html = request.accept_mimetypes.accept_html
path = request.path or ""
if include_ui and wants_html and (path.startswith("/ui") or path == "/"):
return render_template('csrf_error.html', reason=e.description), 400
error_xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error>'
'<Code>CSRFError</Code>'
f'<Message>{e.description}</Message>'
f'<Resource>{path}</Resource>'
f'<RequestId>{getattr(g, "request_id", "-")}</RequestId>'
'</Error>'
)
return error_xml, 400, {'Content-Type': 'application/xml'}
@app.template_filter("filesizeformat")
def filesizeformat(value: int) -> str:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import ipaddress
import json
import logging
import re
import socket
@@ -354,6 +355,10 @@ def update_peer_site(site_id: str):
if region_error:
return _json_error("ValidationError", region_error, 400)
if "connection_id" in payload:
if payload["connection_id"] and not _connections().get(payload["connection_id"]):
return _json_error("ValidationError", f"Connection '{payload['connection_id']}' not found", 400)
peer = PeerSite(
site_id=site_id,
endpoint=payload.get("endpoint", existing.endpoint),

View File

@@ -36,10 +36,11 @@ class GzipMiddleware:
content_type = None
content_length = None
should_compress = False
passthrough = False
exc_info_holder = [None]
def custom_start_response(status: str, headers: List[Tuple[str, str]], exc_info=None):
nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress
nonlocal response_started, status_code, response_headers, content_type, content_length, should_compress, passthrough
response_started = True
status_code = int(status.split(' ', 1)[0])
response_headers = list(headers)
@@ -50,18 +51,32 @@ class GzipMiddleware:
if name_lower == 'content-type':
content_type = value.split(';')[0].strip().lower()
elif name_lower == 'content-length':
try:
content_length = int(value)
except (ValueError, TypeError):
pass
elif name_lower == 'content-encoding':
should_compress = False
passthrough = True
return start_response(status, headers, exc_info)
elif name_lower == 'x-stream-response':
passthrough = True
return start_response(status, headers, exc_info)
if content_type and content_type in COMPRESSIBLE_MIMES:
if content_length is None or content_length >= self.min_size:
should_compress = True
else:
passthrough = True
return start_response(status, headers, exc_info)
return None
response_body = b''.join(self.app(environ, custom_start_response))
app_iter = self.app(environ, custom_start_response)
if passthrough:
return app_iter
response_body = b''.join(app_iter)
if not response_started:
return [response_body]

View File

@@ -309,6 +309,18 @@ class IamService:
if not self._is_allowed(principal, normalized, action):
raise IamError(f"Access denied for action '{action}' on bucket '{bucket_name}'")
def check_permissions(self, principal: Principal, bucket_name: str | None, actions: Iterable[str]) -> Dict[str, bool]:
self._maybe_reload()
bucket_name = (bucket_name or "*").lower() if bucket_name != "*" else (bucket_name or "*")
normalized_actions = {a: self._normalize_action(a) for a in actions}
results: Dict[str, bool] = {}
for original, canonical in normalized_actions.items():
if canonical not in ALLOWED_ACTIONS:
results[original] = False
else:
results[original] = self._is_allowed(principal, bucket_name, canonical)
return results
def buckets_for_principal(self, principal: Principal, buckets: Iterable[str]) -> List[str]:
return [bucket for bucket in buckets if self._is_allowed(principal, bucket, "list")]
@@ -529,11 +541,13 @@ class IamService:
return candidate if candidate in ALLOWED_ACTIONS else ""
def _write_default(self) -> None:
access_key = secrets.token_hex(12)
secret_key = secrets.token_urlsafe(32)
default = {
"users": [
{
"access_key": "localadmin",
"secret_key": "localadmin",
"access_key": access_key,
"secret_key": secret_key,
"display_name": "Local Admin",
"policies": [
{"bucket": "*", "actions": list(ALLOWED_ACTIONS)}
@@ -542,6 +556,14 @@ class IamService:
]
}
self.config_path.write_text(json.dumps(default, indent=2))
print(f"\n{'='*60}")
print("MYFSIO FIRST RUN - ADMIN CREDENTIALS GENERATED")
print(f"{'='*60}")
print(f"Access Key: {access_key}")
print(f"Secret Key: {secret_key}")
print(f"{'='*60}")
print(f"Missed this? Check: {self.config_path}")
print(f"{'='*60}\n")
def _generate_access_key(self) -> str:
return secrets.token_hex(8)

View File

@@ -160,6 +160,7 @@ class KMSManager:
self.generate_data_key_max_bytes = generate_data_key_max_bytes
self._keys: Dict[str, KMSKey] = {}
self._master_key: bytes | None = None
self._master_aesgcm: AESGCM | None = None
self._loaded = False
@property
@@ -191,6 +192,7 @@ class KMSManager:
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
self._master_aesgcm = AESGCM(self._master_key)
return self._master_key
def _load_keys(self) -> None:
@@ -231,18 +233,16 @@ class KMSManager:
_set_secure_file_permissions(self.keys_path)
def _encrypt_key_material(self, key_material: bytes) -> bytes:
"""Encrypt key material with the master key."""
aesgcm = AESGCM(self.master_key)
_ = self.master_key
nonce = secrets.token_bytes(12)
ciphertext = aesgcm.encrypt(nonce, key_material, None)
ciphertext = self._master_aesgcm.encrypt(nonce, key_material, None)
return nonce + ciphertext
def _decrypt_key_material(self, encrypted: bytes) -> bytes:
"""Decrypt key material with the master key."""
aesgcm = AESGCM(self.master_key)
_ = self.master_key
nonce = encrypted[:12]
ciphertext = encrypted[12:]
return aesgcm.decrypt(nonce, ciphertext, None)
return self._master_aesgcm.decrypt(nonce, ciphertext, None)
def create_key(self, description: str = "", key_id: str | None = None) -> KMSKey:
"""Create a new KMS key."""
@@ -404,22 +404,6 @@ class KMSManager:
plaintext, _ = self.decrypt(encrypted_key, context)
return plaintext
def get_provider(self, key_id: str | None = None) -> KMSEncryptionProvider:
"""Get an encryption provider for a specific key."""
self._load_keys()
if key_id is None:
if not self._keys:
key = self.create_key("Default KMS Key")
key_id = key.key_id
else:
key_id = next(iter(self._keys.keys()))
if key_id not in self._keys:
raise EncryptionError(f"Key not found: {key_id}")
return KMSEncryptionProvider(self, key_id)
def re_encrypt(self, ciphertext: bytes, destination_key_id: str,
source_context: Dict[str, str] | None = None,
destination_context: Dict[str, str] | None = None) -> bytes:

View File

@@ -176,11 +176,12 @@ class ReplicationFailureStore:
self.storage_root = storage_root
self.max_failures_per_bucket = max_failures_per_bucket
self._lock = threading.Lock()
self._cache: Dict[str, List[ReplicationFailure]] = {}
def _get_failures_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "replication_failures.json"
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
def _load_from_disk(self, bucket_name: str) -> List[ReplicationFailure]:
path = self._get_failures_path(bucket_name)
if not path.exists():
return []
@@ -192,7 +193,7 @@ class ReplicationFailureStore:
logger.error(f"Failed to load replication failures for {bucket_name}: {e}")
return []
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
def _save_to_disk(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
path = self._get_failures_path(bucket_name)
path.parent.mkdir(parents=True, exist_ok=True)
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
@@ -202,6 +203,18 @@ class ReplicationFailureStore:
except OSError as e:
logger.error(f"Failed to save replication failures for {bucket_name}: {e}")
def load_failures(self, bucket_name: str) -> List[ReplicationFailure]:
if bucket_name in self._cache:
return list(self._cache[bucket_name])
failures = self._load_from_disk(bucket_name)
self._cache[bucket_name] = failures
return list(failures)
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
trimmed = failures[:self.max_failures_per_bucket]
self._cache[bucket_name] = trimmed
self._save_to_disk(bucket_name, trimmed)
def add_failure(self, bucket_name: str, failure: ReplicationFailure) -> None:
with self._lock:
failures = self.load_failures(bucket_name)
@@ -227,6 +240,7 @@ class ReplicationFailureStore:
def clear_failures(self, bucket_name: str) -> None:
with self._lock:
self._cache.pop(bucket_name, None)
path = self._get_failures_path(bucket_name)
if path.exists():
path.unlink()

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import base64
import hashlib
import hmac
import json
import logging
import mimetypes
import re
@@ -999,12 +1000,14 @@ def _apply_object_headers(
etag: str,
) -> None:
if file_stat is not None:
if response.status_code != 206:
response.headers["Content-Length"] = str(file_stat.st_size)
response.headers["Last-Modified"] = http_date(file_stat.st_mtime)
response.headers["ETag"] = f'"{etag}"'
response.headers["Accept-Ranges"] = "bytes"
for key, value in (metadata or {}).items():
response.headers[f"X-Amz-Meta-{key}"] = value
safe_value = _sanitize_header_value(str(value))
response.headers[f"X-Amz-Meta-{key}"] = safe_value
def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None:
@@ -2342,10 +2345,12 @@ def _post_object(bucket_name: str) -> Response:
success_action_redirect = request.form.get("success_action_redirect")
if success_action_redirect:
allowed_hosts = current_app.config.get("ALLOWED_REDIRECT_HOSTS", [])
if not allowed_hosts:
allowed_hosts = [request.host]
parsed = urlparse(success_action_redirect)
if parsed.scheme not in ("http", "https"):
return _error_response("InvalidArgument", "Redirect URL must use http or https", 400)
if allowed_hosts and parsed.netloc not in allowed_hosts:
if parsed.netloc not in allowed_hosts:
return _error_response("InvalidArgument", "Redirect URL host not allowed", 400)
redirect_url = f"{success_action_redirect}?bucket={bucket_name}&key={quote(object_key)}&etag={meta.etag}"
return Response(status=303, headers={"Location": redirect_url})
@@ -2773,9 +2778,14 @@ def object_handler(bucket_name: str, object_key: str):
except StorageError as exc:
return _error_response("InternalError", str(exc), 500)
else:
try:
stat = path.stat()
file_size = stat.st_size
etag = storage._compute_etag(path)
etag = metadata.get("__etag__") or storage._compute_etag(path)
except PermissionError:
return _error_response("AccessDenied", "Permission denied accessing object", 403)
except OSError as exc:
return _error_response("InternalError", f"Failed to access object: {exc}", 500)
if range_header:
try:
@@ -2816,13 +2826,22 @@ def object_handler(bucket_name: str, object_key: str):
except StorageError as exc:
return _error_response("InternalError", str(exc), 500)
else:
try:
stat = path.stat()
response = Response(status=200)
etag = storage._compute_etag(path)
etag = metadata.get("__etag__") or storage._compute_etag(path)
except PermissionError:
return _error_response("AccessDenied", "Permission denied accessing object", 403)
except OSError as exc:
return _error_response("InternalError", f"Failed to access object: {exc}", 500)
response.headers["Content-Type"] = mimetype
logged_bytes = 0
_apply_object_headers(response, file_stat=path.stat() if not is_encrypted else None, metadata=metadata, etag=etag)
try:
file_stat = path.stat() if not is_encrypted else None
except (PermissionError, OSError):
file_stat = None
_apply_object_headers(response, file_stat=file_stat, metadata=metadata, etag=etag)
if request.method == "GET":
response_overrides = {
@@ -2945,7 +2964,11 @@ def _bucket_policy_handler(bucket_name: str) -> Response:
store.delete_policy(bucket_name)
current_app.logger.info("Bucket policy removed", extra={"bucket": bucket_name})
return Response(status=204)
payload = request.get_json(silent=True)
raw_body = request.get_data(cache=False) or b""
try:
payload = json.loads(raw_body)
except (json.JSONDecodeError, ValueError):
return _error_response("MalformedPolicy", "Policy document must be JSON", 400)
if not payload:
return _error_response("MalformedPolicy", "Policy document must be JSON", 400)
try:

284
app/s3_client.py Normal file
View File

@@ -0,0 +1,284 @@
from __future__ import annotations
import json
import logging
import threading
import time
from typing import Any, Generator, Optional
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError
from flask import current_app, session
logger = logging.getLogger(__name__)
UI_PROXY_USER_AGENT = "MyFSIO-UIProxy/1.0"
_BOTO_ERROR_MAP = {
"NoSuchBucket": 404,
"NoSuchKey": 404,
"NoSuchUpload": 404,
"BucketAlreadyExists": 409,
"BucketAlreadyOwnedByYou": 409,
"BucketNotEmpty": 409,
"AccessDenied": 403,
"InvalidAccessKeyId": 403,
"SignatureDoesNotMatch": 403,
"InvalidBucketName": 400,
"InvalidArgument": 400,
"MalformedXML": 400,
"EntityTooLarge": 400,
"QuotaExceeded": 403,
}
_UPLOAD_REGISTRY_MAX_AGE = 86400
_UPLOAD_REGISTRY_CLEANUP_INTERVAL = 3600
class UploadRegistry:
def __init__(self) -> None:
self._entries: dict[str, tuple[str, str, float]] = {}
self._lock = threading.Lock()
self._last_cleanup = time.monotonic()
def register(self, upload_id: str, bucket_name: str, object_key: str) -> None:
with self._lock:
self._entries[upload_id] = (bucket_name, object_key, time.monotonic())
self._maybe_cleanup()
def get_key(self, upload_id: str, bucket_name: str) -> Optional[str]:
with self._lock:
entry = self._entries.get(upload_id)
if entry is None:
return None
stored_bucket, key, created_at = entry
if stored_bucket != bucket_name:
return None
if time.monotonic() - created_at > _UPLOAD_REGISTRY_MAX_AGE:
del self._entries[upload_id]
return None
return key
def remove(self, upload_id: str) -> None:
with self._lock:
self._entries.pop(upload_id, None)
def _maybe_cleanup(self) -> None:
now = time.monotonic()
if now - self._last_cleanup < _UPLOAD_REGISTRY_CLEANUP_INTERVAL:
return
self._last_cleanup = now
cutoff = now - _UPLOAD_REGISTRY_MAX_AGE
stale = [uid for uid, (_, _, ts) in self._entries.items() if ts < cutoff]
for uid in stale:
del self._entries[uid]
class S3ProxyClient:
def __init__(self, api_base_url: str, region: str = "us-east-1") -> None:
if not api_base_url:
raise ValueError("api_base_url is required for S3ProxyClient")
self._api_base_url = api_base_url.rstrip("/")
self._region = region
self.upload_registry = UploadRegistry()
@property
def api_base_url(self) -> str:
return self._api_base_url
def get_client(self, access_key: str, secret_key: str) -> Any:
if not access_key or not secret_key:
raise ValueError("Both access_key and secret_key are required")
config = Config(
user_agent_extra=UI_PROXY_USER_AGENT,
connect_timeout=5,
read_timeout=30,
retries={"max_attempts": 0},
signature_version="s3v4",
s3={"addressing_style": "path"},
request_checksum_calculation="when_required",
response_checksum_validation="when_required",
)
return boto3.client(
"s3",
endpoint_url=self._api_base_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=self._region,
config=config,
)
def _get_proxy() -> S3ProxyClient:
proxy = current_app.extensions.get("s3_proxy")
if proxy is None:
raise RuntimeError(
"S3 proxy not configured. Set API_BASE_URL or run both API and UI servers."
)
return proxy
def _get_session_creds() -> tuple[str, str]:
secret_store = current_app.extensions["secret_store"]
secret_store.purge_expired()
token = session.get("cred_token")
if not token:
raise PermissionError("Not authenticated")
creds = secret_store.peek(token)
if not creds:
raise PermissionError("Session expired")
access_key = creds.get("access_key", "")
secret_key = creds.get("secret_key", "")
if not access_key or not secret_key:
raise PermissionError("Invalid session credentials")
return access_key, secret_key
def get_session_s3_client() -> Any:
proxy = _get_proxy()
access_key, secret_key = _get_session_creds()
return proxy.get_client(access_key, secret_key)
def get_upload_registry() -> UploadRegistry:
return _get_proxy().upload_registry
def handle_client_error(exc: ClientError) -> tuple[dict[str, str], int]:
error_info = exc.response.get("Error", {})
code = error_info.get("Code", "InternalError")
message = error_info.get("Message") or "S3 operation failed"
http_status = _BOTO_ERROR_MAP.get(code)
if http_status is None:
http_status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 500)
return {"error": message}, http_status
def handle_connection_error(exc: Exception) -> tuple[dict[str, str], int]:
logger.error("S3 API connection failed: %s", exc)
return {"error": "S3 API server is unreachable. Ensure the API server is running."}, 502
def format_datetime_display(dt: Any, display_tz: str = "UTC") -> str:
from .ui import _format_datetime_display
return _format_datetime_display(dt, display_tz)
def format_datetime_iso(dt: Any, display_tz: str = "UTC") -> str:
from .ui import _format_datetime_iso
return _format_datetime_iso(dt, display_tz)
def build_url_templates(bucket_name: str) -> dict[str, str]:
from flask import url_for
preview_t = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
delete_t = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
presign_t = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
versions_t = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
restore_t = url_for(
"ui.restore_object_version",
bucket_name=bucket_name,
object_key="KEY_PLACEHOLDER",
version_id="VERSION_ID_PLACEHOLDER",
)
tags_t = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
copy_t = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
move_t = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
metadata_t = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
return {
"preview": preview_t,
"download": preview_t + "?download=1",
"presign": presign_t,
"delete": delete_t,
"versions": versions_t,
"restore": restore_t,
"tags": tags_t,
"copy": copy_t,
"move": move_t,
"metadata": metadata_t,
}
def translate_list_objects(
boto3_response: dict[str, Any],
url_templates: dict[str, str],
display_tz: str = "UTC",
versioning_enabled: bool = False,
) -> dict[str, Any]:
objects_data = []
for obj in boto3_response.get("Contents", []):
last_mod = obj["LastModified"]
objects_data.append({
"key": obj["Key"],
"size": obj["Size"],
"last_modified": last_mod.isoformat(),
"last_modified_display": format_datetime_display(last_mod, display_tz),
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
"etag": obj.get("ETag", "").strip('"'),
})
return {
"objects": objects_data,
"is_truncated": boto3_response.get("IsTruncated", False),
"next_continuation_token": boto3_response.get("NextContinuationToken"),
"total_count": boto3_response.get("KeyCount", len(objects_data)),
"versioning_enabled": versioning_enabled,
"url_templates": url_templates,
}
def get_versioning_via_s3(client: Any, bucket_name: str) -> bool:
try:
resp = client.get_bucket_versioning(Bucket=bucket_name)
return resp.get("Status") == "Enabled"
except ClientError as exc:
code = exc.response.get("Error", {}).get("Code", "")
if code != "NoSuchBucket":
logger.warning("Failed to check versioning for %s: %s", bucket_name, code)
return False
def stream_objects_ndjson(
client: Any,
bucket_name: str,
prefix: Optional[str],
url_templates: dict[str, str],
display_tz: str = "UTC",
versioning_enabled: bool = False,
) -> Generator[str, None, None]:
meta_line = json.dumps({
"type": "meta",
"versioning_enabled": versioning_enabled,
"url_templates": url_templates,
}) + "\n"
yield meta_line
yield json.dumps({"type": "count", "total_count": 0}) + "\n"
kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": 1000}
if prefix:
kwargs["Prefix"] = prefix
try:
paginator = client.get_paginator("list_objects_v2")
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
last_mod = obj["LastModified"]
yield json.dumps({
"type": "object",
"key": obj["Key"],
"size": obj["Size"],
"last_modified": last_mod.isoformat(),
"last_modified_display": format_datetime_display(last_mod, display_tz),
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
"etag": obj.get("ETag", "").strip('"'),
}) + "\n"
except ClientError as exc:
error_msg = exc.response.get("Error", {}).get("Message", "S3 operation failed")
yield json.dumps({"type": "error", "error": error_msg}) + "\n"
return
except (EndpointConnectionError, ConnectionClosedError):
yield json.dumps({"type": "error", "error": "S3 API server is unreachable"}) + "\n"
return
yield json.dumps({"type": "done"}) + "\n"

View File

@@ -18,6 +18,18 @@ class EphemeralSecretStore:
self._store[token] = (payload, expires_at)
return token
def peek(self, token: str | None) -> Any | None:
if not token:
return None
entry = self._store.get(token)
if not entry:
return None
payload, expires_at = entry
if expires_at < time.time():
self._store.pop(token, None)
return None
return payload
def pop(self, token: str | None) -> Any | None:
if not token:
return None

View File

@@ -11,6 +11,7 @@ import time
import unicodedata
import uuid
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
@@ -177,7 +178,7 @@ class ObjectStorage:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float, float]] = OrderedDict()
self._cache_lock = threading.Lock()
self._bucket_locks: Dict[str, threading.Lock] = {}
self._cache_version: Dict[str, int] = {}
@@ -186,6 +187,9 @@ class ObjectStorage:
self._cache_ttl = cache_ttl
self._object_cache_max_size = object_cache_max_size
self._object_key_max_length_bytes = object_key_max_length_bytes
self._sorted_key_cache: Dict[str, tuple[list[str], int]] = {}
self._meta_index_locks: Dict[str, threading.Lock] = {}
self._cleanup_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ParentCleanup")
def _get_bucket_lock(self, bucket_id: str) -> threading.Lock:
"""Get or create a lock for a specific bucket. Reduces global lock contention."""
@@ -243,10 +247,15 @@ class ObjectStorage:
raise BucketNotFoundError("Bucket does not exist")
cache_path = self._system_bucket_root(bucket_name) / "stats.json"
cached_stats = None
cache_fresh = False
if cache_path.exists():
try:
if time.time() - cache_path.stat().st_mtime < cache_ttl:
return json.loads(cache_path.read_text(encoding="utf-8"))
cache_fresh = time.time() - cache_path.stat().st_mtime < cache_ttl
cached_stats = json.loads(cache_path.read_text(encoding="utf-8"))
if cache_fresh:
return cached_stats
except (OSError, json.JSONDecodeError):
pass
@@ -255,6 +264,7 @@ class ObjectStorage:
version_count = 0
version_bytes = 0
try:
for path in bucket_path.rglob("*"):
if path.is_file():
rel = path.relative_to(bucket_path)
@@ -273,6 +283,14 @@ class ObjectStorage:
stat = path.stat()
version_count += 1
version_bytes += stat.st_size
except OSError:
if cached_stats is not None:
return cached_stats
raise
existing_serial = 0
if cached_stats is not None:
existing_serial = cached_stats.get("_cache_serial", 0)
stats = {
"objects": object_count,
@@ -281,6 +299,7 @@ class ObjectStorage:
"version_bytes": version_bytes,
"total_objects": object_count + version_count,
"total_bytes": total_bytes + version_bytes,
"_cache_serial": existing_serial,
}
try:
@@ -299,6 +318,39 @@ class ObjectStorage:
except OSError:
pass
def _update_bucket_stats_cache(
self,
bucket_id: str,
*,
bytes_delta: int = 0,
objects_delta: int = 0,
version_bytes_delta: int = 0,
version_count_delta: int = 0,
) -> None:
"""Incrementally update cached bucket statistics instead of invalidating.
This avoids expensive full directory scans on every PUT/DELETE by
adjusting the cached values directly. Also signals cross-process cache
invalidation by incrementing _cache_serial.
"""
cache_path = self._system_bucket_root(bucket_id) / "stats.json"
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
if cache_path.exists():
data = json.loads(cache_path.read_text(encoding="utf-8"))
else:
data = {"objects": 0, "bytes": 0, "version_count": 0, "version_bytes": 0, "total_objects": 0, "total_bytes": 0, "_cache_serial": 0}
data["objects"] = max(0, data.get("objects", 0) + objects_delta)
data["bytes"] = max(0, data.get("bytes", 0) + bytes_delta)
data["version_count"] = max(0, data.get("version_count", 0) + version_count_delta)
data["version_bytes"] = max(0, data.get("version_bytes", 0) + version_bytes_delta)
data["total_objects"] = max(0, data.get("total_objects", 0) + objects_delta + version_count_delta)
data["total_bytes"] = max(0, data.get("total_bytes", 0) + bytes_delta + version_bytes_delta)
data["_cache_serial"] = data.get("_cache_serial", 0) + 1
cache_path.write_text(json.dumps(data), encoding="utf-8")
except (OSError, json.JSONDecodeError):
pass
def delete_bucket(self, bucket_name: str) -> None:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
@@ -333,6 +385,8 @@ class ObjectStorage:
Returns:
ListObjectsResult with objects, truncation status, and continuation token
"""
import bisect
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
raise BucketNotFoundError("Bucket does not exist")
@@ -340,15 +394,26 @@ class ObjectStorage:
object_cache = self._get_object_cache(bucket_id, bucket_path)
cache_version = self._cache_version.get(bucket_id, 0)
cached_entry = self._sorted_key_cache.get(bucket_id)
if cached_entry and cached_entry[1] == cache_version:
all_keys = cached_entry[0]
else:
all_keys = sorted(object_cache.keys())
self._sorted_key_cache[bucket_id] = (all_keys, cache_version)
if prefix:
all_keys = [k for k in all_keys if k.startswith(prefix)]
lo = bisect.bisect_left(all_keys, prefix)
hi = len(all_keys)
for i in range(lo, len(all_keys)):
if not all_keys[i].startswith(prefix):
hi = i
break
all_keys = all_keys[lo:hi]
total_count = len(all_keys)
start_index = 0
if continuation_token:
import bisect
start_index = bisect.bisect_right(all_keys, continuation_token)
if start_index >= total_count:
return ListObjectsResult(
@@ -403,7 +468,9 @@ class ObjectStorage:
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
archived_version_size = 0
if self._is_versioning_enabled(bucket_path) and is_overwrite:
archived_version_size = existing_size
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
@@ -416,11 +483,10 @@ class ObjectStorage:
shutil.copyfileobj(_HashingReader(stream, checksum), target)
new_size = tmp_path.stat().st_size
if enforce_quota:
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
if enforce_quota:
quota_check = self.check_quota(
bucket_name,
additional_bytes=max(0, size_delta),
@@ -448,7 +514,13 @@ class ObjectStorage:
combined_meta = {**internal_meta, **(metadata or {})}
self._write_metadata(bucket_id, safe_key, combined_meta)
self._invalidate_bucket_stats_cache(bucket_id)
self._update_bucket_stats_cache(
bucket_id,
bytes_delta=size_delta,
objects_delta=object_delta,
version_bytes_delta=archived_version_size,
version_count_delta=1 if archived_version_size > 0 else 0,
)
obj_meta = ObjectMeta(
key=safe_key.as_posix(),
@@ -463,7 +535,7 @@ class ObjectStorage:
def get_object_path(self, bucket_name: str, object_key: str) -> Path:
path = self._object_path(bucket_name, object_key)
if not path.exists():
if not path.is_file():
raise ObjectNotFoundError("Object not found")
return path
@@ -475,11 +547,14 @@ class ObjectStorage:
return self._read_metadata(bucket_path.name, safe_key) or {}
def _cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
"""Remove empty parent directories up to (but not including) stop_at.
"""Remove empty parent directories in a background thread.
On Windows/OneDrive, directories may be locked briefly after file deletion.
This method retries with a small delay to handle that case.
Running this in the background avoids blocking the request thread with retries.
"""
self._cleanup_executor.submit(self._do_cleanup_empty_parents, path, stop_at)
def _do_cleanup_empty_parents(self, path: Path, stop_at: Path) -> None:
for parent in path.parents:
if parent == stop_at:
break
@@ -498,15 +573,24 @@ class ObjectStorage:
path = self._object_path(bucket_name, object_key)
if not path.exists():
return
deleted_size = path.stat().st_size
safe_key = path.relative_to(bucket_path)
bucket_id = bucket_path.name
archived_version_size = 0
if self._is_versioning_enabled(bucket_path):
archived_version_size = deleted_size
self._archive_current_version(bucket_id, safe_key, reason="delete")
rel = path.relative_to(bucket_path)
self._safe_unlink(path)
self._delete_metadata(bucket_id, rel)
self._invalidate_bucket_stats_cache(bucket_id)
self._update_bucket_stats_cache(
bucket_id,
bytes_delta=-deleted_size,
objects_delta=-1,
version_bytes_delta=archived_version_size,
version_count_delta=1 if archived_version_size > 0 else 0,
)
self._update_object_cache_entry(bucket_id, safe_key.as_posix(), None)
self._cleanup_empty_parents(path, bucket_path)
@@ -733,6 +817,10 @@ class ObjectStorage:
if not object_path.exists():
raise ObjectNotFoundError("Object does not exist")
entry = self._read_index_entry(bucket_path.name, safe_key)
if entry is not None:
tags = entry.get("tags")
return tags if isinstance(tags, list) else []
for meta_file in (self._metadata_file(bucket_path.name, safe_key), self._legacy_metadata_file(bucket_path.name, safe_key)):
if not meta_file.exists():
continue
@@ -756,30 +844,31 @@ class ObjectStorage:
if not object_path.exists():
raise ObjectNotFoundError("Object does not exist")
meta_file = self._metadata_file(bucket_path.name, safe_key)
existing_payload: Dict[str, Any] = {}
bucket_id = bucket_path.name
existing_entry = self._read_index_entry(bucket_id, safe_key) or {}
if not existing_entry:
meta_file = self._metadata_file(bucket_id, safe_key)
if meta_file.exists():
try:
existing_payload = json.loads(meta_file.read_text(encoding="utf-8"))
existing_entry = json.loads(meta_file.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
if tags:
existing_payload["tags"] = tags
existing_entry["tags"] = tags
else:
existing_payload.pop("tags", None)
existing_entry.pop("tags", None)
if existing_payload.get("metadata") or existing_payload.get("tags"):
meta_file.parent.mkdir(parents=True, exist_ok=True)
meta_file.write_text(json.dumps(existing_payload), encoding="utf-8")
elif meta_file.exists():
meta_file.unlink()
parent = meta_file.parent
meta_root = self._bucket_meta_root(bucket_path.name)
while parent != meta_root and parent.exists() and not any(parent.iterdir()):
parent.rmdir()
parent = parent.parent
if existing_entry.get("metadata") or existing_entry.get("tags"):
self._write_index_entry(bucket_id, safe_key, existing_entry)
else:
self._delete_index_entry(bucket_id, safe_key)
old_meta = self._metadata_file(bucket_id, safe_key)
try:
if old_meta.exists():
old_meta.unlink()
except OSError:
pass
def delete_object_tags(self, bucket_name: str, object_key: str) -> None:
"""Delete all tags from an object."""
@@ -828,7 +917,12 @@ class ObjectStorage:
if not isinstance(metadata, dict):
metadata = {}
destination = bucket_path / safe_key
if self._is_versioning_enabled(bucket_path) and destination.exists():
restored_size = data_path.stat().st_size
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
archived_version_size = 0
if self._is_versioning_enabled(bucket_path) and is_overwrite:
archived_version_size = existing_size
self._archive_current_version(bucket_id, safe_key, reason="restore-overwrite")
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(data_path, destination)
@@ -837,7 +931,13 @@ class ObjectStorage:
else:
self._delete_metadata(bucket_id, safe_key)
stat = destination.stat()
self._invalidate_bucket_stats_cache(bucket_id)
self._update_bucket_stats_cache(
bucket_id,
bytes_delta=restored_size - existing_size,
objects_delta=0 if is_overwrite else 1,
version_bytes_delta=archived_version_size,
version_count_delta=1 if archived_version_size > 0 else 0,
)
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
@@ -861,6 +961,7 @@ class ObjectStorage:
meta_path = legacy_version_dir / f"{version_id}.json"
if not data_path.exists() and not meta_path.exists():
raise StorageError(f"Version {version_id} not found")
deleted_version_size = data_path.stat().st_size if data_path.exists() else 0
if data_path.exists():
data_path.unlink()
if meta_path.exists():
@@ -868,6 +969,12 @@ class ObjectStorage:
parent = data_path.parent
if parent.exists() and not any(parent.iterdir()):
parent.rmdir()
if deleted_version_size > 0:
self._update_bucket_stats_cache(
bucket_id,
version_bytes_delta=-deleted_version_size,
version_count_delta=-1,
)
def list_orphaned_objects(self, bucket_name: str) -> List[Dict[str, Any]]:
bucket_path = self._bucket_path(bucket_name)
@@ -1167,11 +1274,11 @@ class ObjectStorage:
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
if enforce_quota:
size_delta = total_size - existing_size
object_delta = 0 if is_overwrite else 1
versioning_enabled = self._is_versioning_enabled(bucket_path)
if enforce_quota:
quota_check = self.check_quota(
bucket_name,
additional_bytes=max(0, size_delta),
@@ -1188,9 +1295,11 @@ class ObjectStorage:
lock_file_path = self._system_bucket_root(bucket_id) / "locks" / f"{safe_key.as_posix().replace('/', '_')}.lock"
archived_version_size = 0
try:
with _atomic_lock_file(lock_file_path):
if self._is_versioning_enabled(bucket_path) and destination.exists():
if versioning_enabled and destination.exists():
archived_version_size = destination.stat().st_size
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
checksum = hashlib.md5()
with destination.open("wb") as target:
@@ -1210,7 +1319,13 @@ class ObjectStorage:
shutil.rmtree(upload_root, ignore_errors=True)
self._invalidate_bucket_stats_cache(bucket_id)
self._update_bucket_stats_cache(
bucket_id,
bytes_delta=size_delta,
objects_delta=object_delta,
version_bytes_delta=archived_version_size,
version_count_delta=1 if archived_version_size > 0 else 0,
)
stat = destination.stat()
etag = checksum.hexdigest()
@@ -1420,7 +1535,7 @@ class ObjectStorage:
if entry.is_dir(follow_symlinks=False):
if check_newer(entry.path):
return True
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
elif entry.is_file(follow_symlinks=False) and (entry.name.endswith('.meta.json') or entry.name == '_index.json'):
if entry.stat().st_mtime > index_mtime:
return True
except OSError:
@@ -1434,6 +1549,7 @@ class ObjectStorage:
meta_str = str(meta_root)
meta_len = len(meta_str) + 1
meta_files: list[tuple[str, str]] = []
index_files: list[str] = []
def collect_meta_files(dir_path: str) -> None:
try:
@@ -1441,7 +1557,10 @@ class ObjectStorage:
for entry in it:
if entry.is_dir(follow_symlinks=False):
collect_meta_files(entry.path)
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
elif entry.is_file(follow_symlinks=False):
if entry.name == '_index.json':
index_files.append(entry.path)
elif entry.name.endswith('.meta.json'):
rel = entry.path[meta_len:]
key = rel[:-10].replace(os.sep, '/')
meta_files.append((key, entry.path))
@@ -1450,6 +1569,30 @@ class ObjectStorage:
collect_meta_files(meta_str)
meta_cache = {}
for idx_path in index_files:
try:
with open(idx_path, 'r', encoding='utf-8') as f:
idx_data = json.load(f)
rel_dir = idx_path[meta_len:]
rel_dir = rel_dir.replace(os.sep, '/')
if rel_dir.endswith('/_index.json'):
dir_prefix = rel_dir[:-len('/_index.json')]
else:
dir_prefix = ''
for entry_name, entry_data in idx_data.items():
if dir_prefix:
key = f"{dir_prefix}/{entry_name}"
else:
key = entry_name
meta = entry_data.get("metadata", {})
etag = meta.get("__etag__")
if etag:
meta_cache[key] = etag
except (OSError, json.JSONDecodeError):
pass
def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]:
key, path = item
try:
@@ -1467,14 +1610,15 @@ class ObjectStorage:
except (OSError, UnicodeDecodeError):
return key, None
if meta_files:
meta_cache = {}
max_workers = min((os.cpu_count() or 4) * 2, len(meta_files), 16)
legacy_meta_files = [(k, p) for k, p in meta_files if k not in meta_cache]
if legacy_meta_files:
max_workers = min((os.cpu_count() or 4) * 2, len(legacy_meta_files), 16)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for key, etag in executor.map(read_meta_file, meta_files):
for key, etag in executor.map(read_meta_file, legacy_meta_files):
if etag:
meta_cache[key] = etag
if meta_cache:
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
@@ -1523,38 +1667,46 @@ class ObjectStorage:
Uses LRU eviction to prevent unbounded cache growth.
Thread-safe with per-bucket locks to reduce contention.
Checks stats.json for cross-process cache invalidation.
"""
now = time.time()
current_stats_mtime = self._get_cache_marker_mtime(bucket_id)
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self._cache_ttl:
objects, timestamp, cached_stats_mtime = cached
if now - timestamp < self._cache_ttl and current_stats_mtime == cached_stats_mtime:
self._object_cache.move_to_end(bucket_id)
return objects
cache_version = self._cache_version.get(bucket_id, 0)
bucket_lock = self._get_bucket_lock(bucket_id)
with bucket_lock:
current_stats_mtime = self._get_cache_marker_mtime(bucket_id)
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self._cache_ttl:
objects, timestamp, cached_stats_mtime = cached
if now - timestamp < self._cache_ttl and current_stats_mtime == cached_stats_mtime:
self._object_cache.move_to_end(bucket_id)
return objects
objects = self._build_object_cache(bucket_path)
new_stats_mtime = self._get_cache_marker_mtime(bucket_id)
with self._cache_lock:
current_version = self._cache_version.get(bucket_id, 0)
if current_version != cache_version:
objects = self._build_object_cache(bucket_path)
new_stats_mtime = self._get_cache_marker_mtime(bucket_id)
while len(self._object_cache) >= self._object_cache_max_size:
self._object_cache.popitem(last=False)
self._object_cache[bucket_id] = (objects, time.time())
self._object_cache[bucket_id] = (objects, time.time(), new_stats_mtime)
self._object_cache.move_to_end(bucket_id)
self._cache_version[bucket_id] = current_version + 1
self._sorted_key_cache.pop(bucket_id, None)
return objects
@@ -1562,6 +1714,7 @@ class ObjectStorage:
"""Invalidate the object cache and etag index for a bucket.
Increments version counter to signal stale reads.
Cross-process invalidation is handled by checking stats.json mtime.
"""
with self._cache_lock:
self._object_cache.pop(bucket_id, None)
@@ -1573,19 +1726,37 @@ class ObjectStorage:
except OSError:
pass
def _get_cache_marker_mtime(self, bucket_id: str) -> float:
"""Get a cache marker combining serial and object count for cross-process invalidation.
Returns a combined value that changes if either _cache_serial or object count changes.
This handles cases where the serial was reset but object count differs.
"""
stats_path = self._system_bucket_root(bucket_id) / "stats.json"
try:
data = json.loads(stats_path.read_text(encoding="utf-8"))
serial = data.get("_cache_serial", 0)
count = data.get("objects", 0)
return float(serial * 1000000 + count)
except (OSError, json.JSONDecodeError):
return 0
def _update_object_cache_entry(self, bucket_id: str, key: str, meta: Optional[ObjectMeta]) -> None:
"""Update a single entry in the object cache instead of invalidating the whole cache.
This is a performance optimization - lazy update instead of full invalidation.
Cross-process invalidation is handled by checking stats.json mtime.
"""
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
objects, timestamp, stats_mtime = cached
if meta is None:
objects.pop(key, None)
else:
objects[key] = meta
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
self._sorted_key_cache.pop(bucket_id, None)
def warm_cache(self, bucket_names: Optional[List[str]] = None) -> None:
"""Pre-warm the object cache for specified buckets or all buckets.
@@ -1697,6 +1868,64 @@ class ObjectStorage:
meta_rel = Path(key.as_posix() + ".meta.json")
return meta_root / meta_rel
def _index_file_for_key(self, bucket_name: str, key: Path) -> tuple[Path, str]:
meta_root = self._bucket_meta_root(bucket_name)
parent = key.parent
entry_name = key.name
if parent == Path("."):
return meta_root / "_index.json", entry_name
return meta_root / parent / "_index.json", entry_name
def _get_meta_index_lock(self, index_path: str) -> threading.Lock:
with self._cache_lock:
if index_path not in self._meta_index_locks:
self._meta_index_locks[index_path] = threading.Lock()
return self._meta_index_locks[index_path]
def _read_index_entry(self, bucket_name: str, key: Path) -> Optional[Dict[str, Any]]:
index_path, entry_name = self._index_file_for_key(bucket_name, key)
if not index_path.exists():
return None
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
return index_data.get(entry_name)
except (OSError, json.JSONDecodeError):
return None
def _write_index_entry(self, bucket_name: str, key: Path, entry: Dict[str, Any]) -> None:
index_path, entry_name = self._index_file_for_key(bucket_name, key)
lock = self._get_meta_index_lock(str(index_path))
with lock:
index_path.parent.mkdir(parents=True, exist_ok=True)
index_data: Dict[str, Any] = {}
if index_path.exists():
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
index_data[entry_name] = entry
index_path.write_text(json.dumps(index_data), encoding="utf-8")
def _delete_index_entry(self, bucket_name: str, key: Path) -> None:
index_path, entry_name = self._index_file_for_key(bucket_name, key)
if not index_path.exists():
return
lock = self._get_meta_index_lock(str(index_path))
with lock:
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return
if entry_name in index_data:
del index_data[entry_name]
if index_data:
index_path.write_text(json.dumps(index_data), encoding="utf-8")
else:
try:
index_path.unlink()
except OSError:
pass
def _normalize_metadata(self, metadata: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
if not metadata:
return None
@@ -1708,9 +1937,13 @@ class ObjectStorage:
if not clean:
self._delete_metadata(bucket_name, key)
return
meta_file = self._metadata_file(bucket_name, key)
meta_file.parent.mkdir(parents=True, exist_ok=True)
meta_file.write_text(json.dumps({"metadata": clean}), encoding="utf-8")
self._write_index_entry(bucket_name, key, {"metadata": clean})
old_meta = self._metadata_file(bucket_name, key)
try:
if old_meta.exists():
old_meta.unlink()
except OSError:
pass
def _archive_current_version(self, bucket_name: str, key: Path, *, reason: str) -> None:
bucket_path = self._bucket_path(bucket_name)
@@ -1737,6 +1970,10 @@ class ObjectStorage:
manifest_path.write_text(json.dumps(record), encoding="utf-8")
def _read_metadata(self, bucket_name: str, key: Path) -> Dict[str, str]:
entry = self._read_index_entry(bucket_name, key)
if entry is not None:
data = entry.get("metadata")
return data if isinstance(data, dict) else {}
for meta_file in (self._metadata_file(bucket_name, key), self._legacy_metadata_file(bucket_name, key)):
if not meta_file.exists():
continue
@@ -1767,6 +2004,7 @@ class ObjectStorage:
raise StorageError(message) from last_error
def _delete_metadata(self, bucket_name: str, key: Path) -> None:
self._delete_index_entry(bucket_name, key)
locations = (
(self._metadata_file(bucket_name, key), self._bucket_meta_root(bucket_name)),
(self._legacy_metadata_file(bucket_name, key), self._legacy_meta_root(bucket_name)),

1105
app/ui.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
APP_VERSION = "0.2.5"
APP_VERSION = "0.2.8"
def get_version() -> str:

10
docs.md
View File

@@ -7,7 +7,7 @@ This document expands on the README to describe the full workflow for running, c
MyFSIO ships two Flask entrypoints that share the same storage, IAM, and bucket-policy state:
- **API server** Implements the S3-compatible REST API, policy evaluation, and Signature Version 4 presign service.
- **UI server** Provides the browser console for buckets, IAM, and policies. It proxies to the API for presign operations.
- **UI server** Provides the browser console for buckets, IAM, and policies. It proxies all storage operations through the S3 API via boto3 (SigV4-signed), mirroring the architecture used by MinIO and Garage.
Both servers read `AppConfig`, so editing JSON stores on disk instantly affects both surfaces.
@@ -136,7 +136,7 @@ All configuration is done via environment variables. The table below lists every
| `MAX_UPLOAD_SIZE` | `1073741824` (1 GiB) | Bytes. Caps incoming uploads in both API + UI. |
| `UI_PAGE_SIZE` | `100` | `MaxKeys` hint shown in listings. |
| `SECRET_KEY` | Auto-generated | Flask session key. Auto-generates and persists if not set. **Set explicitly in production.** |
| `API_BASE_URL` | `None` | Public URL for presigned URLs. Required behind proxies. |
| `API_BASE_URL` | `http://127.0.0.1:5000` | Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy. |
| `AWS_REGION` | `us-east-1` | Region embedded in SigV4 credential scope. |
| `AWS_SERVICE` | `s3` | Service string for SigV4. |
@@ -619,13 +619,15 @@ MyFSIO implements a comprehensive Identity and Access Management (IAM) system th
### Getting Started
1. On first boot, `data/.myfsio.sys/config/iam.json` is seeded with `localadmin / localadmin` that has wildcard access.
2. Sign into the UI using those credentials, then open **IAM**:
1. On first boot, `data/.myfsio.sys/config/iam.json` is created with a randomly generated admin user. The access key and secret key are printed to the console during first startup. If you miss it, check the `iam.json` file directly—credentials are stored in plaintext.
2. Sign into the UI using the generated credentials, then open **IAM**:
- **Create user**: supply a display name and optional JSON inline policy array.
- **Rotate secret**: generates a new secret key; the UI surfaces it once.
- **Policy editor**: select a user, paste an array of objects (`{"bucket": "*", "actions": ["list", "read"]}`), and submit. Alias support includes AWS-style verbs (e.g., `s3:GetObject`).
3. Wildcard action `iam:*` is supported for admin user definitions.
> **Breaking Change (v0.2.0+):** Previous versions used fixed default credentials (`localadmin/localadmin`). If upgrading from an older version, your existing credentials remain unchanged, but new installations will generate random credentials.
### Authentication
The API expects every request to include authentication headers. The UI persists them in the Flask session after login.

5
run.py
View File

@@ -5,6 +5,7 @@ import argparse
import os
import sys
import warnings
import multiprocessing
from multiprocessing import Process
from pathlib import Path
@@ -87,6 +88,10 @@ def serve_ui(port: int, prod: bool = False, config: Optional[AppConfig] = None)
if __name__ == "__main__":
multiprocessing.freeze_support()
if _is_frozen():
multiprocessing.set_start_method("spawn", force=True)
parser = argparse.ArgumentParser(description="Run the S3 clone services.")
parser.add_argument("--mode", choices=["api", "ui", "both"], default="both")
parser.add_argument("--api-port", type=int, default=5000)

View File

@@ -192,31 +192,86 @@ cat > "$INSTALL_DIR/myfsio.env" << EOF
# Generated by install.sh on $(date)
# Documentation: https://go.jzwsite.com/myfsio
# Storage paths
# =============================================================================
# STORAGE PATHS
# =============================================================================
STORAGE_ROOT=$DATA_DIR
LOG_DIR=$LOG_DIR
# Network
# =============================================================================
# NETWORK
# =============================================================================
APP_HOST=0.0.0.0
APP_PORT=$API_PORT
# Security - CHANGE IN PRODUCTION
SECRET_KEY=$SECRET_KEY
CORS_ORIGINS=*
# Public URL (set this if behind a reverse proxy)
# Public URL (set this if behind a reverse proxy for presigned URLs)
$(if [[ -n "$API_URL" ]]; then echo "API_BASE_URL=$API_URL"; else echo "# API_BASE_URL=https://s3.example.com"; fi)
# Logging
# =============================================================================
# SECURITY
# =============================================================================
# Secret key for session signing (auto-generated if not set)
SECRET_KEY=$SECRET_KEY
# CORS settings - restrict in production
CORS_ORIGINS=*
# Brute-force protection
AUTH_MAX_ATTEMPTS=5
AUTH_LOCKOUT_MINUTES=15
# Reverse proxy settings (set to number of trusted proxies in front)
# NUM_TRUSTED_PROXIES=1
# Allow internal admin endpoints (only enable on trusted networks)
# ALLOW_INTERNAL_ENDPOINTS=false
# Allowed hosts for redirects (comma-separated, empty = restrict all)
# ALLOWED_REDIRECT_HOSTS=
# =============================================================================
# LOGGING
# =============================================================================
LOG_LEVEL=INFO
LOG_TO_FILE=true
# Rate limiting
# =============================================================================
# RATE LIMITING
# =============================================================================
RATE_LIMIT_DEFAULT=200 per minute
# RATE_LIMIT_LIST_BUCKETS=60 per minute
# RATE_LIMIT_BUCKET_OPS=120 per minute
# RATE_LIMIT_OBJECT_OPS=240 per minute
# RATE_LIMIT_ADMIN=60 per minute
# Optional: Encryption (uncomment to enable)
# =============================================================================
# SERVER TUNING (0 = auto-detect based on system resources)
# =============================================================================
# SERVER_THREADS=0
# SERVER_CONNECTION_LIMIT=0
# SERVER_BACKLOG=0
# SERVER_CHANNEL_TIMEOUT=120
# =============================================================================
# ENCRYPTION (uncomment to enable)
# =============================================================================
# ENCRYPTION_ENABLED=true
# KMS_ENABLED=true
# =============================================================================
# SITE SYNC / REPLICATION (for multi-site deployments)
# =============================================================================
# SITE_ID=site-1
# SITE_ENDPOINT=https://s3-site1.example.com
# SITE_REGION=us-east-1
# SITE_SYNC_ENABLED=false
# =============================================================================
# OPTIONAL FEATURES
# =============================================================================
# LIFECYCLE_ENABLED=false
# METRICS_HISTORY_ENABLED=false
# OPERATION_METRICS_ENABLED=false
EOF
chmod 600 "$INSTALL_DIR/myfsio.env"
echo " [OK] Created $INSTALL_DIR/myfsio.env"
@@ -317,11 +372,36 @@ if [[ "$SKIP_SYSTEMD" != true ]]; then
fi
echo ""
sleep 2
echo " Waiting for service initialization..."
sleep 3
echo " Service Status:"
echo " ---------------"
if systemctl is-active --quiet myfsio; then
echo " [OK] MyFSIO is running"
IAM_FILE="$DATA_DIR/.myfsio.sys/config/iam.json"
if [[ -f "$IAM_FILE" ]]; then
echo ""
echo " ============================================"
echo " ADMIN CREDENTIALS (save these securely!)"
echo " ============================================"
if command -v jq &>/dev/null; then
ACCESS_KEY=$(jq -r '.users[0].access_key' "$IAM_FILE" 2>/dev/null)
SECRET_KEY=$(jq -r '.users[0].secret_key' "$IAM_FILE" 2>/dev/null)
else
ACCESS_KEY=$(grep -o '"access_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/')
SECRET_KEY=$(grep -o '"secret_key"[[:space:]]*:[[:space:]]*"[^"]*"' "$IAM_FILE" | head -1 | sed 's/.*"\([^"]*\)"$/\1/')
fi
if [[ -n "$ACCESS_KEY" && -n "$SECRET_KEY" ]]; then
echo " Access Key: $ACCESS_KEY"
echo " Secret Key: $SECRET_KEY"
else
echo " [!] Could not parse credentials from $IAM_FILE"
echo " Check the file manually or view service logs."
fi
echo " ============================================"
fi
else
echo " [WARNING] MyFSIO may not have started correctly"
echo " Check logs with: journalctl -u myfsio -f"
@@ -346,19 +426,26 @@ echo "Access Points:"
echo " API: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "localhost"):$API_PORT"
echo " UI: http://$(hostname -I 2>/dev/null | awk '{print $1}' || echo "localhost"):$UI_PORT/ui"
echo ""
echo "Default Credentials:"
echo " Username: localadmin"
echo " Password: localadmin"
echo " [!] WARNING: Change these immediately after first login!"
echo "Credentials:"
echo " Admin credentials were shown above (if service was started)."
echo " You can also find them in: $DATA_DIR/.myfsio.sys/config/iam.json"
echo ""
echo "Configuration Files:"
echo " Environment: $INSTALL_DIR/myfsio.env"
echo " IAM Users: $DATA_DIR/.myfsio.sys/config/iam.json"
echo " Bucket Policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json"
echo " Secret Key: $DATA_DIR/.myfsio.sys/config/.secret (auto-generated)"
echo ""
echo "Security Notes:"
echo " - Rate limiting is enabled by default (200 req/min)"
echo " - Brute-force protection: 5 attempts, 15 min lockout"
echo " - Set CORS_ORIGINS to specific domains in production"
echo " - Set NUM_TRUSTED_PROXIES if behind a reverse proxy"
echo ""
echo "Useful Commands:"
echo " Check status: sudo systemctl status myfsio"
echo " View logs: sudo journalctl -u myfsio -f"
echo " Validate config: $INSTALL_DIR/myfsio --check-config"
echo " Restart: sudo systemctl restart myfsio"
echo " Stop: sudo systemctl stop myfsio"
echo ""

View File

@@ -88,7 +88,8 @@ echo "The following items will be removed:"
echo ""
echo " Install directory: $INSTALL_DIR"
if [[ "$KEEP_DATA" != true ]]; then
echo " Data directory: $DATA_DIR (ALL YOUR DATA WILL BE DELETED!)"
echo " Data directory: $DATA_DIR"
echo " [!] ALL DATA, IAM USERS, AND ENCRYPTION KEYS WILL BE DELETED!"
else
echo " Data directory: $DATA_DIR (WILL BE KEPT)"
fi
@@ -227,8 +228,15 @@ echo ""
if [[ "$KEEP_DATA" == true ]]; then
echo "Your data has been preserved at: $DATA_DIR"
echo ""
echo "To reinstall MyFSIO with existing data, run:"
echo " curl -fsSL https://go.jzwsite.com/myfsio-install | sudo bash"
echo "Preserved files include:"
echo " - All buckets and objects"
echo " - IAM configuration: $DATA_DIR/.myfsio.sys/config/iam.json"
echo " - Bucket policies: $DATA_DIR/.myfsio.sys/config/bucket_policies.json"
echo " - Secret key: $DATA_DIR/.myfsio.sys/config/.secret"
echo " - Encryption keys: $DATA_DIR/.myfsio.sys/keys/ (if encryption was enabled)"
echo ""
echo "To reinstall MyFSIO with existing data:"
echo " ./install.sh --data-dir $DATA_DIR"
echo ""
fi

View File

@@ -1288,6 +1288,20 @@ html.sidebar-will-collapse .sidebar-user {
padding: 2rem 1rem;
}
#preview-text {
padding: 1rem 1.125rem;
max-height: 360px;
overflow: auto;
white-space: pre-wrap;
word-break: break-word;
font-family: 'SFMono-Regular', 'Menlo', 'Consolas', 'Liberation Mono', monospace;
font-size: .8rem;
line-height: 1.6;
tab-size: 4;
color: var(--myfsio-text);
background: transparent;
}
.upload-progress-stack {
display: flex;
flex-direction: column;

View File

@@ -101,6 +101,7 @@
const previewImage = document.getElementById('preview-image');
const previewVideo = document.getElementById('preview-video');
const previewAudio = document.getElementById('preview-audio');
const previewText = document.getElementById('preview-text');
const previewIframe = document.getElementById('preview-iframe');
const downloadButton = document.getElementById('downloadButton');
const presignButton = document.getElementById('presignButton');
@@ -182,6 +183,9 @@
let visibleItems = [];
let renderedRange = { start: 0, end: 0 };
let memoizedVisibleItems = null;
let memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null };
const createObjectRow = (obj, displayKey = null) => {
const tr = document.createElement('tr');
tr.dataset.objectRow = '';
@@ -340,7 +344,21 @@
}
};
const computeVisibleItems = () => {
const computeVisibleItems = (forceRecompute = false) => {
const currentInputs = {
objectCount: allObjects.length,
prefix: currentPrefix,
filterTerm: currentFilterTerm
};
if (!forceRecompute &&
memoizedVisibleItems !== null &&
memoizedInputs.objectCount === currentInputs.objectCount &&
memoizedInputs.prefix === currentInputs.prefix &&
memoizedInputs.filterTerm === currentInputs.filterTerm) {
return memoizedVisibleItems;
}
const items = [];
const folders = new Set();
@@ -381,6 +399,8 @@
return aKey.localeCompare(bKey);
});
memoizedVisibleItems = items;
memoizedInputs = currentInputs;
return items;
};
@@ -497,6 +517,9 @@
};
};
let lastStreamRenderTime = 0;
const STREAM_RENDER_THROTTLE_MS = 500;
const flushPendingStreamObjects = () => {
if (pendingStreamObjects.length === 0) return;
const batch = pendingStreamObjects.splice(0, pendingStreamObjects.length);
@@ -513,6 +536,19 @@
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()}${countText} loading...`;
}
}
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) {
const countText = totalObjectCount > 0 ? ` of ${totalObjectCount.toLocaleString()}` : '';
loadingText.textContent = `Loading ${loadedObjectCount.toLocaleString()}${countText} objects...`;
}
}
const now = performance.now();
if (!streamingComplete && now - lastStreamRenderTime < STREAM_RENDER_THROTTLE_MS) {
streamRenderScheduled = false;
return;
}
lastStreamRenderTime = now;
refreshVirtualList();
streamRenderScheduled = false;
};
@@ -533,7 +569,10 @@
loadedObjectCount = 0;
totalObjectCount = 0;
allObjects = [];
memoizedVisibleItems = null;
memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null };
pendingStreamObjects = [];
lastStreamRenderTime = 0;
streamAbortController = new AbortController();
@@ -548,7 +587,10 @@
throw new Error(`HTTP ${response.status}`);
}
if (objectsLoadingRow) objectsLoadingRow.remove();
if (objectsLoadingRow) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) loadingText.textContent = 'Receiving objects...';
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
@@ -576,6 +618,10 @@
break;
case 'count':
totalObjectCount = msg.total_count || 0;
if (objectsLoadingRow) {
const loadingText = objectsLoadingRow.querySelector('p');
if (loadingText) loadingText.textContent = `Loading 0 of ${totalObjectCount.toLocaleString()} objects...`;
}
break;
case 'object':
pendingStreamObjects.push(processStreamObject(msg));
@@ -609,11 +655,16 @@
} catch (e) { }
}
flushPendingStreamObjects();
streamingComplete = true;
flushPendingStreamObjects();
hasMoreObjects = false;
totalObjectCount = loadedObjectCount;
updateObjectCountBadge();
if (objectsLoadingRow && objectsLoadingRow.parentNode) {
objectsLoadingRow.remove();
}
if (loadMoreStatus) {
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`;
}
@@ -643,6 +694,8 @@
loadedObjectCount = 0;
totalObjectCount = 0;
allObjects = [];
memoizedVisibleItems = null;
memoizedInputs = { objectCount: -1, prefix: null, filterTerm: null };
}
if (append && loadMoreSpinner) {
@@ -985,13 +1038,15 @@
};
const navigateToFolder = (prefix) => {
if (streamAbortController) {
streamAbortController.abort();
streamAbortController = null;
}
currentPrefix = prefix;
if (scrollContainer) scrollContainer.scrollTop = 0;
refreshVirtualList();
renderBreadcrumb(prefix);
selectedRows.clear();
if (typeof updateBulkDeleteState === 'function') {
@@ -1001,6 +1056,9 @@
if (previewPanel) previewPanel.classList.add('d-none');
if (previewEmpty) previewEmpty.classList.remove('d-none');
activeRow = null;
isLoadingObjects = false;
loadObjects(false);
};
const renderObjectsView = () => {
@@ -1838,6 +1896,10 @@
el.setAttribute('src', 'about:blank');
}
});
if (previewText) {
previewText.classList.add('d-none');
previewText.textContent = '';
}
previewPlaceholder.classList.remove('d-none');
};
@@ -1901,11 +1963,28 @@
previewIframe.style.minHeight = '500px';
previewIframe.classList.remove('d-none');
previewPlaceholder.classList.add('d-none');
} else if (previewUrl && lower.match(/\.(txt|log|json|md|csv|xml|html|htm|js|ts|py|java|c|cpp|h|css|scss|yaml|yml|toml|ini|cfg|conf|sh|bat)$/)) {
previewIframe.src = previewUrl;
previewIframe.style.minHeight = '200px';
previewIframe.classList.remove('d-none');
} else if (previewUrl && previewText && lower.match(/\.(txt|log|json|md|csv|xml|html|htm|js|ts|py|java|c|cpp|h|css|scss|yaml|yml|toml|ini|cfg|conf|sh|bat|rs|go|rb|php|sql|r|swift|kt|scala|pl|lua|zig|ex|exs|hs|erl|ps1|psm1|psd1|fish|zsh|env|properties|gradle|makefile|dockerfile|vagrantfile|gitignore|gitattributes|editorconfig|eslintrc|prettierrc)$/)) {
previewText.textContent = 'Loading\u2026';
previewText.classList.remove('d-none');
previewPlaceholder.classList.add('d-none');
const currentRow = row;
fetch(previewUrl)
.then((r) => {
if (!r.ok) throw new Error(r.statusText);
const len = parseInt(r.headers.get('Content-Length') || '0', 10);
if (len > 512 * 1024) {
return r.text().then((t) => t.slice(0, 512 * 1024) + '\n\n--- Truncated (file too large for preview) ---');
}
return r.text();
})
.then((text) => {
if (activeRow !== currentRow) return;
previewText.textContent = text;
})
.catch(() => {
if (activeRow !== currentRow) return;
previewText.textContent = 'Failed to load preview';
});
}
const metadataUrl = row.dataset.metadataUrl;

View File

@@ -321,7 +321,8 @@
<img id="preview-image" class="img-fluid d-none w-100" alt="Object preview" style="display: block;" />
<video id="preview-video" class="w-100 d-none" controls style="display: block;"></video>
<audio id="preview-audio" class="w-100 d-none" controls style="display: block;"></audio>
<iframe id="preview-iframe" class="w-100 d-none" loading="lazy" style="min-height: 200px;"></iframe>
<pre id="preview-text" class="w-100 d-none m-0"></pre>
<iframe id="preview-iframe" class="w-100 d-none" style="min-height: 200px;"></iframe>
</div>
</div>
</div>

View File

@@ -141,7 +141,7 @@
let visibleCount = 0;
bucketItems.forEach(item => {
const name = item.querySelector('.card-title').textContent.toLowerCase();
const name = item.querySelector('.bucket-name').textContent.toLowerCase();
if (name.includes(term)) {
item.classList.remove('d-none');
visibleCount++;

View File

@@ -97,8 +97,8 @@ python run.py --mode ui
<tbody>
<tr>
<td><code>API_BASE_URL</code></td>
<td><code>None</code></td>
<td>The public URL of the API. <strong>Required</strong> if running behind a proxy. Ensures presigned URLs are generated correctly.</td>
<td><code>http://127.0.0.1:5000</code></td>
<td>Internal S3 API URL used by the web UI proxy. Also used for presigned URL generation. Set to your public URL if running behind a reverse proxy.</td>
</tr>
<tr>
<td><code>STORAGE_ROOT</code></td>
@@ -451,10 +451,10 @@ sudo journalctl -u myfsio -f # View logs</code></pre>
<span class="docs-section-kicker">03</span>
<h2 class="h4 mb-0">Authenticate &amp; manage IAM</h2>
</div>
<p class="text-muted">MyFSIO seeds <code>data/.myfsio.sys/config/iam.json</code> with <code>localadmin/localadmin</code>. Sign in once, rotate it, then grant least-privilege access to teammates and tools.</p>
<p class="text-muted">On first startup, MyFSIO generates random admin credentials and prints them to the console. Missed it? Check <code>data/.myfsio.sys/config/iam.json</code> directly—credentials are stored in plaintext.</p>
<div class="docs-highlight mb-3">
<ol class="mb-0">
<li>Visit <code>/ui/login</code>, enter the bootstrap credentials, and rotate them immediately from the IAM page.</li>
<li>Check the console output (or <code>iam.json</code>) for the generated <code>Access Key</code> and <code>Secret Key</code>, then visit <code>/ui/login</code>.</li>
<li>Create additional users with descriptive display names and AWS-style inline policies (for example <code>{"bucket": "*", "actions": ["list", "read"]}</code>).</li>
<li>Rotate secrets when sharing with CI jobs—new secrets display once and persist to <code>data/.myfsio.sys/config/iam.json</code>.</li>
<li>Bucket policies layer on top of IAM. Apply Private/Public presets or paste custom JSON; changes reload instantly.</li>
@@ -2136,8 +2136,8 @@ curl -X PUT "{{ api_base }}/&lt;bucket&gt;?tagging" \
<code class="d-block">{{ api_base }}</code>
</div>
<div>
<div class="small text-uppercase text-muted">Sample user</div>
<code class="d-block">localadmin / localadmin</code>
<div class="small text-uppercase text-muted">Initial credentials</div>
<span class="text-muted small">Generated on first run (check console)</span>
</div>
<div>
<div class="small text-uppercase text-muted">Logs</div>

View File

@@ -398,6 +398,14 @@
<option value="24" selected>Last 24 hours</option>
<option value="168">Last 7 days</option>
</select>
<select class="form-select form-select-sm" id="maxDataPoints" style="width: auto;" title="Maximum data points to display">
<option value="100">100 points</option>
<option value="250">250 points</option>
<option value="500" selected>500 points</option>
<option value="1000">1000 points</option>
<option value="2000">2000 points</option>
<option value="0">Unlimited</option>
</select>
</div>
</div>
<div class="card-body p-4">
@@ -817,8 +825,8 @@
var diskChart = null;
var historyStatus = document.getElementById('historyStatus');
var timeRangeSelect = document.getElementById('historyTimeRange');
var maxDataPointsSelect = document.getElementById('maxDataPoints');
var historyTimer = null;
var MAX_DATA_POINTS = 500;
function createChart(ctx, label, color) {
return new Chart(ctx, {
@@ -889,7 +897,8 @@
if (historyStatus) historyStatus.textContent = 'No history data available yet. Data is recorded every ' + (data.interval_minutes || 5) + ' minutes.';
return;
}
var history = data.history.slice(-MAX_DATA_POINTS);
var maxPoints = maxDataPointsSelect ? parseInt(maxDataPointsSelect.value, 10) : 500;
var history = maxPoints > 0 ? data.history.slice(-maxPoints) : data.history;
var labels = history.map(function(h) { return formatTime(h.timestamp); });
var cpuData = history.map(function(h) { return h.cpu_percent; });
var memData = history.map(function(h) { return h.memory_percent; });
@@ -927,6 +936,10 @@
timeRangeSelect.addEventListener('change', loadHistory);
}
if (maxDataPointsSelect) {
maxDataPointsSelect.addEventListener('change', loadHistory);
}
document.addEventListener('visibilitychange', function() {
if (document.hidden) {
if (historyTimer) clearInterval(historyTimer);

View File

@@ -1,8 +1,12 @@
import io
import json
import threading
from pathlib import Path
from werkzeug.serving import make_server
from app import create_app
from app.s3_client import S3ProxyClient
def _build_app(tmp_path: Path):
@@ -26,13 +30,32 @@ def _build_app(tmp_path: Path):
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://localhost",
"API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing",
"WTF_CSRF_ENABLED": False,
}
)
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
def _login(client):
return client.post(
"/ui/login",
@@ -43,6 +66,7 @@ def _login(client):
def test_bulk_delete_json_route(tmp_path: Path):
app = _build_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("demo")
storage.put_object("demo", "first.txt", io.BytesIO(b"first"))
@@ -64,10 +88,13 @@ def test_bulk_delete_json_route(tmp_path: Path):
listing = storage.list_objects_all("demo")
assert {meta.key for meta in listing} == {"second.txt"}
finally:
_shutdown_app(app)
def test_bulk_delete_validation(tmp_path: Path):
app = _build_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("demo")
storage.put_object("demo", "keep.txt", io.BytesIO(b"keep"))
@@ -94,3 +121,5 @@ def test_bulk_delete_validation(tmp_path: Path):
still_there = storage.list_objects_all("demo")
assert {meta.key for meta in still_there} == {"keep.txt"}
finally:
_shutdown_app(app)

View File

@@ -1,10 +1,13 @@
"""Tests for UI-based encryption configuration."""
import json
import threading
from pathlib import Path
import pytest
from werkzeug.serving import make_server
from app import create_app
from app.s3_client import S3ProxyClient
def get_csrf_token(response):
@@ -43,9 +46,10 @@ def _make_encryption_app(tmp_path: Path, *, kms_enabled: bool = True):
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing",
"ENCRYPTION_ENABLED": True,
"WTF_CSRF_ENABLED": False,
}
if kms_enabled:
@@ -54,17 +58,37 @@ def _make_encryption_app(tmp_path: Path, *, kms_enabled: bool = True):
config["ENCRYPTION_MASTER_KEY_PATH"] = str(tmp_path / "master.key")
app = create_app(config)
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
class TestUIBucketEncryption:
"""Test bucket encryption configuration via UI."""
def test_bucket_detail_shows_encryption_card(self, tmp_path):
"""Encryption card should be visible on bucket detail page."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
@@ -75,21 +99,20 @@ class TestUIBucketEncryption:
html = response.data.decode("utf-8")
assert "Default Encryption" in html
assert "Encryption Algorithm" in html or "Default encryption disabled" in html
finally:
_shutdown_app(app)
def test_enable_aes256_encryption(self, tmp_path):
"""Should be able to enable AES-256 encryption."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
@@ -99,12 +122,13 @@ class TestUIBucketEncryption:
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "AES-256" in html or "encryption enabled" in html.lower()
finally:
_shutdown_app(app)
def test_enable_kms_encryption(self, tmp_path):
"""Should be able to enable KMS encryption."""
app = _make_encryption_app(tmp_path, kms_enabled=True)
client = app.test_client()
try:
with app.app_context():
kms = app.extensions.get("kms")
if kms:
@@ -113,15 +137,12 @@ class TestUIBucketEncryption:
else:
pytest.skip("KMS not available")
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "aws:kms",
"kms_key_id": key_id,
@@ -132,33 +153,28 @@ class TestUIBucketEncryption:
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "KMS" in html or "encryption enabled" in html.lower()
finally:
_shutdown_app(app)
def test_disable_encryption(self, tmp_path):
"""Should be able to disable encryption."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "disable",
},
follow_redirects=True,
@@ -167,21 +183,20 @@ class TestUIBucketEncryption:
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "disabled" in html.lower() or "Default encryption disabled" in html
finally:
_shutdown_app(app)
def test_invalid_algorithm_rejected(self, tmp_path):
"""Invalid encryption algorithm should be rejected."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "INVALID",
},
@@ -191,21 +206,20 @@ class TestUIBucketEncryption:
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Invalid" in html or "danger" in html
finally:
_shutdown_app(app)
def test_encryption_persists_in_config(self, tmp_path):
"""Encryption config should persist in bucket config."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
@@ -217,7 +231,9 @@ class TestUIBucketEncryption:
assert "Rules" in config
assert len(config["Rules"]) == 1
assert config["Rules"][0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256"
assert config["Rules"][0]["SSEAlgorithm"] == "AES256"
finally:
_shutdown_app(app)
class TestUIEncryptionWithoutPermission:
@@ -226,17 +242,14 @@ class TestUIEncryptionWithoutPermission:
def test_readonly_user_cannot_change_encryption(self, tmp_path):
"""Read-only user should not be able to change encryption settings."""
app = _make_encryption_app(tmp_path)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "readonly", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/test-bucket?tab=properties")
csrf_token = get_csrf_token(response)
response = client.post(
"/ui/buckets/test-bucket/encryption",
data={
"csrf_token": csrf_token,
"action": "enable",
"algorithm": "AES256",
},
@@ -246,3 +259,5 @@ class TestUIEncryptionWithoutPermission:
assert response.status_code == 200
html = response.data.decode("utf-8")
assert "Access denied" in html or "permission" in html.lower() or "not authorized" in html.lower()
finally:
_shutdown_app(app)

View File

@@ -1,15 +1,18 @@
"""Tests for UI pagination of bucket objects."""
import json
import threading
from io import BytesIO
from pathlib import Path
import pytest
from werkzeug.serving import make_server
from app import create_app
from app.s3_client import S3ProxyClient
def _make_app(tmp_path: Path):
"""Create an app for testing."""
"""Create an app for testing with a live API server."""
storage_root = tmp_path / "data"
iam_config = tmp_path / "iam.json"
bucket_policies = tmp_path / "bucket_policies.json"
@@ -33,29 +36,46 @@ def _make_app(tmp_path: Path):
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://127.0.0.1:0",
}
)
server = make_server("127.0.0.1", 0, flask_app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
flask_app.config["API_BASE_URL"] = api_url
flask_app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
flask_app._test_server = server
flask_app._test_thread = thread
return flask_app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
class TestPaginatedObjectListing:
"""Test paginated object listing API."""
def test_objects_api_returns_paginated_results(self, tmp_path):
"""Objects API should return paginated results."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create 10 test objects
for i in range(10):
storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
with app.test_client() as client:
# Login first
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# Request first page of 3 objects
resp = client.get("/ui/buckets/test-bucket/objects?max_keys=3")
assert resp.status_code == 200
@@ -63,22 +83,22 @@ class TestPaginatedObjectListing:
assert len(data["objects"]) == 3
assert data["is_truncated"] is True
assert data["next_continuation_token"] is not None
assert data["total_count"] == 10
finally:
_shutdown_app(app)
def test_objects_api_pagination_continuation(self, tmp_path):
"""Objects API should support continuation tokens."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create 5 test objects
for i in range(5):
storage.put_object("test-bucket", f"file{i:02d}.txt", BytesIO(b"content"))
with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# Get first page
resp = client.get("/ui/buckets/test-bucket/objects?max_keys=2")
assert resp.status_code == 200
data = resp.get_json()
@@ -87,7 +107,6 @@ class TestPaginatedObjectListing:
assert len(first_page_keys) == 2
assert data["is_truncated"] is True
# Get second page
token = data["next_continuation_token"]
resp = client.get(f"/ui/buckets/test-bucket/objects?max_keys=2&continuation_token={token}")
assert resp.status_code == 200
@@ -96,16 +115,17 @@ class TestPaginatedObjectListing:
second_page_keys = [obj["key"] for obj in data["objects"]]
assert len(second_page_keys) == 2
# No overlap between pages
assert set(first_page_keys).isdisjoint(set(second_page_keys))
finally:
_shutdown_app(app)
def test_objects_api_prefix_filter(self, tmp_path):
"""Objects API should support prefix filtering."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create objects with different prefixes
storage.put_object("test-bucket", "logs/access.log", BytesIO(b"log"))
storage.put_object("test-bucket", "logs/error.log", BytesIO(b"log"))
storage.put_object("test-bucket", "data/file.txt", BytesIO(b"data"))
@@ -113,7 +133,6 @@ class TestPaginatedObjectListing:
with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# Filter by prefix
resp = client.get("/ui/buckets/test-bucket/objects?prefix=logs/")
assert resp.status_code == 200
data = resp.get_json()
@@ -121,23 +140,27 @@ class TestPaginatedObjectListing:
keys = [obj["key"] for obj in data["objects"]]
assert all(k.startswith("logs/") for k in keys)
assert len(keys) == 2
finally:
_shutdown_app(app)
def test_objects_api_requires_authentication(self, tmp_path):
"""Objects API should require login."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
with app.test_client() as client:
# Don't login
resp = client.get("/ui/buckets/test-bucket/objects")
# Should redirect to login
assert resp.status_code == 302
assert "/ui/login" in resp.headers.get("Location", "")
finally:
_shutdown_app(app)
def test_objects_api_returns_object_metadata(self, tmp_path):
"""Objects API should return complete object metadata."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
storage.put_object("test-bucket", "test.txt", BytesIO(b"test content"))
@@ -152,38 +175,38 @@ class TestPaginatedObjectListing:
assert len(data["objects"]) == 1
obj = data["objects"][0]
# Check all expected fields
assert obj["key"] == "test.txt"
assert obj["size"] == 12 # len("test content")
assert obj["size"] == 12
assert "last_modified" in obj
assert "last_modified_display" in obj
assert "etag" in obj
# URLs are now returned as templates (not per-object) for performance
assert "url_templates" in data
templates = data["url_templates"]
assert "preview" in templates
assert "download" in templates
assert "delete" in templates
assert "KEY_PLACEHOLDER" in templates["preview"]
finally:
_shutdown_app(app)
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
"""Bucket detail page should load even with many objects."""
app = _make_app(tmp_path)
try:
storage = app.extensions["object_storage"]
storage.create_bucket("test-bucket")
# Create many objects
for i in range(100):
storage.put_object("test-bucket", f"file{i:03d}.txt", BytesIO(b"x"))
with app.test_client() as client:
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
# The page should load quickly (objects loaded via JS)
resp = client.get("/ui/buckets/test-bucket")
assert resp.status_code == 200
html = resp.data.decode("utf-8")
# Should have the JavaScript loading infrastructure (external JS file)
assert "bucket-detail-main.js" in html
finally:
_shutdown_app(app)

View File

@@ -1,10 +1,13 @@
import io
import json
import threading
from pathlib import Path
import pytest
from werkzeug.serving import make_server
from app import create_app
from app.s3_client import S3ProxyClient
DENY_LIST_ALLOW_GET_POLICY = {
@@ -47,11 +50,25 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing",
"UI_ENFORCE_BUCKET_POLICIES": enforce_policies,
"WTF_CSRF_ENABLED": False,
}
)
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
storage = app.extensions["object_storage"]
storage.create_bucket("testbucket")
storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
@@ -60,9 +77,16 @@ def _make_ui_app(tmp_path: Path, *, enforce_policies: bool):
return app
def _shutdown_app(app):
if hasattr(app, "_test_server"):
app._test_server.shutdown()
app._test_thread.join(timeout=2)
@pytest.mark.parametrize("enforce", [True, False])
def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool):
app = _make_ui_app(tmp_path, enforce_policies=enforce)
try:
client = app.test_client()
client.post("/ui/login", data={"access_key": "test", "secret_key": "secret"}, follow_redirects=True)
response = client.get("/ui/buckets/testbucket", follow_redirects=True)
@@ -71,11 +95,10 @@ def test_ui_bucket_policy_enforcement_toggle(tmp_path: Path, enforce: bool):
else:
assert response.status_code == 200
assert b"Access denied by bucket policy" not in response.data
# Objects are now loaded via async API - check the objects endpoint
objects_response = client.get("/ui/buckets/testbucket/objects")
assert objects_response.status_code == 200
data = objects_response.get_json()
assert any(obj["key"] == "vid.mp4" for obj in data["objects"])
assert objects_response.status_code == 403
finally:
_shutdown_app(app)
def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
@@ -99,10 +122,25 @@ def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
"STORAGE_ROOT": storage_root,
"IAM_CONFIG": iam_config,
"BUCKET_POLICY_PATH": bucket_policies,
"API_BASE_URL": "http://testserver",
"API_BASE_URL": "http://127.0.0.1:0",
"SECRET_KEY": "testing",
"WTF_CSRF_ENABLED": False,
}
)
server = make_server("127.0.0.1", 0, app)
host, port = server.server_address
api_url = f"http://{host}:{port}"
app.config["API_BASE_URL"] = api_url
app.extensions["s3_proxy"] = S3ProxyClient(api_base_url=api_url)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
app._test_server = server
app._test_thread = thread
try:
storage = app.extensions["object_storage"]
storage.create_bucket("testbucket")
storage.put_object("testbucket", "vid.mp4", io.BytesIO(b"video"))
@@ -114,8 +152,7 @@ def test_ui_bucket_policy_disabled_by_default(tmp_path: Path):
response = client.get("/ui/buckets/testbucket", follow_redirects=True)
assert response.status_code == 200
assert b"Access denied by bucket policy" not in response.data
# Objects are now loaded via async API - check the objects endpoint
objects_response = client.get("/ui/buckets/testbucket/objects")
assert objects_response.status_code == 200
data = objects_response.get_json()
assert any(obj["key"] == "vid.mp4" for obj in data["objects"])
assert objects_response.status_code == 403
finally:
_shutdown_app(app)