Improve object storage performance via caching

This commit is contained in:
2025-12-22 17:03:33 +08:00
parent aab9ef696a
commit f5451c162b
5 changed files with 542 additions and 311 deletions

View File

@@ -45,7 +45,7 @@ def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path:
try:
shutil.move(str(legacy_path), str(active_path))
except OSError:
# Fall back to copy + delete if move fails (e.g., cross-device)
# Fall back to copy + delete for cross-device moves
shutil.copy2(legacy_path, active_path)
try:
legacy_path.unlink(missing_ok=True)
@@ -101,25 +101,24 @@ def create_app(
bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"]))
secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300))
# Initialize Replication components
# Store config files in the system config directory for consistency
# Initialize replication with system config directory for consistency
storage_root = Path(app.config["STORAGE_ROOT"])
config_dir = storage_root / ".myfsio.sys" / "config"
config_dir.mkdir(parents=True, exist_ok=True)
# Define paths with migration from legacy locations
# Migrate connection configs from legacy locations
connections_path = _migrate_config_file(
active_path=config_dir / "connections.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "connections.json", # Previous location
storage_root / ".connections.json", # Original legacy location
storage_root / ".myfsio.sys" / "connections.json",
storage_root / ".connections.json",
],
)
replication_rules_path = _migrate_config_file(
active_path=config_dir / "replication_rules.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "replication_rules.json", # Previous location
storage_root / ".replication_rules.json", # Original legacy location
storage_root / ".myfsio.sys" / "replication_rules.json",
storage_root / ".replication_rules.json",
],
)

View File

