diff --git a/app/__init__.py b/app/__init__.py index cd9e842..d764753 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -45,7 +45,7 @@ def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path: try: shutil.move(str(legacy_path), str(active_path)) except OSError: - # Fall back to copy + delete if move fails (e.g., cross-device) + # Fall back to copy + delete for cross-device moves shutil.copy2(legacy_path, active_path) try: legacy_path.unlink(missing_ok=True) @@ -101,25 +101,24 @@ def create_app( bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"])) secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300)) - # Initialize Replication components - # Store config files in the system config directory for consistency + # Initialize replication with system config directory for consistency storage_root = Path(app.config["STORAGE_ROOT"]) config_dir = storage_root / ".myfsio.sys" / "config" config_dir.mkdir(parents=True, exist_ok=True) - # Define paths with migration from legacy locations + # Migrate connection configs from legacy locations connections_path = _migrate_config_file( active_path=config_dir / "connections.json", legacy_paths=[ - storage_root / ".myfsio.sys" / "connections.json", # Previous location - storage_root / ".connections.json", # Original legacy location + storage_root / ".myfsio.sys" / "connections.json", + storage_root / ".connections.json", ], ) replication_rules_path = _migrate_config_file( active_path=config_dir / "replication_rules.json", legacy_paths=[ - storage_root / ".myfsio.sys" / "replication_rules.json", # Previous location - storage_root / ".replication_rules.json", # Original legacy location + storage_root / ".myfsio.sys" / "replication_rules.json", + storage_root / ".replication_rules.json", ], ) diff --git a/app/s3_api.py b/app/s3_api.py index 9406206..61b23f6 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -23,7 +23,7 @@ from .storage import ObjectStorage, StorageError, QuotaExceededError s3_api_bp = Blueprint("s3_api", __name__) -# ---------------------- helpers ---------------------- +# Helper functions for accessing app extensions and generating responses def _storage() -> ObjectStorage: return current_app.extensions["object_storage"] @@ -69,8 +69,7 @@ def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: - # Parse Authorization header - # AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=... + # Parse Authorization header: AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=... match = re.match( r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)", auth_header, @@ -79,17 +78,14 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: return None access_key, date_stamp, region, service, signed_headers_str, signature = match.groups() - - # Get secret key secret_key = _iam().get_secret_key(access_key) if not secret_key: raise IamError("Invalid access key") - # Canonical Request + # Build canonical request method = req.method canonical_uri = quote(req.path, safe="/-_.~") - # Canonical Query String query_args = [] for key, value in req.args.items(multi=True): query_args.append((key, value)) @@ -100,7 +96,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") canonical_query_string = "&".join(canonical_query_parts) - # Canonical Headers signed_headers_list = signed_headers_str.split(";") canonical_headers_parts = [] for header in signed_headers_list: @@ -112,18 +107,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_headers_parts.append(f"{header.lower()}:{header_val}\n") canonical_headers = "".join(canonical_headers_parts) - # Payload Hash payload_hash = req.headers.get("X-Amz-Content-Sha256") if not payload_hash: payload_hash = hashlib.sha256(req.get_data()).hexdigest() canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}" - # String to Sign - amz_date = req.headers.get("X-Amz-Date") - if not amz_date: - amz_date = req.headers.get("Date") - + amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date") if not amz_date: raise IamError("Missing Date header") @@ -134,13 +124,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: now = datetime.now(timezone.utc) time_diff = abs((now - request_time).total_seconds()) - if time_diff > 900: # 15 minutes + if time_diff > 900: # AWS standard: 15-minute request validity window raise IamError("Request timestamp too old or too far in the future") required_headers = {'host', 'x-amz-date'} signed_headers_set = set(signed_headers_str.split(';')) if not required_headers.issubset(signed_headers_set): - # Some clients might sign 'date' instead of 'x-amz-date' + # Some clients use 'date' instead of 'x-amz-date' if 'date' in signed_headers_set: required_headers.remove('x-amz-date') required_headers.add('date') @@ -187,11 +177,10 @@ def _verify_sigv4_query(req: Any) -> Principal | None: if not secret_key: raise IamError("Invalid access key") - # Canonical Request + # Build canonical request method = req.method canonical_uri = quote(req.path, safe="/-_.~") - # Canonical Query String query_args = [] for key, value in req.args.items(multi=True): if key != "X-Amz-Signature": @@ -203,7 +192,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}") canonical_query_string = "&".join(canonical_query_parts) - # Canonical Headers signed_headers_list = signed_headers_str.split(";") canonical_headers_parts = [] for header in signed_headers_list: @@ -212,7 +200,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers = "".join(canonical_headers_parts) - # Payload Hash payload_hash = "UNSIGNED-PAYLOAD" canonical_request = "\n".join([ @@ -224,7 +211,7 @@ def _verify_sigv4_query(req: Any) -> Principal | None: payload_hash ]) - # String to Sign + # Build signature algorithm = "AWS4-HMAC-SHA256" credential_scope = f"{date_stamp}/{region}/{service}/aws4_request" hashed_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() @@ -235,7 +222,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None: hashed_request ]) - # Signature signing_key = _get_signature_key(secret_key, date_stamp, region, service) calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() @@ -493,7 +479,7 @@ def _generate_presigned_url( } canonical_query = _encode_query_params(query_params) - # Determine host and scheme from config or request + # Get presigned URL host and scheme from config or request headers api_base = current_app.config.get("API_BASE_URL") if api_base: parsed = urlparse(api_base) @@ -914,7 +900,7 @@ def _object_tagging_handler(bucket_name: str, object_key: str) -> Response: if error: return error - # For tagging, we use read permission for GET, write for PUT/DELETE + # Use read permission for GET, write for PUT/DELETE action = "read" if request.method == "GET" else "write" try: _authorize_action(principal, bucket_name, action, object_key=object_key) @@ -1093,10 +1079,9 @@ def _bucket_location_handler(bucket_name: str) -> Response: if not storage.bucket_exists(bucket_name): return _error_response("NoSuchBucket", "Bucket does not exist", 404) - # Return the configured AWS_REGION + # Return bucket location (empty for us-east-1 per AWS spec) region = current_app.config.get("AWS_REGION", "us-east-1") root = Element("LocationConstraint") - # AWS returns empty for us-east-1, but we'll be explicit root.text = region if region != "us-east-1" else None return _xml_response(root) @@ -1116,13 +1101,12 @@ def _bucket_acl_handler(bucket_name: str) -> Response: return _error_response("NoSuchBucket", "Bucket does not exist", 404) if request.method == "PUT": - # We don't fully implement ACLs, but we accept the request for compatibility - # Check for canned ACL header + # Accept canned ACL headers for S3 compatibility (not fully implemented) canned_acl = request.headers.get("x-amz-acl", "private") current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl}) return Response(status=200) - # GET - Return a basic ACL document showing full control for owner + # Return basic ACL document showing owner's full control root = Element("AccessControlPolicy") owner = SubElement(root, "Owner") SubElement(owner, "ID").text = principal.access_key if principal else "anonymous" @@ -1188,10 +1172,10 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response: is_truncated = True break - # Current version + # Add current version to response version = SubElement(root, "Version") SubElement(version, "Key").text = obj.key - SubElement(version, "VersionId").text = "null" # Current version ID + SubElement(version, "VersionId").text = "null" SubElement(version, "IsLatest").text = "true" SubElement(version, "LastModified").text = obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z") SubElement(version, "ETag").text = f'"{obj.etag}"' @@ -1296,7 +1280,7 @@ def _render_lifecycle_config(config: list) -> Element: SubElement(rule_el, "Status").text = rule.get("Status", "Enabled") - # Expiration + # Add expiration rule if present if "Expiration" in rule: exp = rule["Expiration"] exp_el = SubElement(rule_el, "Expiration") @@ -1307,14 +1291,14 @@ def _render_lifecycle_config(config: list) -> Element: if exp.get("ExpiredObjectDeleteMarker"): SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true" - # NoncurrentVersionExpiration + # Add noncurrent version expiration if present if "NoncurrentVersionExpiration" in rule: nve = rule["NoncurrentVersionExpiration"] nve_el = SubElement(rule_el, "NoncurrentVersionExpiration") if "NoncurrentDays" in nve: SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"]) - # AbortIncompleteMultipartUpload + # Add incomplete multipart upload cleanup if present if "AbortIncompleteMultipartUpload" in rule: aimu = rule["AbortIncompleteMultipartUpload"] aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload") @@ -1338,29 +1322,29 @@ def _parse_lifecycle_config(payload: bytes) -> list: for rule_el in root.findall("{*}Rule") or root.findall("Rule"): rule: dict = {} - # ID + # Extract rule ID id_el = rule_el.find("{*}ID") or rule_el.find("ID") if id_el is not None and id_el.text: rule["ID"] = id_el.text.strip() - # Filter/Prefix + # Extract filter prefix filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter") if filter_el is not None: prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix") if prefix_el is not None and prefix_el.text: rule["Prefix"] = prefix_el.text - # Legacy Prefix (outside Filter) + # Fall back to legacy Prefix element (outside Filter) if "Prefix" not in rule: prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix") if prefix_el is not None: rule["Prefix"] = prefix_el.text or "" - # Status + # Extract status status_el = rule_el.find("{*}Status") or rule_el.find("Status") rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled" - # Expiration + # Parse expiration rule exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration") if exp_el is not None: expiration: dict = {} @@ -1564,7 +1548,7 @@ def _bulk_delete_handler(bucket_name: str) -> Response: return _xml_response(result, status=200) -# ---------------------- routes ---------------------- +# Route handlers for S3 API endpoints @s3_api_bp.get("/") @limiter.limit("60 per minute") def list_buckets() -> Response: @@ -1642,7 +1626,7 @@ def bucket_handler(bucket_name: str) -> Response: current_app.logger.info("Bucket deleted", extra={"bucket": bucket_name}) return Response(status=204) - # GET - list objects (supports both ListObjects and ListObjectsV2) + # Handle GET - list objects (supports both ListObjects and ListObjectsV2) principal, error = _require_principal() try: _authorize_action(principal, bucket_name, "list") @@ -1650,18 +1634,13 @@ def bucket_handler(bucket_name: str) -> Response: if error: return error return _error_response("AccessDenied", str(exc), 403) - try: - objects = storage.list_objects_all(bucket_name) - except StorageError as exc: - return _error_response("NoSuchBucket", str(exc), 404) - # Check if this is ListObjectsV2 (list-type=2) list_type = request.args.get("list-type") prefix = request.args.get("prefix", "") delimiter = request.args.get("delimiter", "") max_keys = min(int(request.args.get("max-keys", current_app.config["UI_PAGE_SIZE"])), 1000) - # Pagination markers + # Use appropriate markers for pagination depending on API version marker = request.args.get("marker", "") # ListObjects v1 continuation_token = request.args.get("continuation-token", "") # ListObjectsV2 start_after = request.args.get("start-after", "") # ListObjectsV2 @@ -1681,11 +1660,18 @@ def bucket_handler(bucket_name: str) -> Response: else: effective_start = marker - if prefix: - objects = [obj for obj in objects if obj.key.startswith(prefix)] - - if effective_start: - objects = [obj for obj in objects if obj.key > effective_start] + # Fetch with buffer for delimiter processing; delimiter requires extra objects to compute prefixes + fetch_keys = max_keys * 10 if delimiter else max_keys + try: + list_result = storage.list_objects( + bucket_name, + max_keys=fetch_keys, + continuation_token=effective_start or None, + prefix=prefix or None, + ) + objects = list_result.objects + except StorageError as exc: + return _error_response("NoSuchBucket", str(exc), 404) common_prefixes: list[str] = [] filtered_objects: list = [] @@ -1694,7 +1680,7 @@ def bucket_handler(bucket_name: str) -> Response: for obj in objects: key_after_prefix = obj.key[len(prefix):] if prefix else obj.key if delimiter in key_after_prefix: - # This is a "folder" - extract the common prefix + # Extract common prefix (folder-like structure) common_prefix = prefix + key_after_prefix.split(delimiter)[0] + delimiter if common_prefix not in seen_prefixes: seen_prefixes.add(common_prefix) @@ -1705,7 +1691,7 @@ def bucket_handler(bucket_name: str) -> Response: common_prefixes = sorted(common_prefixes) total_items = len(objects) + len(common_prefixes) - is_truncated = total_items > max_keys + is_truncated = total_items > max_keys or list_result.is_truncated if len(objects) >= max_keys: objects = objects[:max_keys] @@ -1845,9 +1831,8 @@ def object_handler(bucket_name: str, object_key: str): response = Response(status=200) response.headers["ETag"] = f'"{meta.etag}"' - # Trigger replication if not a replication request - user_agent = request.headers.get("User-Agent", "") - if "S3ReplicationAgent" not in user_agent: + # Trigger replication for non-replication requests + if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): _replication_manager().trigger_replication(bucket_name, object_key, action="write") return response @@ -1866,17 +1851,15 @@ def object_handler(bucket_name: str, object_key: str): metadata = storage.get_object_metadata(bucket_name, object_key) mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream" - # Check if object is encrypted and needs decryption + # Decrypt encrypted objects is_encrypted = "x-amz-server-side-encryption" in metadata if request.method == "GET": if is_encrypted and hasattr(storage, 'get_object_data'): - # Use encrypted storage to decrypt try: data, clean_metadata = storage.get_object_data(bucket_name, object_key) response = Response(data, mimetype=mimetype) logged_bytes = len(data) - # Use decrypted size for Content-Length response.headers["Content-Length"] = len(data) etag = hashlib.md5(data).hexdigest() except StorageError as exc: diff --git a/app/storage.py b/app/storage.py index 4875519..b8fcefd 100644 --- a/app/storage.py +++ b/app/storage.py @@ -128,11 +128,14 @@ class ObjectStorage: BUCKET_VERSIONS_DIR = "versions" MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" + KEY_INDEX_CACHE_TTL = 30 # seconds - longer TTL for better browsing performance def __init__(self, root: Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() + # In-memory object metadata cache: bucket_id -> (dict[key -> ObjectMeta], timestamp) + self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} def list_buckets(self) -> List[BucketMeta]: buckets: List[BucketMeta] = [] @@ -274,32 +277,26 @@ class ObjectStorage: raise StorageError("Bucket does not exist") bucket_id = bucket_path.name - # Collect all matching object keys first (lightweight - just paths) - all_keys: List[str] = [] - for path in bucket_path.rglob("*"): - if path.is_file(): - rel = path.relative_to(bucket_path) - if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS: - continue - key = str(rel.as_posix()) - if prefix and not key.startswith(prefix): - continue - all_keys.append(key) + # Use cached object metadata for fast listing + object_cache = self._get_object_cache(bucket_id, bucket_path) + + # Get sorted keys + all_keys = sorted(object_cache.keys()) + + # Apply prefix filter if specified + if prefix: + all_keys = [k for k in all_keys if k.startswith(prefix)] - all_keys.sort() total_count = len(all_keys) # Handle continuation token (the key to start after) start_index = 0 if continuation_token: try: - # continuation_token is the last key from previous page - for i, key in enumerate(all_keys): - if key > continuation_token: - start_index = i - break - else: - # Token is past all keys + # Binary search for efficiency on large lists + import bisect + start_index = bisect.bisect_right(all_keys, continuation_token) + if start_index >= total_count: return ListObjectsResult( objects=[], is_truncated=False, @@ -314,27 +311,12 @@ class ObjectStorage: keys_slice = all_keys[start_index:end_index] is_truncated = end_index < total_count - # Now load full metadata only for the objects we're returning + # Build result from cached metadata (no file I/O!) objects: List[ObjectMeta] = [] for key in keys_slice: - safe_key = self._sanitize_object_key(key) - path = bucket_path / safe_key - if not path.exists(): - continue # Object may have been deleted - try: - stat = path.stat() - metadata = self._read_metadata(bucket_id, safe_key) - objects.append( - ObjectMeta( - key=key, - size=stat.st_size, - last_modified=datetime.fromtimestamp(stat.st_mtime), - etag=self._compute_etag(path), - metadata=metadata or None, - ) - ) - except OSError: - continue # File may have been deleted during iteration + obj = object_cache.get(key) + if obj: + objects.append(obj) next_token = keys_slice[-1] if is_truncated and keys_slice else None @@ -416,18 +398,21 @@ class ObjectStorage: pass stat = destination.stat() - if metadata: - self._write_metadata(bucket_id, safe_key, metadata) - else: - self._delete_metadata(bucket_id, safe_key) + etag = checksum.hexdigest() + + # Always store internal metadata (etag, size) alongside user metadata + internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)} + combined_meta = {**internal_meta, **(metadata or {})} + self._write_metadata(bucket_id, safe_key, combined_meta) self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) return ObjectMeta( key=safe_key.as_posix(), size=stat.st_size, last_modified=datetime.fromtimestamp(stat.st_mtime), - etag=checksum.hexdigest(), + etag=etag, metadata=metadata, ) @@ -479,6 +464,7 @@ class ObjectStorage: self._delete_metadata(bucket_id, rel) self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) self._cleanup_empty_parents(path, bucket_path) def purge_object(self, bucket_name: str, object_key: str) -> None: @@ -501,6 +487,7 @@ class ObjectStorage: # Invalidate bucket stats cache self._invalidate_bucket_stats_cache(bucket_id) + self._invalidate_object_cache(bucket_id) self._cleanup_empty_parents(target, bucket_path) def is_versioning_enabled(self, bucket_name: str) -> bool: @@ -1163,6 +1150,187 @@ class ObjectStorage: def _legacy_multipart_dir(self, bucket_name: str, upload_id: str) -> Path: return self._legacy_multipart_bucket_root(bucket_name) / upload_id + def _fast_list_keys(self, bucket_path: Path) -> List[str]: + """Fast directory walk using os.scandir instead of pathlib.rglob. + + This is significantly faster for large directories (10K+ files). + Returns just the keys (for backward compatibility). + """ + return list(self._build_object_cache(bucket_path).keys()) + + def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]: + """Build a complete object metadata cache for a bucket. + + Uses os.scandir for fast directory walking and a persistent etag index. + """ + from concurrent.futures import ThreadPoolExecutor + + bucket_id = bucket_path.name + objects: Dict[str, ObjectMeta] = {} + bucket_str = str(bucket_path) + bucket_len = len(bucket_str) + 1 # +1 for the separator + + # Try to load persisted etag index first (single file read vs thousands) + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + meta_cache: Dict[str, str] = {} + index_mtime: float = 0 + + if etag_index_path.exists(): + try: + index_mtime = etag_index_path.stat().st_mtime + with open(etag_index_path, 'r', encoding='utf-8') as f: + meta_cache = json.load(f) + except (OSError, json.JSONDecodeError): + meta_cache = {} + + # Check if we need to rebuild the index + meta_root = self._bucket_meta_root(bucket_id) + needs_rebuild = False + + if meta_root.exists() and index_mtime > 0: + # Quick check: if any meta file is newer than index, rebuild + def check_newer(dir_path: str) -> bool: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + if check_newer(entry.path): + return True + elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): + if entry.stat().st_mtime > index_mtime: + return True + except OSError: + pass + return False + needs_rebuild = check_newer(str(meta_root)) + elif not meta_cache: + needs_rebuild = True + + if needs_rebuild and meta_root.exists(): + meta_str = str(meta_root) + meta_len = len(meta_str) + 1 + meta_files: list[tuple[str, str]] = [] + + # Collect all metadata file paths + def collect_meta_files(dir_path: str) -> None: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + collect_meta_files(entry.path) + elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'): + rel = entry.path[meta_len:] + key = rel[:-10].replace(os.sep, '/') + meta_files.append((key, entry.path)) + except OSError: + pass + + collect_meta_files(meta_str) + + # Parallel read of metadata files - only extract __etag__ + def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]: + key, path = item + try: + with open(path, 'rb') as f: + content = f.read() + etag_marker = b'"__etag__"' + idx = content.find(etag_marker) + if idx != -1: + start = content.find(b'"', idx + len(etag_marker) + 1) + if start != -1: + end = content.find(b'"', start + 1) + if end != -1: + return key, content[start+1:end].decode('utf-8') + return key, None + except (OSError, UnicodeDecodeError): + return key, None + + if meta_files: + meta_cache = {} + with ThreadPoolExecutor(max_workers=min(64, len(meta_files))) as executor: + for key, etag in executor.map(read_meta_file, meta_files): + if etag: + meta_cache[key] = etag + + # Persist the index for next time + try: + etag_index_path.parent.mkdir(parents=True, exist_ok=True) + with open(etag_index_path, 'w', encoding='utf-8') as f: + json.dump(meta_cache, f) + except OSError: + pass + + # Now scan objects and use cached etags + def scan_dir(dir_path: str) -> None: + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_dir(follow_symlinks=False): + # Skip internal folders + rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name + if rel_start in self.INTERNAL_FOLDERS: + continue + scan_dir(entry.path) + elif entry.is_file(follow_symlinks=False): + # Get relative path and convert to POSIX + rel = entry.path[bucket_len:] + # Check if in internal folder + first_part = rel.split(os.sep)[0] if os.sep in rel else rel + if first_part in self.INTERNAL_FOLDERS: + continue + + key = rel.replace(os.sep, '/') + try: + # Use entry.stat() which is cached from scandir + stat = entry.stat() + + # Get etag from cache (now just a string, not dict) + etag = meta_cache.get(key) + + # Use placeholder for legacy objects without stored etag + if not etag: + etag = f'"{stat.st_size}-{int(stat.st_mtime)}"' + + objects[key] = ObjectMeta( + key=key, + size=stat.st_size, + last_modified=datetime.fromtimestamp(stat.st_mtime), + etag=etag, + metadata=None, # Don't include user metadata in listing + ) + except OSError: + pass + except OSError: + pass + + scan_dir(bucket_str) + return objects + + def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]: + """Get cached object metadata for a bucket, refreshing if stale.""" + now = time.time() + cached = self._object_cache.get(bucket_id) + + if cached: + objects, timestamp = cached + if now - timestamp < self.KEY_INDEX_CACHE_TTL: + return objects + + # Rebuild cache + objects = self._build_object_cache(bucket_path) + self._object_cache[bucket_id] = (objects, now) + return objects + + def _invalidate_object_cache(self, bucket_id: str) -> None: + """Invalidate the object cache and etag index for a bucket.""" + self._object_cache.pop(bucket_id, None) + # Also invalidate persisted etag index + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" + try: + etag_index_path.unlink(missing_ok=True) + except OSError: + pass + def _ensure_system_roots(self) -> None: for path in ( self._system_root_path(), diff --git a/app/ui.py b/app/ui.py index 2d2936f..ce926f7 100644 --- a/app/ui.py +++ b/app/ui.py @@ -423,7 +423,7 @@ def list_bucket_objects(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - max_keys = min(int(request.args.get("max_keys", 100)), 1000) + max_keys = min(int(request.args.get("max_keys", 1000)), 10000) continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None @@ -738,41 +738,30 @@ def bulk_download_objects(bucket_name: str): unique_keys = list(dict.fromkeys(cleaned)) storage = _storage() - # Check permissions for all keys first (or at least bucket read) - # We'll check bucket read once, then object read for each if needed? - # _authorize_ui checks bucket level if object_key is None, but we need to check each object if fine-grained policies exist. - # For simplicity/performance, we check bucket list/read. + # Verify permission to read bucket contents try: _authorize_ui(principal, bucket_name, "read") except IamError as exc: return jsonify({"error": str(exc)}), 403 - # Create ZIP + # Create ZIP archive of selected objects buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf: for key in unique_keys: try: - # Verify individual object permission if needed? - # _authorize_ui(principal, bucket_name, "read", object_key=key) - # This might be slow for many objects. Assuming bucket read is enough for now or we accept the overhead. - # Let's skip individual check for bulk speed, assuming bucket read implies object read unless denied. - # But strictly we should check. Let's check. _authorize_ui(principal, bucket_name, "read", object_key=key) - # Check if object is encrypted metadata = storage.get_object_metadata(bucket_name, key) is_encrypted = "x-amz-server-side-encryption" in metadata if is_encrypted and hasattr(storage, 'get_object_data'): - # Decrypt and add to zip data, _ = storage.get_object_data(bucket_name, key) zf.writestr(key, data) else: - # Add unencrypted file directly path = storage.get_object_path(bucket_name, key) zf.write(path, arcname=key) except (StorageError, IamError): - # Skip files we can't read or don't exist + # Skip objects that can't be accessed continue buffer.seek(0) @@ -1077,7 +1066,6 @@ def update_bucket_encryption(bucket_name: str): action = request.form.get("action", "enable") if action == "disable": - # Disable encryption try: _storage().set_bucket_encryption(bucket_name, None) flash("Default encryption disabled", "info") @@ -1085,16 +1073,14 @@ def update_bucket_encryption(bucket_name: str): flash(_friendly_error_message(exc), "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - # Enable or update encryption algorithm = request.form.get("algorithm", "AES256") kms_key_id = request.form.get("kms_key_id", "").strip() or None - # Validate algorithm if algorithm not in ("AES256", "aws:kms"): flash("Invalid encryption algorithm", "danger") return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties")) - # Build encryption config following AWS format + # Build encryption configuration in AWS S3 format encryption_config: dict[str, Any] = { "Rules": [ { diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index 107fd5e..d0bc2f9 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -172,15 +172,15 @@
This folder contains no objects${hasMoreObjects ? ' yet. Loading more...' : '.'}
+