diff --git a/app/__init__.py b/app/__init__.py index d764753..3147a1c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -45,7 +45,6 @@ def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: try: shutil.move(str(legacy_path), str(active_path)) except OSError: - # Fall back to copy + delete for cross-device moves shutil.copy2(legacy_path, active_path) try: legacy_path.unlink(missing_ok=True) @@ -101,12 +100,10 @@ def create_app( bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"])) secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300)) - # Initialize replication with system config directory for consistency storage_root = Path(app.config["STORAGE_ROOT"]) config_dir = storage_root / ".myfsio.sys" / "config" config_dir.mkdir(parents=True, exist_ok=True) - # Migrate connection configs from legacy locations connections_path = _migrate_config_file( active_path=config_dir / "connections.json", legacy_paths=[ @@ -125,7 +122,6 @@ def create_app( connections = ConnectionStore(connections_path) replication = ReplicationManager(storage, connections, replication_rules_path) - # Initialize encryption and KMS encryption_config = { "encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False), "encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"), @@ -140,7 +136,6 @@ def create_app( kms_manager = KMSManager(kms_keys_path, kms_master_key_path) encryption_manager.set_kms_provider(kms_manager) - # Wrap storage with encryption layer if encryption is enabled if app.config.get("ENCRYPTION_ENABLED", False): from .encrypted_storage import EncryptedObjectStorage storage = EncryptedObjectStorage(storage, encryption_manager) @@ -243,7 +238,7 @@ def _configure_cors(app: Flask) -> None: class _RequestContextFilter(logging.Filter): """Inject request-specific attributes into log records.""" - def filter(self, record: logging.LogRecord) -> bool: # pragma: no cover - simple boilerplate + def filter(self, record: logging.LogRecord) -> bool: if has_request_context(): record.request_id = getattr(g, "request_id", "-") record.path = request.path diff --git a/app/bucket_policies.py b/app/bucket_policies.py index 73a28b7..d576030 100644 --- a/app/bucket_policies.py +++ b/app/bucket_policies.py @@ -188,7 +188,6 @@ class BucketPolicyStore: except FileNotFoundError: return None - # ------------------------------------------------------------------ def evaluate( self, access_key: Optional[str], @@ -229,7 +228,6 @@ class BucketPolicyStore: self._policies.pop(bucket, None) self._persist() - # ------------------------------------------------------------------ def _load(self) -> None: try: content = self.policy_path.read_text(encoding='utf-8') diff --git a/app/config.py b/app/config.py index b5b2a92..2489397 100644 --- a/app/config.py +++ b/app/config.py @@ -153,9 +153,8 @@ class AppConfig: cors_allow_headers = _csv(str(_get("CORS_ALLOW_HEADERS", "*")), ["*"]) cors_expose_headers = _csv(str(_get("CORS_EXPOSE_HEADERS", "*")), ["*"]) session_lifetime_days = int(_get("SESSION_LIFETIME_DAYS", 30)) - bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60)) # Default 60 seconds + bucket_stats_cache_ttl = int(_get("BUCKET_STATS_CACHE_TTL", 60)) - # Encryption settings encryption_enabled = str(_get("ENCRYPTION_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} encryption_keys_dir = storage_root / ".myfsio.sys" / "keys" encryption_master_key_path = Path(_get("ENCRYPTION_MASTER_KEY_PATH", encryption_keys_dir / "master.key")).resolve() @@ -206,7 +205,6 @@ class AppConfig: """ issues = [] - # Check if storage_root is writable try: test_file = self.storage_root / ".write_test" test_file.touch() @@ -214,24 +212,20 @@ class AppConfig: except (OSError, PermissionError) as e: issues.append(f"CRITICAL: STORAGE_ROOT '{self.storage_root}' is not writable: {e}") - # Check if storage_root looks like a temp directory storage_str = str(self.storage_root).lower() if "/tmp" in storage_str or "\\temp" in storage_str or "appdata\\local\\temp" in storage_str: issues.append(f"WARNING: STORAGE_ROOT '{self.storage_root}' appears to be a temporary directory. Data may be lost on reboot!") - # Check if IAM config path is under storage_root try: self.iam_config_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: IAM_CONFIG '{self.iam_config_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting IAM_CONFIG explicitly or ensuring paths are aligned.") - # Check if bucket policy path is under storage_root try: self.bucket_policy_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: BUCKET_POLICY_PATH '{self.bucket_policy_path}' is outside STORAGE_ROOT '{self.storage_root}'. Consider setting BUCKET_POLICY_PATH explicitly.") - # Check if log path is writable try: self.log_path.parent.mkdir(parents=True, exist_ok=True) test_log = self.log_path.parent / ".write_test" @@ -240,26 +234,22 @@ class AppConfig: except (OSError, PermissionError) as e: issues.append(f"WARNING: Log directory '{self.log_path.parent}' is not writable: {e}") - # Check log path location log_str = str(self.log_path).lower() if "/tmp" in log_str or "\\temp" in log_str or "appdata\\local\\temp" in log_str: issues.append(f"WARNING: LOG_DIR '{self.log_path.parent}' appears to be a temporary directory. Logs may be lost on reboot!") - # Check if encryption keys path is under storage_root (when encryption is enabled) if self.encryption_enabled: try: self.encryption_master_key_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: ENCRYPTION_MASTER_KEY_PATH '{self.encryption_master_key_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.") - # Check if KMS keys path is under storage_root (when KMS is enabled) if self.kms_enabled: try: self.kms_keys_path.relative_to(self.storage_root) except ValueError: issues.append(f"WARNING: KMS_KEYS_PATH '{self.kms_keys_path}' is outside STORAGE_ROOT. Ensure proper backup procedures.") - # Warn about production settings if self.secret_key == "dev-secret-key": issues.append("WARNING: Using default SECRET_KEY. Set SECRET_KEY environment variable for production.") diff --git a/app/iam.py b/app/iam.py index d8c3c8d..519ae74 100644 --- a/app/iam.py +++ b/app/iam.py @@ -125,7 +125,6 @@ class IamService: except OSError: pass - # ---------------------- authz helpers ---------------------- def authenticate(self, access_key: str, secret_key: str) -> Principal: self._maybe_reload() access_key = (access_key or "").strip() @@ -218,7 +217,6 @@ class IamService: return True return False - # ---------------------- management helpers ---------------------- def list_users(self) -> List[Dict[str, Any]]: listing: List[Dict[str, Any]] = [] for access_key, record in self._users.items(): @@ -291,7 +289,6 @@ class IamService: self._save() self._load() - # ---------------------- config helpers ---------------------- def _load(self) -> None: try: self._last_load_time = self.config_path.stat().st_mtime @@ -337,7 +334,6 @@ class IamService: except (OSError, PermissionError) as e: raise IamError(f"Cannot save IAM config: {e}") - # ---------------------- insight helpers ---------------------- def config_summary(self) -> Dict[str, Any]: return { "path": str(self.config_path), diff --git a/app/kms_api.py b/app/kms_api.py index 551d262..2ff9e35 100644 --- a/app/kms_api.py +++ b/app/kms_api.py @@ -33,9 +33,6 @@ def _encryption(): def _error_response(code: str, message: str, status: int) -> tuple[Dict[str, Any], int]: return {"__type": code, "message": message}, status - -# ---------------------- Key Management ---------------------- - @kms_api_bp.route("/keys", methods=["GET", "POST"]) @limiter.limit("30 per minute") def list_or_create_keys(): @@ -65,7 +62,6 @@ def list_or_create_keys(): except EncryptionError as exc: return _error_response("KMSInternalException", str(exc), 400) - # GET - List keys keys = kms.list_keys() return jsonify({ "Keys": [{"KeyId": k.key_id, "KeyArn": k.arn} for k in keys], @@ -96,7 +92,6 @@ def get_or_delete_key(key_id: str): except EncryptionError as exc: return _error_response("NotFoundException", str(exc), 404) - # GET key = kms.get_key(key_id) if not key: return _error_response("NotFoundException", f"Key not found: {key_id}", 404) @@ -149,9 +144,6 @@ def disable_key(key_id: str): except EncryptionError as exc: return _error_response("NotFoundException", str(exc), 404) - -# ---------------------- Encryption Operations ---------------------- - @kms_api_bp.route("/encrypt", methods=["POST"]) @limiter.limit("60 per minute") def encrypt_data(): @@ -251,7 +243,6 @@ def generate_data_key(): try: plaintext_key, encrypted_key = kms.generate_data_key(key_id, context) - # Trim key if AES_128 requested if key_spec == "AES_128": plaintext_key = plaintext_key[:16] @@ -322,10 +313,7 @@ def re_encrypt(): return _error_response("ValidationException", "CiphertextBlob must be base64 encoded", 400) try: - # First decrypt, get source key id plaintext, source_key_id = kms.decrypt(ciphertext, source_context) - - # Re-encrypt with destination key new_ciphertext = kms.encrypt(destination_key_id, plaintext, destination_context) return jsonify({ @@ -365,9 +353,6 @@ def generate_random(): except EncryptionError as exc: return _error_response("ValidationException", str(exc), 400) - -# ---------------------- Client-Side Encryption Helpers ---------------------- - @kms_api_bp.route("/client/generate-key", methods=["POST"]) @limiter.limit("30 per minute") def generate_client_key(): @@ -427,9 +412,6 @@ def client_decrypt(): except Exception as exc: return _error_response("DecryptionError", str(exc), 400) - -# ---------------------- Encryption Materials for S3 Client-Side Encryption ---------------------- - @kms_api_bp.route("/materials/", methods=["POST"]) @limiter.limit("60 per minute") def get_encryption_materials(key_id: str): diff --git a/app/replication.py b/app/replication.py index 9361ca2..4a1df9a 100644 --- a/app/replication.py +++ b/app/replication.py @@ -22,6 +22,8 @@ from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" +REPLICATION_CONNECT_TIMEOUT = 5 # seconds to wait for connection +REPLICATION_READ_TIMEOUT = 30 # seconds to wait for response REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" @@ -121,6 +123,34 @@ class ReplicationManager: with open(self.rules_path, "w") as f: json.dump(data, f, indent=2) + def check_endpoint_health(self, connection: RemoteConnection) -> bool: + """Check if a remote endpoint is reachable and responsive. + + Returns True if endpoint is healthy, False otherwise. + Uses short timeouts to prevent blocking. + """ + try: + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 1} # Don't retry for health checks + ) + s3 = boto3.client( + "s3", + endpoint_url=connection.endpoint_url, + aws_access_key_id=connection.access_key, + aws_secret_access_key=connection.secret_key, + region_name=connection.region, + config=config, + ) + # Simple list_buckets call to verify connectivity + s3.list_buckets() + return True + except Exception as e: + logger.warning(f"Endpoint health check failed for {connection.name} ({connection.endpoint_url}): {e}") + return False + def get_rule(self, bucket_name: str) -> Optional[ReplicationRule]: return self._rules.get(bucket_name) @@ -218,6 +248,11 @@ class ReplicationManager: logger.warning(f"Cannot replicate existing objects: Connection {rule.target_connection_id} not found") return + # Check endpoint health before starting bulk replication + if not self.check_endpoint_health(connection): + logger.warning(f"Cannot replicate existing objects: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable") + return + try: objects = self.storage.list_objects_all(bucket_name) logger.info(f"Starting replication of {len(objects)} existing objects from {bucket_name}") @@ -255,6 +290,11 @@ class ReplicationManager: logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found") return + # Check endpoint health before attempting replication to prevent hangs + if not self.check_endpoint_health(connection): + logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Endpoint {connection.name} ({connection.endpoint_url}) is not reachable") + return + self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: @@ -271,13 +311,20 @@ class ReplicationManager: file_size = 0 try: - config = Config(user_agent_extra=REPLICATION_USER_AGENT) + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 2}, # Limited retries to prevent long hangs + signature_version='s3v4', # Force signature v4 for compatibility + s3={'addressing_style': 'path'} # Use path-style addressing for compatibility + ) s3 = boto3.client( "s3", endpoint_url=conn.endpoint_url, aws_access_key_id=conn.access_key, aws_secret_access_key=conn.secret_key, - region_name=conn.region, + region_name=conn.region or 'us-east-1', # Default region if not set config=config, ) @@ -309,16 +356,31 @@ class ReplicationManager: logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") def do_put_object() -> None: - """Helper to upload object.""" - with path.open("rb") as f: - s3.put_object( - Bucket=rule.target_bucket, - Key=object_key, - Body=f, - ContentLength=file_size, - ContentType=content_type or "application/octet-stream", - Metadata=metadata or {} - ) + """Helper to upload object. + + Reads the file content into memory first to avoid signature calculation + issues with certain binary file types (like GIFs) when streaming. + Do NOT set ContentLength explicitly - boto3 calculates it from the bytes + and setting it manually can cause SignatureDoesNotMatch errors. + """ + file_content = path.read_bytes() + put_kwargs = { + "Bucket": rule.target_bucket, + "Key": object_key, + "Body": file_content, + } + if content_type: + put_kwargs["ContentType"] = content_type + if metadata: + put_kwargs["Metadata"] = metadata + + # Debug logging for signature issues + logger.debug(f"PUT request details: bucket={rule.target_bucket}, key={repr(object_key)}, " + f"content_type={content_type}, body_len={len(file_content)}, " + f"endpoint={conn.endpoint_url}") + logger.debug(f"Key bytes: {object_key.encode('utf-8')}") + + s3.put_object(**put_kwargs) try: do_put_object() @@ -358,6 +420,14 @@ class ReplicationManager: except (ClientError, OSError, ValueError) as e: logger.error(f"Replication failed for {bucket_name}/{object_key}: {e}") + # Log additional debug info for signature errors + if isinstance(e, ClientError): + error_code = e.response.get('Error', {}).get('Code', '') + if 'Signature' in str(e) or 'Signature' in error_code: + logger.error(f"Signature debug - Key repr: {repr(object_key)}, " + f"Endpoint: {conn.endpoint_url}, " + f"Region: {conn.region}, " + f"Target bucket: {rule.target_bucket}") except Exception: logger.exception(f"Unexpected error during replication for {bucket_name}/{object_key}") diff --git a/app/s3_api.py b/app/s3_api.py index 61b23f6..ae031dd 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -8,7 +8,7 @@ import re import uuid from datetime import datetime, timedelta, timezone from typing import Any, Dict -from urllib.parse import quote, urlencode, urlparse +from urllib.parse import quote, urlencode, urlparse, unquote from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError from flask import Blueprint, Response, current_app, jsonify, request, g @@ -22,8 +22,6 @@ from .storage import ObjectStorage, StorageError, QuotaExceededError s3_api_bp = Blueprint("s3_api", __name__) - -# Helper functions for accessing app extensions and generating responses def _storage() -> ObjectStorage: return current_app.extensions["object_storage"] @@ -68,8 +66,26 @@ def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name return k_signing +def _get_canonical_uri(req: Any) -> str: + """Get the canonical URI for SigV4 signature verification. + + AWS SigV4 requires the canonical URI to be URL-encoded exactly as the client + sent it. Flask/Werkzeug automatically URL-decodes request.path, so we need + to get the raw path from the environ. + + The canonical URI should have each path segment URL-encoded (with '/' preserved), + and the encoding should match what the client used when signing. + """ + raw_uri = req.environ.get('RAW_URI') or req.environ.get('REQUEST_URI') + + if raw_uri: + path = raw_uri.split('?')[0] + return path + + return quote(req.path, safe="/-_.~") + + def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: - # Parse Authorization header: AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=... match = re.match( r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)", auth_header, @@ -82,9 +98,8 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if not secret_key: raise IamError("Invalid access key") - # Build canonical request method = req.method - canonical_uri = quote(req.path, safe="/-_.~") + canonical_uri = _get_canonical_uri(req) query_args = [] for key, value in req.args.items(multi=True): @@ -124,13 +139,12 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: now = datetime.now(timezone.utc) time_diff = abs((now - request_time).total_seconds()) - if time_diff > 900: # AWS standard: 15-minute request validity window + if time_diff > 900: raise IamError("Request timestamp too old or too far in the future") required_headers = {'host', 'x-amz-date'} signed_headers_set = set(signed_headers_str.split(';')) if not required_headers.issubset(signed_headers_set): - # Some clients use 'date' instead of 'x-amz-date' if 'date' in signed_headers_set: required_headers.remove('x-amz-date') required_headers.add('date') @@ -177,9 +191,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None: if not secret_key: raise IamError("Invalid access key") - # Build canonical request method = req.method - canonical_uri = quote(req.path, safe="/-_.~") + canonical_uri = _get_canonical_uri(req) query_args = [] for key, value in req.args.items(multi=True): @@ -211,7 +224,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: payload_hash ]) - # Build signature algorithm = "AWS4-HMAC-SHA256" credential_scope = f"{date_stamp}/{region}/{service}/aws4_request" hashed_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() @@ -479,7 +491,6 @@ def _generate_presigned_url( } canonical_query = _encode_query_params(query_params) - # Get presigned URL host and scheme from config or request headers api_base = current_app.config.get("API_BASE_URL") if api_base: parsed = urlparse(api_base) @@ -839,7 +850,6 @@ def _bucket_versioning_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket versioning updated", extra={"bucket": bucket_name, "status": status}) return Response(status=200) - # GET try: enabled = storage.is_versioning_enabled(bucket_name) except StorageError as exc: @@ -875,7 +885,7 @@ def _bucket_tagging_handler(bucket_name: str) -> Response: return _error_response("NoSuchBucket", str(exc), 404) current_app.logger.info("Bucket tags deleted", extra={"bucket": bucket_name}) return Response(status=204) - # PUT + payload = request.get_data(cache=False) or b"" try: tags = _parse_tagging_document(payload) @@ -900,7 +910,6 @@ def _object_tagging_handler(bucket_name: str, object_key: str) -> Response: if error: return error - # Use read permission for GET, write for PUT/DELETE action = "read" if request.method == "GET" else "write" try: _authorize_action(principal, bucket_name, action, object_key=object_key) @@ -1079,7 +1088,6 @@ def _bucket_location_handler(bucket_name: str) -> Response: if not storage.bucket_exists(bucket_name): return _error_response("NoSuchBucket", "Bucket does not exist", 404) - # Return bucket location (empty for us-east-1 per AWS spec) region = current_app.config.get("AWS_REGION", "us-east-1") root = Element("LocationConstraint") root.text = region if region != "us-east-1" else None @@ -1106,7 +1114,6 @@ def _bucket_acl_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl}) return Response(status=200) - # Return basic ACL document showing owner's full control root = Element("AccessControlPolicy") owner = SubElement(root, "Owner") SubElement(owner, "ID").text = principal.access_key if principal else "anonymous" @@ -1154,7 +1161,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: if key_marker: objects = [obj for obj in objects if obj.key > key_marker] - # Build XML response root = Element("ListVersionsResult", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") SubElement(root, "Name").text = bucket_name SubElement(root, "Prefix").text = prefix @@ -1172,7 +1178,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: is_truncated = True break - # Add current version to response version = SubElement(root, "Version") SubElement(version, "Key").text = obj.key SubElement(version, "VersionId").text = "null" @@ -1189,7 +1194,6 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: version_count += 1 next_key_marker = obj.key - # Get historical versions try: versions = storage.list_object_versions(bucket_name, obj.key) for v in versions: @@ -1273,14 +1277,12 @@ def _render_lifecycle_config(config: list) -> Element: rule_el = SubElement(root, "Rule") SubElement(rule_el, "ID").text = rule.get("ID", "") - # Filter filter_el = SubElement(rule_el, "Filter") if rule.get("Prefix"): SubElement(filter_el, "Prefix").text = rule.get("Prefix", "") SubElement(rule_el, "Status").text = rule.get("Status", "Enabled") - # Add expiration rule if present if "Expiration" in rule: exp = rule["Expiration"] exp_el = SubElement(rule_el, "Expiration") @@ -1291,14 +1293,12 @@ def _render_lifecycle_config(config: list) -> Element: if exp.get("ExpiredObjectDeleteMarker"): SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true" - # Add noncurrent version expiration if present if "NoncurrentVersionExpiration" in rule: nve = rule["NoncurrentVersionExpiration"] nve_el = SubElement(rule_el, "NoncurrentVersionExpiration") if "NoncurrentDays" in nve: SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"]) - # Add incomplete multipart upload cleanup if present if "AbortIncompleteMultipartUpload" in rule: aimu = rule["AbortIncompleteMultipartUpload"] aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload") @@ -1322,29 +1322,24 @@ def _parse_lifecycle_config(payload: bytes) -> list: for rule_el in root.findall("{*}Rule") or root.findall("Rule"): rule: dict = {} - # Extract rule ID id_el = rule_el.find("{*}ID") or rule_el.find("ID") if id_el is not None and id_el.text: rule["ID"] = id_el.text.strip() - # Extract filter prefix filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter") if filter_el is not None: prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix") if prefix_el is not None and prefix_el.text: rule["Prefix"] = prefix_el.text - # Fall back to legacy Prefix element (outside Filter) if "Prefix" not in rule: prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix") if prefix_el is not None: rule["Prefix"] = prefix_el.text or "" - # Extract status status_el = rule_el.find("{*}Status") or rule_el.find("Status") rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled" - # Parse expiration rule exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration") if exp_el is not None: expiration: dict = {} @@ -1360,7 +1355,6 @@ def _parse_lifecycle_config(payload: bytes) -> list: if expiration: rule["Expiration"] = expiration - # NoncurrentVersionExpiration nve_el = rule_el.find("{*}NoncurrentVersionExpiration") or rule_el.find("NoncurrentVersionExpiration") if nve_el is not None: nve: dict = {} @@ -1370,7 +1364,6 @@ def _parse_lifecycle_config(payload: bytes) -> list: if nve: rule["NoncurrentVersionExpiration"] = nve - # AbortIncompleteMultipartUpload aimu_el = rule_el.find("{*}AbortIncompleteMultipartUpload") or rule_el.find("AbortIncompleteMultipartUpload") if aimu_el is not None: aimu: dict = {} @@ -1408,7 +1401,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if not quota: return _error_response("NoSuchQuotaConfiguration", "No quota configuration found", 404) - # Return as JSON for simplicity (not a standard S3 API) stats = storage.bucket_stats(bucket_name) return jsonify({ "quota": quota, @@ -1437,7 +1429,6 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if max_size_bytes is None and max_objects is None: return _error_response("InvalidArgument", "At least one of max_size_bytes or max_objects is required", 400) - # Validate types if max_size_bytes is not None: try: max_size_bytes = int(max_size_bytes) @@ -1548,7 +1539,6 @@ def _bulk_delete_handler(bucket_name: str) -> Response: return _xml_response(result, status=200) -# Route handlers for S3 API endpoints @s3_api_bp.get("/") @limiter.limit("60 per minute") def list_buckets() -> Response: @@ -1626,7 +1616,6 @@ def bucket_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket deleted", extra={"bucket": bucket_name}) return Response(status=204) - # Handle GET - list objects (supports both ListObjects and ListObjectsV2) principal, error = _require_principal() try: _authorize_action(principal, bucket_name, "list") @@ -1640,7 +1629,6 @@ def bucket_handler(bucket_name: str) -> Response: delimiter = request.args.get("delimiter", "") max_keys = min(int(request.args.get("max-keys", current_app.config["UI_PAGE_SIZE"])), 1000) - # Use appropriate markers for pagination depending on API version marker = request.args.get("marker", "") # ListObjects v1 continuation_token = request.args.get("continuation-token", "") # ListObjectsV2 start_after = request.args.get("start-after", "") # ListObjectsV2 @@ -1660,7 +1648,6 @@ def bucket_handler(bucket_name: str) -> Response: else: effective_start = marker - # Fetch with buffer for delimiter processing; delimiter requires extra objects to compute prefixes fetch_keys = max_keys * 10 if delimiter else max_keys try: list_result = storage.list_objects( @@ -1680,7 +1667,6 @@ def bucket_handler(bucket_name: str) -> Response: for obj in objects: key_after_prefix = obj.key[len(prefix):] if prefix else obj.key if delimiter in key_after_prefix: - # Extract common prefix (folder-like structure) common_prefix = prefix + key_after_prefix.split(delimiter)[0] + delimiter if common_prefix not in seen_prefixes: seen_prefixes.add(common_prefix) @@ -1778,7 +1764,6 @@ def object_handler(bucket_name: str, object_key: str): if "tagging" in request.args: return _object_tagging_handler(bucket_name, object_key) - # Multipart Uploads if request.method == "POST": if "uploads" in request.args: return _initiate_multipart_upload(bucket_name, object_key) @@ -1831,7 +1816,6 @@ def object_handler(bucket_name: str, object_key: str): response = Response(status=200) response.headers["ETag"] = f'"{meta.etag}"' - # Trigger replication for non-replication requests if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): _replication_manager().trigger_replication(bucket_name, object_key, action="write") @@ -1851,7 +1835,6 @@ def object_handler(bucket_name: str, object_key: str): metadata = storage.get_object_metadata(bucket_name, object_key) mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" - # Decrypt encrypted objects is_encrypted = "x-amz-server-side-encryption" in metadata if request.method == "GET": @@ -1865,15 +1848,12 @@ def object_handler(bucket_name: str, object_key: str): except StorageError as exc: return _error_response("InternalError", str(exc), 500) else: - # Stream unencrypted file directly stat = path.stat() response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True) logged_bytes = stat.st_size etag = storage._compute_etag(path) else: - # HEAD request if is_encrypted and hasattr(storage, 'get_object_data'): - # For encrypted objects, we need to report decrypted size try: data, _ = storage.get_object_data(bucket_name, object_key) response = Response(status=200) @@ -1902,7 +1882,6 @@ def object_handler(bucket_name: str, object_key: str): storage.delete_object(bucket_name, object_key) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) - # Trigger replication if not a replication request user_agent = request.headers.get("User-Agent", "") if "S3ReplicationAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="delete") @@ -2183,7 +2162,6 @@ class AwsChunkedDecoder: self.chunk_remaining -= len(chunk) if self.chunk_remaining == 0: - # Read CRLF after chunk data crlf = self.stream.read(2) if crlf != b"\r\n": raise IOError("Malformed chunk: missing CRLF") @@ -2202,7 +2180,6 @@ class AwsChunkedDecoder: try: line_str = line.decode("ascii").strip() - # Handle chunk-signature extension if present (e.g. "1000;chunk-signature=...") if ";" in line_str: line_str = line_str.split(";")[0] chunk_size = int(line_str, 16) @@ -2358,7 +2335,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: try: _storage().abort_multipart_upload(bucket_name, upload_id) except StorageError as exc: - # Abort is idempotent, but if bucket missing... if "Bucket does not exist" in str(exc): return _error_response("NoSuchBucket", str(exc), 404) @@ -2368,7 +2344,6 @@ def _abort_multipart_upload(bucket_name: str, object_key: str) -> Response: @s3_api_bp.before_request def resolve_principal(): g.principal = None - # Try SigV4 try: if ("Authorization" in request.headers and request.headers["Authorization"].startswith("AWS4-HMAC-SHA256")) or \ (request.args.get("X-Amz-Algorithm") == "AWS4-HMAC-SHA256"): @@ -2377,7 +2352,6 @@ def resolve_principal(): except Exception: pass - # Try simple auth headers (internal/testing) access_key = request.headers.get("X-Access-Key") secret_key = request.headers.get("X-Secret-Key") if access_key and secret_key: diff --git a/app/ui.py b/app/ui.py index ce926f7..4baf5c3 100644 --- a/app/ui.py +++ b/app/ui.py @@ -189,7 +189,7 @@ def inject_nav_state() -> dict[str, Any]: return { "principal": principal, "can_manage_iam": can_manage, - "can_view_metrics": can_manage, # Only admins can view metrics + "can_view_metrics": can_manage, "csrf_token": generate_csrf, } @@ -294,7 +294,6 @@ def bucket_detail(bucket_name: str): storage = _storage() try: _authorize_ui(principal, bucket_name, "list") - # Don't load objects here - UI fetches them asynchronously via /buckets//objects if not storage.bucket_exists(bucket_name): raise StorageError("Bucket does not exist") except (StorageError, IamError) as exc: @@ -343,7 +342,6 @@ def bucket_detail(bucket_name: str): except IamError: can_manage_versioning = False - # Check replication permission can_manage_replication = False if principal: try: @@ -352,7 +350,6 @@ def bucket_detail(bucket_name: str): except IamError: can_manage_replication = False - # Check if user is admin (can configure replication settings, not just toggle) is_replication_admin = False if principal: try: @@ -361,12 +358,9 @@ def bucket_detail(bucket_name: str): except IamError: is_replication_admin = False - # Replication info - don't compute sync status here (it's slow), let JS fetch it async replication_rule = _replication().get_rule(bucket_name) - # Load connections for admin, or for non-admin if there's an existing rule (to show target name) connections = _connections().list() if (is_replication_admin or replication_rule) else [] - # Encryption settings encryption_config = storage.get_bucket_encryption(bucket_name) kms_manager = _kms() kms_keys = kms_manager.list_keys() if kms_manager else [] @@ -374,7 +368,6 @@ def bucket_detail(bucket_name: str): encryption_enabled = current_app.config.get("ENCRYPTION_ENABLED", False) can_manage_encryption = can_manage_versioning # Same as other bucket properties - # Quota settings (admin only) bucket_quota = storage.get_bucket_quota(bucket_name) bucket_stats = storage.bucket_stats(bucket_name) can_manage_quota = False @@ -384,7 +377,6 @@ def bucket_detail(bucket_name: str): except IamError: pass - # Pass the objects API endpoint URL for async loading objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) return render_template( @@ -1256,7 +1248,6 @@ def delete_iam_user(access_key: str): return redirect(url_for("ui.iam_dashboard")) if access_key == principal.access_key: - # Self-deletion try: _iam().delete_user(access_key) session.pop("credentials", None) @@ -1338,6 +1329,9 @@ def create_connection(): @ui_bp.post("/connections/test") def test_connection(): + from botocore.config import Config as BotoConfig + from botocore.exceptions import ConnectTimeoutError, EndpointConnectionError, ReadTimeoutError + principal = _current_principal() try: _iam().authorize(principal, None, "iam:list_users") @@ -1354,18 +1348,32 @@ def test_connection(): return jsonify({"status": "error", "message": "Missing credentials"}), 400 try: + config = BotoConfig( + connect_timeout=5, + read_timeout=10, + retries={'max_attempts': 1} + ) s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region, + config=config, ) - # Try to list buckets to verify credentials and endpoint + s3.list_buckets() return jsonify({"status": "ok", "message": "Connection successful"}) + except (ConnectTimeoutError, ReadTimeoutError): + return jsonify({"status": "error", "message": f"Connection timed out - endpoint may be down or unreachable: {endpoint}"}), 400 + except EndpointConnectionError: + return jsonify({"status": "error", "message": f"Could not connect to endpoint: {endpoint}"}), 400 + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + error_msg = e.response.get('Error', {}).get('Message', str(e)) + return jsonify({"status": "error", "message": f"Connection failed ({error_code}): {error_msg}"}), 400 except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 400 + return jsonify({"status": "error", "message": f"Connection failed: {str(e)}"}), 400 @ui_bp.post("/connections//update") @@ -1426,7 +1434,6 @@ def update_bucket_replication(bucket_name: str): flash(str(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) - # Check if user is admin (required for create/delete operations) is_admin = False try: _iam().authorize(principal, None, "iam:list_users") @@ -1437,14 +1444,12 @@ def update_bucket_replication(bucket_name: str): action = request.form.get("action") if action == "delete": - # Admin only - remove configuration entirely if not is_admin: flash("Only administrators can remove replication configuration", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) _replication().delete_rule(bucket_name) flash("Replication configuration removed", "info") elif action == "pause": - # Users can pause - just set enabled=False rule = _replication().get_rule(bucket_name) if rule: rule.enabled = False @@ -1453,7 +1458,6 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to pause", "warning") elif action == "resume": - # Users can resume - just set enabled=True rule = _replication().get_rule(bucket_name) if rule: rule.enabled = True @@ -1462,7 +1466,6 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to resume", "warning") elif action == "create": - # Admin only - create new configuration if not is_admin: flash("Only administrators can configure replication settings", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="replication")) @@ -1487,7 +1490,6 @@ def update_bucket_replication(bucket_name: str): ) _replication().set_rule(rule) - # If mode is "all", trigger replication of existing objects if replication_mode == REPLICATION_MODE_ALL: _replication().replicate_existing_objects(bucket_name) flash("Replication configured. Existing objects are being replicated in the background.", "success") @@ -1512,10 +1514,31 @@ def get_replication_status(bucket_name: str): if not rule: return jsonify({"error": "No replication rule"}), 404 - # This is the slow operation - compute sync status by comparing buckets - stats = _replication().get_sync_status(bucket_name) + connection = _connections().get(rule.target_connection_id) + endpoint_healthy = False + endpoint_error = None + if connection: + endpoint_healthy = _replication().check_endpoint_health(connection) + if not endpoint_healthy: + endpoint_error = f"Cannot reach endpoint: {connection.endpoint_url}" + else: + endpoint_error = "Target connection not found" + + stats = None + if endpoint_healthy: + stats = _replication().get_sync_status(bucket_name) + if not stats: - return jsonify({"error": "Failed to compute status"}), 500 + return jsonify({ + "objects_synced": 0, + "objects_pending": 0, + "objects_orphaned": 0, + "bytes_synced": 0, + "last_sync_at": rule.stats.last_sync_at if rule.stats else None, + "last_sync_key": rule.stats.last_sync_key if rule.stats else None, + "endpoint_healthy": endpoint_healthy, + "endpoint_error": endpoint_error, + }) return jsonify({ "objects_synced": stats.objects_synced, @@ -1524,6 +1547,28 @@ def get_replication_status(bucket_name: str): "bytes_synced": stats.bytes_synced, "last_sync_at": stats.last_sync_at, "last_sync_key": stats.last_sync_key, + "endpoint_healthy": endpoint_healthy, + "endpoint_error": endpoint_error, + }) + + +@ui_bp.get("/connections//health") +def check_connection_health(connection_id: str): + """Check if a connection endpoint is reachable.""" + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + conn = _connections().get(connection_id) + if not conn: + return jsonify({"healthy": False, "error": "Connection not found"}), 404 + + healthy = _replication().check_endpoint_health(conn) + return jsonify({ + "healthy": healthy, + "error": None if healthy else f"Cannot reach endpoint: {conn.endpoint_url}" }) @@ -1544,7 +1589,6 @@ def connections_dashboard(): def metrics_dashboard(): principal = _current_principal() - # Metrics are restricted to admin users try: _iam().authorize(principal, None, "iam:list_users") except IamError: @@ -1568,16 +1612,13 @@ def metrics_dashboard(): total_bytes_used = 0 total_versions = 0 - # Note: Uses cached stats from storage layer to improve performance cache_ttl = current_app.config.get("BUCKET_STATS_CACHE_TTL", 60) for bucket in buckets: stats = storage.bucket_stats(bucket.name, cache_ttl=cache_ttl) - # Use totals which include archived versions total_objects += stats.get("total_objects", stats.get("objects", 0)) total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0)) total_versions += stats.get("version_count", 0) - # Calculate system uptime boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime_days = int(uptime_seconds / 86400) diff --git a/static/css/main.css b/static/css/main.css index 77b74b3..4de7e7a 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -66,8 +66,28 @@ html { color: var(--myfsio-muted) !important; } -.table-responsive { border-radius: 0.5rem; overflow: hidden; } +.table-responsive { + border-radius: 0.5rem; + overflow-x: auto; + -webkit-overflow-scrolling: touch; +} .message-stack { position: sticky; top: 1rem; z-index: 100; } + +/* Mobile-friendly table improvements */ +.table-responsive table { + min-width: 600px; +} + +.table-responsive table th, +.table-responsive table td { + white-space: nowrap; +} + +/* Allow text wrapping for description columns */ +.table-responsive table td.text-wrap { + white-space: normal; + min-width: 200px; +} code { font-size: 0.85rem; } code { @@ -1553,6 +1573,41 @@ pre code { position: relative !important; top: 0 !important; } + + /* Ensure tables are scrollable on mobile */ + .card-body .table-responsive { + margin: -1rem; + padding: 0; + width: calc(100% + 2rem); + } + + .card-body .table-responsive table { + margin-bottom: 0; + } + + /* IAM users table mobile adjustments */ + .table th, + .table td { + padding: 0.5rem 0.75rem; + } + + /* Better touch scrolling indicator */ + .table-responsive::after { + content: ''; + position: absolute; + top: 0; + right: 0; + bottom: 0; + width: 20px; + background: linear-gradient(to left, var(--myfsio-card-bg), transparent); + pointer-events: none; + opacity: 0; + transition: opacity 0.3s; + } + + .table-responsive:not(:hover)::after { + opacity: 0.8; + } } *:focus-visible { diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index d0bc2f9..33c84cd 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -954,7 +954,7 @@
{% if replication_rule and replication_rule.enabled %} - + + +
@@ -1090,11 +1103,11 @@ {% endif %}
- + - Enabled + Enabled
@@ -1109,7 +1122,7 @@ Refresh -
+