@@ -23,7 +23,7 @@ from .storage import ObjectStorage, StorageError, QuotaExceededError
s3_api_bp = Blueprint("s3_api", __name__)
# ---------------------- helpers ----------------------
# Helper functions for accessing app extensions and generating responses
def _storage() -> ObjectStorage:
return current_app.extensions["object_storage"]
@@ -69,8 +69,7 @@ def _get_signature_key(key: str, date_stamp: str, region_name: str, service_name
def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
# Parse Authorization header
# AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=...
# Parse Authorization header: AWS4-HMAC-SHA256 Credential=AKIA.../20230101/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-date, Signature=...
match = re.match(
r"AWS4-HMAC-SHA256 Credential=([^/]+)/([^/]+)/([^/]+)/([^/]+)/aws4_request, SignedHeaders=([^,]+), Signature=(.+)",
auth_header,
@@ -79,17 +78,14 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
return None
access_key, date_stamp, region, service, signed_headers_str, signature = match.groups()
# Get secret key
secret_key = _iam().get_secret_key(access_key)
if not secret_key:
raise IamError("Invalid access key")
# Canonical Request
# Build canonical request
method = req.method
canonical_uri = quote(req.path, safe="/-_.~")
# Canonical Query String
query_args = []
for key, value in req.args.items(multi=True):
query_args.append((key, value))
@@ -100,7 +96,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
canonical_query_string = "&".join(canonical_query_parts)
# Canonical Headers
signed_headers_list = signed_headers_str.split(";")
canonical_headers_parts = []
for header in signed_headers_list:
@@ -112,18 +107,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
canonical_headers = "".join(canonical_headers_parts)
# Payload Hash
payload_hash = req.headers.get("X-Amz-Content-Sha256")
if not payload_hash:
payload_hash = hashlib.sha256(req.get_data()).hexdigest()
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
# String to Sign
amz_date = req.headers.get("X-Amz-Date")
if not amz_date:
amz_date = req.headers.get("Date")
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
if not amz_date:
raise IamError("Missing Date header")
@@ -134,13 +124,13 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
now = datetime.now(timezone.utc)
time_diff = abs((now - request_time).total_seconds())
if time_diff > 900: # 15 minutes
if time_diff > 900: # AWS standard: 15-minute request validity window
raise IamError("Request timestamp too old or too far in the future")
required_headers = {'host', 'x-amz-date'}
signed_headers_set = set(signed_headers_str.split(';'))
if not required_headers.issubset(signed_headers_set):
# Some clients might sign 'date' instead of 'x-amz-date'
# Some clients use 'date' instead of 'x-amz-date'
if 'date' in signed_headers_set:
required_headers.remove('x-amz-date')
required_headers.add('date')
@@ -187,11 +177,10 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
if not secret_key:
raise IamError("Invalid access key")
# Canonical Request
# Build canonical request
method = req.method
canonical_uri = quote(req.path, safe="/-_.~")
# Canonical Query String
query_args = []
for key, value in req.args.items(multi=True):
if key != "X-Amz-Signature":
@@ -203,7 +192,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_query_parts.append(f"{quote(k, safe='-_.~')}={quote(v, safe='-_.~')}")
canonical_query_string = "&".join(canonical_query_parts)
# Canonical Headers
signed_headers_list = signed_headers_str.split(";")
canonical_headers_parts = []
for header in signed_headers_list:
@@ -212,7 +200,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_headers_parts.append(f"{header}:{val}\n")
canonical_headers = "".join(canonical_headers_parts)
# Payload Hash
payload_hash = "UNSIGNED-PAYLOAD"
canonical_request = "\n".join([
@@ -224,7 +211,7 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
payload_hash
])
# String to Sign
# Build signature
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"
hashed_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
@@ -235,7 +222,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
hashed_request
])
# Signature
signing_key = _get_signature_key(secret_key, date_stamp, region, service)
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
@@ -493,7 +479,7 @@ def _generate_presigned_url(
}
canonical_query = _encode_query_params(query_params)
# Determine host and scheme from config or request
# Get presigned URL host and scheme from config or request headers
api_base = current_app.config.get("API_BASE_URL")
if api_base:
parsed = urlparse(api_base)
@@ -914,7 +900,7 @@ def _object_tagging_handler(bucket_name: str, object_key: str) -> Response:
if error:
return error
# For tagging, we use read permission for GET, write for PUT/DELETE
# Use read permission for GET, write for PUT/DELETE
action = "read" if request.method == "GET" else "write"
try:
_authorize_action(principal, bucket_name, action, object_key=object_key)
@@ -1093,10 +1079,9 @@ def _bucket_location_handler(bucket_name: str) -> Response:
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
# Return the configured AWS_REGION
# Return bucket location (empty for us-east-1 per AWS spec)
region = current_app.config.get("AWS_REGION", "us-east-1")
root = Element("LocationConstraint")
# AWS returns empty for us-east-1, but we'll be explicit
root.text = region if region != "us-east-1" else None
return _xml_response(root)
@@ -1116,13 +1101,12 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
if request.method == "PUT":
# We don't fully implement ACLs, but we accept the request for compatibility
# Check for canned ACL header
# Accept canned ACL headers for S3 compatibility (not fully implemented)
canned_acl = request.headers.get("x-amz-acl", "private")
current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl})
return Response(status=200)
# GET - Return a basic ACL document showing full control for owner
# Return basic ACL document showing owner's full control
root = Element("AccessControlPolicy")
owner = SubElement(root, "Owner")
SubElement(owner, "ID").text = principal.access_key if principal else "anonymous"
@@ -1188,10 +1172,10 @@ def _bucket_list_versions_handler(bucket_name: str) -> Response:
is_truncated = True
break
# Current version
# Add current version to response
version = SubElement(root, "Version")
SubElement(version, "Key").text = obj.key
SubElement(version, "VersionId").text = "null" # Current version ID
SubElement(version, "VersionId").text = "null"
SubElement(version, "IsLatest").text = "true"
SubElement(version, "LastModified").text = obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z")
SubElement(version, "ETag").text = f'"{obj.etag}"'
@@ -1296,7 +1280,7 @@ def _render_lifecycle_config(config: list) -> Element:
SubElement(rule_el, "Status").text = rule.get("Status", "Enabled")
# Expiration
# Add expiration rule if present
if "Expiration" in rule:
exp = rule["Expiration"]
exp_el = SubElement(rule_el, "Expiration")
@@ -1307,14 +1291,14 @@ def _render_lifecycle_config(config: list) -> Element:
if exp.get("ExpiredObjectDeleteMarker"):
SubElement(exp_el, "ExpiredObjectDeleteMarker").text = "true"
# NoncurrentVersionExpiration
# Add noncurrent version expiration if present
if "NoncurrentVersionExpiration" in rule:
nve = rule["NoncurrentVersionExpiration"]
nve_el = SubElement(rule_el, "NoncurrentVersionExpiration")
if "NoncurrentDays" in nve:
SubElement(nve_el, "NoncurrentDays").text = str(nve["NoncurrentDays"])
# AbortIncompleteMultipartUpload
# Add incomplete multipart upload cleanup if present
if "AbortIncompleteMultipartUpload" in rule:
aimu = rule["AbortIncompleteMultipartUpload"]
aimu_el = SubElement(rule_el, "AbortIncompleteMultipartUpload")
@@ -1338,29 +1322,29 @@ def _parse_lifecycle_config(payload: bytes) -> list:
for rule_el in root.findall("{*}Rule") or root.findall("Rule"):
rule: dict = {}
# ID
# Extract rule ID
id_el = rule_el.find("{*}ID") or rule_el.find("ID")
if id_el is not None and id_el.text:
rule["ID"] = id_el.text.strip()
# Filter/Prefix
# Extract filter prefix
filter_el = rule_el.find("{*}Filter") or rule_el.find("Filter")
if filter_el is not None:
prefix_el = filter_el.find("{*}Prefix") or filter_el.find("Prefix")
if prefix_el is not None and prefix_el.text:
rule["Prefix"] = prefix_el.text
# Legacy Prefix (outside Filter)
# Fall back to legacy Prefix element (outside Filter)
if "Prefix" not in rule:
prefix_el = rule_el.find("{*}Prefix") or rule_el.find("Prefix")
if prefix_el is not None:
rule["Prefix"] = prefix_el.text or ""
# Status
# Extract status
status_el = rule_el.find("{*}Status") or rule_el.find("Status")
rule["Status"] = (status_el.text or "Enabled").strip() if status_el is not None else "Enabled"
# Expiration
# Parse expiration rule
exp_el = rule_el.find("{*}Expiration") or rule_el.find("Expiration")
if exp_el is not None:
expiration: dict = {}
@@ -1564,7 +1548,7 @@ def _bulk_delete_handler(bucket_name: str) -> Response:
return _xml_response(result, status=200)
# ---------------------- routes ----------------------
# Route handlers for S3 API endpoints
@s3_api_bp.get("/")
@limiter.limit("60 per minute")
def list_buckets() -> Response:
@@ -1642,7 +1626,7 @@ def bucket_handler(bucket_name: str) -> Response:
current_app.logger.info("Bucket deleted", extra={"bucket": bucket_name})
return Response(status=204)
# GET - list objects (supports both ListObjects and ListObjectsV2)
# Handle GET - list objects (supports both ListObjects and ListObjectsV2)
principal, error = _require_principal()
try:
_authorize_action(principal, bucket_name, "list")
@@ -1650,18 +1634,13 @@ def bucket_handler(bucket_name: str) -> Response:
if error:
return error
return _error_response("AccessDenied", str(exc), 403)
try:
objects = storage.list_objects_all(bucket_name)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
# Check if this is ListObjectsV2 (list-type=2)
list_type = request.args.get("list-type")
prefix = request.args.get("prefix", "")
delimiter = request.args.get("delimiter", "")
max_keys = min(int(request.args.get("max-keys", current_app.config["UI_PAGE_SIZE"])), 1000)
# Pagination markers
# Use appropriate markers for pagination depending on API version
marker = request.args.get("marker", "") # ListObjects v1
continuation_token = request.args.get("continuation-token", "") # ListObjectsV2
start_after = request.args.get("start-after", "") # ListObjectsV2
@@ -1681,11 +1660,18 @@ def bucket_handler(bucket_name: str) -> Response:
else:
effective_start = marker
if prefix:
objects = [obj for obj in objects if obj.key.startswith(prefix)]
if effective_start:
objects = [obj for obj in objects if obj.key > effective_start]
# Fetch with buffer for delimiter processing; delimiter requires extra objects to compute prefixes
fetch_keys = max_keys * 10 if delimiter else max_keys
try:
list_result = storage.list_objects(
bucket_name,
max_keys=fetch_keys,
continuation_token=effective_start or None,
prefix=prefix or None,
)
objects = list_result.objects
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
common_prefixes: list[str] = []
filtered_objects: list = []
@@ -1694,7 +1680,7 @@ def bucket_handler(bucket_name: str) -> Response:
for obj in objects:
key_after_prefix = obj.key[len(prefix):] if prefix else obj.key
if delimiter in key_after_prefix:
# This is a "folder" - extract the common prefix
# Extract common prefix (folder-like structure)
common_prefix = prefix + key_after_prefix.split(delimiter)[0] + delimiter
if common_prefix not in seen_prefixes:
seen_prefixes.add(common_prefix)
@@ -1705,7 +1691,7 @@ def bucket_handler(bucket_name: str) -> Response:
common_prefixes = sorted(common_prefixes)
total_items = len(objects) + len(common_prefixes)
is_truncated = total_items > max_keys
is_truncated = total_items > max_keys or list_result.is_truncated
if len(objects) >= max_keys:
objects = objects[:max_keys]
@@ -1845,9 +1831,8 @@ def object_handler(bucket_name: str, object_key: str):
response = Response(status=200)
response.headers["ETag"] = f'"{meta.etag}"'
# Trigger replication if not a replication request
user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent:
# Trigger replication for non-replication requests
if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""):
_replication_manager().trigger_replication(bucket_name, object_key, action="write")
return response
@@ -1866,17 +1851,15 @@ def object_handler(bucket_name: str, object_key: str):
metadata = storage.get_object_metadata(bucket_name, object_key)
mimetype = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
# Check if object is encrypted and needs decryption
# Decrypt encrypted objects
is_encrypted = "x-amz-server-side-encryption" in metadata
if request.method == "GET":
if is_encrypted and hasattr(storage, 'get_object_data'):
# Use encrypted storage to decrypt
try:
data, clean_metadata = storage.get_object_data(bucket_name, object_key)
response = Response(data, mimetype=mimetype)
logged_bytes = len(data)
# Use decrypted size for Content-Length
response.headers["Content-Length"] = len(data)
etag = hashlib.md5(data).hexdigest()
except StorageError as exc:

