diff --git a/app/__init__.py b/app/__init__.py index cd9e842..4d7d68f 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -45,7 +45,6 @@ def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: try: shutil.move(str(legacy_path), str(active_path)) except OSError: - # Fall back to copy + delete if move fails (e.g., cross-device) shutil.copy2(legacy_path, active_path) try: legacy_path.unlink(missing_ok=True) @@ -101,32 +100,28 @@ def create_app( bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"])) secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300)) - # Initialize Replication components - # Store config files in the system config directory for consistency storage_root = Path(app.config["STORAGE_ROOT"]) config_dir = storage_root / ".myfsio.sys" / "config" config_dir.mkdir(parents=True, exist_ok=True) - # Define paths with migration from legacy locations connections_path = _migrate_config_file( active_path=config_dir / "connections.json", legacy_paths=[ - storage_root / ".myfsio.sys" / "connections.json", # Previous location - storage_root / ".connections.json", # Original legacy location + storage_root / ".myfsio.sys" / "connections.json", + storage_root / ".connections.json", ], ) replication_rules_path = _migrate_config_file( active_path=config_dir / "replication_rules.json", legacy_paths=[ - storage_root / ".myfsio.sys" / "replication_rules.json", # Previous location - storage_root / ".replication_rules.json", # Original legacy location + storage_root / ".myfsio.sys" / "replication_rules.json", + storage_root / ".replication_rules.json", ], ) connections = ConnectionStore(connections_path) replication = ReplicationManager(storage, connections, replication_rules_path) - # Initialize encryption and KMS encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), @@ -141,7 +136,6 @@ def create_app( kms_manager = KMSManager(kms_keys_path, kms_master_key_path) encryption_manager.set_kms_provider(kms_manager) - # Wrap storage with encryption layer if encryption is enabled if app.config.get("ENCRYPTION_ENABLED", False): from .encrypted_storage import EncryptedObjectStorage storage = EncryptedObjectStorage(storage, encryption_manager) @@ -177,13 +171,22 @@ def create_app( @app.template_filter("timestamp_to_datetime") def timestamp_to_datetime(value: float) -> str: - """Format Unix timestamp as human-readable datetime.""" - from datetime import datetime + """Format Unix timestamp as human-readable datetime in configured timezone.""" + from datetime import datetime, timezone as dt_timezone + from zoneinfo import ZoneInfo if not value: return "Never" try: - dt = datetime.fromtimestamp(value) - return dt.strftime("%Y-%m-%d %H:%M:%S") + dt_utc = datetime.fromtimestamp(value, dt_timezone.utc) + display_tz = app.config.get("DISPLAY_TIMEZONE", "UTC") + if display_tz and display_tz != "UTC": + try: + tz = ZoneInfo(display_tz) + dt_local = dt_utc.astimezone(tz) + return dt_local.strftime("%Y-%m-%d %H:%M:%S") + except (KeyError, ValueError): + pass + return dt_utc.strftime("%Y-%m-%d %H:%M:%S UTC") except (ValueError, OSError): return "Unknown" @@ -244,7 +247,7 @@ def _configure_cors(app: Flask) -> None: class _RequestContextFilter(logging.Filter): """Inject request-specific attributes into log records.""" - def filter(self, record: logging.LogRecord) -> bool: # pragma: no cover - simple boilerplate + def filter(self, record: logging.LogRecord) -> bool: if has_request_context(): record.request_id = getattr(g, "request_id", "-") record.path = request.path diff --git a/app/bucket_policies.py b/app/bucket_policies.py index 73a28b7..d576030 100644 --- a/app/bucket_policies.py +++ b/app/bucket_policies.py @@ -188,7 +188,6 @@ class BucketPolicyStore: except FileNotFoundError: return None - # ------------------------------------------------------------------ def evaluate( self, access_key: Optional[str], @@ -229,7 +228,6 @@ class BucketPolicyStore: self._policies.pop(bucket, None) self._persist() - # ------------------------------------------------------------------ def _load(self) -> None: try: content = self.policy_path.read_text(encoding='utf-8') diff --git a/app/config.py b/app/config.py index b5b2a92..206cce6 100644 --- a/app/config.py +++ b/app/config.py @@ -73,6 +73,7 @@ class AppConfig: kms_enabled: bool kms_keys_path: Path default_encryption_algorithm: str + display_timezone: str @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -153,15 +154,15 @@ class AppConfig: cors_allow_headers = _csv(str(_get("CORS_ALLOW_HEADERS", "*")), ["*"]) cors_expose_headers = _csv(str(_get("CORS_EXPOSE_HEADERS", "*")), ["*"]) session_lifetime_days = int(_get("SESSION_LIFETIME_DAYS", 30)) - bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60)) # Default 60 seconds + bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60)) - # Encryption settings encryption_enabled = str(_get("ENCRYPTION_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} encryption_keys_dir = storage_root / ".myfsio.sys" / "keys" encryption_master_key_path = Path(_get("ENCRYPTION_MASTER_KEY_PATH", encryption_keys_dir / "master.key")).resolve() kms_enabled = str(_get("KMS_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} kms_keys_path = Path(_get("KMS_KEYS_PATH", encryption_keys_dir / "kms_keys.json")).resolve() default_encryption_algorithm = str(_get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256")) + display_timezone = str(_get("DISPLAY_TIMEZONE", "UTC")) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -196,7 +197,8 @@ class AppConfig: encryption_master_key_path=encryption_master_key_path, kms_enabled=kms_enabled, kms_keys_path=kms_keys_path, - default_encryption_algorithm=default_encryption_algorithm) + default_encryption_algorithm=default_encryption_algorithm, + display_timezone=display_timezone) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -206,7 +208,6 @@ class AppConfig: """ issues = [] - # Check if storage_root is writable try: test_file = self.storage_root / ".write_test" test_file.touch() @@ -214,24 +215,20 @@ class AppConfig: except (OSError, PermissionError) as e: issues.append(f"CRITICAL: STORAGE_ROOT '{self.storage_root}' is not writable: {e}") - # Check if storage_root looks like a temp directory storage_str = str(self.storage_root).lower() if "/tmp" in storage_str or "\\temp" in storage_str or "appdata\\local\\temp" in storage_str: issues.append(f"WARNING: STORAGE_ROOT '{self.storage_root}' appears to be a temporary directory. Data may be lost on reboot!") - # Check if IAM config path is under storage_root try: self.iam_config_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: IAM_CONFIG '{self.iam_config_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting IAM_CONFIG explicitly or ensuring paths are aligned.") - # Check if bucket policy path is under storage_root try: self.bucket_policy_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: BUCKET_POLICY_PATH '{self.bucket_policy_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting BUCKET_POLICY_PATH explicitly.") - # Check if log path is writable try: self.log_path.parent.mkdir(parents=True, exist_ok=True) test_log = self.log_path.parent / ".write_test" @@ -240,26 +237,22 @@ class AppConfig: except (OSError, PermissionError) as e: issues.append(f"WARNING: Log directory '{self.log_path.parent}' is not writable: {e}") - # Check log path location log_str = str(self.log_path).lower() if "/tmp" in log_str or "\\temp" in log_str or "appdata\\local\\temp" in log_str: issues.append(f"WARNING: LOG_DIR '{self.log_path.parent}' appears to be a temporary directory. Logs may be lost on reboot!") - # Check if encryption keys path is under storage_root (when encryption is enabled) if self.encryption_enabled: try: self.encryption_master_key_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: ENCRYPTION_MASTER_KEY_PATH '{self.encryption_master_key_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.") - # Check if KMS keys path is under storage_root (when KMS is enabled) if self.kms_enabled: try: self.kms_keys_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: KMS_KEYS_PATH '{self.kms_keys_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.") - # Warn about production settings if self.secret_key == "dev-secret-key": issues.append("WARNING: Using default SECRET_KEY. Set SECRET_KEY environment variable for production.") @@ -330,4 +323,5 @@ class AppConfig: "KMS_ENABLED": self.kms_enabled, "KMS_KEYS_PATH": str(self.kms_keys_path), "DEFAULT_ENCRYPTION_ALGORITHM": self.default_encryption_algorithm, + "DISPLAY_TIMEZONE": self.display_timezone, } diff --git a/app/iam.py b/app/iam.py index d8c3c8d..465aca1 100644 --- a/app/iam.py +++ b/app/iam.py @@ -6,7 +6,7 @@ import math import secrets from collections import deque from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Deque, Dict, Iterable, List, Optional, Sequence, Set @@ -125,7 +125,6 @@ class IamService: except OSError: pass - # ---------------------- authz helpers ---------------------- def authenticate(self, access_key: str, secret_key: str) -> Principal: self._maybe_reload() access_key = (access_key or "").strip() @@ -149,7 +148,7 @@ class IamService: return attempts = self._failed_attempts.setdefault(access_key, deque()) self._prune_attempts(attempts) - attempts.append(datetime.now()) + attempts.append(datetime.now(timezone.utc)) def _clear_failed_attempts(self, access_key: str) -> None: if not access_key: @@ -157,7 +156,7 @@ class IamService: self._failed_attempts.pop(access_key, None) def _prune_attempts(self, attempts: Deque[datetime]) -> None: - cutoff = datetime.now() - self.auth_lockout_window + cutoff = datetime.now(timezone.utc) - self.auth_lockout_window while attempts and attempts[0] < cutoff: attempts.popleft() @@ -178,7 +177,7 @@ class IamService: if len(attempts) < self.auth_max_attempts: return 0 oldest = attempts[0] - elapsed = (datetime.now() - oldest).total_seconds() + elapsed = (datetime.now(timezone.utc) - oldest).total_seconds() return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) def principal_for_key(self, access_key: str) -> Principal: @@ -218,7 +217,6 @@ class IamService: return True return False - # ---------------------- management helpers ---------------------- def list_users(self) -> List[Dict[str, Any]]: listing: List[Dict[str, Any]] = [] for access_key, record in self._users.items(): @@ -291,7 +289,6 @@ class IamService: self._save() self._load() - # ---------------------- config helpers ---------------------- def _load(self) -> None: try: self._last_load_time = self.config_path.stat().st_mtime @@ -337,7 +334,6 @@ class IamService: except (OSError, PermissionError) as e: raise IamError(f"Cannot save IAM config: {e}") - # ---------------------- insight helpers ---------------------- def config_summary(self) -> Dict[str, Any]: return { "path": str(self.config_path), diff --git a/app/kms_api.py b/app/kms_api.py index 551d262..2ff9e35 100644 --- a/app/kms_api.py +++ b/app/kms_api.py @@ -33,9 +33,6 @@ def _encryption(): def _error_response(code: str, message: str, status: int) -> tuple[Dict[str, Any], int]: return {"__type": code, "message": message}, status - -# ---------------------- Key Management ---------------------- - @kms_api_bp.route("/keys", methods=["GET", "POST"]) @limiter.limit("30 per minute") def list_or_create_keys(): @@ -65,7 +62,6 @@ def list_or_create_keys(): except EncryptionError as exc: return _error_response("KMSInternalException", str(exc), 400) - # GET - List keys keys = kms.list_keys() return jsonify({ "Keys": [{"KeyId": k.key_id, "KeyArn": k.arn} for k in keys], @@ -96,7 +92,6 @@ def get_or_delete_key(key_id: str): except EncryptionError as exc: return _error_response("NotFoundException", str(exc), 404) - # GET key = kms.get_key(key_id) if not key: return _error_response("NotFoundException", f"Key not found: {key_id}", 404) @@ -149,9 +144,6 @@ def disable_key(key_id: str): except EncryptionError as exc: return _error_response("NotFoundException", str(exc), 404) - -# ---------------------- Encryption Operations ---------------------- - @kms_api_bp.route("/encrypt", methods=["POST"]) @limiter.limit("60 per minute") def encrypt_data(): @@ -251,7 +243,6 @@ def generate_data_key(): try: plaintext_key, encrypted_key = kms.generate_data_key(key_id, context) - # Trim key if AES_128 requested if key_spec == "AES_128": plaintext_key = plaintext_key[:16] @@ -322,10 +313,7 @@ def re_encrypt(): return _error_response("ValidationException", "CiphertextBlob must be base64 encoded", 400) try: - # First decrypt, get source key id plaintext, source_key_id = kms.decrypt(ciphertext, source_context) - - # Re-encrypt with destination key new_ciphertext = kms.encrypt(destination_key_id, plaintext, destination_context) return jsonify({ @@ -365,9 +353,6 @@ def generate_random(): except EncryptionError as exc: return _error_response("ValidationException", str(exc), 400) - -# ---------------------- Client-Side Encryption Helpers ---------------------- - @kms_api_bp.route("/client/generate-key", methods=["POST"]) @limiter.limit("30 per minute") def generate_client_key(): @@ -427,9 +412,6 @@ def client_decrypt(): except Exception as exc: return _error_response("DecryptionError", str(exc), 400) - -# ---------------------- Encryption Materials for S3 Client-Side Encryption ---------------------- - @kms_api_bp.route("/materials/", methods=["POST"]) @limiter.limit("60 per minute") def get_encryption_materials(key_id: str): diff --git a/app/replication.py b/app/replication.py index a48296a..5a4ad8a 100644 --- a/app/replication.py +++ b/app/replication.py @@ -22,6 +22,8 @@ from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" +REPLICATION_CONNECT_TIMEOUT = 5 +REPLICATION_READ_TIMEOUT = 30 REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" @@ -30,10 +32,10 @@ REPLICATION_MODE_ALL = "all" @dataclass class ReplicationStats: """Statistics for replication operations - computed dynamically.""" - objects_synced: int = 0 # Objects that exist in both source and destination - objects_pending: int = 0 # Objects in source but not in destination - objects_orphaned: int = 0 # Objects in destination but not in source (will be deleted) - bytes_synced: int = 0 # Total bytes synced to destination + objects_synced: int = 0 + objects_pending: int = 0 + objects_orphaned: int = 0 + bytes_synced: int = 0 last_sync_at: Optional[float] = None last_sync_key: Optional[str] = None @@ -83,7 +85,6 @@ class ReplicationRule: @classmethod def from_dict(cls, data: dict) -> "ReplicationRule": stats_data = data.pop("stats", {}) - # Handle old rules without mode/created_at if "mode" not in data: data["mode"] = REPLICATION_MODE_NEW_ONLY if "created_at" not in data: @@ -121,6 +122,33 @@ class ReplicationManager: with open(self.rules_path, "w") as f: json.dump(data, f, indent=2) + def check_endpoint_health(self, connection: RemoteConnection) -> bool: + """Check if a remote endpoint is reachable and responsive. + + Returns True if endpoint is healthy, False otherwise. + Uses short timeouts to prevent blocking. + """ + try: + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 1} + ) + s3 = boto3.client( + "s3", + endpoint_url=connection.endpoint_url, + aws_access_key_id=connection.access_key, + aws_secret_access_key=connection.secret_key, + region_name=connection.region, + config=config, + ) + s3.list_buckets() + return True + except Exception as e: + logger.warning(f"Endpoint health check failed for {connection.name} ({connection.endpoint_url}): {e}") + return False + def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]: return self._rules.get(bucket_name) @@ -151,14 +179,12 @@ class ReplicationManager: connection = self.connections.get(rule.target_connection_id) if not connection: - return rule.stats # Return cached stats if connection unavailable + return rule.stats try: - # Get source objects source_objects = self.storage.list_objects_all(bucket_name) source_keys = {obj.key: obj.size for obj in source_objects} - # Get destination objects s3 = boto3.client( "s3", endpoint_url=connection.endpoint_url, @@ -178,24 +204,18 @@ class ReplicationManager: bytes_synced += obj.get('Size', 0) except ClientError as e: if e.response['Error']['Code'] == 'NoSuchBucket': - # Destination bucket doesn't exist yet dest_keys = set() else: raise - # Compute stats - synced = source_keys.keys() & dest_keys # Objects in both - orphaned = dest_keys - source_keys.keys() # In dest but not source + synced = source_keys.keys() & dest_keys + orphaned = dest_keys - source_keys.keys() - # For "new_only" mode, we can't determine pending since we don't know - # which objects existed before replication was enabled. Only "all" mode - # should show pending (objects that should be replicated but aren't yet). if rule.mode == REPLICATION_MODE_ALL: - pending = source_keys.keys() - dest_keys # In source but not dest + pending = source_keys.keys() - dest_keys else: - pending = set() # New-only mode: don't show pre-existing as pending + pending = set() - # Update cached stats with computed values rule.stats.objects_synced = len(synced) rule.stats.objects_pending = len(pending) rule.stats.objects_orphaned = len(orphaned) @@ -205,7 +225,7 @@ class ReplicationManager: except (ClientError, StorageError) as e: logger.error(f"Failed to compute sync status for {bucket_name}: {e}") - return rule.stats # Return cached stats on error + return rule.stats def replicate_existing_objects(self, bucket_name: str) -> None: """Trigger replication for all existing objects in a bucket.""" @@ -218,6 +238,10 @@ class ReplicationManager: logger.warning(f"Cannot replicate existing objects: Connection {rule.target_connection_id} not found") return + if not self.check_endpoint_health(connection): + logger.warning(f"Cannot replicate existing objects: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable") + return + try: objects = self.storage.list_objects_all(bucket_name) logger.info(f"Starting replication of {len(objects)} existing objects from {bucket_name}") @@ -255,6 +279,10 @@ class ReplicationManager: logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found") return + if not self.check_endpoint_health(connection): + logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable") + return + self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: @@ -271,13 +299,26 @@ class ReplicationManager: file_size = 0 try: - config = Config(user_agent_extra=REPLICATION_USER_AGENT) + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 2}, + signature_version='s3v4', + s3={ + 'addressing_style': 'path', + }, + # Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors + # with S3-compatible servers that don't support CRC32 checksum headers + request_checksum_calculation='when_required', + response_checksum_validation='when_required', + ) s3 = boto3.client( "s3", endpoint_url=conn.endpoint_url, aws_access_key_id=conn.access_key, aws_secret_access_key=conn.secret_key, - region_name=conn.region, + region_name=conn.region or 'us-east-1', config=config, ) @@ -296,54 +337,59 @@ class ReplicationManager: logger.error(f"Source object not found: {bucket_name}/{object_key}") return - metadata = self.storage.get_object_metadata(bucket_name, object_key) - - extra_args = {} - if metadata: - extra_args["Metadata"] = metadata + # Don't replicate metadata - destination server will generate its own + # __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values. - # Guess content type to prevent corruption/wrong handling content_type, _ = mimetypes.guess_type(path) file_size = path.stat().st_size logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") + def do_put_object() -> None: + """Helper to upload object. + + Reads the file content into memory first to avoid signature calculation + issues with certain binary file types (like GIFs) when streaming. + Do NOT set ContentLength explicitly - boto3 calculates it from the bytes + and setting it manually can cause SignatureDoesNotMatch errors. + """ + file_content = path.read_bytes() + put_kwargs = { + "Bucket": rule.target_bucket, + "Key": object_key, + "Body": file_content, + } + if content_type: + put_kwargs["ContentType"] = content_type + s3.put_object(**put_kwargs) + try: - with path.open("rb") as f: - s3.put_object( - Bucket=rule.target_bucket, - Key=object_key, - Body=f, - ContentLength=file_size, - ContentType=content_type or "application/octet-stream", - Metadata=metadata or {} - ) + do_put_object() except (ClientError, S3UploadFailedError) as e: - is_no_bucket = False + error_code = None if isinstance(e, ClientError): - if e.response['Error']['Code'] == 'NoSuchBucket': - is_no_bucket = True + error_code = e.response['Error']['Code'] elif isinstance(e, S3UploadFailedError): if "NoSuchBucket" in str(e): - is_no_bucket = True + error_code = 'NoSuchBucket' - if is_no_bucket: + if error_code == 'NoSuchBucket': logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.") + bucket_ready = False try: s3.create_bucket(Bucket=rule.target_bucket) - # Retry upload - with path.open("rb") as f: - s3.put_object( - Bucket=rule.target_bucket, - Key=object_key, - Body=f, - ContentLength=file_size, - ContentType=content_type or "application/octet-stream", - Metadata=metadata or {} - ) - except Exception as create_err: - logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}") - raise e # Raise original error + bucket_ready = True + logger.info(f"Created target bucket {rule.target_bucket}") + except ClientError as bucket_err: + if bucket_err.response['Error']['Code'] in ('BucketAlreadyExists', 'BucketAlreadyOwnedByYou'): + logger.debug(f"Bucket {rule.target_bucket} already exists (created by another thread)") + bucket_ready = True + else: + logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}") + raise e + + if bucket_ready: + do_put_object() else: raise e @@ -354,3 +400,4 @@ class ReplicationManager: logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}") except Exception: logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}") + diff --git a/app/s3_api.py b/app/s3_api.py index 9406206..d0d32d9 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -8,7 +8,7 @@ import re import uuid from datetime import datetime, timedelta, timezone from typing import Any, Dict -from urllib.parse import quote, urlencode, urlparse +from urllib.parse import quote, urlencode, urlparse, unquote from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError from flask import Blueprint, Response, current_app, jsonify, request, g @@ -22,8 +22,6 @@ from .storage import ObjectStorage, StorageError, QuotaExceededError s3_api_bp = Blueprint("s3_api", __name__) - -# ---------------------- helpers ---------------------- def _storage() -> ObjectStorage: return current_app.extensions["object_storage"] @@ -68,9 +66,26 @@ def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name return k_signing +def _get_canonical_uri(req: Any) -> str: + """Get the canonical URI for SigV4 signature verification. + + AWS SigV4 requires the canonical URI to be URL-encoded exactly as the client + sent it. Flask/Werkzeug automatically URL-decodes request.path, so we need + to get the raw path from the environ. + + The canonical URI should have each path segment URL-encoded (with '/' preserved), + and the encoding should match what the client used when signing. + """ + raw_uri = req.environ.get('RAW_URI') or req.environ.get('REQUEST_URI') + + if raw_uri: + path = raw_uri.split('?')[0] + return path + + return quote(req.path, safe="/-_.~") + + def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: - # Parse Authorization header - # AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=... match = re.match( r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)", auth_header, @@ -79,17 +94,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: return None access_key, date_stamp, region, service, signed_headers_str, signature = match.groups() - - # Get secret key secret_key = _iam().get_secret_key(access_key) if not secret_key: raise IamError("Invalid access key") - # Canonical Request method = req.method - canonical_uri = quote(req.path, safe="/-_.~") + canonical_uri = _get_canonical_uri(req) - # Canonical Query String query_args = [] for key, value in req.args.items(multi=True): query_args.append((key, value)) @@ -100,7 +111,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") canonical_query_string = "&".join(canonical_query_parts) - # Canonical Headers signed_headers_list = signed_headers_str.split(";") canonical_headers_parts = [] for header in signed_headers_list: @@ -112,18 +122,22 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_headers_parts.append(f"{header.lower()}:{header_val}\n") canonical_headers = "".join(canonical_headers_parts) - # Payload Hash payload_hash = req.headers.get("X-Amz-Content-Sha256") if not payload_hash: payload_hash = hashlib.sha256(req.get_data()).hexdigest() canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}" - # String to Sign - amz_date = req.headers.get("X-Amz-Date") - if not amz_date: - amz_date = req.headers.get("Date") - + # Debug logging for signature issues + import logging + logger = logging.getLogger(__name__) + logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}") + logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}") + logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}") + logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}") + logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}") + + amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date") if not amz_date: raise IamError("Missing Date header") @@ -134,13 +148,12 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: now = datetime.now(timezone.utc) time_diff = abs((now - request_time).total_seconds()) - if time_diff > 900: # 15 minutes + if time_diff > 900: raise IamError("Request timestamp too old or too far in the future") required_headers = {'host', 'x-amz-date'} signed_headers_set = set(signed_headers_str.split(';')) if not required_headers.issubset(signed_headers_set): - # Some clients might sign 'date' instead of 'x-amz-date' if 'date' in signed_headers_set: required_headers.remove('x-amz-date') required_headers.add('date') @@ -154,6 +167,24 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() if not hmac.compare_digest(calculated_signature, signature): + # Debug logging for signature mismatch + import logging + logger = logging.getLogger(__name__) + logger.error(f"Signature mismatch for {req.path}") + logger.error(f" Content-Type: {req.headers.get('Content-Type')}") + logger.error(f" Content-Length: {req.headers.get('Content-Length')}") + logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}") + logger.error(f" Canonical URI: {canonical_uri}") + logger.error(f" Signed headers: {signed_headers_str}") + # Log each signed header's value + for h in signed_headers_list: + logger.error(f" Header '{h}': {repr(req.headers.get(h))}") + logger.error(f" Expected sig: {signature[:16]}...") + logger.error(f" Calculated sig: {calculated_signature[:16]}...") + # Log first part of canonical request to compare + logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...") + # Log the full canonical request for debugging + logger.error(f" Canonical request:\n{canonical_request[:500]}...") raise IamError("SignatureDoesNotMatch") return _iam().get_principal(access_key) @@ -187,11 +218,9 @@ def _verify_sigv4_query(req: Any) -> Principal | None: if not secret_key: raise IamError("Invalid access key") - # Canonical Request method = req.method - canonical_uri = quote(req.path, safe="/-_.~") + canonical_uri = _get_canonical_uri(req) - # Canonical Query String query_args = [] for key, value in req.args.items(multi=True): if key != "X-Amz-Signature": @@ -203,7 +232,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") canonical_query_string = "&".join(canonical_query_parts) - # Canonical Headers signed_headers_list = signed_headers_str.split(";") canonical_headers_parts = [] for header in signed_headers_list: @@ -212,7 +240,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers = "".join(canonical_headers_parts) - # Payload Hash payload_hash = "UNSIGNED-PAYLOAD" canonical_request = "\n".join([ @@ -224,7 +251,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: payload_hash ]) - # String to Sign algorithm = "AWS4-HMAC-SHA256" credential_scope = f"{date_stamp}/{region}/{service}/aws4_request" hashed_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() @@ -235,7 +261,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: hashed_request ]) - # Signature signing_key = _get_signature_key(secret_key, date_stamp, region, service) calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() @@ -493,7 +518,6 @@ def _generate_presigned_url( } canonical_query = _encode_query_params(query_params) - # Determine host and scheme from config or request api_base = current_app.config.get("API_BASE_URL") if api_base: parsed = urlparse(api_base) @@ -853,7 +877,6 @@ def _bucket_versioning_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket versioning updated", extra={"bucket": bucket_name, "status": status}) return Response(status=200) - # GET try: enabled = storage.is_versioning_enabled(bucket_name) except StorageError as exc: @@ -889,7 +912,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: return _error_response("NoSuchBucket", str(exc), 404) current_app.logger.info("Bucket tags deleted", extra={"bucket": bucket_name}) return Response(status=204) - # PUT + payload = request.get_data(cache=False) or b"" try: tags = _parse_tagging_document(payload) @@ -914,7 +937,6 @@ def _object_tagging_handler(bucket_name: str, object_key: str) -> Response: if error: return error - # For tagging, we use read permission for GET, write for PUT/DELETE action = "read" if request.method == "GET" else "write" try: _authorize_action(principal, bucket_name, action, object_key=object_key) @@ -1093,10 +1115,8 @@ def _bucket_location_handler(bucket_name: str) -> Response: if not storage.bucket_exists(bucket_name): return _error_response("NoSuchBucket", "Bucket does not exist", 404) - # Return the configured AWS_REGION region = current_app.config.get("AWS_REGION", "us-east-1") root = Element("LocationConstraint") - # AWS returns empty for us-east-1, but we'll be explicit root.text = region if region != "us-east-1" else None return _xml_response(root) @@ -1116,13 +1136,11 @@ def _bucket_acl_handler(bucket_name: str) -> Response: return _error_response("NoSuchBucket", "Bucket does not exist", 404) if request.method == "PUT": - # We don't fully implement ACLs, but we accept the request for compatibility - # Check for canned ACL header + # Accept canned ACL headers for S3 compatibility (not fully implemented) canned_acl = request.headers.get("x-amz-acl", "private") current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl}) return Response(status=200) - # GET - Return a basic ACL document showing full control for owner root = Element("AccessControlPolicy") owner = SubElement(root, "Owner") SubElement(owner, "ID").text = principal.access_key if principal else "anonymous" @@ -1170,7 +1188,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: if key_marker: objects = [obj for obj in objects if obj.key > key_marker] - # Build XML response root = Element("ListVersionsResult", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") SubElement(root, "Name").text = bucket_name SubElement(root, "Prefix").text = prefix @@ -1188,10 +1205,9 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: is_truncated = True break - # Current version version = SubElement(root, "Version") SubElement(version, "Key").text = obj.key - SubElement(version, "VersionId").text = "null" # Current version ID + SubElement(version, "VersionId").text = "null" SubElement(version, "IsLatest").text = "true" SubElement(version, "LastModified").text = obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z") SubElement(version, "ETag").text = f'"{obj.etag}"' @@ -1205,7 +1221,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: version_count += 1 next_key_marker = obj.key - # Get historical versions try: versions = storage.list_object_versions(bucket_name, obj.key) for v in versions: @@ -1289,14 +1304,12 @@ def _render_lifecycle_config(config: list) -> Element: rule_el = SubElement(root, "Rule") SubElement(rule_el, "ID").text = rule.get("ID", "") - # Filter filter_el = SubElement(rule_el, "Filter") if rule.get("Prefix"): SubElement(filter_el, "Prefix").text = rule.get("Prefix", "") SubElement(rule_el, "Status").text = rule.get("Status", "Enabled") - # Expiration if "Expiration" in rule: exp = rule["Expiration"] exp_el = SubElement(rule_el, "Expiration") @@ -1307,14 +1320,12 @@ def _render_lifecycle_config(config: list) -> Element: if exp.get("ExpiredObjectDeleteMarker"): SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true" - # NoncurrentVersionExpiration if "NoncurrentVersionExpiration" in rule: nve = rule["NoncurrentVersionExpiration"] nve_el = SubElement(rule_el, "NoncurrentVersionExpiration") if "NoncurrentDays" in nve: SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"]) - # AbortIncompleteMultipartUpload if "AbortIncompleteMultipartUpload" in rule: aimu = rule["AbortIncompleteMultipartUpload"] aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload") @@ -1338,29 +1349,24 @@ def _parse_lifecycle_config(payload: bytes) -> list: for rule_el in root.findall("{*}Rule") or root.findall("Rule"): rule: dict = {} - # ID id_el = rule_el.find("{*}ID") or rule_el.find("ID") if id_el is not None and id_el.text: rule["ID"] = id_el.text.strip() - # Filter/Prefix filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter") if filter_el is not None: prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix") if prefix_el is not None and prefix_el.text: rule["Prefix"] = prefix_el.text - # Legacy Prefix (outside Filter) if "Prefix" not in rule: prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix") if prefix_el is not None: rule["Prefix"] = prefix_el.text or "" - # Status status_el = rule_el.find("{*}Status") or rule_el.find("Status") rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled" - # Expiration exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration") if exp_el is not None: expiration: dict = {} @@ -1376,7 +1382,6 @@ def _parse_lifecycle_config(payload: bytes) -> list: if expiration: rule["Expiration"] = expiration - # NoncurrentVersionExpiration nve_el = rule_el.find("{*}NoncurrentVersionExpiration") or rule_el.find("NoncurrentVersionExpiration") if nve_el is not None: nve: dict = {} @@ -1386,7 +1391,6 @@ def _parse_lifecycle_config(payload: bytes) -> list: if nve: rule["NoncurrentVersionExpiration"] = nve - # AbortIncompleteMultipartUpload aimu_el = rule_el.find("{*}AbortIncompleteMultipartUpload") or rule_el.find("AbortIncompleteMultipartUpload") if aimu_el is not None: aimu: dict = {} @@ -1424,7 +1428,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if not quota: return _error_response("NoSuchQuotaConfiguration", "No quota configuration found", 404) - # Return as JSON for simplicity (not a standard S3 API) stats = storage.bucket_stats(bucket_name) return jsonify({ "quota": quota, @@ -1453,7 +1456,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if max_size_bytes is None and max_objects is None: return _error_response("InvalidArgument", "At least one of max_size_bytes or max_objects is required", 400) - # Validate types if max_size_bytes is not None: try: max_size_bytes = int(max_size_bytes) @@ -1564,7 +1566,6 @@ def _bulk_delete_handler(bucket_name: str) -> Response: return _xml_response(result, status=200) -# ---------------------- routes ---------------------- @s3_api_bp.get("/") @limiter.limit("60 per minute") def list_buckets() -> Response: @@ -1642,7 +1643,6 @@ def bucket_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket deleted", extra={"bucket": bucket_name}) return Response(status=204) - # GET - list objects (supports both ListObjects and ListObjectsV2) principal, error = _require_principal() try: _authorize_action(principal, bucket_name, "list") @@ -1650,18 +1650,12 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error return _error_response("AccessDenied", str(exc), 403) - try: - objects = storage.list_objects_all(bucket_name) - except StorageError as exc: - return _error_response("NoSuchBucket", str(exc), 404) - # Check if this is ListObjectsV2 (list-type=2) list_type = request.args.get("list-type") prefix = request.args.get("prefix", "") delimiter = request.args.get("delimiter", "") max_keys = min(int(request.args.get("max-keys", current_app.config["UI_PAGE_SIZE"])), 1000) - # Pagination markers marker = request.args.get("marker", "") # ListObjects v1 continuation_token = request.args.get("continuation-token", "") # ListObjectsV2 start_after = request.args.get("start-after", "") # ListObjectsV2 @@ -1681,11 +1675,17 @@ def bucket_handler(bucket_name: str) -> Response: else: effective_start = marker - if prefix: - objects = [obj for obj in objects if obj.key.startswith(prefix)] - - if effective_start: - objects = [obj for obj in objects if obj.key > effective_start] + fetch_keys = max_keys * 10 if delimiter else max_keys + try: + list_result = storage.list_objects( + bucket_name, + max_keys=fetch_keys, + continuation_token=effective_start or None, + prefix=prefix or None, + ) + objects = list_result.objects + except StorageError as exc: + return _error_response("NoSuchBucket", str(exc), 404) common_prefixes: list[str] = [] filtered_objects: list = [] @@ -1694,7 +1694,6 @@ def bucket_handler(bucket_name: str) -> Response: for obj in objects: key_after_prefix = obj.key[len(prefix):] if prefix else obj.key if delimiter in key_after_prefix: - # This is a "folder" - extract the common prefix common_prefix = prefix + key_after_prefix.split(delimiter)[0] + delimiter if common_prefix not in seen_prefixes: seen_prefixes.add(common_prefix) @@ -1705,7 +1704,7 @@ def bucket_handler(bucket_name: str) -> Response: common_prefixes = sorted(common_prefixes) total_items = len(objects) + len(common_prefixes) - is_truncated = total_items > max_keys + is_truncated = total_items > max_keys or list_result.is_truncated if len(objects) >= max_keys: objects = objects[:max_keys] @@ -1792,7 +1791,6 @@ def object_handler(bucket_name: str, object_key: str): if "tagging" in request.args: return _object_tagging_handler(bucket_name, object_key) - # Multipart Uploads if request.method == "POST": if "uploads" in request.args: return _initiate_multipart_upload(bucket_name, object_key) @@ -1845,9 +1843,7 @@ def object_handler(bucket_name: str, object_key: str): response = Response(status=200) response.headers["ETag"] = f'"{meta.etag}"' - # Trigger replication if not a replication request - user_agent = request.headers.get("User-Agent", "") - if "S3ReplicationAgent" not in user_agent: + if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): _replication_manager().trigger_replication(bucket_name, object_key, action="write") return response @@ -1866,31 +1862,25 @@ def object_handler(bucket_name: str, object_key: str): metadata = storage.get_object_metadata(bucket_name, object_key) mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" - # Check if object is encrypted and needs decryption is_encrypted = "x-amz-server-side-encryption" in metadata if request.method == "GET": if is_encrypted and hasattr(storage, 'get_object_data'): - # Use encrypted storage to decrypt try: data, clean_metadata = storage.get_object_data(bucket_name, object_key) response = Response(data, mimetype=mimetype) logged_bytes = len(data) - # Use decrypted size for Content-Length response.headers["Content-Length"] = len(data) etag = hashlib.md5(data).hexdigest() except StorageError as exc: return _error_response("InternalError", str(exc), 500) else: - # Stream unencrypted file directly stat = path.stat() response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True) logged_bytes = stat.st_size etag = storage._compute_etag(path) else: - # HEAD request if is_encrypted and hasattr(storage, 'get_object_data'): - # For encrypted objects, we need to report decrypted size try: data, _ = storage.get_object_data(bucket_name, object_key) response = Response(status=200) @@ -1919,7 +1909,6 @@ def object_handler(bucket_name: str, object_key: str): storage.delete_object(bucket_name, object_key) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) - # Trigger replication if not a replication request user_agent = request.headers.get("User-Agent", "") if "S3ReplicationAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="delete") @@ -2200,7 +2189,6 @@ class AwsChunkedDecoder: self.chunk_remaining -= len(chunk) if self.chunk_remaining == 0: - # Read CRLF after chunk data crlf = self.stream.read(2) if crlf != b"\r\n": raise IOError("Malformed chunk: missing CRLF") @@ -2219,7 +2207,6 @@ class AwsChunkedDecoder: try: line_str = line.decode("ascii").strip() - # Handle chunk-signature extension if present (e.g. "1000;chunk-signature=...") if ";" in line_str: line_str = line_str.split(";")[0] chunk_size = int(line_str, 16) @@ -2375,7 +2362,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: try: _storage().abort_multipart_upload(bucket_name, upload_id) except StorageError as exc: - # Abort is idempotent, but if bucket missing... if "Bucket does not exist" in str(exc): return _error_response("NoSuchBucket", str(exc), 404) @@ -2385,7 +2371,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: @s3_api_bp.before_request def resolve_principal(): g.principal = None - # Try SigV4 try: if ("Authorization" in request.headers and request.headers["Authorization"].startswith("AWS4-HMAC-SHA256")) or \ (request.args.get("X-Amz-Algorithm") == "AWS4-HMAC-SHA256"): @@ -2394,7 +2379,6 @@ def resolve_principal(): except Exception: pass - # Try simple auth headers (internal/testing) access_key = request.headers.get("X-Access-Key") secret_key = request.headers.get("X-Secret-Key") if access_key and secret_key: diff --git a/app/storage.py b/app/storage.py index 4875519..c710847 100644 --- a/app/storage.py +++ b/app/storage.py @@ -128,11 +128,13 @@ class ObjectStorage: BUCKET_VERSIONS_DIR = "versions" MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" + KEY_INDEX_CACHE_TTL = 30 def __init__(self, root: Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() + self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} def list_buckets(self) -> List[BucketMeta]: buckets: List[BucketMeta] = [] @@ -142,7 +144,7 @@ class ObjectStorage: buckets.append( BucketMeta( name=bucket.name, - created_at=datetime.fromtimestamp(stat.st_ctime), + created_at=datetime.fromtimestamp(stat.st_ctime, timezone.utc), ) ) return buckets @@ -189,8 +191,7 @@ class ObjectStorage: total_bytes = 0 version_count = 0 version_bytes = 0 - - # Count current objects in the bucket folder + for path in bucket_path.rglob("*"): if path.is_file(): rel = path.relative_to(bucket_path) @@ -201,8 +202,7 @@ class ObjectStorage: stat = path.stat() object_count += 1 total_bytes += stat.st_size - - # Count archived versions in the system folder + versions_root = self._bucket_versions_root(bucket_name) if versions_root.exists(): for path in versions_root.rglob("*.bin"): @@ -216,8 +216,8 @@ class ObjectStorage: "bytes": total_bytes, "version_count": version_count, "version_bytes": version_bytes, - "total_objects": object_count + version_count, # All objects including versions - "total_bytes": total_bytes + version_bytes, # All storage including versions + "total_objects": object_count + version_count, + "total_bytes": total_bytes + version_bytes, } try: @@ -274,32 +274,20 @@ class ObjectStorage: raise StorageError("Bucket does not exist") bucket_id = bucket_path.name - # Collect all matching object keys first (lightweight - just paths) - all_keys: List[str] = [] - for path in bucket_path.rglob("*"): - if path.is_file(): - rel = path.relative_to(bucket_path) - if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: - continue - key = str(rel.as_posix()) - if prefix and not key.startswith(prefix): - continue - all_keys.append(key) + object_cache = self._get_object_cache(bucket_id, bucket_path) + + all_keys = sorted(object_cache.keys()) + + if prefix: + all_keys = [k for k in all_keys if k.startswith(prefix)] - all_keys.sort() total_count = len(all_keys) - - # Handle continuation token (the key to start after) start_index = 0 if continuation_token: try: - # continuation_token is the last key from previous page - for i, key in enumerate(all_keys): - if key > continuation_token: - start_index = i - break - else: - # Token is past all keys + import bisect + start_index = bisect.bisect_right(all_keys, continuation_token) + if start_index >= total_count: return ListObjectsResult( objects=[], is_truncated=False, @@ -307,34 +295,17 @@ class ObjectStorage: total_count=total_count, ) except Exception: - pass # Invalid token, start from beginning + pass - # Get the slice we need end_index = start_index + max_keys keys_slice = all_keys[start_index:end_index] is_truncated = end_index < total_count - - # Now load full metadata only for the objects we're returning + objects: List[ObjectMeta] = [] for key in keys_slice: - safe_key = self._sanitize_object_key(key) - path = bucket_path / safe_key - if not path.exists(): - continue # Object may have been deleted - try: - stat = path.stat() - metadata = self._read_metadata(bucket_id, safe_key) - objects.append( - ObjectMeta( - key=key, - size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), - etag=self._compute_etag(path), - metadata=metadata or None, - ) - ) - except OSError: - continue # File may have been deleted during iteration + obj = object_cache.get(key) + if obj: + objects.append(obj) next_token = keys_slice[-1] if is_truncated and keys_slice else None @@ -368,14 +339,12 @@ class ObjectStorage: destination = bucket_path / safe_key destination.parent.mkdir(parents=True, exist_ok=True) - # Check if this is an overwrite (won't add to object count) is_overwrite = destination.exists() existing_size = destination.stat().st_size if is_overwrite else 0 if self._is_versioning_enabled(bucket_path) and is_overwrite: self._archive_current_version(bucket_id, safe_key, reason="overwrite") - # Write to temp file first to get actual size tmp_dir = self._system_root_path() / self.SYSTEM_TMP_DIR tmp_dir.mkdir(parents=True, exist_ok=True) tmp_path = tmp_dir / f"{uuid.uuid4().hex}.tmp" @@ -387,9 +356,7 @@ class ObjectStorage: new_size = tmp_path.stat().st_size - # Check quota before finalizing if enforce_quota: - # Calculate net change (new size minus size being replaced) size_delta = new_size - existing_size object_delta = 0 if is_overwrite else 1 @@ -405,29 +372,29 @@ class ObjectStorage: quota_check["usage"], ) - # Move to final destination shutil.move(str(tmp_path), str(destination)) finally: - # Clean up temp file if it still exists try: tmp_path.unlink(missing_ok=True) except OSError: pass stat = destination.stat() - if metadata: - self._write_metadata(bucket_id, safe_key, metadata) - else: - self._delete_metadata(bucket_id, safe_key) + etag = checksum.hexdigest() + + internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} + combined_meta = {**internal_meta, **(metadata or {})} + self._write_metadata(bucket_id, safe_key, combined_meta) self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), - etag=checksum.hexdigest(), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), + etag=etag, metadata=metadata, ) @@ -453,16 +420,14 @@ class ObjectStorage: for parent in path.parents: if parent == stop_at: break - # Retry a few times with small delays for Windows/OneDrive for attempt in range(3): try: if parent.exists() and not any(parent.iterdir()): parent.rmdir() - break # Success, move to next parent + break except OSError: if attempt < 2: - time.sleep(0.1) # Brief delay before retry - # Final attempt failed - continue to next parent + time.sleep(0.1) break def delete_object(self, bucket_name: str, object_key: str) -> None: @@ -479,6 +444,7 @@ class ObjectStorage: self._delete_metadata(bucket_id, rel) self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) self._cleanup_empty_parents(path, bucket_path) def purge_object(self, bucket_name: str, object_key: str) -> None: @@ -499,8 +465,8 @@ class ObjectStorage: if legacy_version_dir.exists(): shutil.rmtree(legacy_version_dir, ignore_errors=True) - # Invalidate bucket stats cache self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) self._cleanup_empty_parents(target, bucket_path) def is_versioning_enabled(self, bucket_name: str) -> bool: @@ -612,7 +578,6 @@ class ObjectStorage: bucket_path = self._require_bucket_path(bucket_name) if max_bytes is None and max_objects is None: - # Remove quota entirely self._set_bucket_config_entry(bucket_path.name, "quota", None) return @@ -654,9 +619,7 @@ class ObjectStorage: "message": None, } - # Get current stats (uses cache when available) stats = self.bucket_stats(bucket_name) - # Use totals which include versions for quota enforcement current_bytes = stats.get("total_bytes", stats.get("bytes", 0)) current_objects = stats.get("total_objects", stats.get("objects", 0)) @@ -817,7 +780,7 @@ class ObjectStorage: return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=self._compute_etag(destination), metadata=metadata or None, ) @@ -920,14 +883,12 @@ class ObjectStorage: raise StorageError("part_number must be >= 1") bucket_path = self._bucket_path(bucket_name) - # Get the upload root directory upload_root = self._multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): raise StorageError("Multipart upload not found") - # Write the part data first (can happen concurrently) checksum = hashlib.md5() part_filename = f"part-{part_number:05d}.part" part_path = upload_root / part_filename @@ -939,13 +900,11 @@ class ObjectStorage: "filename": part_filename, } - # Update manifest with file locking to prevent race conditions manifest_path = upload_root / self.MULTIPART_MANIFEST lock_path = upload_root / ".manifest.lock" with lock_path.open("w") as lock_file: with _file_lock(lock_file): - # Re-read manifest under lock to get latest state try: manifest = json.loads(manifest_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: @@ -999,11 +958,9 @@ class ObjectStorage: safe_key = self._sanitize_object_key(manifest["object_key"]) destination = bucket_path / safe_key - # Check if this is an overwrite is_overwrite = destination.exists() existing_size = destination.stat().st_size if is_overwrite else 0 - # Check quota before writing if enforce_quota: size_delta = total_size - existing_size object_delta = 0 if is_overwrite else 1 @@ -1065,7 +1022,7 @@ class ObjectStorage: return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), etag=checksum.hexdigest(), metadata=metadata, ) @@ -1163,6 +1120,172 @@ class ObjectStorage: def _legacy_multipart_dir(self, bucket_name: str, upload_id: str) -> Path: return self._legacy_multipart_bucket_root(bucket_name) / upload_id + def _fast_list_keys(self, bucket_path: Path) -> List[str]: + """Fast directory walk using os.scandir instead of pathlib.rglob. + + This is significantly faster for large directories (10K+ files). + Returns just the keys (for backward compatibility). + """ + return list(self._build_object_cache(bucket_path).keys()) + + def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]: + """Build a complete object metadata cache for a bucket. + + Uses os.scandir for fast directory walking and a persistent etag index. + """ + from concurrent.futures import ThreadPoolExecutor + + bucket_id = bucket_path.name + objects: Dict[str, ObjectMeta] = {} + bucket_str = str(bucket_path) + bucket_len = len(bucket_str) + 1 + + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + meta_cache: Dict[str, str] = {} + index_mtime: float = 0 + + if etag_index_path.exists(): + try: + index_mtime = etag_index_path.stat().st_mtime + with open(etag_index_path, 'r', encoding='utf-8') as f: + meta_cache = json.load(f) + except (OSError, json.JSONDecodeError): + meta_cache = {} + + meta_root = self._bucket_meta_root(bucket_id) + needs_rebuild = False + + if meta_root.exists() and index_mtime > 0: + def check_newer(dir_path: str) -> bool: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + if check_newer(entry.path): + return True + elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): + if entry.stat().st_mtime > index_mtime: + return True + except OSError: + pass + return False + needs_rebuild = check_newer(str(meta_root)) + elif not meta_cache: + needs_rebuild = True + + if needs_rebuild and meta_root.exists(): + meta_str = str(meta_root) + meta_len = len(meta_str) + 1 + meta_files: list[tuple[str, str]] = [] + + def collect_meta_files(dir_path: str) -> None: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + collect_meta_files(entry.path) + elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): + rel = entry.path[meta_len:] + key = rel[:-10].replace(os.sep, '/') + meta_files.append((key, entry.path)) + except OSError: + pass + + collect_meta_files(meta_str) + + def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]: + key, path = item + try: + with open(path, 'rb') as f: + content = f.read() + etag_marker = b'"__etag__"' + idx = content.find(etag_marker) + if idx != -1: + start = content.find(b'"', idx + len(etag_marker) + 1) + if start != -1: + end = content.find(b'"', start + 1) + if end != -1: + return key, content[start+1:end].decode('utf-8') + return key, None + except (OSError, UnicodeDecodeError): + return key, None + + if meta_files: + meta_cache = {} + with ThreadPoolExecutor(max_workers=min(64, len(meta_files))) as executor: + for key, etag in executor.map(read_meta_file, meta_files): + if etag: + meta_cache[key] = etag + + try: + etag_index_path.parent.mkdir(parents=True, exist_ok=True) + with open(etag_index_path, 'w', encoding='utf-8') as f: + json.dump(meta_cache, f) + except OSError: + pass + + def scan_dir(dir_path: str) -> None: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name + if rel_start in self.INTERNAL_FOLDERS: + continue + scan_dir(entry.path) + elif entry.is_file(follow_symlinks=False): + rel = entry.path[bucket_len:] + first_part = rel.split(os.sep)[0] if os.sep in rel else rel + if first_part in self.INTERNAL_FOLDERS: + continue + + key = rel.replace(os.sep, '/') + try: + stat = entry.stat() + + etag = meta_cache.get(key) + + if not etag: + etag = f'"{stat.st_size}-{int(stat.st_mtime)}"' + + objects[key] = ObjectMeta( + key=key, + size=stat.st_size, + last_modified=datetime.fromtimestamp(stat.st_mtime, timezone.utc), + etag=etag, + metadata=None, + ) + except OSError: + pass + except OSError: + pass + + scan_dir(bucket_str) + return objects + + def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]: + """Get cached object metadata for a bucket, refreshing if stale.""" + now = time.time() + cached = self._object_cache.get(bucket_id) + + if cached: + objects, timestamp = cached + if now - timestamp < self.KEY_INDEX_CACHE_TTL: + return objects + + objects = self._build_object_cache(bucket_path) + self._object_cache[bucket_id] = (objects, now) + return objects + + def _invalidate_object_cache(self, bucket_id: str) -> None: + """Invalidate the object cache and etag index for a bucket.""" + self._object_cache.pop(bucket_id, None) + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + etag_index_path.unlink(missing_ok=True) + except OSError: + pass + def _ensure_system_roots(self) -> None: for path in ( self._system_root_path(), diff --git a/app/ui.py b/app/ui.py index 2d2936f..4baf5c3 100644 --- a/app/ui.py +++ b/app/ui.py @@ -189,7 +189,7 @@ def inject_nav_state() -> dict[str, Any]: return { "principal": principal, "can_manage_iam": can_manage, - "can_view_metrics": can_manage, # Only admins can view metrics + "can_view_metrics": can_manage, "csrf_token": generate_csrf, } @@ -294,7 +294,6 @@ def bucket_detail(bucket_name: str): storage = _storage() try: _authorize_ui(principal, bucket_name, "list") - # Don't load objects here - UI fetches them asynchronously via /buckets//objects if not storage.bucket_exists(bucket_name): raise StorageError("Bucket does not exist") except (StorageError, IamError) as exc: @@ -343,7 +342,6 @@ def bucket_detail(bucket_name: str): except IamError: can_manage_versioning = False - # Check replication permission can_manage_replication = False if principal: try: @@ -352,7 +350,6 @@ def bucket_detail(bucket_name: str): except IamError: can_manage_replication = False - # Check if user is admin (can configure replication settings, not just toggle) is_replication_admin = False if principal: try: @@ -361,12 +358,9 @@ def bucket_detail(bucket_name: str): except IamError: is_replication_admin = False - # Replication info - don't compute sync status here (it's slow), let JS fetch it async replication_rule = _replication().get_rule(bucket_name) - # Load connections for admin, or for non-admin if there's an existing rule (to show target name) connections = _connections().list() if (is_replication_admin or replication_rule) else [] - # Encryption settings encryption_config = storage.get_bucket_encryption(bucket_name) kms_manager = _kms() kms_keys = kms_manager.list_keys() if kms_manager else [] @@ -374,7 +368,6 @@ def bucket_detail(bucket_name: str): encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False) can_manage_encryption = can_manage_versioning # Same as other bucket properties - # Quota settings (admin only) bucket_quota = storage.get_bucket_quota(bucket_name) bucket_stats = storage.bucket_stats(bucket_name) can_manage_quota = False @@ -384,7 +377,6 @@ def bucket_detail(bucket_name: str): except IamError: pass - # Pass the objects API endpoint URL for async loading objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) return render_template( @@ -423,7 +415,7 @@ def list_bucket_objects(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - max_keys = min(int(request.args.get("max_keys", 100)), 1000) + max_keys = min(int(request.args.get("max_keys", 1000)), 10000) continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None @@ -738,41 +730,30 @@ def bulk_download_objects(bucket_name: str): unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() - # Check permissions for all keys first (or at least bucket read) - # We'll check bucket read once, then object read for each if needed? - # _authorize_ui checks bucket level if object_key is None, but we need to check each object if fine-grained policies exist. - # For simplicity/performance, we check bucket list/read. + # Verify permission to read bucket contents try: _authorize_ui(principal, bucket_name, "read") except IamError as exc: return jsonify({"error": str(exc)}), 403 - # Create ZIP + # Create ZIP archive of selected objects buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for key in unique_keys: try: - # Verify individual object permission if needed? - # _authorize_ui(principal, bucket_name, "read", object_key=key) - # This might be slow for many objects. Assuming bucket read is enough for now or we accept the overhead. - # Let's skip individual check for bulk speed, assuming bucket read implies object read unless denied. - # But strictly we should check. Let's check. _authorize_ui(principal, bucket_name, "read", object_key=key) - # Check if object is encrypted metadata = storage.get_object_metadata(bucket_name, key) is_encrypted = "x-amz-server-side-encryption" in metadata if is_encrypted and hasattr(storage, 'get_object_data'): - # Decrypt and add to zip data, _ = storage.get_object_data(bucket_name, key) zf.writestr(key, data) else: - # Add unencrypted file directly path = storage.get_object_path(bucket_name, key) zf.write(path, arcname=key) except (StorageError, IamError): - # Skip files we can't read or don't exist + # Skip objects that can't be accessed continue buffer.seek(0) @@ -1077,7 +1058,6 @@ def update_bucket_encryption(bucket_name: str): action = request.form.get("action", "enable") if action == "disable": - # Disable encryption try: _storage().set_bucket_encryption(bucket_name, None) flash("Default encryption disabled", "info") @@ -1085,16 +1065,14 @@ def update_bucket_encryption(bucket_name: str): flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - # Enable or update encryption algorithm = request.form.get("algorithm", "AES256") kms_key_id = request.form.get("kms_key_id", "").strip() or None - # Validate algorithm if algorithm not in ("AES256", "aws:kms"): flash("Invalid encryption algorithm", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - # Build encryption config following AWS format + # Build encryption configuration in AWS S3 format encryption_config: dict[str, Any] = { "Rules": [ { @@ -1270,7 +1248,6 @@ def delete_iam_user(access_key: str): return redirect(url_for("ui.iam_dashboard")) if access_key == principal.access_key: - # Self-deletion try: _iam().delete_user(access_key) session.pop("credentials", None) @@ -1352,6 +1329,9 @@ def create_connection(): @ui_bp.post("/connections/test") def test_connection(): + from botocore.config import Config as BotoConfig + from botocore.exceptions import ConnectTimeoutError, EndpointConnectionError, ReadTimeoutError + principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") @@ -1368,18 +1348,32 @@ def test_connection(): return jsonify({"status": "error", "message": "Missing credentials"}), 400 try: + config = BotoConfig( + connect_timeout=5, + read_timeout=10, + retries={'max_attempts': 1} + ) s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region, + config=config, ) - # Try to list buckets to verify credentials and endpoint + s3.list_buckets() return jsonify({"status": "ok", "message": "Connection successful"}) + except (ConnectTimeoutError, ReadTimeoutError): + return jsonify({"status": "error", "message": f"Connection timed out - endpoint may be down or unreachable: {endpoint}"}), 400 + except EndpointConnectionError: + return jsonify({"status": "error", "message": f"Could not connect to endpoint: {endpoint}"}), 400 + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + error_msg = e.response.get('Error', {}).get('Message', str(e)) + return jsonify({"status": "error", "message": f"Connection failed ({error_code}): {error_msg}"}), 400 except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 400 + return jsonify({"status": "error", "message": f"Connection failed: {str(e)}"}), 400 @ui_bp.post("/connections//update") @@ -1440,7 +1434,6 @@ def update_bucket_replication(bucket_name: str): flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) - # Check if user is admin (required for create/delete operations) is_admin = False try: _iam().authorize(principal, None, "iam:list_users") @@ -1451,14 +1444,12 @@ def update_bucket_replication(bucket_name: str): action = request.form.get("action") if action == "delete": - # Admin only - remove configuration entirely if not is_admin: flash("Only administrators can remove replication configuration", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) _replication().delete_rule(bucket_name) flash("Replication configuration removed", "info") elif action == "pause": - # Users can pause - just set enabled=False rule = _replication().get_rule(bucket_name) if rule: rule.enabled = False @@ -1467,7 +1458,6 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to pause", "warning") elif action == "resume": - # Users can resume - just set enabled=True rule = _replication().get_rule(bucket_name) if rule: rule.enabled = True @@ -1476,7 +1466,6 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to resume", "warning") elif action == "create": - # Admin only - create new configuration if not is_admin: flash("Only administrators can configure replication settings", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) @@ -1501,7 +1490,6 @@ def update_bucket_replication(bucket_name: str): ) _replication().set_rule(rule) - # If mode is "all", trigger replication of existing objects if replication_mode == REPLICATION_MODE_ALL: _replication().replicate_existing_objects(bucket_name) flash("Replication configured. Existing objects are being replicated in the background.", "success") @@ -1526,10 +1514,31 @@ def get_replication_status(bucket_name: str): if not rule: return jsonify({"error": "No replication rule"}), 404 - # This is the slow operation - compute sync status by comparing buckets - stats = _replication().get_sync_status(bucket_name) + connection = _connections().get(rule.target_connection_id) + endpoint_healthy = False + endpoint_error = None + if connection: + endpoint_healthy = _replication().check_endpoint_health(connection) + if not endpoint_healthy: + endpoint_error = f"Cannot reach endpoint: {connection.endpoint_url}" + else: + endpoint_error = "Target connection not found" + + stats = None + if endpoint_healthy: + stats = _replication().get_sync_status(bucket_name) + if not stats: - return jsonify({"error": "Failed to compute status"}), 500 + return jsonify({ + "objects_synced": 0, + "objects_pending": 0, + "objects_orphaned": 0, + "bytes_synced": 0, + "last_sync_at": rule.stats.last_sync_at if rule.stats else None, + "last_sync_key": rule.stats.last_sync_key if rule.stats else None, + "endpoint_healthy": endpoint_healthy, + "endpoint_error": endpoint_error, + }) return jsonify({ "objects_synced": stats.objects_synced, @@ -1538,6 +1547,28 @@ def get_replication_status(bucket_name: str): "bytes_synced": stats.bytes_synced, "last_sync_at": stats.last_sync_at, "last_sync_key": stats.last_sync_key, + "endpoint_healthy": endpoint_healthy, + "endpoint_error": endpoint_error, + }) + + +@ui_bp.get("/connections//health") +def check_connection_health(connection_id: str): + """Check if a connection endpoint is reachable.""" + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + conn = _connections().get(connection_id) + if not conn: + return jsonify({"healthy": False, "error": "Connection not found"}), 404 + + healthy = _replication().check_endpoint_health(conn) + return jsonify({ + "healthy": healthy, + "error": None if healthy else f"Cannot reach endpoint: {conn.endpoint_url}" }) @@ -1558,7 +1589,6 @@ def connections_dashboard(): def metrics_dashboard(): principal = _current_principal() - # Metrics are restricted to admin users try: _iam().authorize(principal, None, "iam:list_users") except IamError: @@ -1582,16 +1612,13 @@ def metrics_dashboard(): total_bytes_used = 0 total_versions = 0 - # Note: Uses cached stats from storage layer to improve performance cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) for bucket in buckets: stats = storage.bucket_stats(bucket.name, cache_ttl=cache_ttl) - # Use totals which include archived versions total_objects += stats.get("total_objects", stats.get("objects", 0)) total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0)) total_versions += stats.get("version_count", 0) - # Calculate system uptime boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds / 86400) diff --git a/app/version.py b/app/version.py index bd1e371..ea75040 100644 --- a/app/version.py +++ b/app/version.py @@ -1,7 +1,7 @@ """Central location for the application version string.""" from __future__ import annotations -APP_VERSION = "0.1.7" +APP_VERSION = "0.1.8" def get_version() -> str: diff --git a/static/css/main.css b/static/css/main.css index a75838a..22c399c 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -66,8 +66,28 @@ html { color: var(--myfsio-muted) !important; } -.table-responsive { border-radius: 0.5rem; overflow: hidden; } +.table-responsive { + border-radius: 0.5rem; + overflow-x: auto; + -webkit-overflow-scrolling: touch; +} .message-stack { position: sticky; top: 1rem; z-index: 100; } + +/* Mobile-friendly table improvements */ +.table-responsive table { + min-width: 600px; +} + +.table-responsive table th, +.table-responsive table td { + white-space: nowrap; +} + +/* Allow text wrapping for description columns */ +.table-responsive table td.text-wrap { + white-space: normal; + min-width: 200px; +} code { font-size: 0.85rem; } code { @@ -389,8 +409,22 @@ code { .bucket-table th:last-child { white-space: nowrap; } .object-key { - word-break: break-word; - max-width: 32rem; + max-width: 0; + width: 100%; + overflow: hidden; + text-overflow: ellipsis; +} + +.object-key .fw-medium { + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} + +.object-key .text-muted { + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; } .preview-card { top: 1rem; } @@ -517,6 +551,22 @@ code { overflow-y: auto; } +.objects-table-container thead { + position: sticky; + top: 0; + z-index: 10; +} + +.objects-table-container thead th { + background-color: #f8f9fa; + border-bottom: 1px solid var(--myfsio-card-border); + box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05); +} + +[data-theme='dark'] .objects-table-container thead th { + background-color: #1e293b; +} + .btn-group form { display: inline; } .font-monospace { font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace; } @@ -1537,6 +1587,41 @@ pre code { position: relative !important; top: 0 !important; } + + /* Ensure tables are scrollable on mobile */ + .card-body .table-responsive { + margin: -1rem; + padding: 0; + width: calc(100% + 2rem); + } + + .card-body .table-responsive table { + margin-bottom: 0; + } + + /* IAM users table mobile adjustments */ + .table th, + .table td { + padding: 0.5rem 0.75rem; + } + + /* Better touch scrolling indicator */ + .table-responsive::after { + content: ''; + position: absolute; + top: 0; + right: 0; + bottom: 0; + width: 20px; + background: linear-gradient(to left, var(--myfsio-card-bg), transparent); + pointer-events: none; + opacity: 0; + transition: opacity 0.3s; + } + + .table-responsive:not(:hover)::after { + opacity: 0.8; + } } *:focus-visible { diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index 41d22ba..daa8f45 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -86,7 +86,9 @@ Upload - +
+ +
+
- +
- - + + @@ -158,18 +168,19 @@ +
- Load + Batch - per page + objects
@@ -322,12 +333,13 @@
-
'; + } + if (!bottomSpacer) { + bottomSpacer = document.createElement('tr'); + bottomSpacer.id = 'virtual-bottom-spacer'; + bottomSpacer.innerHTML = ''; + } + }; + + // Compute which items should be visible based on current view + const computeVisibleItems = () => { + const items = []; + const folders = new Set(); + + allObjects.forEach(obj => { + if (!obj.key.startsWith(currentPrefix)) return; + + const remainder = obj.key.slice(currentPrefix.length); + const slashIndex = remainder.indexOf('/'); + + if (slashIndex === -1) { + // File in current folder - filter on the displayed filename (remainder) + if (!currentFilterTerm || remainder.toLowerCase().includes(currentFilterTerm)) { + items.push({ type: 'file', data: obj, displayKey: remainder }); + } + } else { + // Folder + const folderName = remainder.slice(0, slashIndex); + const folderPath = currentPrefix + folderName + '/'; + if (!folders.has(folderPath)) { + folders.add(folderPath); + // Filter on the displayed folder name only + if (!currentFilterTerm || folderName.toLowerCase().includes(currentFilterTerm)) { + items.push({ type: 'folder', path: folderPath, displayKey: folderName }); + } + } + } + }); + + // Sort: folders first, then files + items.sort((a, b) => { + if (a.type === 'folder' && b.type === 'file') return -1; + if (a.type === 'file' && b.type === 'folder') return 1; + const aKey = a.type === 'folder' ? a.path : a.data.key; + const bKey = b.type === 'folder' ? b.path : b.data.key; + return aKey.localeCompare(bKey); + }); + + return items; + }; + + // Render only the visible rows based on scroll position + const renderVirtualRows = () => { + if (!objectsTableBody || !scrollContainer) return; + + const containerHeight = scrollContainer.clientHeight; + const scrollTop = scrollContainer.scrollTop; + + // Calculate visible range + const startIndex = Math.max(0, Math.floor(scrollTop / ROW_HEIGHT) - BUFFER_ROWS); + const endIndex = Math.min(visibleItems.length, Math.ceil((scrollTop + containerHeight) / ROW_HEIGHT) + BUFFER_ROWS); + + // Skip if range hasn't changed significantly + if (startIndex === renderedRange.start && endIndex === renderedRange.end) return; + + renderedRange = { start: startIndex, end: endIndex }; + + // Clear and rebuild + objectsTableBody.innerHTML = ''; + + // Add top spacer + initVirtualScrollElements(); + topSpacer.querySelector('td').style.height = `${startIndex * ROW_HEIGHT}px`; + objectsTableBody.appendChild(topSpacer); + + // Render visible rows + for (let i = startIndex; i < endIndex; i++) { + const item = visibleItems[i]; + if (!item) continue; + + let row; + if (item.type === 'folder') { + row = createFolderRow(item.path, item.displayKey); + } else { + row = createObjectRow(item.data, item.displayKey); + } + row.dataset.virtualIndex = i; + objectsTableBody.appendChild(row); + } + + // Add bottom spacer + const remainingRows = visibleItems.length - endIndex; + bottomSpacer.querySelector('td').style.height = `${remainingRows * ROW_HEIGHT}px`; + objectsTableBody.appendChild(bottomSpacer); + + // Re-attach handlers to new rows + attachRowHandlers(); + }; + + // Debounced scroll handler for virtual scrolling + let scrollTimeout = null; + const handleVirtualScroll = () => { + if (scrollTimeout) cancelAnimationFrame(scrollTimeout); + scrollTimeout = requestAnimationFrame(renderVirtualRows); + }; + + // Refresh the virtual list (after data changes or navigation) + const refreshVirtualList = () => { + visibleItems = computeVisibleItems(); + renderedRange = { start: -1, end: -1 }; // Force re-render + + if (visibleItems.length === 0) { + if (allObjects.length === 0 && !hasMoreObjects) { + showEmptyState(); + } else { + // Empty folder + objectsTableBody.innerHTML = ` + + + + `; + } + } else { + renderVirtualRows(); + } + + updateFolderViewStatus(); + }; + + // Update status bar + const updateFolderViewStatus = () => { + const folderViewStatusEl = document.getElementById('folder-view-status'); + if (!folderViewStatusEl) return; + + if (currentPrefix) { + const folderCount = visibleItems.filter(i => i.type === 'folder').length; + const fileCount = visibleItems.filter(i => i.type === 'file').length; + folderViewStatusEl.innerHTML = `${folderCount} folder${folderCount !== 1 ? 's' : ''}, ${fileCount} file${fileCount !== 1 ? 's' : ''} in this view`; + folderViewStatusEl.classList.remove('d-none'); + } else { + folderViewStatusEl.classList.add('d-none'); + } + }; + + // ============== DATA LOADING ============== + const loadObjects = async (append = false) => { if (isLoadingObjects) return; isLoadingObjects = true; if (!append) { - if (objectsLoadingRow) objectsLoadingRow.style.display = ''; nextContinuationToken = null; loadedObjectCount = 0; @@ -1973,35 +2219,18 @@ totalObjectCount = data.total_count || 0; nextContinuationToken = data.next_continuation_token; - if (!append) { - - if (objectsLoadingRow) objectsLoadingRow.remove(); - - if (data.objects.length === 0) { - showEmptyState(); - updateObjectCountBadge(); - isLoadingObjects = false; - return; - } - - objectsTableBody.innerHTML = ''; + if (!append && objectsLoadingRow) { + objectsLoadingRow.remove(); } + // Store lightweight object metadata (no DOM elements!) data.objects.forEach(obj => { - const row = createObjectRow(obj); - objectsTableBody.appendChild(row); loadedObjectCount++; - - // Apply current filter to newly loaded objects - if (currentFilterTerm) { - const keyLower = obj.key.toLowerCase(); - row.style.display = keyLower.includes(currentFilterTerm) ? '' : 'none'; - } - allObjects.push({ key: obj.key, size: obj.size, lastModified: obj.last_modified, + lastModifiedDisplay: obj.last_modified_display, etag: obj.etag, previewUrl: obj.preview_url, downloadUrl: obj.download_url, @@ -2009,34 +2238,28 @@ deleteEndpoint: obj.delete_endpoint, metadata: JSON.stringify(obj.metadata || {}), versionsEndpoint: obj.versions_endpoint, - restoreTemplate: obj.restore_template, - element: row + restoreTemplate: obj.restore_template }); }); updateObjectCountBadge(); - - // Track if there are more objects to load hasMoreObjects = data.is_truncated; if (loadMoreStatus) { if (data.is_truncated) { - loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} of ${totalObjectCount.toLocaleString()} objects loaded`; + loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} of ${totalObjectCount.toLocaleString()} loaded`; } else { - loadMoreStatus.textContent = `All ${loadedObjectCount.toLocaleString()} objects loaded`; + loadMoreStatus.textContent = `${loadedObjectCount.toLocaleString()} objects`; } } - // Update Load More button visibility if (typeof updateLoadMoreButton === 'function') { updateLoadMoreButton(); } - if (typeof initFolderNavigation === 'function') { - initFolderNavigation(); - } - - attachRowHandlers(); + // Refresh virtual scroll view + refreshVirtualList(); + renderBreadcrumb(currentPrefix); } catch (error) { console.error('Failed to load objects:', error); @@ -2047,7 +2270,6 @@ } } finally { isLoadingObjects = false; - // Hide loading spinner if (loadMoreSpinner) { loadMoreSpinner.classList.add('d-none'); } @@ -2055,16 +2277,15 @@ }; const attachRowHandlers = () => { + // Attach handlers to object rows const objectRows = document.querySelectorAll('[data-object-row]'); objectRows.forEach(row => { - if (row.dataset.handlersAttached) return; row.dataset.handlersAttached = 'true'; const deleteBtn = row.querySelector('[data-delete-object]'); deleteBtn?.addEventListener('click', (e) => { e.stopPropagation(); - const deleteModalEl = document.getElementById('deleteObjectModal'); const deleteModal = deleteModalEl ? bootstrap.Modal.getOrCreateInstance(deleteModalEl) : null; const deleteObjectForm = document.getElementById('deleteObjectForm'); @@ -2081,17 +2302,63 @@ selectCheckbox?.addEventListener('change', () => { toggleRowSelection(row, selectCheckbox.checked); }); + + // Restore selection state + if (selectedRows.has(row.dataset.key)) { + selectCheckbox.checked = true; + row.classList.add('table-active'); + } + }); + + // Attach handlers to folder rows + const folderRows = document.querySelectorAll('.folder-row'); + folderRows.forEach(row => { + if (row.dataset.handlersAttached) return; + row.dataset.handlersAttached = 'true'; + + const folderPath = row.dataset.folderPath; + + const checkbox = row.querySelector('[data-folder-select]'); + checkbox?.addEventListener('change', (e) => { + e.stopPropagation(); + // Select all objects in this folder + const folderObjects = allObjects.filter(obj => obj.key.startsWith(folderPath)); + folderObjects.forEach(obj => { + if (checkbox.checked) { + selectedRows.set(obj.key, obj); + } else { + selectedRows.delete(obj.key); + } + }); + updateBulkDeleteState(); + }); + + const folderBtn = row.querySelector('button'); + folderBtn?.addEventListener('click', (e) => { + e.stopPropagation(); + navigateToFolder(folderPath); + }); + + row.addEventListener('click', (e) => { + if (e.target.closest('[data-folder-select]') || e.target.closest('button')) return; + navigateToFolder(folderPath); + }); }); updateBulkDeleteState(); }; - // Infinite scroll: use IntersectionObserver to auto-load more objects + // Scroll container reference (needed for virtual scrolling) const scrollSentinel = document.getElementById('scroll-sentinel'); const scrollContainer = document.querySelector('.objects-table-container'); const loadMoreBtn = document.getElementById('load-more-btn'); - // Load More button click handler (fallback for mobile) + // Virtual scroll: listen to scroll events + if (scrollContainer) { + scrollContainer.addEventListener('scroll', handleVirtualScroll, { passive: true }); + } + + // Load More button click handler (fallback) loadMoreBtn?.addEventListener('click', () => { if (hasMoreObjects && !isLoadingObjects) { loadObjects(true); @@ -2105,8 +2372,8 @@ } } + // Auto-load more when near bottom (for loading all data) if (scrollSentinel && scrollContainer) { - // Observer for scrolling within the container (desktop) const containerObserver = new IntersectionObserver((entries) => { entries.forEach(entry => { if (entry.isIntersecting && hasMoreObjects && !isLoadingObjects) { @@ -2115,12 +2382,11 @@ }); }, { root: scrollContainer, - rootMargin: '100px', + rootMargin: '500px', // Load more earlier for smoother experience threshold: 0 }); containerObserver.observe(scrollSentinel); - // Observer for page scrolling (mobile - when container is not scrollable) const viewportObserver = new IntersectionObserver((entries) => { entries.forEach(entry => { if (entry.isIntersecting && hasMoreObjects && !isLoadingObjects) { @@ -2128,14 +2394,14 @@ } }); }, { - root: null, // viewport - rootMargin: '200px', + root: null, + rootMargin: '500px', threshold: 0 }); viewportObserver.observe(scrollSentinel); } - // Page size selector + // Page size selector (now controls batch size) const pageSizeSelect = document.getElementById('page-size-select'); pageSizeSelect?.addEventListener('change', (e) => { pageSize = parseInt(e.target.value, 10); @@ -2147,7 +2413,6 @@ const folderBreadcrumb = document.getElementById('folder-breadcrumb'); const objectsTableBody = document.querySelector('#objects-table tbody'); - let currentPrefix = ''; if (objectsTableBody) { objectsTableBody.addEventListener('click', (e) => { @@ -2189,7 +2454,8 @@ }; const countObjectsInFolder = (folderPrefix) => { - return allObjects.filter(obj => obj.key.startsWith(folderPrefix)).length; + const count = allObjects.filter(obj => obj.key.startsWith(folderPrefix)).length; + return { count, mayHaveMore: hasMoreObjects }; }; const renderBreadcrumb = (prefix) => { @@ -2263,9 +2529,10 @@ return allObjects.filter(obj => obj.key.startsWith(folderPrefix)); }; - const createFolderRow = (folderPath) => { - const folderName = folderPath.slice(currentPrefix.length).replace(/\/$/, ''); - const objectCount = countObjectsInFolder(folderPath); + const createFolderRow = (folderPath, displayName = null) => { + const folderName = displayName || folderPath.slice(currentPrefix.length).replace(/\/$/, ''); + const { count: objectCount, mayHaveMore } = countObjectsInFolder(folderPath); + const countDisplay = mayHaveMore ? `${objectCount}+` : objectCount; const tr = document.createElement('tr'); tr.className = 'folder-row'; @@ -2283,7 +2550,7 @@ ${escapeHtml(folderName)}/ -
${objectCount} object${objectCount !== 1 ? 's' : ''}
+
${countDisplay} object${objectCount !== 1 ? 's' : ''}
`; - - const checkbox = tr.querySelector('[data-folder-select]'); - checkbox?.addEventListener('change', (e) => { - e.stopPropagation(); - const folderObjects = getObjectsInFolder(folderPath); - folderObjects.forEach(obj => { - const objCheckbox = obj.element.querySelector('[data-object-select]'); - if (objCheckbox) { - objCheckbox.checked = checkbox.checked; - } - toggleRowSelection(obj.element, checkbox.checked); - }); - }); - - tr.addEventListener('click', (e) => { - if (e.target.closest('[data-folder-select]') || e.target.closest('button')) return; - navigateToFolder(folderPath); - }); return tr; }; + // Instant client-side folder navigation (no server round-trip!) const navigateToFolder = (prefix) => { currentPrefix = prefix; + + // Scroll to top when navigating + if (scrollContainer) scrollContainer.scrollTop = 0; + + // Instant re-render from already-loaded data + refreshVirtualList(); renderBreadcrumb(prefix); - renderObjectsView(); selectedRows.clear(); @@ -2530,12 +2785,10 @@ bulkDeleteConfirm.disabled = selectedCount === 0 || bulkDeleting; } if (selectAllCheckbox) { - - const visibleRowsRaw = hasFolders() - ? allObjects.filter(obj => obj.key.startsWith(currentPrefix) && !obj.key.slice(currentPrefix.length).includes('/')).map(obj => obj.element) - : Array.from(document.querySelectorAll('[data-object-row]')); - const total = visibleRowsRaw.filter(r => r.style.display !== 'none').length; - const visibleSelectedCount = visibleRowsRaw.filter(r => r.style.display !== 'none' && selectedRows.has(r.dataset.key)).length; + // With virtual scrolling, count files in current folder from visibleItems + const filesInView = visibleItems.filter(item => item.type === 'file'); + const total = filesInView.length; + const visibleSelectedCount = filesInView.filter(item => selectedRows.has(item.data.key)).length; selectAllCheckbox.disabled = total === 0; selectAllCheckbox.checked = visibleSelectedCount > 0 && visibleSelectedCount === total && total > 0; selectAllCheckbox.indeterminate = visibleSelectedCount > 0 && visibleSelectedCount < total; @@ -3138,44 +3391,41 @@ function initFolderNavigation() { if (hasFolders()) { - renderBreadcrumb(''); + renderBreadcrumb(currentPrefix); renderObjectsView(); } + if (typeof updateFolderViewStatus === 'function') { + updateFolderViewStatus(); + } + if (typeof updateFilterWarning === 'function') { + updateFilterWarning(); + } } bulkDeleteButton?.addEventListener('click', () => openBulkDeleteModal()); bulkDeleteConfirm?.addEventListener('click', () => performBulkDelete()); + const filterWarning = document.getElementById('filter-warning'); + const filterWarningText = document.getElementById('filter-warning-text'); + const folderViewStatus = document.getElementById('folder-view-status'); + + const updateFilterWarning = () => { + if (!filterWarning) return; + const isFiltering = currentFilterTerm.length > 0; + if (isFiltering && hasMoreObjects) { + filterWarning.classList.remove('d-none'); + } else { + filterWarning.classList.add('d-none'); + } + }; + document.getElementById('object-search')?.addEventListener('input', (event) => { currentFilterTerm = event.target.value.toLowerCase(); + updateFilterWarning(); - if (hasFolders()) { - const { folders, files } = getFoldersAtPrefix(currentPrefix); - const tbody = objectsTableBody; - - tbody.innerHTML = ''; - - folders.forEach(folderPath => { - const folderName = folderPath.slice(currentPrefix.length).replace(/\/$/, '').toLowerCase(); - if (folderName.includes(currentFilterTerm)) { - tbody.appendChild(createFolderRow(folderPath)); - } - }); - - files.forEach(obj => { - const keyName = obj.key.slice(currentPrefix.length).toLowerCase(); - if (keyName.includes(currentFilterTerm)) { - tbody.appendChild(obj.element); - obj.element.style.display = ''; - } - }); - } else { - // Filter all loaded objects (including newly loaded ones) - document.querySelectorAll('[data-object-row]').forEach((row) => { - const key = row.dataset.key.toLowerCase(); - row.style.display = key.includes(currentFilterTerm) ? '' : 'none'; - }); - } + // Use the virtual scrolling system for filtering - it properly handles + // both folder view and flat view, and works with large object counts + refreshVirtualList(); }); refreshVersionsButton?.addEventListener('click', () => { @@ -3616,6 +3866,12 @@ const lastSyncEl = document.getElementById('replication-last-sync'); const lastSyncTimeEl = document.querySelector('[data-stat="last-sync-time"]'); const lastSyncKeyEl = document.querySelector('[data-stat="last-sync-key"]'); + const endpointWarning = document.getElementById('replication-endpoint-warning'); + const endpointErrorEl = document.getElementById('replication-endpoint-error'); + const statusAlert = document.getElementById('replication-status-alert'); + const statusBadge = document.getElementById('replication-status-badge'); + const statusText = document.getElementById('replication-status-text'); + const pauseForm = document.getElementById('pause-replication-form'); const loadReplicationStats = async () => { try { @@ -3623,6 +3879,48 @@ if (!resp.ok) throw new Error('Failed to fetch stats'); const data = await resp.json(); + // Handle endpoint health status + if (data.endpoint_healthy === false) { + // Show warning and hide success alert + if (endpointWarning) { + endpointWarning.classList.remove('d-none'); + if (endpointErrorEl && data.endpoint_error) { + endpointErrorEl.textContent = data.endpoint_error + '. Replication is paused until the endpoint is available.'; + } + } + if (statusAlert) statusAlert.classList.add('d-none'); + + // Update status badge to show "Paused" with warning styling + if (statusBadge) { + statusBadge.className = 'badge bg-warning-subtle text-warning px-3 py-2'; + statusBadge.innerHTML = ` + + + + Paused (Endpoint Unavailable)`; + } + + // Hide the pause button since replication is effectively already paused + if (pauseForm) pauseForm.classList.add('d-none'); + } else { + // Hide warning and show success alert + if (endpointWarning) endpointWarning.classList.add('d-none'); + if (statusAlert) statusAlert.classList.remove('d-none'); + + // Restore status badge to show "Enabled" + if (statusBadge) { + statusBadge.className = 'badge bg-success-subtle text-success px-3 py-2'; + statusBadge.innerHTML = ` + + + + Enabled`; + } + + // Show the pause button + if (pauseForm) pauseForm.classList.remove('d-none'); + } + if (syncedEl) syncedEl.textContent = data.objects_synced; if (pendingEl) { pendingEl.textContent = data.objects_pending; @@ -3685,5 +3983,106 @@ } } }); + + // Bucket name validation for replication setup + const targetBucketInput = document.getElementById('target_bucket'); + const targetBucketFeedback = document.getElementById('target_bucket_feedback'); + + const validateBucketName = (name) => { + if (!name) return { valid: false, error: 'Bucket name is required' }; + if (name.length < 3) return { valid: false, error: 'Bucket name must be at least 3 characters' }; + if (name.length > 63) return { valid: false, error: 'Bucket name must be 63 characters or less' }; + if (!/^[a-z0-9]/.test(name)) return { valid: false, error: 'Bucket name must start with a lowercase letter or number' }; + if (!/[a-z0-9]$/.test(name)) return { valid: false, error: 'Bucket name must end with a lowercase letter or number' }; + if (/[A-Z]/.test(name)) return { valid: false, error: 'Bucket name must not contain uppercase letters' }; + if (/_/.test(name)) return { valid: false, error: 'Bucket name must not contain underscores' }; + if (/\.\.|--/.test(name)) return { valid: false, error: 'Bucket name must not contain consecutive periods or hyphens' }; + if (/^\d+\.\d+\.\d+\.\d+$/.test(name)) return { valid: false, error: 'Bucket name must not be formatted as an IP address' }; + if (!/^[a-z0-9][a-z0-9.-]*[a-z0-9]$/.test(name) && name.length > 2) return { valid: false, error: 'Bucket name contains invalid characters. Use only lowercase letters, numbers, hyphens, and periods.' }; + return { valid: true, error: null }; + }; + + const updateBucketNameValidation = () => { + if (!targetBucketInput || !targetBucketFeedback) return; + const name = targetBucketInput.value.trim(); + if (!name) { + targetBucketInput.classList.remove('is-valid', 'is-invalid'); + targetBucketFeedback.textContent = ''; + return; + } + const result = validateBucketName(name); + targetBucketInput.classList.toggle('is-valid', result.valid); + targetBucketInput.classList.toggle('is-invalid', !result.valid); + targetBucketFeedback.textContent = result.error || ''; + }; + + targetBucketInput?.addEventListener('input', updateBucketNameValidation); + targetBucketInput?.addEventListener('blur', updateBucketNameValidation); + + // Prevent form submission if bucket name is invalid + const replicationForm = targetBucketInput?.closest('form'); + replicationForm?.addEventListener('submit', (e) => { + const name = targetBucketInput.value.trim(); + const result = validateBucketName(name); + if (!result.valid) { + e.preventDefault(); + updateBucketNameValidation(); + targetBucketInput.focus(); + return false; + } + }); + + // Policy JSON validation and formatting + const formatPolicyBtn = document.getElementById('formatPolicyBtn'); + const policyValidationStatus = document.getElementById('policyValidationStatus'); + const policyValidBadge = document.getElementById('policyValidBadge'); + const policyInvalidBadge = document.getElementById('policyInvalidBadge'); + const policyErrorDetail = document.getElementById('policyErrorDetail'); + + const validatePolicyJson = () => { + if (!policyTextarea || !policyValidationStatus) return; + const value = policyTextarea.value.trim(); + if (!value) { + policyValidationStatus.classList.add('d-none'); + policyErrorDetail?.classList.add('d-none'); + return; + } + policyValidationStatus.classList.remove('d-none'); + try { + JSON.parse(value); + policyValidBadge?.classList.remove('d-none'); + policyInvalidBadge?.classList.add('d-none'); + policyErrorDetail?.classList.add('d-none'); + } catch (err) { + policyValidBadge?.classList.add('d-none'); + policyInvalidBadge?.classList.remove('d-none'); + if (policyErrorDetail) { + policyErrorDetail.textContent = err.message; + policyErrorDetail.classList.remove('d-none'); + } + } + }; + + policyTextarea?.addEventListener('input', validatePolicyJson); + policyTextarea?.addEventListener('blur', validatePolicyJson); + + formatPolicyBtn?.addEventListener('click', () => { + if (!policyTextarea) return; + const value = policyTextarea.value.trim(); + if (!value) return; + try { + const parsed = JSON.parse(value); + policyTextarea.value = JSON.stringify(parsed, null, 2); + validatePolicyJson(); + } catch (err) { + // Show error in validation + validatePolicyJson(); + } + }); + + // Initialize policy validation on page load + if (policyTextarea && policyPreset?.value === 'custom') { + validatePolicyJson(); + } {% endblock %} diff --git a/templates/connections.html b/templates/connections.html index 943af75..816fd9e 100644 --- a/templates/connections.html +++ b/templates/connections.html @@ -104,6 +104,7 @@
KeySizeActionsSizeActions
-
${escapeHtml(obj.key)}
-
Modified ${escapeHtml(obj.last_modified_display)}
+
${escapeHtml(keyToShow)}
+
Modified ${escapeHtml(lastModDisplay)}
${formatBytes(obj.size)} @@ -1845,7 +1923,7 @@
+
+
+ + + +
+
Empty folder
+

This folder contains no objects${hasMoreObjects ? ' yet. Loading more...' : '.'}

+
+
@@ -2296,32 +2563,20 @@
+ @@ -113,7 +114,12 @@ {% for conn in connections %} - + +
Status Name Endpoint Region
+ + + +
@@ -301,7 +307,11 @@ const formData = new FormData(form); const data = Object.fromEntries(formData.entries()); - resultDiv.innerHTML = '
Testing...
'; + resultDiv.innerHTML = '
Testing connection...
'; + + // Use AbortController to timeout client-side after 20 seconds + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 20000); try { const response = await fetch("{{ url_for('ui.test_connection') }}", { @@ -310,17 +320,44 @@ "Content-Type": "application/json", "X-CSRFToken": "{{ csrf_token() }}" }, - body: JSON.stringify(data) + body: JSON.stringify(data), + signal: controller.signal }); + clearTimeout(timeoutId); const result = await response.json(); if (response.ok) { - resultDiv.innerHTML = `
${result.message}
`; + resultDiv.innerHTML = `
+ + + + ${result.message} +
`; } else { - resultDiv.innerHTML = `
${result.message}
`; + resultDiv.innerHTML = `
+ + + + ${result.message} +
`; } } catch (error) { - resultDiv.innerHTML = `
Connection failed
`; + clearTimeout(timeoutId); + if (error.name === 'AbortError') { + resultDiv.innerHTML = `
+ + + + Connection test timed out - endpoint may be unreachable +
`; + } else { + resultDiv.innerHTML = `
+ + + + Connection failed: Network error +
`; + } } } @@ -358,5 +395,54 @@ const form = document.getElementById('deleteConnectionForm'); form.action = "{{ url_for('ui.delete_connection', connection_id='CONN_ID') }}".replace('CONN_ID', id); }); + + // Check connection health for each connection in the table + // Uses staggered requests to avoid overwhelming the server + async function checkConnectionHealth(connectionId, statusEl) { + try { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 15000); + + const response = await fetch(`/ui/connections/${connectionId}/health`, { + signal: controller.signal + }); + clearTimeout(timeoutId); + + const data = await response.json(); + if (data.healthy) { + statusEl.innerHTML = ` + + + `; + statusEl.setAttribute('data-status', 'healthy'); + statusEl.setAttribute('title', 'Connected'); + } else { + statusEl.innerHTML = ` + + + `; + statusEl.setAttribute('data-status', 'unhealthy'); + statusEl.setAttribute('title', data.error || 'Unreachable'); + } + } catch (error) { + statusEl.innerHTML = ` + + + `; + statusEl.setAttribute('data-status', 'unknown'); + statusEl.setAttribute('title', 'Could not check status'); + } + } + + // Stagger health checks to avoid all requests at once + const connectionRows = document.querySelectorAll('tr[data-connection-id]'); + connectionRows.forEach((row, index) => { + const connectionId = row.getAttribute('data-connection-id'); + const statusEl = row.querySelector('.connection-status'); + if (statusEl) { + // Stagger requests by 200ms each + setTimeout(() => checkConnectionHealth(connectionId, statusEl), index * 200); + } + }); {% endblock %} diff --git a/templates/docs.html b/templates/docs.html index 4840d82..fd09517 100644 --- a/templates/docs.html +++ b/templates/docs.html @@ -47,9 +47,9 @@ python run.py --mode ui - - - + + +
VariableDefaultDescriptionVariableDefaultDescription