20 Commits

Author SHA1 Message Date
a2745ff2ee Merge pull request 'MyFSIO v0.1.8 Release' (#9) from next into main
Reviewed-on: #9
2025-12-23 06:01:32 +00:00
9165e365e6 Comment cleanup 2025-12-23 13:57:13 +08:00
01e26754e8 Add option to display custom timezone; Fix timezone inconsistencies 2025-12-23 13:48:02 +08:00
b592fa9fdb Fixed replication issue - clean up debug 2025-12-23 13:37:51 +08:00
cd9734b398 Debug replication corruption issue - Fix attempt 2025-12-23 13:24:05 +08:00
90893cac27 Debug replication corruption issue - check if it's boto3 issue 2025-12-23 12:02:26 +08:00
6e659902bd Addd header debugging for replication issue 2025-12-23 11:55:47 +08:00
39a707ecbc Add additional debugging for replication issue 2025-12-23 11:49:51 +08:00
4199f8e6c7 Add debugging for replication issue 2025-12-23 11:43:29 +08:00
adc6770273 Improve object browser search filter; Test: Fix replication GIF issue 2025-12-23 11:31:32 +08:00
f5451c162b Improve object storage performance via caching 2025-12-22 17:03:33 +08:00
aab9ef696a Fix race condition in replication 2025-12-22 14:14:04 +08:00
28cb656d94 Merge pull request 'MyFSIO v0.1.7 Release' (#8) from next into main
Reviewed-on: #8
2025-12-22 03:10:35 +00:00
3c44152fc6 Merge pull request 'MyFSIO v0.1.6 Release' (#7) from next into main
Reviewed-on: #7
2025-12-21 06:30:21 +00:00
397515edce Merge pull request 'MyFSIO v0.1.5 Release' (#6) from next into main
Reviewed-on: #6
2025-12-13 15:41:03 +00:00
980fced7e4 Merge pull request 'MyFSIO v0.1.4 Release' (#5) from next into main
Reviewed-on: #5
2025-12-13 08:22:43 +00:00
bae5009ec4 Merge pull request 'Release v0.1.3' (#4) from next into main
Reviewed-on: #4
2025-12-03 04:14:57 +00:00
233780617f Merge pull request 'Release V0.1.2' (#3) from next into main
Reviewed-on: #3
2025-11-26 04:59:15 +00:00
fd8fb21517 Merge pull request 'Prepare for binary release' (#2) from next into main
Reviewed-on: #2
2025-11-22 12:33:38 +00:00
c6cbe822e1 Merge pull request 'Release v0.1.1' (#1) from next into main
Reviewed-on: #1
2025-11-22 12:31:27 +00:00
13 changed files with 984 additions and 541 deletions

View File

@@ -45,7 +45,6 @@ def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path:
try:
shutil.move(str(legacy_path), str(active_path))
except OSError:
# Fall back to copy + delete if move fails (e.g., cross-device)
shutil.copy2(legacy_path, active_path)
try:
legacy_path.unlink(missing_ok=True)
@@ -101,32 +100,28 @@ def create_app(
bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"]))
secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300))
# Initialize Replication components
# Store config files in the system config directory for consistency
storage_root = Path(app.config["STORAGE_ROOT"])
config_dir = storage_root / ".myfsio.sys" / "config"
config_dir.mkdir(parents=True, exist_ok=True)
# Define paths with migration from legacy locations
connections_path = _migrate_config_file(
active_path=config_dir / "connections.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "connections.json", # Previous location
storage_root / ".connections.json", # Original legacy location
storage_root / ".myfsio.sys" / "connections.json",
storage_root / ".connections.json",
],
)
replication_rules_path = _migrate_config_file(
active_path=config_dir / "replication_rules.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "replication_rules.json", # Previous location
storage_root / ".replication_rules.json", # Original legacy location
storage_root / ".myfsio.sys" / "replication_rules.json",
storage_root / ".replication_rules.json",
],
)
connections = ConnectionStore(connections_path)
replication = ReplicationManager(storage, connections, replication_rules_path)
# Initialize encryption and KMS
encryption_config = {
"encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False),
"encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"),
@@ -141,7 +136,6 @@ def create_app(
kms_manager = KMSManager(kms_keys_path, kms_master_key_path)
encryption_manager.set_kms_provider(kms_manager)
# Wrap storage with encryption layer if encryption is enabled
if app.config.get("ENCRYPTION_ENABLED", False):
from .encrypted_storage import EncryptedObjectStorage
storage = EncryptedObjectStorage(storage, encryption_manager)
@@ -177,13 +171,22 @@ def create_app(
@app.template_filter("timestamp_to_datetime")
def timestamp_to_datetime(value: float) -> str:
"""Format Unix timestamp as human-readable datetime."""
from datetime import datetime
"""Format Unix timestamp as human-readable datetime in configured timezone."""
from datetime import datetime, timezone as dt_timezone
from zoneinfo import ZoneInfo
if not value:
return "Never"
try:
dt = datetime.fromtimestamp(value)
return dt.strftime("%Y-%m-%d %H:%M:%S")
dt_utc = datetime.fromtimestamp(value, dt_timezone.utc)
display_tz = app.config.get("DISPLAY_TIMEZONE", "UTC")
if display_tz and display_tz != "UTC":
try:
tz = ZoneInfo(display_tz)
dt_local = dt_utc.astimezone(tz)
return dt_local.strftime("%Y-%m-%d %H:%M:%S")
except (KeyError, ValueError):
pass
return dt_utc.strftime("%Y-%m-%d %H:%M:%S UTC")
except (ValueError, OSError):
return "Unknown"
@@ -244,7 +247,7 @@ def _configure_cors(app: Flask) -> None:
class _RequestContextFilter(logging.Filter):
"""Inject request-specific attributes into log records."""
def filter(self, record: logging.LogRecord) -> bool: # pragma: no cover - simple boilerplate
def filter(self, record: logging.LogRecord) -> bool:
if has_request_context():
record.request_id = getattr(g, "request_id", "-")
record.path = request.path

View File

@@ -188,7 +188,6 @@ class BucketPolicyStore:
except FileNotFoundError:
return None
# ------------------------------------------------------------------
def evaluate(
self,
access_key: Optional[str],
@@ -229,7 +228,6 @@ class BucketPolicyStore:
self._policies.pop(bucket, None)
self._persist()
# ------------------------------------------------------------------
def _load(self) -> None:
try:
content = self.policy_path.read_text(encoding='utf-8')

View File

@@ -73,6 +73,7 @@ class AppConfig:
kms_enabled: bool
kms_keys_path: Path
default_encryption_algorithm: str
display_timezone: str
@classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -153,15 +154,15 @@ class AppConfig:
cors_allow_headers = _csv(str(_get("CORS_ALLOW_HEADERS", "*")), ["*"])
cors_expose_headers = _csv(str(_get("CORS_EXPOSE_HEADERS", "*")), ["*"])
session_lifetime_days = int(_get("SESSION_LIFETIME_DAYS", 30))
bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60)) # Default 60 seconds
bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60))
# Encryption settings
encryption_enabled = str(_get("ENCRYPTION_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
encryption_keys_dir = storage_root / ".myfsio.sys" / "keys"
encryption_master_key_path = Path(_get("ENCRYPTION_MASTER_KEY_PATH", encryption_keys_dir / "master.key")).resolve()
kms_enabled = str(_get("KMS_ENABLED", "0")).lower() in {"1", "true", "yes", "on"}
kms_keys_path = Path(_get("KMS_KEYS_PATH", encryption_keys_dir / "kms_keys.json")).resolve()
default_encryption_algorithm = str(_get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256"))
display_timezone = str(_get("DISPLAY_TIMEZONE", "UTC"))
return cls(storage_root=storage_root,
max_upload_size=max_upload_size,
@@ -196,7 +197,8 @@ class AppConfig:
encryption_master_key_path=encryption_master_key_path,
kms_enabled=kms_enabled,
kms_keys_path=kms_keys_path,
default_encryption_algorithm=default_encryption_algorithm)
default_encryption_algorithm=default_encryption_algorithm,
display_timezone=display_timezone)
def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues.
@@ -206,7 +208,6 @@ class AppConfig:
"""
issues = []
# Check if storage_root is writable
try:
test_file = self.storage_root / ".write_test"
test_file.touch()
@@ -214,24 +215,20 @@ class AppConfig:
except (OSError, PermissionError) as e:
issues.append(f"CRITICAL: STORAGE_ROOT '{self.storage_root}' is not writable: {e}")
# Check if storage_root looks like a temp directory
storage_str = str(self.storage_root).lower()
if "/tmp" in storage_str or "\\temp" in storage_str or "appdata\\local\\temp" in storage_str:
issues.append(f"WARNING: STORAGE_ROOT '{self.storage_root}' appears to be a temporary directory. Data may be lost on reboot!")
# Check if IAM config path is under storage_root
try:
self.iam_config_path.relative_to(self.storage_root)
except ValueError:
issues.append(f"WARNING: IAM_CONFIG '{self.iam_config_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting IAM_CONFIG explicitly or ensuring paths are aligned.")
# Check if bucket policy path is under storage_root
try:
self.bucket_policy_path.relative_to(self.storage_root)
except ValueError:
issues.append(f"WARNING: BUCKET_POLICY_PATH '{self.bucket_policy_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting BUCKET_POLICY_PATH explicitly.")
# Check if log path is writable
try:
self.log_path.parent.mkdir(parents=True, exist_ok=True)
test_log = self.log_path.parent / ".write_test"
@@ -240,26 +237,22 @@ class AppConfig:
except (OSError, PermissionError) as e:
issues.append(f"WARNING: Log directory '{self.log_path.parent}' is not writable: {e}")
# Check log path location
log_str = str(self.log_path).lower()
if "/tmp" in log_str or "\\temp" in log_str or "appdata\\local\\temp" in log_str:
issues.append(f"WARNING: LOG_DIR '{self.log_path.parent}' appears to be a temporary directory. Logs may be lost on reboot!")
# Check if encryption keys path is under storage_root (when encryption is enabled)
if self.encryption_enabled:
try:
self.encryption_master_key_path.relative_to(self.storage_root)
except ValueError:
issues.append(f"WARNING: ENCRYPTION_MASTER_KEY_PATH '{self.encryption_master_key_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.")
# Check if KMS keys path is under storage_root (when KMS is enabled)
if self.kms_enabled:
try:
self.kms_keys_path.relative_to(self.storage_root)
except ValueError:
issues.append(f"WARNING: KMS_KEYS_PATH '{self.kms_keys_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.")
# Warn about production settings
if self.secret_key == "dev-secret-key":
issues.append("WARNING: Using default SECRET_KEY. Set SECRET_KEY environment variable for production.")
@@ -330,4 +323,5 @@ class AppConfig:
"KMS_ENABLED": self.kms_enabled,
"KMS_KEYS_PATH": str(self.kms_keys_path),
"DEFAULT_ENCRYPTION_ALGORITHM": self.default_encryption_algorithm,
"DISPLAY_TIMEZONE": self.display_timezone,
}

View File

@@ -6,7 +6,7 @@ import math
import secrets
from collections import deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set
@@ -125,7 +125,6 @@ class IamService:
except OSError:
pass
# ---------------------- authz helpers ----------------------
def authenticate(self, access_key: str, secret_key: str) -> Principal:
self._maybe_reload()
access_key = (access_key or "").strip()
@@ -149,7 +148,7 @@ class IamService:
return
attempts = self._failed_attempts.setdefault(access_key, deque())
self._prune_attempts(attempts)
attempts.append(datetime.now())
attempts.append(datetime.now(timezone.utc))
def _clear_failed_attempts(self, access_key: str) -> None:
if not access_key:
@@ -157,7 +156,7 @@ class IamService:
self._failed_attempts.pop(access_key, None)
def _prune_attempts(self, attempts: Deque[datetime]) -> None:
cutoff = datetime.now() - self.auth_lockout_window
cutoff = datetime.now(timezone.utc) - self.auth_lockout_window
while attempts and attempts[0] < cutoff:
attempts.popleft()
@@ -178,7 +177,7 @@ class IamService:
if len(attempts) < self.auth_max_attempts:
return 0
oldest = attempts[0]
elapsed = (datetime.now() - oldest).total_seconds()
elapsed = (datetime.now(timezone.utc) - oldest).total_seconds()
return int(max(0, self.auth_lockout_window.total_seconds() - elapsed))
def principal_for_key(self, access_key: str) -> Principal:
@@ -218,7 +217,6 @@ class IamService:
return True
return False
# ---------------------- management helpers ----------------------
def list_users(self) -> List[Dict[str, Any]]:
listing: List[Dict[str, Any]] = []
for access_key, record in self._users.items():
@@ -291,7 +289,6 @@ class IamService:
self._save()
self._load()
# ---------------------- config helpers ----------------------
def _load(self) -> None:
try:
self._last_load_time = self.config_path.stat().st_mtime
@@ -337,7 +334,6 @@ class IamService:
except (OSError, PermissionError) as e:
raise IamError(f"Cannot save IAM config: {e}")
# ---------------------- insight helpers ----------------------
def config_summary(self) -> Dict[str, Any]:
return {
"path": str(self.config_path),

View File

@@ -33,9 +33,6 @@ def _encryption():
def _error_response(code: str, message: str, status: int) -> tuple[Dict[str, Any], int]:
return {"__type": code, "message": message}, status
# ---------------------- Key Management ----------------------
@kms_api_bp.route("/keys", methods=["GET", "POST"])
@limiter.limit("30 per minute")
def list_or_create_keys():
@@ -65,7 +62,6 @@ def list_or_create_keys():
except EncryptionError as exc:
return _error_response("KMSInternalException", str(exc), 400)
# GET - List keys
keys = kms.list_keys()
return jsonify({
"Keys": [{"KeyId": k.key_id, "KeyArn": k.arn} for k in keys],
@@ -96,7 +92,6 @@ def get_or_delete_key(key_id: str):
except EncryptionError as exc:
return _error_response("NotFoundException", str(exc), 404)
# GET
key = kms.get_key(key_id)
if not key:
return _error_response("NotFoundException", f"Key not found: {key_id}", 404)
@@ -149,9 +144,6 @@ def disable_key(key_id: str):
except EncryptionError as exc:
return _error_response("NotFoundException", str(exc), 404)
# ---------------------- Encryption Operations ----------------------
@kms_api_bp.route("/encrypt", methods=["POST"])
@limiter.limit("60 per minute")
def encrypt_data():
@@ -251,7 +243,6 @@ def generate_data_key():
try:
plaintext_key, encrypted_key = kms.generate_data_key(key_id, context)
# Trim key if AES_128 requested
if key_spec == "AES_128":
plaintext_key = plaintext_key[:16]
@@ -322,10 +313,7 @@ def re_encrypt():
return _error_response("ValidationException", "CiphertextBlob must be base64 encoded", 400)
try:
# First decrypt, get source key id
plaintext, source_key_id = kms.decrypt(ciphertext, source_context)
# Re-encrypt with destination key
new_ciphertext = kms.encrypt(destination_key_id, plaintext, destination_context)
return jsonify({
@@ -365,9 +353,6 @@ def generate_random():
except EncryptionError as exc:
return _error_response("ValidationException", str(exc), 400)
# ---------------------- Client-Side Encryption Helpers ----------------------
@kms_api_bp.route("/client/generate-key", methods=["POST"])
@limiter.limit("30 per minute")
def generate_client_key():
@@ -427,9 +412,6 @@ def client_decrypt():
except Exception as exc:
return _error_response("DecryptionError", str(exc), 400)
# ---------------------- Encryption Materials for S3 Client-Side Encryption ----------------------
@kms_api_bp.route("/materials/<key_id>", methods=["POST"])
@limiter.limit("60 per minute")
def get_encryption_materials(key_id: str):

View File

@@ -22,6 +22,8 @@ from .storage import ObjectStorage, StorageError
logger = logging.getLogger(__name__)
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
REPLICATION_CONNECT_TIMEOUT = 5
REPLICATION_READ_TIMEOUT = 30
REPLICATION_MODE_NEW_ONLY = "new_only"
REPLICATION_MODE_ALL = "all"
@@ -30,10 +32,10 @@ REPLICATION_MODE_ALL = "all"
@dataclass
class ReplicationStats:
"""Statistics for replication operations - computed dynamically."""
objects_synced: int = 0 # Objects that exist in both source and destination
objects_pending: int = 0 # Objects in source but not in destination
objects_orphaned: int = 0 # Objects in destination but not in source (will be deleted)
bytes_synced: int = 0 # Total bytes synced to destination
objects_synced: int = 0
objects_pending: int = 0
objects_orphaned: int = 0
bytes_synced: int = 0
last_sync_at: Optional[float] = None
last_sync_key: Optional[str] = None
@@ -83,7 +85,6 @@ class ReplicationRule:
@classmethod
def from_dict(cls, data: dict) -> "ReplicationRule":
stats_data = data.pop("stats", {})
# Handle old rules without mode/created_at
if "mode" not in data:
data["mode"] = REPLICATION_MODE_NEW_ONLY
if "created_at" not in data:
@@ -121,6 +122,33 @@ class ReplicationManager:
with open(self.rules_path, "w") as f:
json.dump(data, f, indent=2)
def check_endpoint_health(self, connection: RemoteConnection) -> bool:
"""Check if a remote endpoint is reachable and responsive.
Returns True if endpoint is healthy, False otherwise.
Uses short timeouts to prevent blocking.
"""
try:
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 1}
)
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
config=config,
)
s3.list_buckets()
return True
except Exception as e:
logger.warning(f"Endpoint health check failed for {connection.name} ({connection.endpoint_url}): {e}")
return False
def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]:
return self._rules.get(bucket_name)
@@ -151,14 +179,12 @@ class ReplicationManager:
connection = self.connections.get(rule.target_connection_id)
if not connection:
return rule.stats # Return cached stats if connection unavailable
return rule.stats
try:
# Get source objects
source_objects = self.storage.list_objects_all(bucket_name)
source_keys = {obj.key: obj.size for obj in source_objects}
# Get destination objects
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
@@ -178,24 +204,18 @@ class ReplicationManager:
bytes_synced += obj.get('Size', 0)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
# Destination bucket doesn't exist yet
dest_keys = set()
else:
raise
# Compute stats
synced = source_keys.keys() & dest_keys # Objects in both
orphaned = dest_keys - source_keys.keys() # In dest but not source
synced = source_keys.keys() & dest_keys
orphaned = dest_keys - source_keys.keys()
# For "new_only" mode, we can't determine pending since we don't know
# which objects existed before replication was enabled. Only "all" mode
# should show pending (objects that should be replicated but aren't yet).
if rule.mode == REPLICATION_MODE_ALL:
pending = source_keys.keys() - dest_keys # In source but not dest
pending = source_keys.keys() - dest_keys
else:
pending = set() # New-only mode: don't show pre-existing as pending
pending = set()
# Update cached stats with computed values
rule.stats.objects_synced = len(synced)
rule.stats.objects_pending = len(pending)
rule.stats.objects_orphaned = len(orphaned)
@@ -205,7 +225,7 @@ class ReplicationManager:
except (ClientError, StorageError) as e:
logger.error(f"Failed to compute sync status for {bucket_name}: {e}")
return rule.stats # Return cached stats on error
return rule.stats
def replicate_existing_objects(self, bucket_name: str) -> None:
"""Trigger replication for all existing objects in a bucket."""
@@ -218,6 +238,10 @@ class ReplicationManager:
logger.warning(f"Cannot replicate existing objects: Connection {rule.target_connection_id} not found")
return
if not self.check_endpoint_health(connection):
logger.warning(f"Cannot replicate existing objects: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable")
return
try:
objects = self.storage.list_objects_all(bucket_name)
logger.info(f"Starting replication of {len(objects)} existing objects from {bucket_name}")
@@ -255,6 +279,10 @@ class ReplicationManager:
logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found")
return
if not self.check_endpoint_health(connection):
logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable")
return
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
@@ -271,13 +299,26 @@ class ReplicationManager:
file_size = 0
try:
config = Config(user_agent_extra=REPLICATION_USER_AGENT)
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 2},
signature_version='s3v4',
s3={
'addressing_style': 'path',
},
# Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors
# with S3-compatible servers that don't support CRC32 checksum headers
request_checksum_calculation='when_required',
response_checksum_validation='when_required',
)
s3 = boto3.client(
"s3",
endpoint_url=conn.endpoint_url,
aws_access_key_id=conn.access_key,
aws_secret_access_key=conn.secret_key,
region_name=conn.region,
region_name=conn.region or 'us-east-1',
config=config,
)
@@ -296,54 +337,59 @@ class ReplicationManager:
logger.error(f"Source object not found: {bucket_name}/{object_key}")
return
metadata = self.storage.get_object_metadata(bucket_name, object_key)
# Don't replicate metadata - destination server will generate its own
# __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values.
extra_args = {}
if metadata:
extra_args["Metadata"] = metadata
# Guess content type to prevent corruption/wrong handling
content_type, _ = mimetypes.guess_type(path)
file_size = path.stat().st_size
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
def do_put_object() -> None:
"""Helper to upload object.
Reads the file content into memory first to avoid signature calculation
issues with certain binary file types (like GIFs) when streaming.
Do NOT set ContentLength explicitly - boto3 calculates it from the bytes
and setting it manually can cause SignatureDoesNotMatch errors.
"""
file_content = path.read_bytes()
put_kwargs = {
"Bucket": rule.target_bucket,
"Key": object_key,
"Body": file_content,
}
if content_type:
put_kwargs["ContentType"] = content_type
s3.put_object(**put_kwargs)
try:
with path.open("rb") as f:
s3.put_object(
Bucket=rule.target_bucket,
Key=object_key,
Body=f,
ContentLength=file_size,
ContentType=content_type or "application/octet-stream",
Metadata=metadata or {}
)
do_put_object()
except (ClientError, S3UploadFailedError) as e:
is_no_bucket = False
error_code = None
if isinstance(e, ClientError):
if e.response['Error']['Code'] == 'NoSuchBucket':
is_no_bucket = True
error_code = e.response['Error']['Code']
elif isinstance(e, S3UploadFailedError):
if "NoSuchBucket" in str(e):
is_no_bucket = True
error_code = 'NoSuchBucket'
if is_no_bucket:
if error_code == 'NoSuchBucket':
logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.")
bucket_ready = False
try:
s3.create_bucket(Bucket=rule.target_bucket)
# Retry upload
with path.open("rb") as f:
s3.put_object(
Bucket=rule.target_bucket,
Key=object_key,
Body=f,
ContentLength=file_size,
ContentType=content_type or "application/octet-stream",
Metadata=metadata or {}
)
except Exception as create_err:
logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}")
raise e # Raise original error
bucket_ready = True
logger.info(f"Created target bucket {rule.target_bucket}")
except ClientError as bucket_err:
if bucket_err.response['Error']['Code'] in ('BucketAlreadyExists', 'BucketAlreadyOwnedByYou'):
logger.debug(f"Bucket {rule.target_bucket} already exists (created by another thread)")
bucket_ready = True
else:
logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}")
raise e
if bucket_ready:
do_put_object()
else:
raise e
@@ -354,3 +400,4 @@ class ReplicationManager:
logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}")
except Exception:
logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}")

View File

@@ -8,7 +8,7 @@ import re
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
from urllib.parse import quote, urlencode, urlparse
from urllib.parse import quote, urlencode, urlparse, unquote
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
from flask import Blueprint, Response, current_app, jsonify, request, g
@@ -22,8 +22,6 @@ from .storage import ObjectStorage, StorageError, QuotaExceededError
s3_api_bp = Blueprint("s3_api", __name__)
# ---------------------- helpers ----------------------
def _storage() -> ObjectStorage:
return current_app.extensions["object_storage"]
@@ -68,9 +66,26 @@ def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name
return k_signing
def _get_canonical_uri(req: Any) -> str:
"""Get the canonical URI for SigV4 signature verification.
AWS SigV4 requires the canonical URI to be URL-encoded exactly as the client
sent it. Flask/Werkzeug automatically URL-decodes request.path, so we need
to get the raw path from the environ.
The canonical URI should have each path segment URL-encoded (with '/' preserved),
and the encoding should match what the client used when signing.
"""
raw_uri = req.environ.get('RAW_URI') or req.environ.get('REQUEST_URI')
if raw_uri:
path = raw_uri.split('?')[0]
return path
return quote(req.path, safe="/-_.~")
def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
# Parse Authorization header
# AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=...
match = re.match(
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)",
auth_header,
@@ -79,17 +94,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
return None
access_key, date_stamp, region, service, signed_headers_str, signature = match.groups()
# Get secret key
secret_key = _iam().get_secret_key(access_key)
if not secret_key:
raise IamError("Invalid access key")
# Canonical Request
method = req.method
canonical_uri = quote(req.path, safe="/-_.~")
canonical_uri = _get_canonical_uri(req)
# Canonical Query String
query_args = []
for key, value in req.args.items(multi=True):
query_args.append((key, value))
@@ -100,7 +111,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
canonical_query_string = "&".join(canonical_query_parts)
# Canonical Headers
signed_headers_list = signed_headers_str.split(";")
canonical_headers_parts = []
for header in signed_headers_list:
@@ -112,18 +122,22 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
canonical_headers = "".join(canonical_headers_parts)
# Payload Hash
payload_hash = req.headers.get("X-Amz-Content-Sha256")
if not payload_hash:
payload_hash = hashlib.sha256(req.get_data()).hexdigest()
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
# String to Sign
amz_date = req.headers.get("X-Amz-Date")
if not amz_date:
amz_date = req.headers.get("Date")
# Debug logging for signature issues
import logging
logger = logging.getLogger(__name__)
logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}")
logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}")
logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}")
logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}")
logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}")
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
if not amz_date:
raise IamError("Missing Date header")
@@ -134,13 +148,12 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
now = datetime.now(timezone.utc)
time_diff = abs((now - request_time).total_seconds())
if time_diff > 900: # 15 minutes
if time_diff > 900:
raise IamError("Request timestamp too old or too far in the future")
required_headers = {'host', 'x-amz-date'}
signed_headers_set = set(signed_headers_str.split(';'))
if not required_headers.issubset(signed_headers_set):
# Some clients might sign 'date' instead of 'x-amz-date'
if 'date' in signed_headers_set:
required_headers.remove('x-amz-date')
required_headers.add('date')
@@ -154,6 +167,24 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature):
# Debug logging for signature mismatch
import logging
logger = logging.getLogger(__name__)
logger.error(f"Signature mismatch for {req.path}")
logger.error(f" Content-Type: {req.headers.get('Content-Type')}")
logger.error(f" Content-Length: {req.headers.get('Content-Length')}")
logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}")
logger.error(f" Canonical URI: {canonical_uri}")
logger.error(f" Signed headers: {signed_headers_str}")
# Log each signed header's value
for h in signed_headers_list:
logger.error(f" Header '{h}': {repr(req.headers.get(h))}")
logger.error(f" Expected sig: {signature[:16]}...")
logger.error(f" Calculated sig: {calculated_signature[:16]}...")
# Log first part of canonical request to compare
logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...")
# Log the full canonical request for debugging
logger.error(f" Canonical request:\n{canonical_request[:500]}...")
raise IamError("SignatureDoesNotMatch")
return _iam().get_principal(access_key)
@@ -187,11 +218,9 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
if not secret_key:
raise IamError("Invalid access key")
# Canonical Request
method = req.method
canonical_uri = quote(req.path, safe="/-_.~")
canonical_uri = _get_canonical_uri(req)
# Canonical Query String
query_args = []
for key, value in req.args.items(multi=True):
if key != "X-Amz-Signature":
@@ -203,7 +232,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
canonical_query_string = "&".join(canonical_query_parts)
# Canonical Headers
signed_headers_list = signed_headers_str.split(";")
canonical_headers_parts = []
for header in signed_headers_list:
@@ -212,7 +240,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_headers_parts.append(f"{header}:{val}\n")
canonical_headers = "".join(canonical_headers_parts)
# Payload Hash
payload_hash = "UNSIGNED-PAYLOAD"
canonical_request = "\n".join([
@@ -224,7 +251,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
payload_hash
])
# String to Sign
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
hashed_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
@@ -235,7 +261,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
hashed_request
])
# Signature
signing_key = _get_signature_key(secret_key, date_stamp, region, service)
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
@@ -493,7 +518,6 @@ def _generate_presigned_url(
}
canonical_query = _encode_query_params(query_params)
# Determine host and scheme from config or request
api_base = current_app.config.get("API_BASE_URL")
if api_base:
parsed = urlparse(api_base)
@@ -853,7 +877,6 @@ def _bucket_versioning_handler(bucket_name: str) -> Response:
current_app.logger.info("Bucket versioning updated", extra={"bucket": bucket_name, "status": status})
return Response(status=200)
# GET
try:
enabled = storage.is_versioning_enabled(bucket_name)
except StorageError as exc:
@@ -889,7 +912,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket tags deleted", extra={"bucket": bucket_name})
return Response(status=204)
# PUT
payload = request.get_data(cache=False) or b""
try:
tags = _parse_tagging_document(payload)
@@ -914,7 +937,6 @@ def _object_tagging_handler(bucket_name: str, object_key: str) -> Response:
if error:
return error
# For tagging, we use read permission for GET, write for PUT/DELETE
action = "read" if request.method == "GET" else "write"
try:
_authorize_action(principal, bucket_name, action, object_key=object_key)
@@ -1093,10 +1115,8 @@ def _bucket_location_handler(bucket_name: str) -> Response:
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
# Return the configured AWS_REGION
region = current_app.config.get("AWS_REGION", "us-east-1")
root = Element("LocationConstraint")
# AWS returns empty for us-east-1, but we'll be explicit
root.text = region if region != "us-east-1" else None
return _xml_response(root)
@@ -1116,13 +1136,11 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
if request.method == "PUT":
# We don't fully implement ACLs, but we accept the request for compatibility
# Check for canned ACL header
# Accept canned ACL headers for S3 compatibility (not fully implemented)
canned_acl = request.headers.get("x-amz-acl", "private")
current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl})
return Response(status=200)
# GET - Return a basic ACL document showing full control for owner
root = Element("AccessControlPolicy")
owner = SubElement(root, "Owner")
SubElement(owner, "ID").text = principal.access_key if principal else "anonymous"
@@ -1170,7 +1188,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response:
if key_marker:
objects = [obj for obj in objects if obj.key > key_marker]
# Build XML response
root = Element("ListVersionsResult", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
SubElement(root, "Name").text = bucket_name
SubElement(root, "Prefix").text = prefix
@@ -1188,10 +1205,9 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response:
is_truncated = True
break
# Current version
version = SubElement(root, "Version")
SubElement(version, "Key").text = obj.key
SubElement(version, "VersionId").text = "null" # Current version ID
SubElement(version, "VersionId").text = "null"
SubElement(version, "IsLatest").text = "true"
SubElement(version, "LastModified").text = obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z")
SubElement(version, "ETag").text = f'"{obj.etag}"'
@@ -1205,7 +1221,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response:
version_count += 1
next_key_marker = obj.key
# Get historical versions
try:
versions = storage.list_object_versions(bucket_name, obj.key)
for v in versions:
@@ -1289,14 +1304,12 @@ def _render_lifecycle_config(config: list) -> Element:
rule_el = SubElement(root, "Rule")
SubElement(rule_el, "ID").text = rule.get("ID", "")
# Filter
filter_el = SubElement(rule_el, "Filter")
if rule.get("Prefix"):
SubElement(filter_el, "Prefix").text = rule.get("Prefix", "")
SubElement(rule_el, "Status").text = rule.get("Status", "Enabled")
# Expiration
if "Expiration" in rule:
exp = rule["Expiration"]
exp_el = SubElement(rule_el, "Expiration")
@@ -1307,14 +1320,12 @@ def _render_lifecycle_config(config: list) -> Element:
if exp.get("ExpiredObjectDeleteMarker"):
SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true"
# NoncurrentVersionExpiration
if "NoncurrentVersionExpiration" in rule:
nve = rule["NoncurrentVersionExpiration"]
nve_el = SubElement(rule_el, "NoncurrentVersionExpiration")
if "NoncurrentDays" in nve:
SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"])
# AbortIncompleteMultipartUpload
if "AbortIncompleteMultipartUpload" in rule:
aimu = rule["AbortIncompleteMultipartUpload"]
aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload")
@@ -1338,29 +1349,24 @@ def _parse_lifecycle_config(payload: bytes) -> list:
for rule_el in root.findall("{*}Rule") or root.findall("Rule"):
rule: dict = {}
# ID
id_el = rule_el.find("{*}ID") or rule_el.find("ID")
if id_el is not None and id_el.text:
rule["ID"] = id_el.text.strip()
# Filter/Prefix
filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter")
if filter_el is not None:
prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix")
if prefix_el is not None and prefix_el.text:
rule["Prefix"] = prefix_el.text
# Legacy Prefix (outside Filter)
if "Prefix" not in rule:
prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix")
if prefix_el is not None:
rule["Prefix"] = prefix_el.text or ""
# Status
status_el = rule_el.find("{*}Status") or rule_el.find("Status")
rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled"
# Expiration
exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration")
if exp_el is not None:
expiration: dict = {}
@@ -1376,7 +1382,6 @@ def _parse_lifecycle_config(payload: bytes) -> list:
if expiration:
rule["Expiration"] = expiration
# NoncurrentVersionExpiration
nve_el = rule_el.find("{*}NoncurrentVersionExpiration") or rule_el.find("NoncurrentVersionExpiration")
if nve_el is not None:
nve: dict = {}
@@ -1386,7 +1391,6 @@ def _parse_lifecycle_config(payload: bytes) -> list:
if nve:
rule["NoncurrentVersionExpiration"] = nve
# AbortIncompleteMultipartUpload
aimu_el = rule_el.find("{*}AbortIncompleteMultipartUpload") or rule_el.find("AbortIncompleteMultipartUpload")
if aimu_el is not None:
aimu: dict = {}
@@ -1424,7 +1428,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
if not quota:
return _error_response("NoSuchQuotaConfiguration", "No quota configuration found", 404)
# Return as JSON for simplicity (not a standard S3 API)
stats = storage.bucket_stats(bucket_name)
return jsonify({
"quota": quota,
@@ -1453,7 +1456,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
if max_size_bytes is None and max_objects is None:
return _error_response("InvalidArgument", "At least one of max_size_bytes or max_objects is required", 400)
# Validate types
if max_size_bytes is not None:
try:
max_size_bytes = int(max_size_bytes)
@@ -1564,7 +1566,6 @@ def _bulk_delete_handler(bucket_name: str) -> Response:
return _xml_response(result, status=200)
# ---------------------- routes ----------------------
@s3_api_bp.get("/")
@limiter.limit("60 per minute")
def list_buckets() -> Response:
@@ -1642,7 +1643,6 @@ def bucket_handler(bucket_name: str) -> Response:
current_app.logger.info("Bucket deleted", extra={"bucket": bucket_name})
return Response(status=204)
# GET - list objects (supports both ListObjects and ListObjectsV2)
principal, error = _require_principal()
try:
_authorize_action(principal, bucket_name, "list")
@@ -1650,18 +1650,12 @@ def bucket_handler(bucket_name: str) -> Response:
if error:
return error
return _error_response("AccessDenied", str(exc), 403)
try:
objects = storage.list_objects_all(bucket_name)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
# Check if this is ListObjectsV2 (list-type=2)
list_type = request.args.get("list-type")
prefix = request.args.get("prefix", "")
delimiter = request.args.get("delimiter", "")
max_keys = min(int(request.args.get("max-keys", current_app.config["UI_PAGE_SIZE"])), 1000)
# Pagination markers
marker = request.args.get("marker", "") # ListObjects v1
continuation_token = request.args.get("continuation-token", "") # ListObjectsV2
start_after = request.args.get("start-after", "") # ListObjectsV2
@@ -1681,11 +1675,17 @@ def bucket_handler(bucket_name: str) -> Response:
else:
effective_start = marker
if prefix:
objects = [obj for obj in objects if obj.key.startswith(prefix)]
if effective_start:
objects = [obj for obj in objects if obj.key > effective_start]
fetch_keys = max_keys * 10 if delimiter else max_keys
try:
list_result = storage.list_objects(
bucket_name,
max_keys=fetch_keys,
continuation_token=effective_start or None,
prefix=prefix or None,
)
objects = list_result.objects
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
common_prefixes: list[str] = []
filtered_objects: list = []
@@ -1694,7 +1694,6 @@ def bucket_handler(bucket_name: str) -> Response:
for obj in objects:
key_after_prefix = obj.key[len(prefix):] if prefix else obj.key
if delimiter in key_after_prefix:
# This is a "folder" - extract the common prefix
common_prefix = prefix + key_after_prefix.split(delimiter)[0] + delimiter
if common_prefix not in seen_prefixes:
seen_prefixes.add(common_prefix)
@@ -1705,7 +1704,7 @@ def bucket_handler(bucket_name: str) -> Response:
common_prefixes = sorted(common_prefixes)
total_items = len(objects) + len(common_prefixes)
is_truncated = total_items > max_keys
is_truncated = total_items > max_keys or list_result.is_truncated
if len(objects) >= max_keys:
objects = objects[:max_keys]
@@ -1792,7 +1791,6 @@ def object_handler(bucket_name: str, object_key: str):
if "tagging" in request.args:
return _object_tagging_handler(bucket_name, object_key)
# Multipart Uploads
if request.method == "POST":
if "uploads" in request.args:
return _initiate_multipart_upload(bucket_name, object_key)
@@ -1845,9 +1843,7 @@ def object_handler(bucket_name: str, object_key: str):
response = Response(status=200)
response.headers["ETag"] = f'"{meta.etag}"'
# Trigger replication if not a replication request
user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent:
if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""):
_replication_manager().trigger_replication(bucket_name, object_key, action="write")
return response
@@ -1866,31 +1862,25 @@ def object_handler(bucket_name: str, object_key: str):
metadata = storage.get_object_metadata(bucket_name, object_key)
mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
# Check if object is encrypted and needs decryption
is_encrypted = "x-amz-server-side-encryption" in metadata
if request.method == "GET":
if is_encrypted and hasattr(storage, 'get_object_data'):
# Use encrypted storage to decrypt
try:
data, clean_metadata = storage.get_object_data(bucket_name, object_key)
response = Response(data, mimetype=mimetype)
logged_bytes = len(data)
# Use decrypted size for Content-Length
response.headers["Content-Length"] = len(data)
etag = hashlib.md5(data).hexdigest()
except StorageError as exc:
return _error_response("InternalError", str(exc), 500)
else:
# Stream unencrypted file directly
stat = path.stat()
response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True)
logged_bytes = stat.st_size
etag = storage._compute_etag(path)
else:
# HEAD request
if is_encrypted and hasattr(storage, 'get_object_data'):
# For encrypted objects, we need to report decrypted size
try:
data, _ = storage.get_object_data(bucket_name, object_key)
response = Response(status=200)
@@ -1919,7 +1909,6 @@ def object_handler(bucket_name: str, object_key: str):
storage.delete_object(bucket_name, object_key)
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
# Trigger replication if not a replication request
user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent:
_replication_manager().trigger_replication(bucket_name, object_key, action="delete")
@@ -2200,7 +2189,6 @@ class AwsChunkedDecoder:
self.chunk_remaining -= len(chunk)
if self.chunk_remaining == 0:
# Read CRLF after chunk data
crlf = self.stream.read(2)
if crlf != b"\r\n":
raise IOError("Malformed chunk: missing CRLF")
@@ -2219,7 +2207,6 @@ class AwsChunkedDecoder:
try:
line_str = line.decode("ascii").strip()
# Handle chunk-signature extension if present (e.g. "1000;chunk-signature=...")
if ";" in line_str:
line_str = line_str.split(";")[0]
chunk_size = int(line_str, 16)
@@ -2375,7 +2362,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response:
try:
_storage().abort_multipart_upload(bucket_name, upload_id)
except StorageError as exc:
# Abort is idempotent, but if bucket missing...
if "Bucket does not exist" in str(exc):
return _error_response("NoSuchBucket", str(exc), 404)
@@ -2385,7 +2371,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response:
@s3_api_bp.before_request
def resolve_principal():
g.principal = None
# Try SigV4
try:
if ("Authorization" in request.headers and request.headers["Authorization"].startswith("AWS4-HMAC-SHA256")) or \
(request.args.get("X-Amz-Algorithm") == "AWS4-HMAC-SHA256"):
@@ -2394,7 +2379,6 @@ def resolve_principal():
except Exception:
pass
# Try simple auth headers (internal/testing)
access_key = request.headers.get("X-Access-Key")
secret_key = request.headers.get("X-Secret-Key")
if access_key and secret_key:

View File

@@ -128,11 +128,13 @@ class ObjectStorage:
BUCKET_VERSIONS_DIR = "versions"
MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30
def __init__(self, root: Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = []
@@ -142,7 +144,7 @@ class ObjectStorage:
buckets.append(
BucketMeta(
name=bucket.name,
created_at=datetime.fromtimestamp(stat.st_ctime),
created_at=datetime.fromtimestamp(stat.st_ctime, timezone.utc),
)
)
return buckets
@@ -190,7 +192,6 @@ class ObjectStorage:
version_count = 0
version_bytes = 0
# Count current objects in the bucket folder
for path in bucket_path.rglob("*"):
if path.is_file():
rel = path.relative_to(bucket_path)
@@ -202,7 +203,6 @@ class ObjectStorage:
object_count += 1
total_bytes += stat.st_size
# Count archived versions in the system folder
versions_root = self._bucket_versions_root(bucket_name)
if versions_root.exists():
for path in versions_root.rglob("*.bin"):
@@ -216,8 +216,8 @@ class ObjectStorage:
"bytes": total_bytes,
"version_count": version_count,
"version_bytes": version_bytes,
"total_objects": object_count + version_count, # All objects including versions
"total_bytes": total_bytes + version_bytes, # All storage including versions
"total_objects": object_count + version_count,
"total_bytes": total_bytes + version_bytes,
}
try:
@@ -274,32 +274,20 @@ class ObjectStorage:
raise StorageError("Bucket does not exist")
bucket_id = bucket_path.name
# Collect all matching object keys first (lightweight - just paths)
all_keys: List[str] = []
for path in bucket_path.rglob("*"):
if path.is_file():
rel = path.relative_to(bucket_path)
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
continue
key = str(rel.as_posix())
if prefix and not key.startswith(prefix):
continue
all_keys.append(key)
object_cache = self._get_object_cache(bucket_id, bucket_path)
all_keys = sorted(object_cache.keys())
if prefix:
all_keys = [k for k in all_keys if k.startswith(prefix)]
all_keys.sort()
total_count = len(all_keys)
# Handle continuation token (the key to start after)
start_index = 0
if continuation_token:
try:
# continuation_token is the last key from previous page
for i, key in enumerate(all_keys):
if key > continuation_token:
start_index = i
break
else:
# Token is past all keys
import bisect
start_index = bisect.bisect_right(all_keys, continuation_token)
if start_index >= total_count:
return ListObjectsResult(
objects=[],
is_truncated=False,
@@ -307,34 +295,17 @@ class ObjectStorage:
total_count=total_count,
)
except Exception:
pass # Invalid token, start from beginning
pass
# Get the slice we need
end_index = start_index + max_keys
keys_slice = all_keys[start_index:end_index]
is_truncated = end_index < total_count
# Now load full metadata only for the objects we're returning
objects: List[ObjectMeta] = []
for key in keys_slice:
safe_key = self._sanitize_object_key(key)
path = bucket_path / safe_key
if not path.exists():
continue # Object may have been deleted
try:
stat = path.stat()
metadata = self._read_metadata(bucket_id, safe_key)
objects.append(
ObjectMeta(
key=key,
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
etag=self._compute_etag(path),
metadata=metadata or None,
)
)
except OSError:
continue # File may have been deleted during iteration
obj = object_cache.get(key)
if obj:
objects.append(obj)
next_token = keys_slice[-1] if is_truncated and keys_slice else None
@@ -368,14 +339,12 @@ class ObjectStorage:
destination = bucket_path / safe_key
destination.parent.mkdir(parents=True, exist_ok=True)
# Check if this is an overwrite (won't add to object count)
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
if self._is_versioning_enabled(bucket_path) and is_overwrite:
self._archive_current_version(bucket_id, safe_key, reason="overwrite")
# Write to temp file first to get actual size
tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp"
@@ -387,9 +356,7 @@ class ObjectStorage:
new_size = tmp_path.stat().st_size
# Check quota before finalizing
if enforce_quota:
# Calculate net change (new size minus size being replaced)
size_delta = new_size - existing_size
object_delta = 0 if is_overwrite else 1
@@ -405,29 +372,29 @@ class ObjectStorage:
quota_check["usage"],
)
# Move to final destination
shutil.move(str(tmp_path), str(destination))
finally:
# Clean up temp file if it still exists
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
stat = destination.stat()
if metadata:
self._write_metadata(bucket_id, safe_key, metadata)
else:
self._delete_metadata(bucket_id, safe_key)
etag = checksum.hexdigest()
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
combined_meta = {**internal_meta, **(metadata or {})}
self._write_metadata(bucket_id, safe_key, combined_meta)
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
etag=checksum.hexdigest(),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=etag,
metadata=metadata,
)
@@ -453,16 +420,14 @@ class ObjectStorage:
for parent in path.parents:
if parent == stop_at:
break
# Retry a few times with small delays for Windows/OneDrive
for attempt in range(3):
try:
if parent.exists() and not any(parent.iterdir()):
parent.rmdir()
break # Success, move to next parent
break
except OSError:
if attempt < 2:
time.sleep(0.1) # Brief delay before retry
# Final attempt failed - continue to next parent
time.sleep(0.1)
break
def delete_object(self, bucket_name: str, object_key: str) -> None:
@@ -479,6 +444,7 @@ class ObjectStorage:
self._delete_metadata(bucket_id, rel)
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
self._cleanup_empty_parents(path, bucket_path)
def purge_object(self, bucket_name: str, object_key: str) -> None:
@@ -499,8 +465,8 @@ class ObjectStorage:
if legacy_version_dir.exists():
shutil.rmtree(legacy_version_dir, ignore_errors=True)
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
self._cleanup_empty_parents(target, bucket_path)
def is_versioning_enabled(self, bucket_name: str) -> bool:
@@ -612,7 +578,6 @@ class ObjectStorage:
bucket_path = self._require_bucket_path(bucket_name)
if max_bytes is None and max_objects is None:
# Remove quota entirely
self._set_bucket_config_entry(bucket_path.name, "quota", None)
return
@@ -654,9 +619,7 @@ class ObjectStorage:
"message": None,
}
# Get current stats (uses cache when available)
stats = self.bucket_stats(bucket_name)
# Use totals which include versions for quota enforcement
current_bytes = stats.get("total_bytes", stats.get("bytes", 0))
current_objects = stats.get("total_objects", stats.get("objects", 0))
@@ -817,7 +780,7 @@ class ObjectStorage:
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=self._compute_etag(destination),
metadata=metadata or None,
)
@@ -920,14 +883,12 @@ class ObjectStorage:
raise StorageError("part_number must be >= 1")
bucket_path = self._bucket_path(bucket_name)
# Get the upload root directory
upload_root = self._multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
raise StorageError("Multipart upload not found")
# Write the part data first (can happen concurrently)
checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename
@@ -939,13 +900,11 @@ class ObjectStorage:
"filename": part_filename,
}
# Update manifest with file locking to prevent race conditions
manifest_path = upload_root / self.MULTIPART_MANIFEST
lock_path = upload_root / ".manifest.lock"
with lock_path.open("w") as lock_file:
with _file_lock(lock_file):
# Re-read manifest under lock to get latest state
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
@@ -999,11 +958,9 @@ class ObjectStorage:
safe_key = self._sanitize_object_key(manifest["object_key"])
destination = bucket_path / safe_key
# Check if this is an overwrite
is_overwrite = destination.exists()
existing_size = destination.stat().st_size if is_overwrite else 0
# Check quota before writing
if enforce_quota:
size_delta = total_size - existing_size
object_delta = 0 if is_overwrite else 1
@@ -1065,7 +1022,7 @@ class ObjectStorage:
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=checksum.hexdigest(),
metadata=metadata,
)
@@ -1163,6 +1120,172 @@ class ObjectStorage:
def _legacy_multipart_dir(self, bucket_name: str, upload_id: str) -> Path:
return self._legacy_multipart_bucket_root(bucket_name) / upload_id
def _fast_list_keys(self, bucket_path: Path) -> List[str]:
"""Fast directory walk using os.scandir instead of pathlib.rglob.
This is significantly faster for large directories (10K+ files).
Returns just the keys (for backward compatibility).
"""
return list(self._build_object_cache(bucket_path).keys())
def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Build a complete object metadata cache for a bucket.
Uses os.scandir for fast directory walking and a persistent etag index.
"""
from concurrent.futures import ThreadPoolExecutor
bucket_id = bucket_path.name
objects: Dict[str, ObjectMeta] = {}
bucket_str = str(bucket_path)
bucket_len = len(bucket_str) + 1
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {}
index_mtime: float = 0
if etag_index_path.exists():
try:
index_mtime = etag_index_path.stat().st_mtime
with open(etag_index_path, 'r', encoding='utf-8') as f:
meta_cache = json.load(f)
except (OSError, json.JSONDecodeError):
meta_cache = {}
meta_root = self._bucket_meta_root(bucket_id)
needs_rebuild = False
if meta_root.exists() and index_mtime > 0:
def check_newer(dir_path: str) -> bool:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
if check_newer(entry.path):
return True
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
if entry.stat().st_mtime > index_mtime:
return True
except OSError:
pass
return False
needs_rebuild = check_newer(str(meta_root))
elif not meta_cache:
needs_rebuild = True
if needs_rebuild and meta_root.exists():
meta_str = str(meta_root)
meta_len = len(meta_str) + 1
meta_files: list[tuple[str, str]] = []
def collect_meta_files(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
collect_meta_files(entry.path)
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
rel = entry.path[meta_len:]
key = rel[:-10].replace(os.sep, '/')
meta_files.append((key, entry.path))
except OSError:
pass
collect_meta_files(meta_str)
def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]:
key, path = item
try:
with open(path, 'rb') as f:
content = f.read()
etag_marker = b'"__etag__"'
idx = content.find(etag_marker)
if idx != -1:
start = content.find(b'"', idx + len(etag_marker) + 1)
if start != -1:
end = content.find(b'"', start + 1)
if end != -1:
return key, content[start+1:end].decode('utf-8')
return key, None
except (OSError, UnicodeDecodeError):
return key, None
if meta_files:
meta_cache = {}
with ThreadPoolExecutor(max_workers=min(64, len(meta_files))) as executor:
for key, etag in executor.map(read_meta_file, meta_files):
if etag:
meta_cache[key] = etag
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(meta_cache, f)
except OSError:
pass
def scan_dir(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name
if rel_start in self.INTERNAL_FOLDERS:
continue
scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
rel = entry.path[bucket_len:]
first_part = rel.split(os.sep)[0] if os.sep in rel else rel
if first_part in self.INTERNAL_FOLDERS:
continue
key = rel.replace(os.sep, '/')
try:
stat = entry.stat()
etag = meta_cache.get(key)
if not etag:
etag = f'"{stat.st_size}-{int(stat.st_mtime)}"'
objects[key] = ObjectMeta(
key=key,
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc),
etag=etag,
metadata=None,
)
except OSError:
pass
except OSError:
pass
scan_dir(bucket_str)
return objects
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Get cached object metadata for a bucket, refreshing if stale."""
now = time.time()
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
return objects
objects = self._build_object_cache(bucket_path)
self._object_cache[bucket_id] = (objects, now)
return objects
def _invalidate_object_cache(self, bucket_id: str) -> None:
"""Invalidate the object cache and etag index for a bucket."""
self._object_cache.pop(bucket_id, None)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)
except OSError:
pass
def _ensure_system_roots(self) -> None:
for path in (
self._system_root_path(),

115
app/ui.py
View File

@@ -189,7 +189,7 @@ def inject_nav_state() -> dict[str, Any]:
return {
"principal": principal,
"can_manage_iam": can_manage,
"can_view_metrics": can_manage, # Only admins can view metrics
"can_view_metrics": can_manage,
"csrf_token": generate_csrf,
}
@@ -294,7 +294,6 @@ def bucket_detail(bucket_name: str):
storage = _storage()
try:
_authorize_ui(principal, bucket_name, "list")
# Don't load objects here - UI fetches them asynchronously via /buckets/<name>/objects
if not storage.bucket_exists(bucket_name):
raise StorageError("Bucket does not exist")
except (StorageError, IamError) as exc:
@@ -343,7 +342,6 @@ def bucket_detail(bucket_name: str):
except IamError:
can_manage_versioning = False
# Check replication permission
can_manage_replication = False
if principal:
try:
@@ -352,7 +350,6 @@ def bucket_detail(bucket_name: str):
except IamError:
can_manage_replication = False
# Check if user is admin (can configure replication settings, not just toggle)
is_replication_admin = False
if principal:
try:
@@ -361,12 +358,9 @@ def bucket_detail(bucket_name: str):
except IamError:
is_replication_admin = False
# Replication info - don't compute sync status here (it's slow), let JS fetch it async
replication_rule = _replication().get_rule(bucket_name)
# Load connections for admin, or for non-admin if there's an existing rule (to show target name)
connections = _connections().list() if (is_replication_admin or replication_rule) else []
# Encryption settings
encryption_config = storage.get_bucket_encryption(bucket_name)
kms_manager = _kms()
kms_keys = kms_manager.list_keys() if kms_manager else []
@@ -374,7 +368,6 @@ def bucket_detail(bucket_name: str):
encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False)
can_manage_encryption = can_manage_versioning # Same as other bucket properties
# Quota settings (admin only)
bucket_quota = storage.get_bucket_quota(bucket_name)
bucket_stats = storage.bucket_stats(bucket_name)
can_manage_quota = False
@@ -384,7 +377,6 @@ def bucket_detail(bucket_name: str):
except IamError:
pass
# Pass the objects API endpoint URL for async loading
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
return render_template(
@@ -423,7 +415,7 @@ def list_bucket_objects(bucket_name: str):
except IamError as exc:
return jsonify({"error": str(exc)}), 403
max_keys = min(int(request.args.get("max_keys", 100)), 1000)
max_keys = min(int(request.args.get("max_keys", 1000)), 10000)
continuation_token = request.args.get("continuation_token") or None
prefix = request.args.get("prefix") or None
@@ -738,41 +730,30 @@ def bulk_download_objects(bucket_name: str):
unique_keys = list(dict.fromkeys(cleaned))
storage = _storage()
# Check permissions for all keys first (or at least bucket read)
# We'll check bucket read once, then object read for each if needed?
# _authorize_ui checks bucket level if object_key is None, but we need to check each object if fine-grained policies exist.
# For simplicity/performance, we check bucket list/read.
# Verify permission to read bucket contents
try:
_authorize_ui(principal, bucket_name, "read")
except IamError as exc:
return jsonify({"error": str(exc)}), 403
# Create ZIP
# Create ZIP archive of selected objects
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for key in unique_keys:
try:
# Verify individual object permission if needed?
# _authorize_ui(principal, bucket_name, "read", object_key=key)
# This might be slow for many objects. Assuming bucket read is enough for now or we accept the overhead.
# Let's skip individual check for bulk speed, assuming bucket read implies object read unless denied.
# But strictly we should check. Let's check.
_authorize_ui(principal, bucket_name, "read", object_key=key)
# Check if object is encrypted
metadata = storage.get_object_metadata(bucket_name, key)
is_encrypted = "x-amz-server-side-encryption" in metadata
if is_encrypted and hasattr(storage, 'get_object_data'):
# Decrypt and add to zip
data, _ = storage.get_object_data(bucket_name, key)
zf.writestr(key, data)
else:
# Add unencrypted file directly
path = storage.get_object_path(bucket_name, key)
zf.write(path, arcname=key)
except (StorageError, IamError):
# Skip files we can't read or don't exist
# Skip objects that can't be accessed
continue
buffer.seek(0)
@@ -1077,7 +1058,6 @@ def update_bucket_encryption(bucket_name: str):
action = request.form.get("action", "enable")
if action == "disable":
# Disable encryption
try:
_storage().set_bucket_encryption(bucket_name, None)
flash("Default encryption disabled", "info")
@@ -1085,16 +1065,14 @@ def update_bucket_encryption(bucket_name: str):
flash(_friendly_error_message(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
# Enable or update encryption
algorithm = request.form.get("algorithm", "AES256")
kms_key_id = request.form.get("kms_key_id", "").strip() or None
# Validate algorithm
if algorithm not in ("AES256", "aws:kms"):
flash("Invalid encryption algorithm", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
# Build encryption config following AWS format
# Build encryption configuration in AWS S3 format
encryption_config: dict[str, Any] = {
"Rules": [
{
@@ -1270,7 +1248,6 @@ def delete_iam_user(access_key: str):
return redirect(url_for("ui.iam_dashboard"))
if access_key == principal.access_key:
# Self-deletion
try:
_iam().delete_user(access_key)
session.pop("credentials", None)
@@ -1352,6 +1329,9 @@ def create_connection():
@ui_bp.post("/connections/test")
def test_connection():
from botocore.config import Config as BotoConfig
from botocore.exceptions import ConnectTimeoutError, EndpointConnectionError, ReadTimeoutError
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
@@ -1368,18 +1348,32 @@ def test_connection():
return jsonify({"status": "error", "message": "Missing credentials"}), 400
try:
config = BotoConfig(
connect_timeout=5,
read_timeout=10,
retries={'max_attempts': 1}
)
s3 = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
config=config,
)
# Try to list buckets to verify credentials and endpoint
s3.list_buckets()
return jsonify({"status": "ok", "message": "Connection successful"})
except (ConnectTimeoutError, ReadTimeoutError):
return jsonify({"status": "error", "message": f"Connection timed out - endpoint may be down or unreachable: {endpoint}"}), 400
except EndpointConnectionError:
return jsonify({"status": "error", "message": f"Could not connect to endpoint: {endpoint}"}), 400
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
error_msg = e.response.get('Error', {}).get('Message', str(e))
return jsonify({"status": "error", "message": f"Connection failed ({error_code}): {error_msg}"}), 400
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 400
return jsonify({"status": "error", "message": f"Connection failed: {str(e)}"}), 400
@ui_bp.post("/connections/<connection_id>/update")
@@ -1440,7 +1434,6 @@ def update_bucket_replication(bucket_name: str):
flash(str(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication"))
# Check if user is admin (required for create/delete operations)
is_admin = False
try:
_iam().authorize(principal, None, "iam:list_users")
@@ -1451,14 +1444,12 @@ def update_bucket_replication(bucket_name: str):
action = request.form.get("action")
if action == "delete":
# Admin only - remove configuration entirely
if not is_admin:
flash("Only administrators can remove replication configuration", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication"))
_replication().delete_rule(bucket_name)
flash("Replication configuration removed", "info")
elif action == "pause":
# Users can pause - just set enabled=False
rule = _replication().get_rule(bucket_name)
if rule:
rule.enabled = False
@@ -1467,7 +1458,6 @@ def update_bucket_replication(bucket_name: str):
else:
flash("No replication configuration to pause", "warning")
elif action == "resume":
# Users can resume - just set enabled=True
rule = _replication().get_rule(bucket_name)
if rule:
rule.enabled = True
@@ -1476,7 +1466,6 @@ def update_bucket_replication(bucket_name: str):
else:
flash("No replication configuration to resume", "warning")
elif action == "create":
# Admin only - create new configuration
if not is_admin:
flash("Only administrators can configure replication settings", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication"))
@@ -1501,7 +1490,6 @@ def update_bucket_replication(bucket_name: str):
)
_replication().set_rule(rule)
# If mode is "all", trigger replication of existing objects
if replication_mode == REPLICATION_MODE_ALL:
_replication().replicate_existing_objects(bucket_name)
flash("Replication configured. Existing objects are being replicated in the background.", "success")
@@ -1526,10 +1514,31 @@ def get_replication_status(bucket_name: str):
if not rule:
return jsonify({"error": "No replication rule"}), 404
# This is the slow operation - compute sync status by comparing buckets
stats = _replication().get_sync_status(bucket_name)
connection = _connections().get(rule.target_connection_id)
endpoint_healthy = False
endpoint_error = None
if connection:
endpoint_healthy = _replication().check_endpoint_health(connection)
if not endpoint_healthy:
endpoint_error = f"Cannot reach endpoint: {connection.endpoint_url}"
else:
endpoint_error = "Target connection not found"
stats = None
if endpoint_healthy:
stats = _replication().get_sync_status(bucket_name)
if not stats:
return jsonify({"error": "Failed to compute status"}), 500
return jsonify({
"objects_synced": 0,
"objects_pending": 0,
"objects_orphaned": 0,
"bytes_synced": 0,
"last_sync_at": rule.stats.last_sync_at if rule.stats else None,
"last_sync_key": rule.stats.last_sync_key if rule.stats else None,
"endpoint_healthy": endpoint_healthy,
"endpoint_error": endpoint_error,
})
return jsonify({
"objects_synced": stats.objects_synced,
@@ -1538,6 +1547,28 @@ def get_replication_status(bucket_name: str):
"bytes_synced": stats.bytes_synced,
"last_sync_at": stats.last_sync_at,
"last_sync_key": stats.last_sync_key,
"endpoint_healthy": endpoint_healthy,
"endpoint_error": endpoint_error,
})
@ui_bp.get("/connections/<connection_id>/health")
def check_connection_health(connection_id: str):
"""Check if a connection endpoint is reachable."""
principal = _current_principal()
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
return jsonify({"error": "Access denied"}), 403
conn = _connections().get(connection_id)
if not conn:
return jsonify({"healthy": False, "error": "Connection not found"}), 404
healthy = _replication().check_endpoint_health(conn)
return jsonify({
"healthy": healthy,
"error": None if healthy else f"Cannot reach endpoint: {conn.endpoint_url}"
})
@@ -1558,7 +1589,6 @@ def connections_dashboard():
def metrics_dashboard():
principal = _current_principal()
# Metrics are restricted to admin users
try:
_iam().authorize(principal, None, "iam:list_users")
except IamError:
@@ -1582,16 +1612,13 @@ def metrics_dashboard():
total_bytes_used = 0
total_versions = 0
# Note: Uses cached stats from storage layer to improve performance
cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60)
for bucket in buckets:
stats = storage.bucket_stats(bucket.name, cache_ttl=cache_ttl)
# Use totals which include archived versions
total_objects += stats.get("total_objects", stats.get("objects", 0))
total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0))
total_versions += stats.get("version_count", 0)
# Calculate system uptime
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds / 86400)

View File

@@ -66,8 +66,28 @@ html {
color: var(--myfsio-muted) !important;
}
.table-responsive { border-radius: 0.5rem; overflow: hidden; }
.table-responsive {
border-radius: 0.5rem;
overflow-x: auto;
-webkit-overflow-scrolling: touch;
}
.message-stack { position: sticky; top: 1rem; z-index: 100; }
/* Mobile-friendly table improvements */
.table-responsive table {
min-width: 600px;
}
.table-responsive table th,
.table-responsive table td {
white-space: nowrap;
}
/* Allow text wrapping for description columns */
.table-responsive table td.text-wrap {
white-space: normal;
min-width: 200px;
}
code { font-size: 0.85rem; }
code {
@@ -389,8 +409,22 @@ code {
.bucket-table th:last-child { white-space: nowrap; }
.object-key {
word-break: break-word;
max-width: 32rem;
max-width: 0;
width: 100%;
overflow: hidden;
text-overflow: ellipsis;
}
.object-key .fw-medium {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.object-key .text-muted {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.preview-card { top: 1rem; }
@@ -1553,6 +1587,41 @@ pre code {
position: relative !important;
top: 0 !important;
}
/* Ensure tables are scrollable on mobile */
.card-body .table-responsive {
margin: -1rem;
padding: 0;
width: calc(100% + 2rem);
}
.card-body .table-responsive table {
margin-bottom: 0;
}
/* IAM users table mobile adjustments */
.table th,
.table td {
padding: 0.5rem 0.75rem;
}
/* Better touch scrolling indicator */
.table-responsive::after {
content: '';
position: absolute;
top: 0;
right: 0;
bottom: 0;
width: 20px;
background: linear-gradient(to left, var(--myfsio-card-bg), transparent);
pointer-events: none;
opacity: 0;
transition: opacity 0.3s;
}
.table-responsive:not(:hover)::after {
opacity: 0.8;
}
}
*:focus-visible {

View File

@@ -134,15 +134,15 @@
data-bulk-delete-endpoint="{{ url_for('ui.bulk_delete_objects', bucket_name=bucket_name) }}"
data-bulk-download-endpoint="{{ url_for('ui.bulk_download_objects', bucket_name=bucket_name) }}"
>
<table class="table table-hover align-middle mb-0" id="objects-table">
<table class="table table-hover align-middle mb-0" id="objects-table" style="table-layout: fixed;">
<thead class="table-light">
<tr>
<th scope="col" class="text-center" style="width: 3rem;">
<input class="form-check-input" type="checkbox" data-select-all aria-label="Select all objects" />
</th>
<th scope="col">Key</th>
<th scope="col" class="text-end">Size</th>
<th scope="col" class="text-end">Actions</th>
<th scope="col" class="text-end" style="width: 6rem;">Size</th>
<th scope="col" class="text-end" style="width: 5.5rem;">Actions</th>
</tr>
</thead>
<tbody>
@@ -172,15 +172,15 @@
<button id="load-more-btn" class="btn btn-link btn-sm p-0 d-none" style="font-size: 0.75rem;">Load more</button>
</div>
<div class="d-flex align-items-center gap-1">
<span class="text-muted">Load</span>
<span class="text-muted">Batch</span>
<select id="page-size-select" class="form-select form-select-sm py-0" style="width: auto; font-size: 0.75rem;">
<option value="50">50</option>
<option value="100" selected>100</option>
<option value="150">150</option>
<option value="200">200</option>
<option value="250">250</option>
<option value="1000">1K</option>
<option value="5000" selected>5K</option>
<option value="10000">10K</option>
<option value="25000">25K</option>
<option value="50000">50K</option>
</select>
<span class="text-muted">per page</span>
<span class="text-muted">objects</span>
</div>
</div>
</div>
@@ -954,7 +954,7 @@
</div>
<div class="card-body">
{% if replication_rule and replication_rule.enabled %}
<div class="alert alert-success d-flex align-items-center mb-4" role="alert">
<div id="replication-status-alert" class="alert alert-success d-flex align-items-center mb-4" role="alert">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="bi bi-check-circle-fill flex-shrink-0 me-2" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zm-3.97-3.03a.75.75 0 0 0-1.08.022L7.477 9.417 5.384 7.323a.75.75 0 0 0-1.06 1.06L6.97 11.03a.75.75 0 0 0 1.079-.02l3.992-4.99a.75.75 0 0 0-.01-1.05z"/>
</svg>
@@ -968,6 +968,19 @@
</div>
</div>
<!-- Warning alert for unreachable endpoint (shown by JS if endpoint is down) -->
<div id="replication-endpoint-warning" class="alert alert-danger d-none mb-4" role="alert">
<div class="d-flex align-items-start">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="flex-shrink-0 me-2" viewBox="0 0 16 16">
<path d="M8.982 1.566a1.13 1.13 0 0 0-1.96 0L.165 13.233c-.457.778.091 1.767.98 1.767h13.713c.889 0 1.438-.99.98-1.767L8.982 1.566zM8 5c.535 0 .954.462.9.995l-.35 3.507a.552.552 0 0 1-1.1 0L7.1 5.995A.905.905 0 0 1 8 5zm.002 6a1 1 0 1 1 0 2 1 1 0 0 1 0-2z"/>
</svg>
<div>
<strong>Replication Endpoint Unreachable</strong>
<p class="mb-0 small" id="replication-endpoint-error">The target endpoint is not responding. Replication is paused until the endpoint is available.</p>
</div>
</div>
</div>
<div class="row g-3 mb-4" id="replication-stats-cards" data-status-endpoint="{{ url_for('ui.get_replication_status', bucket_name=bucket_name) }}">
<div class="col-6 col-lg">
<div class="card bg-body-tertiary border-0 h-100">
@@ -1090,11 +1103,11 @@
{% endif %}
</div>
<div class="col-auto">
<span class="badge bg-success-subtle text-success px-3 py-2">
<span id="replication-status-badge" class="badge bg-success-subtle text-success px-3 py-2">
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zm-3.97-3.03a.75.75 0 0 0-1.08.022L7.477 9.417 5.384 7.323a.75.75 0 0 0-1.06 1.06L6.97 11.03a.75.75 0 0 0 1.079-.02l3.992-4.99a.75.75 0 0 0-.01-1.05z"/>
</svg>
Enabled
<span id="replication-status-text">Enabled</span>
</span>
</div>
</div>
@@ -1109,7 +1122,7 @@
</svg>
Refresh
</a>
<form method="POST" action="{{ url_for('ui.update_bucket_replication', bucket_name=bucket_name) }}" class="d-inline">
<form id="pause-replication-form" method="POST" action="{{ url_for('ui.update_bucket_replication', bucket_name=bucket_name) }}" class="d-inline">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" name="action" value="pause">
<button type="submit" class="btn btn-outline-warning">
@@ -1866,30 +1879,42 @@
let isLoadingObjects = false;
let hasMoreObjects = false;
let currentFilterTerm = '';
let pageSize = 100;
let pageSize = 5000; // Load large batches for virtual scrolling
let currentPrefix = ''; // Current folder prefix for navigation
let allObjects = []; // All loaded object metadata (lightweight)
const createObjectRow = (obj) => {
// Virtual scrolling state
const ROW_HEIGHT = 53; // Height of each table row in pixels
const BUFFER_ROWS = 10; // Extra rows to render above/below viewport
let visibleItems = []; // Current items to display (filtered by folder/search)
let renderedRange = { start: 0, end: 0 }; // Currently rendered row indices
// Create a row element from object data (for virtual scrolling)
const createObjectRow = (obj, displayKey = null) => {
const tr = document.createElement('tr');
tr.dataset.objectRow = '';
tr.dataset.key = obj.key;
tr.dataset.size = obj.size;
tr.dataset.lastModified = obj.last_modified;
tr.dataset.lastModified = obj.lastModified || obj.last_modified;
tr.dataset.etag = obj.etag;
tr.dataset.previewUrl = obj.preview_url;
tr.dataset.downloadUrl = obj.download_url;
tr.dataset.presignEndpoint = obj.presign_endpoint;
tr.dataset.deleteEndpoint = obj.delete_endpoint;
tr.dataset.metadata = JSON.stringify(obj.metadata || {});
tr.dataset.versionsEndpoint = obj.versions_endpoint;
tr.dataset.restoreTemplate = obj.restore_template;
tr.dataset.previewUrl = obj.previewUrl || obj.preview_url;
tr.dataset.downloadUrl = obj.downloadUrl || obj.download_url;
tr.dataset.presignEndpoint = obj.presignEndpoint || obj.presign_endpoint;
tr.dataset.deleteEndpoint = obj.deleteEndpoint || obj.delete_endpoint;
tr.dataset.metadata = typeof obj.metadata === 'string' ? obj.metadata : JSON.stringify(obj.metadata || {});
tr.dataset.versionsEndpoint = obj.versionsEndpoint || obj.versions_endpoint;
tr.dataset.restoreTemplate = obj.restoreTemplate || obj.restore_template;
const keyToShow = displayKey || obj.key;
const lastModDisplay = obj.lastModifiedDisplay || obj.last_modified_display || new Date(obj.lastModified || obj.last_modified).toLocaleDateString();
tr.innerHTML = `
<td class="text-center align-middle">
<input class="form-check-input" type="checkbox" data-object-select aria-label="Select ${escapeHtml(obj.key)}" />
</td>
<td class="object-key text-break" title="${escapeHtml(obj.key)}">
<div class="fw-medium">${escapeHtml(obj.key)}</div>
<div class="text-muted small">Modified ${escapeHtml(obj.last_modified_display)}</div>
<div class="fw-medium">${escapeHtml(keyToShow)}</div>
<div class="text-muted small">Modified ${escapeHtml(lastModDisplay)}</div>
</td>
<td class="text-end text-nowrap">
<span class="text-muted small">${formatBytes(obj.size)}</span>
@@ -1898,7 +1923,7 @@
<div class="btn-group btn-group-sm" role="group">
<a
class="btn btn-outline-primary btn-icon"
href="${escapeHtml(obj.download_url)}"
href="${escapeHtml(obj.downloadUrl || obj.download_url)}"
target="_blank"
title="Download"
aria-label="Download"
@@ -1987,12 +2012,180 @@
}
};
// ============== VIRTUAL SCROLLING SYSTEM ==============
// Spacer elements for virtual scroll height
let topSpacer = null;
let bottomSpacer = null;
const initVirtualScrollElements = () => {
if (!objectsTableBody) return;
// Create spacer rows if they don't exist
if (!topSpacer) {
topSpacer = document.createElement('tr');
topSpacer.id = 'virtual-top-spacer';
topSpacer.innerHTML = '<td colspan="4" style="padding: 0; border: none;"></td>';
}
if (!bottomSpacer) {
bottomSpacer = document.createElement('tr');
bottomSpacer.id = 'virtual-bottom-spacer';
bottomSpacer.innerHTML = '<td colspan="4" style="padding: 0; border: none;"></td>';
}
};
// Compute which items should be visible based on current view
const computeVisibleItems = () => {
const items = [];
const folders = new Set();
allObjects.forEach(obj => {
if (!obj.key.startsWith(currentPrefix)) return;
const remainder = obj.key.slice(currentPrefix.length);
const slashIndex = remainder.indexOf('/');
if (slashIndex === -1) {
// File in current folder - filter on the displayed filename (remainder)
if (!currentFilterTerm || remainder.toLowerCase().includes(currentFilterTerm)) {
items.push({ type: 'file', data: obj, displayKey: remainder });
}
} else {
// Folder
const folderName = remainder.slice(0, slashIndex);
const folderPath = currentPrefix + folderName + '/';
if (!folders.has(folderPath)) {
folders.add(folderPath);
// Filter on the displayed folder name only
if (!currentFilterTerm || folderName.toLowerCase().includes(currentFilterTerm)) {
items.push({ type: 'folder', path: folderPath, displayKey: folderName });
}
}
}
});
// Sort: folders first, then files
items.sort((a, b) => {
if (a.type === 'folder' && b.type === 'file') return -1;
if (a.type === 'file' && b.type === 'folder') return 1;
const aKey = a.type === 'folder' ? a.path : a.data.key;
const bKey = b.type === 'folder' ? b.path : b.data.key;
return aKey.localeCompare(bKey);
});
return items;
};
// Render only the visible rows based on scroll position
const renderVirtualRows = () => {
if (!objectsTableBody || !scrollContainer) return;
const containerHeight = scrollContainer.clientHeight;
const scrollTop = scrollContainer.scrollTop;
// Calculate visible range
const startIndex = Math.max(0, Math.floor(scrollTop / ROW_HEIGHT) - BUFFER_ROWS);
const endIndex = Math.min(visibleItems.length, Math.ceil((scrollTop + containerHeight) / ROW_HEIGHT) + BUFFER_ROWS);
// Skip if range hasn't changed significantly
if (startIndex === renderedRange.start && endIndex === renderedRange.end) return;
renderedRange = { start: startIndex, end: endIndex };
// Clear and rebuild
objectsTableBody.innerHTML = '';
// Add top spacer
initVirtualScrollElements();
topSpacer.querySelector('td').style.height = `${startIndex * ROW_HEIGHT}px`;
objectsTableBody.appendChild(topSpacer);
// Render visible rows
for (let i = startIndex; i < endIndex; i++) {
const item = visibleItems[i];
if (!item) continue;
let row;
if (item.type === 'folder') {
row = createFolderRow(item.path, item.displayKey);
} else {
row = createObjectRow(item.data, item.displayKey);
}
row.dataset.virtualIndex = i;
objectsTableBody.appendChild(row);
}
// Add bottom spacer
const remainingRows = visibleItems.length - endIndex;
bottomSpacer.querySelector('td').style.height = `${remainingRows * ROW_HEIGHT}px`;
objectsTableBody.appendChild(bottomSpacer);
// Re-attach handlers to new rows
attachRowHandlers();
};
// Debounced scroll handler for virtual scrolling
let scrollTimeout = null;
const handleVirtualScroll = () => {
if (scrollTimeout) cancelAnimationFrame(scrollTimeout);
scrollTimeout = requestAnimationFrame(renderVirtualRows);
};
// Refresh the virtual list (after data changes or navigation)
const refreshVirtualList = () => {
visibleItems = computeVisibleItems();
renderedRange = { start: -1, end: -1 }; // Force re-render
if (visibleItems.length === 0) {
if (allObjects.length === 0 && !hasMoreObjects) {
showEmptyState();
} else {
// Empty folder
objectsTableBody.innerHTML = `
<tr>
<td colspan="4" class="py-5">
<div class="empty-state">
<div class="empty-state-icon mx-auto" style="width: 64px; height: 64px;">
<svg xmlns="http://www.w3.org/2000/svg" width="28" height="28" fill="currentColor" viewBox="0 0 16 16">
<path d="M9.828 3h3.982a2 2 0 0 1 1.992 2.181l-.637 7A2 2 0 0 1 13.174 14H2.825a2 2 0 0 1-1.991-1.819l-.637-7a1.99 1.99 0 0 1 .342-1.31L.5 3a2 2 0 0 1 2-2h3.672a2 2 0 0 1 1.414.586l.828.828A2 2 0 0 0 9.828 3zm-8.322.12C1.72 3.042 1.95 3 2.19 3h5.396l-.707-.707A1 1 0 0 0 6.172 2H2.5a1 1 0 0 0-1 .981l.006.139z"/>
</svg>
</div>
<h6 class="mb-2">Empty folder</h6>
<p class="text-muted small mb-0">This folder contains no objects${hasMoreObjects ? ' yet. Loading more...' : '.'}</p>
</div>
</td>
</tr>
`;
}
} else {
renderVirtualRows();
}
updateFolderViewStatus();
};
// Update status bar
const updateFolderViewStatus = () => {
const folderViewStatusEl = document.getElementById('folder-view-status');
if (!folderViewStatusEl) return;
if (currentPrefix) {
const folderCount = visibleItems.filter(i => i.type === 'folder').length;
const fileCount = visibleItems.filter(i => i.type === 'file').length;
folderViewStatusEl.innerHTML = `<span class="text-muted">${folderCount} folder${folderCount !== 1 ? 's' : ''}, ${fileCount} file${fileCount !== 1 ? 's' : ''} in this view</span>`;
folderViewStatusEl.classList.remove('d-none');
} else {
folderViewStatusEl.classList.add('d-none');
}
};
// ============== DATA LOADING ==============
const loadObjects = async (append = false) => {
if (isLoadingObjects) return;
isLoadingObjects = true;
if (!append) {
if (objectsLoadingRow) objectsLoadingRow.style.display = '';
nextContinuationToken = null;
loadedObjectCount = 0;
@@ -2026,35 +2219,18 @@
totalObjectCount = data.total_count || 0;
nextContinuationToken = data.next_continuation_token;
if (!append) {
if (objectsLoadingRow) objectsLoadingRow.remove();
if (data.objects.length === 0) {
showEmptyState();
updateObjectCountBadge();
isLoadingObjects = false;
return;
}
objectsTableBody.innerHTML = '';
if (!append && objectsLoadingRow) {
objectsLoadingRow.remove();
}
// Store lightweight object metadata (no DOM elements!)
data.objects.forEach(obj => {
const row = createObjectRow(obj);
objectsTableBody.appendChild(row);
loadedObjectCount++;
// Apply current filter to newly loaded objects
if (currentFilterTerm) {
const keyLower = obj.key.toLowerCase();
row.style.display = keyLower.includes(currentFilterTerm) ? '' : 'none';
}
allObjects.push({
key: obj.key,
size: obj.size,
lastModified: obj.last_modified,
lastModifiedDisplay: obj.last_modified_display,
etag: obj.etag,
previewUrl: obj.preview_url,
downloadUrl: obj.download_url,
@@ -2062,86 +2238,28 @@
deleteEndpoint: obj.delete_endpoint,
metadata: JSON.stringify(obj.metadata || {}),
versionsEndpoint: obj.versions_endpoint,
restoreTemplate: obj.restore_template,
element: row
restoreTemplate: obj.restore_template
});
});
updateObjectCountBadge();
// Track if there are more objects to load
hasMoreObjects = data.is_truncated;
if (loadMoreStatus) {
if (data.is_truncated) {
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} of ${totalObjectCount.toLocaleString()} objects loaded`;
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} of ${totalObjectCount.toLocaleString()} loaded`;
} else {
loadMoreStatus.textContent = `All ${loadedObjectCount.toLocaleString()} objects loaded`;
loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`;
}
}
// Update Load More button visibility
if (typeof updateLoadMoreButton === 'function') {
updateLoadMoreButton();
}
// Track the count of items in current folder before re-rendering
let prevFolderItemCount = 0;
if (currentPrefix && append) {
const prevState = getFoldersAtPrefix(currentPrefix);
prevFolderItemCount = prevState.folders.length + prevState.files.length;
}
if (typeof initFolderNavigation === 'function') {
initFolderNavigation();
}
attachRowHandlers();
// If we're in a nested folder and loaded more objects, scroll to show newly loaded content
if (currentPrefix && append) {
const newState = getFoldersAtPrefix(currentPrefix);
const newFolderItemCount = newState.folders.length + newState.files.length;
const addedCount = newFolderItemCount - prevFolderItemCount;
if (addedCount > 0) {
// Show a brief notification about newly loaded items in the current folder
const folderViewStatusEl = document.getElementById('folder-view-status');
if (folderViewStatusEl) {
folderViewStatusEl.innerHTML = `<span class="text-success fw-medium">+${addedCount} new item${addedCount !== 1 ? 's' : ''} loaded in this folder</span>`;
folderViewStatusEl.classList.remove('d-none');
// Reset to normal status after 3 seconds
setTimeout(() => {
if (typeof updateFolderViewStatus === 'function') {
updateFolderViewStatus();
}
}, 3000);
}
// Scroll to show the first newly added item
const allRows = objectsTableBody.querySelectorAll('tr:not([style*="display: none"])');
if (allRows.length > prevFolderItemCount) {
const firstNewRow = allRows[prevFolderItemCount];
if (firstNewRow) {
firstNewRow.scrollIntoView({ behavior: 'smooth', block: 'center' });
// Briefly highlight the new rows
for (let i = prevFolderItemCount; i < allRows.length; i++) {
allRows[i].classList.add('table-info');
setTimeout(() => {
allRows[i].classList.remove('table-info');
}, 2000);
}
}
}
} else if (hasMoreObjects) {
// Objects were loaded but none were in the current folder - show a hint
const folderViewStatusEl = document.getElementById('folder-view-status');
if (folderViewStatusEl) {
folderViewStatusEl.innerHTML = `<span class="text-muted">Loaded more objects (not in this folder). <button type="button" class="btn btn-link btn-sm p-0" onclick="navigateToFolder('')">Go to root</button> to see all.</span>`;
folderViewStatusEl.classList.remove('d-none');
}
}
}
// Refresh virtual scroll view
refreshVirtualList();
renderBreadcrumb(currentPrefix);
} catch (error) {
console.error('Failed to load objects:', error);
@@ -2152,7 +2270,6 @@
}
} finally {
isLoadingObjects = false;
// Hide loading spinner
if (loadMoreSpinner) {
loadMoreSpinner.classList.add('d-none');
}
@@ -2160,16 +2277,15 @@
};
const attachRowHandlers = () => {
// Attach handlers to object rows
const objectRows = document.querySelectorAll('[data-object-row]');
objectRows.forEach(row => {
if (row.dataset.handlersAttached) return;
row.dataset.handlersAttached = 'true';
const deleteBtn = row.querySelector('[data-delete-object]');
deleteBtn?.addEventListener('click', (e) => {
e.stopPropagation();
const deleteModalEl = document.getElementById('deleteObjectModal');
const deleteModal = deleteModalEl ? bootstrap.Modal.getOrCreateInstance(deleteModalEl) : null;
const deleteObjectForm = document.getElementById('deleteObjectForm');
@@ -2186,17 +2302,63 @@
selectCheckbox?.addEventListener('change', () => {
toggleRowSelection(row, selectCheckbox.checked);
});
// Restore selection state
if (selectedRows.has(row.dataset.key)) {
selectCheckbox.checked = true;
row.classList.add('table-active');
}
});
// Attach handlers to folder rows
const folderRows = document.querySelectorAll('.folder-row');
folderRows.forEach(row => {
if (row.dataset.handlersAttached) return;
row.dataset.handlersAttached = 'true';
const folderPath = row.dataset.folderPath;
const checkbox = row.querySelector('[data-folder-select]');
checkbox?.addEventListener('change', (e) => {
e.stopPropagation();
// Select all objects in this folder
const folderObjects = allObjects.filter(obj => obj.key.startsWith(folderPath));
folderObjects.forEach(obj => {
if (checkbox.checked) {
selectedRows.set(obj.key, obj);
} else {
selectedRows.delete(obj.key);
}
});
updateBulkDeleteState();
});
const folderBtn = row.querySelector('button');
folderBtn?.addEventListener('click', (e) => {
e.stopPropagation();
navigateToFolder(folderPath);
});
row.addEventListener('click', (e) => {
if (e.target.closest('[data-folder-select]') || e.target.closest('button')) return;
navigateToFolder(folderPath);
});
});
updateBulkDeleteState();
};
// Infinite scroll: use IntersectionObserver to auto-load more objects
// Scroll container reference (needed for virtual scrolling)
const scrollSentinel = document.getElementById('scroll-sentinel');
const scrollContainer = document.querySelector('.objects-table-container');
const loadMoreBtn = document.getElementById('load-more-btn');
// Load More button click handler (fallback for mobile)
// Virtual scroll: listen to scroll events
if (scrollContainer) {
scrollContainer.addEventListener('scroll', handleVirtualScroll, { passive: true });
}
// Load More button click handler (fallback)
loadMoreBtn?.addEventListener('click', () => {
if (hasMoreObjects && !isLoadingObjects) {
loadObjects(true);
@@ -2210,8 +2372,8 @@
}
}
// Auto-load more when near bottom (for loading all data)
if (scrollSentinel && scrollContainer) {
// Observer for scrolling within the container (desktop)
const containerObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting && hasMoreObjects && !isLoadingObjects) {
@@ -2220,12 +2382,11 @@
});
}, {
root: scrollContainer,
rootMargin: '100px',
rootMargin: '500px', // Load more earlier for smoother experience
threshold: 0
});
containerObserver.observe(scrollSentinel);
// Observer for page scrolling (mobile - when container is not scrollable)
const viewportObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting && hasMoreObjects && !isLoadingObjects) {
@@ -2233,14 +2394,14 @@
}
});
}, {
root: null, // viewport
rootMargin: '200px',
root: null,
rootMargin: '500px',
threshold: 0
});
viewportObserver.observe(scrollSentinel);
}
// Page size selector
// Page size selector (now controls batch size)
const pageSizeSelect = document.getElementById('page-size-select');
pageSizeSelect?.addEventListener('change', (e) => {
pageSize = parseInt(e.target.value, 10);
@@ -2252,7 +2413,6 @@
const folderBreadcrumb = document.getElementById('folder-breadcrumb');
const objectsTableBody = document.querySelector('#objects-table tbody');
let currentPrefix = '';
if (objectsTableBody) {
objectsTableBody.addEventListener('click', (e) => {
@@ -2369,8 +2529,8 @@
return allObjects.filter(obj => obj.key.startsWith(folderPrefix));
};
const createFolderRow = (folderPath) => {
const folderName = folderPath.slice(currentPrefix.length).replace(/\/$/, '');
const createFolderRow = (folderPath, displayName = null) => {
const folderName = displayName || folderPath.slice(currentPrefix.length).replace(/\/$/, '');
const { count: objectCount, mayHaveMore } = countObjectsInFolder(folderPath);
const countDisplay = mayHaveMore ? `${objectCount}+` : objectCount;
@@ -2404,37 +2564,19 @@
</td>
`;
const checkbox = tr.querySelector('[data-folder-select]');
checkbox?.addEventListener('change', (e) => {
e.stopPropagation();
const folderObjects = getObjectsInFolder(folderPath);
folderObjects.forEach(obj => {
const objCheckbox = obj.element.querySelector('[data-object-select]');
if (objCheckbox) {
objCheckbox.checked = checkbox.checked;
}
toggleRowSelection(obj.element, checkbox.checked);
});
});
const folderBtn = tr.querySelector('button');
folderBtn?.addEventListener('click', (e) => {
e.stopPropagation();
navigateToFolder(folderPath);
});
tr.addEventListener('click', (e) => {
if (e.target.closest('[data-folder-select]') || e.target.closest('button')) return;
navigateToFolder(folderPath);
});
return tr;
};
// Instant client-side folder navigation (no server round-trip!)
const navigateToFolder = (prefix) => {
currentPrefix = prefix;
// Scroll to top when navigating
if (scrollContainer) scrollContainer.scrollTop = 0;
// Instant re-render from already-loaded data
refreshVirtualList();
renderBreadcrumb(prefix);
renderObjectsView();
selectedRows.clear();
@@ -2442,14 +2584,6 @@
updateBulkDeleteState();
}
if (typeof updateFolderViewStatus === 'function') {
updateFolderViewStatus();
}
if (typeof updateFilterWarning === 'function') {
updateFilterWarning();
}
if (previewPanel) previewPanel.classList.add('d-none');
if (previewEmpty) previewEmpty.classList.remove('d-none');
activeRow = null;
@@ -2651,12 +2785,10 @@
bulkDeleteConfirm.disabled = selectedCount === 0 || bulkDeleting;
}
if (selectAllCheckbox) {
const visibleRowsRaw = hasFolders()
? allObjects.filter(obj => obj.key.startsWith(currentPrefix) && !obj.key.slice(currentPrefix.length).includes('/')).map(obj => obj.element)
: Array.from(document.querySelectorAll('[data-object-row]'));
const total = visibleRowsRaw.filter(r => r.style.display !== 'none').length;
const visibleSelectedCount = visibleRowsRaw.filter(r => r.style.display !== 'none' && selectedRows.has(r.dataset.key)).length;
// With virtual scrolling, count files in current folder from visibleItems
const filesInView = visibleItems.filter(item => item.type === 'file');
const total = filesInView.length;
const visibleSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length;
selectAllCheckbox.disabled = total === 0;
selectAllCheckbox.checked = visibleSelectedCount > 0 && visibleSelectedCount === total && total > 0;
selectAllCheckbox.indeterminate = visibleSelectedCount > 0 && visibleSelectedCount < total;
@@ -3287,59 +3419,13 @@
}
};
const updateFolderViewStatus = () => {
if (!folderViewStatus || !loadMoreStatus) return;
if (currentPrefix) {
const { folders, files } = getFoldersAtPrefix(currentPrefix);
const visibleCount = folders.length + files.length;
const folderObjectCount = allObjects.filter(obj => obj.key.startsWith(currentPrefix)).length;
const folderMayHaveMore = hasMoreObjects && folderObjectCount > 0;
if (folderMayHaveMore) {
folderViewStatus.textContent = `Showing ${visibleCount} items in folder • more may be available`;
folderViewStatus.classList.remove('d-none');
loadMoreStatus.classList.add('d-none');
} else {
folderViewStatus.textContent = `${visibleCount} items in folder`;
folderViewStatus.classList.remove('d-none');
loadMoreStatus.classList.add('d-none');
}
} else {
folderViewStatus.classList.add('d-none');
loadMoreStatus.classList.remove('d-none');
}
};
document.getElementById('object-search')?.addEventListener('input', (event) => {
currentFilterTerm = event.target.value.toLowerCase();
updateFilterWarning();
if (hasFolders()) {
const { folders, files } = getFoldersAtPrefix(currentPrefix);
const tbody = objectsTableBody;
tbody.innerHTML = '';
folders.forEach(folderPath => {
const folderName = folderPath.slice(currentPrefix.length).replace(/\/$/, '').toLowerCase();
if (folderName.includes(currentFilterTerm)) {
tbody.appendChild(createFolderRow(folderPath));
}
});
files.forEach(obj => {
const keyName = obj.key.slice(currentPrefix.length).toLowerCase();
if (keyName.includes(currentFilterTerm)) {
tbody.appendChild(obj.element);
obj.element.style.display = '';
}
});
} else {
// Filter all loaded objects (including newly loaded ones)
document.querySelectorAll('[data-object-row]').forEach((row) => {
const key = row.dataset.key.toLowerCase();
row.style.display = key.includes(currentFilterTerm) ? '' : 'none';
});
}
// Use the virtual scrolling system for filtering - it properly handles
// both folder view and flat view, and works with large object counts
refreshVirtualList();
});
refreshVersionsButton?.addEventListener('click', () => {
@@ -3780,6 +3866,12 @@
const lastSyncEl = document.getElementById('replication-last-sync');
const lastSyncTimeEl = document.querySelector('[data-stat="last-sync-time"]');
const lastSyncKeyEl = document.querySelector('[data-stat="last-sync-key"]');
const endpointWarning = document.getElementById('replication-endpoint-warning');
const endpointErrorEl = document.getElementById('replication-endpoint-error');
const statusAlert = document.getElementById('replication-status-alert');
const statusBadge = document.getElementById('replication-status-badge');
const statusText = document.getElementById('replication-status-text');
const pauseForm = document.getElementById('pause-replication-form');
const loadReplicationStats = async () => {
try {
@@ -3787,6 +3879,48 @@
if (!resp.ok) throw new Error('Failed to fetch stats');
const data = await resp.json();
// Handle endpoint health status
if (data.endpoint_healthy === false) {
// Show warning and hide success alert
if (endpointWarning) {
endpointWarning.classList.remove('d-none');
if (endpointErrorEl && data.endpoint_error) {
endpointErrorEl.textContent = data.endpoint_error + '. Replication is paused until the endpoint is available.';
}
}
if (statusAlert) statusAlert.classList.add('d-none');
// Update status badge to show "Paused" with warning styling
if (statusBadge) {
statusBadge.className = 'badge bg-warning-subtle text-warning px-3 py-2';
statusBadge.innerHTML = `
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M5.5 3.5A1.5 1.5 0 0 1 7 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5zm5 0A1.5 1.5 0 0 1 12 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5z"/>
</svg>
<span>Paused (Endpoint Unavailable)</span>`;
}
// Hide the pause button since replication is effectively already paused
if (pauseForm) pauseForm.classList.add('d-none');
} else {
// Hide warning and show success alert
if (endpointWarning) endpointWarning.classList.add('d-none');
if (statusAlert) statusAlert.classList.remove('d-none');
// Restore status badge to show "Enabled"
if (statusBadge) {
statusBadge.className = 'badge bg-success-subtle text-success px-3 py-2';
statusBadge.innerHTML = `
<svg xmlns="http://www.w3.org/2000/svg" width="12" height="12" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zm-3.97-3.03a.75.75 0 0 0-1.08.022L7.477 9.417 5.384 7.323a.75.75 0 0 0-1.06 1.06L6.97 11.03a.75.75 0 0 0 1.079-.02l3.992-4.99a.75.75 0 0 0-.01-1.05z"/>
</svg>
<span>Enabled</span>`;
}
// Show the pause button
if (pauseForm) pauseForm.classList.remove('d-none');
}
if (syncedEl) syncedEl.textContent = data.objects_synced;
if (pendingEl) {
pendingEl.textContent = data.objects_pending;

View File

@@ -104,6 +104,7 @@
<table class="table table-hover align-middle mb-0">
<thead class="table-light">
<tr>
<th scope="col" style="width: 50px;">Status</th>
<th scope="col">Name</th>
<th scope="col">Endpoint</th>
<th scope="col">Region</th>
@@ -113,7 +114,12 @@
</thead>
<tbody>
{% for conn in connections %}
<tr>
<tr data-connection-id="{{ conn.id }}">
<td class="text-center">
<span class="connection-status" data-status="checking" title="Checking...">
<span class="spinner-border spinner-border-sm text-muted" role="status" style="width: 12px; height: 12px;"></span>
</span>
</td>
<td>
<div class="d-flex align-items-center gap-2">
<div class="connection-icon">
@@ -301,7 +307,11 @@
const formData = new FormData(form);
const data = Object.fromEntries(formData.entries());
resultDiv.innerHTML = '<div class="text-info"><span class="spinner-border spinner-border-sm" role="status" aria-hidden="true"></span> Testing...</div>';
resultDiv.innerHTML = '<div class="text-info"><span class="spinner-border spinner-border-sm" role="status" aria-hidden="true"></span> Testing connection...</div>';
// Use AbortController to timeout client-side after 20 seconds
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 20000);
try {
const response = await fetch("{{ url_for('ui.test_connection') }}", {
@@ -310,17 +320,44 @@
"Content-Type": "application/json",
"X-CSRFToken": "{{ csrf_token() }}"
},
body: JSON.stringify(data)
body: JSON.stringify(data),
signal: controller.signal
});
clearTimeout(timeoutId);
const result = await response.json();
if (response.ok) {
resultDiv.innerHTML = `<div class="text-success"><i class="bi bi-check-circle"></i> ${result.message}</div>`;
resultDiv.innerHTML = `<div class="text-success">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zm-3.97-3.03a.75.75 0 0 0-1.08.022L7.477 9.417 5.384 7.323a.75.75 0 0 0-1.06 1.06L6.97 11.03a.75.75 0 0 0 1.079-.02l3.992-4.99a.75.75 0 0 0-.01-1.05z"/>
</svg>
${result.message}
</div>`;
} else {
resultDiv.innerHTML = `<div class="text-danger"><i class="bi bi-exclamation-circle"></i> ${result.message}</div>`;
resultDiv.innerHTML = `<div class="text-danger">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zM5.354 4.646a.5.5 0 1 0-.708.708L7.293 8l-2.647 2.646a.5.5 0 0 0 .708.708L8 8.707l2.646 2.647a.5.5 0 0 0 .708-.708L8.707 8l2.647-2.646a.5.5 0 0 0-.708-.708L8 7.293 5.354 4.646z"/>
</svg>
${result.message}
</div>`;
}
} catch (error) {
resultDiv.innerHTML = `<div class="text-danger"><i class="bi bi-exclamation-circle"></i> Connection failed</div>`;
clearTimeout(timeoutId);
if (error.name === 'AbortError') {
resultDiv.innerHTML = `<div class="text-danger">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zM5.354 4.646a.5.5 0 1 0-.708.708L7.293 8l-2.647 2.646a.5.5 0 0 0 .708.708L8 8.707l2.646 2.647a.5.5 0 0 0 .708-.708L8.707 8l2.647-2.646a.5.5 0 0 0-.708-.708L8 7.293 5.354 4.646z"/>
</svg>
Connection test timed out - endpoint may be unreachable
</div>`;
} else {
resultDiv.innerHTML = `<div class="text-danger">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="me-1" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zM5.354 4.646a.5.5 0 1 0-.708.708L7.293 8l-2.647 2.646a.5.5 0 0 0 .708.708L8 8.707l2.646 2.647a.5.5 0 0 0 .708-.708L8.707 8l2.647-2.646a.5.5 0 0 0-.708-.708L8 7.293 5.354 4.646z"/>
</svg>
Connection failed: Network error
</div>`;
}
}
}
@@ -358,5 +395,54 @@
const form = document.getElementById('deleteConnectionForm');
form.action = "{{ url_for('ui.delete_connection', connection_id='CONN_ID') }}".replace('CONN_ID', id);
});
// Check connection health for each connection in the table
// Uses staggered requests to avoid overwhelming the server
async function checkConnectionHealth(connectionId, statusEl) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 15000);
const response = await fetch(`/ui/connections/${connectionId}/health`, {
signal: controller.signal
});
clearTimeout(timeoutId);
const data = await response.json();
if (data.healthy) {
statusEl.innerHTML = `
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="text-success" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zm-3.97-3.03a.75.75 0 0 0-1.08.022L7.477 9.417 5.384 7.323a.75.75 0 0 0-1.06 1.06L6.97 11.03a.75.75 0 0 0 1.079-.02l3.992-4.99a.75.75 0 0 0-.01-1.05z"/>
</svg>`;
statusEl.setAttribute('data-status', 'healthy');
statusEl.setAttribute('title', 'Connected');
} else {
statusEl.innerHTML = `
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="text-danger" viewBox="0 0 16 16">
<path d="M16 8A8 8 0 1 1 0 8a8 8 0 0 1 16 0zM5.354 4.646a.5.5 0 1 0-.708.708L7.293 8l-2.647 2.646a.5.5 0 0 0 .708.708L8 8.707l2.646 2.647a.5.5 0 0 0 .708-.708L8.707 8l2.647-2.646a.5.5 0 0 0-.708-.708L8 7.293 5.354 4.646z"/>
</svg>`;
statusEl.setAttribute('data-status', 'unhealthy');
statusEl.setAttribute('title', data.error || 'Unreachable');
}
} catch (error) {
statusEl.innerHTML = `
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="text-warning" viewBox="0 0 16 16">
<path d="M8.982 1.566a1.13 1.13 0 0 0-1.96 0L.165 13.233c-.457.778.091 1.767.98 1.767h13.713c.889 0 1.438-.99.98-1.767L8.982 1.566zM8 5c.535 0 .954.462.9.995l-.35 3.507a.552.552 0 0 1-1.1 0L7.1 5.995A.905.905 0 0 1 8 5zm.002 6a1 1 0 1 1 0 2 1 1 0 0 1 0-2z"/>
</svg>`;
statusEl.setAttribute('data-status', 'unknown');
statusEl.setAttribute('title', 'Could not check status');
}
}
// Stagger health checks to avoid all requests at once
const connectionRows = document.querySelectorAll('tr[data-connection-id]');
connectionRows.forEach((row, index) => {
const connectionId = row.getAttribute('data-connection-id');
const statusEl = row.querySelector('.connection-status');
if (statusEl) {
// Stagger requests by 200ms each
setTimeout(() => checkConnectionHealth(connectionId, statusEl), index * 200);
}
});
</script>
{% endblock %}

View File

@@ -47,9 +47,9 @@ python run.py --mode ui
<table class="table table-sm table-bordered small mb-0">
<thead class="table-light">
<tr>
<th>Variable</th>
<th>Default</th>
<th>Description</th>
<th style="min-width: 180px;">Variable</th>
<th style="min-width: 120px;">Default</th>
<th class="text-wrap" style="min-width: 250px;">Description</th>
</tr>
</thead>
<tbody>