View File

@@ -128,11 +128,14 @@ class ObjectStorage:
BUCKET_VERSIONS_DIR = "versions"
MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30 # seconds - longer TTL for better browsing performance
def __init__(self, root: Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
# In-memory object metadata cache: bucket_id -> (dict[key -> ObjectMeta], timestamp)
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = []
@@ -274,32 +277,26 @@ class ObjectStorage:
raise StorageError("Bucket does not exist")
bucket_id = bucket_path.name
# Collect all matching object keys first (lightweight - just paths)
all_keys: List[str] = []
for path in bucket_path.rglob("*"):
if path.is_file():
rel = path.relative_to(bucket_path)
if rel.parts and rel.parts[0] in self.INTERNAL_FOLDERS:
continue
key = str(rel.as_posix())
if prefix and not key.startswith(prefix):
continue
all_keys.append(key)
# Use cached object metadata for fast listing
object_cache = self._get_object_cache(bucket_id, bucket_path)
# Get sorted keys
all_keys = sorted(object_cache.keys())
# Apply prefix filter if specified
if prefix:
all_keys = [k for k in all_keys if k.startswith(prefix)]
all_keys.sort()
total_count = len(all_keys)
# Handle continuation token (the key to start after)
start_index = 0
if continuation_token:
try:
# continuation_token is the last key from previous page
for i, key in enumerate(all_keys):
if key > continuation_token:
start_index = i
break
else:
# Token is past all keys
# Binary search for efficiency on large lists
import bisect
start_index = bisect.bisect_right(all_keys, continuation_token)
if start_index >= total_count:
return ListObjectsResult(
objects=[],
is_truncated=False,
@@ -314,27 +311,12 @@ class ObjectStorage:
keys_slice = all_keys[start_index:end_index]
is_truncated = end_index < total_count
# Now load full metadata only for the objects we're returning
# Build result from cached metadata (no file I/O!)
objects: List[ObjectMeta] = []
for key in keys_slice:
safe_key = self._sanitize_object_key(key)
path = bucket_path / safe_key
if not path.exists():
continue # Object may have been deleted
try:
stat = path.stat()
metadata = self._read_metadata(bucket_id, safe_key)
objects.append(
ObjectMeta(
key=key,
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
etag=self._compute_etag(path),
metadata=metadata or None,
)
)
except OSError:
continue # File may have been deleted during iteration
obj = object_cache.get(key)
if obj:
objects.append(obj)
next_token = keys_slice[-1] if is_truncated and keys_slice else None
@@ -416,18 +398,21 @@ class ObjectStorage:
pass
stat = destination.stat()
if metadata:
self._write_metadata(bucket_id, safe_key, metadata)
else:
self._delete_metadata(bucket_id, safe_key)
etag = checksum.hexdigest()
# Always store internal metadata (etag, size) alongside user metadata
internal_meta = {"__etag__": etag, "__size__": str(stat.st_size)}
combined_meta = {**internal_meta, **(metadata or {})}
self._write_metadata(bucket_id, safe_key, combined_meta)
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
return ObjectMeta(
key=safe_key.as_posix(),
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
etag=checksum.hexdigest(),
etag=etag,
metadata=metadata,
)
@@ -479,6 +464,7 @@ class ObjectStorage:
self._delete_metadata(bucket_id, rel)
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
self._cleanup_empty_parents(path, bucket_path)
def purge_object(self, bucket_name: str, object_key: str) -> None:
@@ -501,6 +487,7 @@ class ObjectStorage:
# Invalidate bucket stats cache
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
self._cleanup_empty_parents(target, bucket_path)
def is_versioning_enabled(self, bucket_name: str) -> bool:
@@ -1163,6 +1150,187 @@ class ObjectStorage:
def _legacy_multipart_dir(self, bucket_name: str, upload_id: str) -> Path:
return self._legacy_multipart_bucket_root(bucket_name) / upload_id
def _fast_list_keys(self, bucket_path: Path) -> List[str]:
"""Fast directory walk using os.scandir instead of pathlib.rglob.
This is significantly faster for large directories (10K+ files).
Returns just the keys (for backward compatibility).
"""
return list(self._build_object_cache(bucket_path).keys())
def _build_object_cache(self, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Build a complete object metadata cache for a bucket.
Uses os.scandir for fast directory walking and a persistent etag index.
"""
from concurrent.futures import ThreadPoolExecutor
bucket_id = bucket_path.name
objects: Dict[str, ObjectMeta] = {}
bucket_str = str(bucket_path)
bucket_len = len(bucket_str) + 1 # +1 for the separator
# Try to load persisted etag index first (single file read vs thousands)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
meta_cache: Dict[str, str] = {}
index_mtime: float = 0
if etag_index_path.exists():
try:
index_mtime = etag_index_path.stat().st_mtime
with open(etag_index_path, 'r', encoding='utf-8') as f:
meta_cache = json.load(f)
except (OSError, json.JSONDecodeError):
meta_cache = {}
# Check if we need to rebuild the index
meta_root = self._bucket_meta_root(bucket_id)
needs_rebuild = False
if meta_root.exists() and index_mtime > 0:
# Quick check: if any meta file is newer than index, rebuild
def check_newer(dir_path: str) -> bool:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
if check_newer(entry.path):
return True
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
if entry.stat().st_mtime > index_mtime:
return True
except OSError:
pass
return False
needs_rebuild = check_newer(str(meta_root))
elif not meta_cache:
needs_rebuild = True
if needs_rebuild and meta_root.exists():
meta_str = str(meta_root)
meta_len = len(meta_str) + 1
meta_files: list[tuple[str, str]] = []
# Collect all metadata file paths
def collect_meta_files(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
collect_meta_files(entry.path)
elif entry.is_file(follow_symlinks=False) and entry.name.endswith('.meta.json'):
rel = entry.path[meta_len:]
key = rel[:-10].replace(os.sep, '/')
meta_files.append((key, entry.path))
except OSError:
pass
collect_meta_files(meta_str)
# Parallel read of metadata files - only extract __etag__
def read_meta_file(item: tuple[str, str]) -> tuple[str, str | None]:
key, path = item
try:
with open(path, 'rb') as f:
content = f.read()
etag_marker = b'"__etag__"'
idx = content.find(etag_marker)
if idx != -1:
start = content.find(b'"', idx + len(etag_marker) + 1)
if start != -1:
end = content.find(b'"', start + 1)
if end != -1:
return key, content[start+1:end].decode('utf-8')
return key, None
except (OSError, UnicodeDecodeError):
return key, None
if meta_files:
meta_cache = {}
with ThreadPoolExecutor(max_workers=min(64, len(meta_files))) as executor:
for key, etag in executor.map(read_meta_file, meta_files):
if etag:
meta_cache[key] = etag
# Persist the index for next time
try:
etag_index_path.parent.mkdir(parents=True, exist_ok=True)
with open(etag_index_path, 'w', encoding='utf-8') as f:
json.dump(meta_cache, f)
except OSError:
pass
# Now scan objects and use cached etags
def scan_dir(dir_path: str) -> None:
try:
with os.scandir(dir_path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip internal folders
rel_start = entry.path[bucket_len:].split(os.sep)[0] if len(entry.path) > bucket_len else entry.name
if rel_start in self.INTERNAL_FOLDERS:
continue
scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
# Get relative path and convert to POSIX
rel = entry.path[bucket_len:]
# Check if in internal folder
first_part = rel.split(os.sep)[0] if os.sep in rel else rel
if first_part in self.INTERNAL_FOLDERS:
continue
key = rel.replace(os.sep, '/')
try:
# Use entry.stat() which is cached from scandir
stat = entry.stat()
# Get etag from cache (now just a string, not dict)
etag = meta_cache.get(key)
# Use placeholder for legacy objects without stored etag
if not etag:
etag = f'"{stat.st_size}-{int(stat.st_mtime)}"'
objects[key] = ObjectMeta(
key=key,
size=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime),
etag=etag,
metadata=None, # Don't include user metadata in listing
)
except OSError:
pass
except OSError:
pass
scan_dir(bucket_str)
return objects
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Get cached object metadata for a bucket, refreshing if stale."""
now = time.time()
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
return objects
# Rebuild cache
objects = self._build_object_cache(bucket_path)
self._object_cache[bucket_id] = (objects, now)
return objects
def _invalidate_object_cache(self, bucket_id: str) -> None:
"""Invalidate the object cache and etag index for a bucket."""
self._object_cache.pop(bucket_id, None)
# Also invalidate persisted etag index
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)
except OSError:
pass
def _ensure_system_roots(self) -> None:
for path in (
self._system_root_path(),

View File

@@ -423,7 +423,7 @@ def list_bucket_objects(bucket_name: str):
except IamError as exc:
return jsonify({"error": str(exc)}), 403
max_keys = min(int(request.args.get("max_keys", 100)), 1000)
max_keys = min(int(request.args.get("max_keys", 1000)), 10000)
continuation_token = request.args.get("continuation_token") or None
prefix = request.args.get("prefix") or None
@@ -738,41 +738,30 @@ def bulk_download_objects(bucket_name: str):
unique_keys = list(dict.fromkeys(cleaned))
storage = _storage()
# Check permissions for all keys first (or at least bucket read)
# We'll check bucket read once, then object read for each if needed?
# _authorize_ui checks bucket level if object_key is None, but we need to check each object if fine-grained policies exist.
# For simplicity/performance, we check bucket list/read.
# Verify permission to read bucket contents
try:
_authorize_ui(principal, bucket_name, "read")
except IamError as exc:
return jsonify({"error": str(exc)}), 403
# Create ZIP
# Create ZIP archive of selected objects
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for key in unique_keys:
try:
# Verify individual object permission if needed?
# _authorize_ui(principal, bucket_name, "read", object_key=key)
# This might be slow for many objects. Assuming bucket read is enough for now or we accept the overhead.
# Let's skip individual check for bulk speed, assuming bucket read implies object read unless denied.
# But strictly we should check. Let's check.
_authorize_ui(principal, bucket_name, "read", object_key=key)
# Check if object is encrypted
metadata = storage.get_object_metadata(bucket_name, key)
is_encrypted = "x-amz-server-side-encryption" in metadata
if is_encrypted and hasattr(storage, 'get_object_data'):
# Decrypt and add to zip
data, _ = storage.get_object_data(bucket_name, key)
zf.writestr(key, data)
else:
# Add unencrypted file directly
path = storage.get_object_path(bucket_name, key)
zf.write(path, arcname=key)
except (StorageError, IamError):
# Skip files we can't read or don't exist
# Skip objects that can't be accessed
continue
buffer.seek(0)
@@ -1077,7 +1066,6 @@ def update_bucket_encryption(bucket_name: str):
action = request.form.get("action", "enable")
if action == "disable":
# Disable encryption
try:
_storage().set_bucket_encryption(bucket_name, None)
flash("Default encryption disabled", "info")
@@ -1085,16 +1073,14 @@ def update_bucket_encryption(bucket_name: str):
flash(_friendly_error_message(exc), "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
# Enable or update encryption
algorithm = request.form.get("algorithm", "AES256")
kms_key_id = request.form.get("kms_key_id", "").strip() or None
# Validate algorithm
if algorithm not in ("AES256", "aws:kms"):
flash("Invalid encryption algorithm", "danger")
return redirect(url_for("ui.bucket_detail", bucket_name=bucket_name, tab="properties"))
# Build encryption config following AWS format
# Build encryption configuration in AWS S3 format
encryption_config: dict[str, Any] = {
"Rules": [
{