9 Commits

8 changed files with 216 additions and 487 deletions

View File

@@ -9,7 +9,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Dict, Optional
import boto3
from botocore.config import Config
@@ -24,42 +24,11 @@ logger = logging.getLogger(__name__)
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
REPLICATION_CONNECT_TIMEOUT = 5
REPLICATION_READ_TIMEOUT = 30
STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files
REPLICATION_MODE_NEW_ONLY = "new_only"
REPLICATION_MODE_ALL = "all"
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
"""Create a boto3 S3 client for the given connection.
Args:
connection: Remote S3 connection configuration
health_check: If True, use minimal retries for quick health checks
Returns:
Configured boto3 S3 client
"""
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 1 if health_check else 2},
signature_version='s3v4',
s3={'addressing_style': 'path'},
request_checksum_calculation='when_required',
response_checksum_validation='when_required',
)
return boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region or 'us-east-1',
config=config,
)
@dataclass
class ReplicationStats:
"""Statistics for replication operations - computed dynamically."""
@@ -133,19 +102,8 @@ class ReplicationManager:
self._rules: Dict[str, ReplicationRule] = {}
self._stats_lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
self._shutdown = False
self.reload_rules()
def shutdown(self, wait: bool = True) -> None:
"""Shutdown the replication executor gracefully.
Args:
wait: If True, wait for pending tasks to complete
"""
self._shutdown = True
self._executor.shutdown(wait=wait)
logger.info("Replication manager shut down")
def reload_rules(self) -> None:
if not self.rules_path.exists():
self._rules = {}
@@ -166,12 +124,25 @@ class ReplicationManager:
def check_endpoint_health(self, connection: RemoteConnection) -> bool:
"""Check if a remote endpoint is reachable and responsive.
Returns True if endpoint is healthy, False otherwise.
Uses short timeouts to prevent blocking.
"""
try:
s3 = _create_s3_client(connection, health_check=True)
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 1}
)
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
config=config,
)
s3.list_buckets()
return True
except Exception as e:
@@ -213,9 +184,15 @@ class ReplicationManager:
try:
source_objects = self.storage.list_objects_all(bucket_name)
source_keys = {obj.key: obj.size for obj in source_objects}
s3 = _create_s3_client(connection)
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
)
dest_keys = set()
bytes_synced = 0
paginator = s3.get_paginator('list_objects_v2')
@@ -280,7 +257,13 @@ class ReplicationManager:
raise ValueError(f"Connection {connection_id} not found")
try:
s3 = _create_s3_client(connection)
s3 = boto3.client(
"s3",
endpoint_url=connection.endpoint_url,
aws_access_key_id=connection.access_key,
aws_secret_access_key=connection.secret_key,
region_name=connection.region,
)
s3.create_bucket(Bucket=bucket_name)
except ClientError as e:
logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
@@ -303,28 +286,41 @@ class ReplicationManager:
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
if self._shutdown:
return
# Re-check if rule is still enabled (may have been paused after task was submitted)
current_rule = self.get_rule(bucket_name)
if not current_rule or not current_rule.enabled:
logger.debug(f"Replication skipped for {bucket_name}/{object_key}: rule disabled or removed")
return
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
return
try:
from .storage import ObjectStorage
ObjectStorage._sanitize_object_key(object_key)
except StorageError as e:
logger.error(f"Object key validation failed in replication: {e}")
return
file_size = 0
try:
s3 = _create_s3_client(conn)
config = Config(
user_agent_extra=REPLICATION_USER_AGENT,
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
read_timeout=REPLICATION_READ_TIMEOUT,
retries={'max_attempts': 2},
signature_version='s3v4',
s3={
'addressing_style': 'path',
},
# Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors
# with S3-compatible servers that don't support CRC32 checksum headers
request_checksum_calculation='when_required',
response_checksum_validation='when_required',
)
s3 = boto3.client(
"s3",
endpoint_url=conn.endpoint_url,
aws_access_key_id=conn.access_key,
aws_secret_access_key=conn.secret_key,
region_name=conn.region or 'us-east-1',
config=config,
)
if action == "delete":
try:
@@ -341,42 +337,34 @@ class ReplicationManager:
logger.error(f"Source object not found: {bucket_name}/{object_key}")
return
# Don't replicate metadata - destination server will generate its own
# __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values.
content_type, _ = mimetypes.guess_type(path)
file_size = path.stat().st_size
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
def do_upload() -> None:
"""Upload object using appropriate method based on file size.
For small files (< 10 MiB): Read into memory for simpler handling
For large files: Use streaming upload to avoid memory issues
def do_put_object() -> None:
"""Helper to upload object.
Reads the file content into memory first to avoid signature calculation
issues with certain binary file types (like GIFs) when streaming.
Do NOT set ContentLength explicitly - boto3 calculates it from the bytes
and setting it manually can cause SignatureDoesNotMatch errors.
"""
extra_args = {}
file_content = path.read_bytes()
put_kwargs = {
"Bucket": rule.target_bucket,
"Key": object_key,
"Body": file_content,
}
if content_type:
extra_args["ContentType"] = content_type
if file_size >= STREAMING_THRESHOLD_BYTES:
# Use multipart upload for large files
s3.upload_file(
str(path),
rule.target_bucket,
object_key,
ExtraArgs=extra_args if extra_args else None,
)
else:
# Read small files into memory
file_content = path.read_bytes()
put_kwargs = {
"Bucket": rule.target_bucket,
"Key": object_key,
"Body": file_content,
**extra_args,
}
s3.put_object(**put_kwargs)
put_kwargs["ContentType"] = content_type
s3.put_object(**put_kwargs)
try:
do_upload()
do_put_object()
except (ClientError, S3UploadFailedError) as e:
error_code = None
if isinstance(e, ClientError):
@@ -398,13 +386,13 @@ class ReplicationManager:
bucket_ready = True
else:
logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}")
raise e
raise e
if bucket_ready:
do_upload()
do_put_object()
else:
raise e
logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})")
self._update_last_sync(bucket_name, object_key)

View File

@@ -1,15 +1,13 @@
"""Flask blueprint exposing a subset of the S3 REST API."""
from __future__ import annotations
import base64
import hashlib
import hmac
import logging
import mimetypes
import re
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict
from urllib.parse import quote, urlencode, urlparse, unquote
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
@@ -22,8 +20,6 @@ from .iam import IamError, Principal
from .replication import ReplicationManager
from .storage import ObjectStorage, StorageError, QuotaExceededError
logger = logging.getLogger(__name__)
s3_api_bp = Blueprint("s3_api", __name__)
def _storage() -> ObjectStorage:
@@ -122,9 +118,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
if header_val is None:
header_val = ""
if header.lower() == 'expect' and header_val == "":
header_val = "100-continue"
header_val = " ".join(header_val.split())
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
canonical_headers = "".join(canonical_headers_parts)
@@ -135,6 +128,15 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
# Debug logging for signature issues
import logging
logger = logging.getLogger(__name__)
logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}")
logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}")
logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}")
logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}")
logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}")
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
if not amz_date:
raise IamError("Missing Date header")
@@ -165,18 +167,24 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature):
# Only log detailed signature debug info if DEBUG_SIGV4 is enabled
if current_app.config.get("DEBUG_SIGV4"):
logger.warning(
"SigV4 signature mismatch",
extra={
"path": req.path,
"method": method,
"signed_headers": signed_headers_str,
"content_type": req.headers.get("Content-Type"),
"content_length": req.headers.get("Content-Length"),
}
)
# Debug logging for signature mismatch
import logging
logger = logging.getLogger(__name__)
logger.error(f"Signature mismatch for {req.path}")
logger.error(f" Content-Type: {req.headers.get('Content-Type')}")
logger.error(f" Content-Length: {req.headers.get('Content-Length')}")
logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}")
logger.error(f" Canonical URI: {canonical_uri}")
logger.error(f" Signed headers: {signed_headers_str}")
# Log each signed header's value
for h in signed_headers_list:
logger.error(f" Header '{h}': {repr(req.headers.get(h))}")
logger.error(f" Expected sig: {signature[:16]}...")
logger.error(f" Calculated sig: {calculated_signature[:16]}...")
# Log first part of canonical request to compare
logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...")
# Log the full canonical request for debugging
logger.error(f" Canonical request:\n{canonical_request[:500]}...")
raise IamError("SignatureDoesNotMatch")
return _iam().get_principal(access_key)
@@ -228,8 +236,6 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_headers_parts = []
for header in signed_headers_list:
val = req.headers.get(header, "").strip()
if header.lower() == 'expect' and val == "":
val = "100-continue"
val = " ".join(val.split())
canonical_headers_parts.append(f"{header}:{val}\n")
canonical_headers = "".join(canonical_headers_parts)
@@ -563,28 +569,6 @@ def _strip_ns(tag: str | None) -> str:
return tag.split("}")[-1]
def _find_element(parent: Element, name: str) -> Optional[Element]:
"""Find a child element by name, trying both namespaced and non-namespaced variants.
This handles XML documents that may or may not include namespace prefixes.
"""
el = parent.find(f"{{*}}{name}")
if el is None:
el = parent.find(name)
return el
def _find_element_text(parent: Element, name: str, default: str = "") -> str:
"""Find a child element and return its text content.
Returns the default value if element not found or has no text.
"""
el = _find_element(parent, name)
if el is None or el.text is None:
return default
return el.text.strip()
def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
try:
root = fromstring(payload)
@@ -601,11 +585,17 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
for tag_el in list(tagset):
if _strip_ns(tag_el.tag) != "Tag":
continue
key = _find_element_text(tag_el, "Key")
key_el = tag_el.find("{*}Key")
if key_el is None:
key_el = tag_el.find("Key")
value_el = tag_el.find("{*}Value")
if value_el is None:
value_el = tag_el.find("Value")
key = (key_el.text or "").strip() if key_el is not None else ""
if not key:
continue
value = _find_element_text(tag_el, "Value")
tags.append({"Key": key, "Value": value})
value = value_el.text if value_el is not None else ""
tags.append({"Key": key, "Value": value or ""})
return tags
@@ -1449,7 +1439,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
if request.method == "DELETE":
try:
storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None)
storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name})
@@ -1483,7 +1473,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
return _error_response("InvalidArgument", f"max_objects {exc}", 400)
try:
storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects)
storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
@@ -1675,6 +1665,7 @@ def bucket_handler(bucket_name: str) -> Response:
effective_start = ""
if list_type == "2":
if continuation_token:
import base64
try:
effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8")
except Exception:
@@ -1731,6 +1722,7 @@ def bucket_handler(bucket_name: str) -> Response:
next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1]
if list_type == "2" and next_marker:
import base64
next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8")
if list_type == "2":

View File

@@ -7,11 +7,9 @@ import os
import re
import shutil
import stat
import threading
import time
import unicodedata
import uuid
from collections import OrderedDict
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
@@ -131,17 +129,12 @@ class ObjectStorage:
MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30
OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache
def __init__(self, root: Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
# LRU cache for object metadata with thread-safe access
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
self._cache_lock = threading.Lock()
# Cache version counter for detecting stale reads
self._cache_version: Dict[str, int] = {}
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = []
@@ -736,6 +729,8 @@ class ObjectStorage:
bucket_id = bucket_path.name
safe_key = self._sanitize_object_key(object_key)
version_dir = self._version_dir(bucket_id, safe_key)
if not version_dir.exists():
version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists():
version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists():
@@ -884,73 +879,41 @@ class ObjectStorage:
part_number: int,
stream: BinaryIO,
) -> str:
"""Upload a part for a multipart upload.
Uses file locking to safely update the manifest and handle concurrent uploads.
"""
if part_number < 1:
raise StorageError("part_number must be >= 1")
bucket_path = self._bucket_path(bucket_name)
upload_root = self._multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
raise StorageError("Multipart upload not found")
# Write part to temporary file first, then rename atomically
checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename
temp_path = upload_root / f".{part_filename}.tmp"
try:
with temp_path.open("wb") as target:
shutil.copyfileobj(_HashingReader(stream, checksum), target)
# Atomic rename (or replace on Windows)
temp_path.replace(part_path)
except OSError:
# Clean up temp file on failure
try:
temp_path.unlink(missing_ok=True)
except OSError:
pass
raise
with part_path.open("wb") as target:
shutil.copyfileobj(_HashingReader(stream, checksum), target)
record = {
"etag": checksum.hexdigest(),
"size": part_path.stat().st_size,
"filename": part_filename,
}
manifest_path = upload_root / self.MULTIPART_MANIFEST
lock_path = upload_root / ".manifest.lock"
# Retry loop for handling transient lock/read failures
max_retries = 3
for attempt in range(max_retries):
try:
with lock_path.open("w") as lock_file:
with _file_lock(lock_file):
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise StorageError("Multipart manifest unreadable") from exc
parts = manifest.setdefault("parts", {})
parts[str(part_number)] = record
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
break
except OSError as exc:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise StorageError(f"Failed to update multipart manifest: {exc}") from exc
with lock_path.open("w") as lock_file:
with _file_lock(lock_file):
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
raise StorageError("Multipart manifest unreadable") from exc
parts = manifest.setdefault("parts", {})
parts[str(part_number)] = record
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
return record["etag"]
def complete_multipart_upload(
@@ -1052,10 +1015,9 @@ class ObjectStorage:
pass
shutil.rmtree(upload_root, ignore_errors=True)
self._invalidate_bucket_stats_cache(bucket_id)
self._invalidate_object_cache(bucket_id)
stat = destination.stat()
return ObjectMeta(
key=safe_key.as_posix(),
@@ -1302,52 +1264,22 @@ class ObjectStorage:
return objects
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
"""Get cached object metadata for a bucket, refreshing if stale.
Uses LRU eviction to prevent unbounded cache growth.
Thread-safe with version tracking to detect concurrent invalidations.
"""
"""Get cached object metadata for a bucket, refreshing if stale."""
now = time.time()
with self._cache_lock:
cached = self._object_cache.get(bucket_id)
cache_version = self._cache_version.get(bucket_id, 0)
if cached:
objects, timestamp = cached
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
# Move to end (most recently used)
self._object_cache.move_to_end(bucket_id)
return objects
# Build cache outside lock to avoid holding lock during I/O
cached = self._object_cache.get(bucket_id)
if cached:
objects, timestamp = cached
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
return objects
objects = self._build_object_cache(bucket_path)
with self._cache_lock:
# Check if cache was invalidated while we were building
current_version = self._cache_version.get(bucket_id, 0)
if current_version != cache_version:
# Cache was invalidated, rebuild
objects = self._build_object_cache(bucket_path)
# Evict oldest entries if cache is full
while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
self._object_cache.popitem(last=False)
self._object_cache[bucket_id] = (objects, time.time())
self._object_cache.move_to_end(bucket_id)
self._object_cache[bucket_id] = (objects, now)
return objects
def _invalidate_object_cache(self, bucket_id: str) -> None:
"""Invalidate the object cache and etag index for a bucket.
Increments version counter to signal stale reads.
"""
with self._cache_lock:
self._object_cache.pop(bucket_id, None)
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
"""Invalidate the object cache and etag index for a bucket."""
self._object_cache.pop(bucket_id, None)
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)

View File

@@ -415,7 +415,7 @@ def list_bucket_objects(bucket_name: str):
except IamError as exc:
return jsonify({"error": str(exc)}), 403
max_keys = min(int(request.args.get("max_keys", 1000)), 100000)
max_keys = min(int(request.args.get("max_keys", 1000)), 10000)
continuation_token = request.args.get("continuation_token") or None
prefix = request.args.get("prefix") or None
@@ -434,14 +434,6 @@ def list_bucket_objects(bucket_name: str):
except StorageError:
versioning_enabled = False
# Pre-compute URL templates once (not per-object) for performance
# Frontend will construct actual URLs by replacing KEY_PLACEHOLDER
preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER")
objects_data = []
for obj in result.objects:
objects_data.append({
@@ -450,6 +442,13 @@ def list_bucket_objects(bucket_name: str):
"last_modified": obj.last_modified.isoformat(),
"last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"),
"etag": obj.etag,
"metadata": obj.metadata or {},
"preview_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key),
"download_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key) + "?download=1",
"presign_endpoint": url_for("ui.object_presign", bucket_name=bucket_name, object_key=obj.key),
"delete_endpoint": url_for("ui.delete_object", bucket_name=bucket_name, object_key=obj.key),
"versions_endpoint": url_for("ui.object_versions", bucket_name=bucket_name, object_key=obj.key),
"restore_template": url_for("ui.restore_object_version", bucket_name=bucket_name, object_key=obj.key, version_id="VERSION_ID_PLACEHOLDER"),
})
return jsonify({
@@ -458,14 +457,6 @@ def list_bucket_objects(bucket_name: str):
"next_continuation_token": result.next_continuation_token,
"total_count": result.total_count,
"versioning_enabled": versioning_enabled,
"url_templates": {
"preview": preview_template,
"download": preview_template + "?download=1",
"presign": presign_template,
"delete": delete_template,
"versions": versions_template,
"restore": restore_template,
},
})
@@ -1467,17 +1458,11 @@ def update_bucket_replication(bucket_name: str):
else:
flash("No replication configuration to pause", "warning")
elif action == "resume":
from .replication import REPLICATION_MODE_ALL
rule = _replication().get_rule(bucket_name)
if rule:
rule.enabled = True
_replication().set_rule(rule)
# When resuming, sync any pending objects that accumulated while paused
if rule.mode == REPLICATION_MODE_ALL:
_replication().replicate_existing_objects(bucket_name)
flash("Replication resumed. Syncing pending objects in background.", "success")
else:
flash("Replication resumed", "success")
flash("Replication resumed", "success")
else:
flash("No replication configuration to resume", "warning")
elif action == "create":

View File

@@ -1,7 +1,7 @@
"""Central location for the application version string."""
from __future__ import annotations
APP_VERSION = "0.1.9"
APP_VERSION = "0.1.8"
def get_version() -> str:

View File

@@ -173,16 +173,14 @@
</div>
<div class="d-flex align-items-center gap-1">
<span class="text-muted">Batch</span>
<select id="page-size-select" class="form-select form-select-sm py-0" style="width: auto; font-size: 0.75rem;" title="Number of objects to load per batch">
<select id="page-size-select" class="form-select form-select-sm py-0" style="width: auto; font-size: 0.75rem;">
<option value="1000">1K</option>
<option value="5000" selected>5K</option>
<option value="10000">10K</option>
<option value="25000">25K</option>
<option value="50000">50K</option>
<option value="75000">75K</option>
<option value="100000">100K</option>
</select>
<span class="text-muted">per batch</span>
<span class="text-muted">objects</span>
</div>
</div>
</div>
@@ -1146,18 +1144,13 @@
</div>
{% elif replication_rule and not replication_rule.enabled %}
<div class="alert alert-warning d-flex align-items-start mb-4" role="alert">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="flex-shrink-0 me-2 mt-1" viewBox="0 0 16 16">
<div class="alert alert-warning d-flex align-items-center mb-4" role="alert">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="flex-shrink-0 me-2" viewBox="0 0 16 16">
<path d="M5.5 3.5A1.5 1.5 0 0 1 7 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5zm5 0A1.5 1.5 0 0 1 12 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5z"/>
</svg>
<div>
<strong>Replication Paused</strong>
<p class="mb-1">Replication is configured but currently paused. New uploads will not be replicated until resumed.</p>
{% if replication_rule.mode == 'all' %}
<p class="mb-0 small text-dark"><strong>Tip:</strong> When you resume, any objects uploaded while paused will be automatically synced to the target.</p>
{% else %}
<p class="mb-0 small text-dark"><strong>Note:</strong> Objects uploaded while paused will not be synced (mode: new_only). Consider switching to "All Objects" mode if you need to sync missed uploads.</p>
{% endif %}
<strong>Replication Paused</strong>
Replication is configured but currently paused. New uploads will not be replicated until resumed.
</div>
</div>
@@ -1889,14 +1882,7 @@
let pageSize = 5000; // Load large batches for virtual scrolling
let currentPrefix = ''; // Current folder prefix for navigation
let allObjects = []; // All loaded object metadata (lightweight)
let urlTemplates = null; // URL templates from API for constructing object URLs
// Helper to build URL from template by replacing KEY_PLACEHOLDER with encoded key
const buildUrlFromTemplate = (template, key) => {
if (!template) return '';
return template.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/'));
};
// Virtual scrolling state
const ROW_HEIGHT = 53; // Height of each table row in pixels
const BUFFER_ROWS = 10; // Extra rows to render above/below viewport
@@ -2237,26 +2223,22 @@
objectsLoadingRow.remove();
}
if (data.url_templates && !urlTemplates) {
urlTemplates = data.url_templates;
}
// Store lightweight object metadata (no DOM elements!)
data.objects.forEach(obj => {
loadedObjectCount++;
const key = obj.key;
allObjects.push({
key: key,
key: obj.key,
size: obj.size,
lastModified: obj.last_modified,
lastModifiedDisplay: obj.last_modified_display,
etag: obj.etag,
previewUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.preview, key) : '',
downloadUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.download, key) : '',
presignEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.presign, key) : '',
deleteEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.delete, key) : '',
metadata: '{}',
versionsEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.versions, key) : '',
restoreTemplate: urlTemplates ? urlTemplates.restore.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/')) : ''
previewUrl: obj.preview_url,
downloadUrl: obj.download_url,
presignEndpoint: obj.presign_endpoint,
deleteEndpoint: obj.delete_endpoint,
metadata: JSON.stringify(obj.metadata || {}),
versionsEndpoint: obj.versions_endpoint,
restoreTemplate: obj.restore_template
});
});
@@ -3801,40 +3783,41 @@
selectAllCheckbox?.addEventListener('change', (event) => {
const shouldSelect = Boolean(event.target?.checked);
if (hasFolders()) {
// Get all file items in the current view (works with virtual scrolling)
const filesInView = visibleItems.filter(item => item.type === 'file');
const objectsInCurrentView = allObjects.filter(obj => obj.key.startsWith(currentPrefix));
objectsInCurrentView.forEach(obj => {
const checkbox = obj.element.querySelector('[data-object-select]');
if (checkbox && !checkbox.disabled) {
checkbox.checked = shouldSelect;
}
toggleRowSelection(obj.element, shouldSelect);
});
// Update selectedRows directly using object keys (not DOM elements)
filesInView.forEach(item => {
if (shouldSelect) {
selectedRows.set(item.data.key, item.data);
} else {
selectedRows.delete(item.data.key);
}
});
document.querySelectorAll('[data-folder-select]').forEach(cb => {
cb.checked = shouldSelect;
});
} else {
// Update folder checkboxes in DOM (folders are always rendered)
document.querySelectorAll('[data-folder-select]').forEach(cb => {
cb.checked = shouldSelect;
});
// Update any currently rendered object checkboxes
document.querySelectorAll('[data-object-row]').forEach((row) => {
const checkbox = row.querySelector('[data-object-select]');
if (checkbox) {
document.querySelectorAll('[data-object-row]').forEach((row) => {
if (row.style.display === 'none') return;
const checkbox = row.querySelector('[data-object-select]');
if (!checkbox || checkbox.disabled) {
return;
}
checkbox.checked = shouldSelect;
}
});
updateBulkDeleteState();
toggleRowSelection(row, shouldSelect);
});
}
setTimeout(updateBulkDownloadState, 0);
});
bulkDownloadButton?.addEventListener('click', async () => {
if (!bulkDownloadEndpoint) return;
// Use selectedRows which tracks all selected objects (not just rendered ones)
const selected = Array.from(selectedRows.keys());
const selected = Array.from(document.querySelectorAll('[data-object-select]:checked')).map(
(cb) => cb.closest('tr').dataset.key
);
if (selected.length === 0) return;
bulkDownloadButton.disabled = true;

View File

@@ -407,62 +407,10 @@ curl -X POST {{ api_base }}/presign/demo/notes.txt \
<span class="docs-section-kicker">07</span>
<h2 class="h4 mb-0">API Examples</h2>
</div>
<p class="text-muted">Common operations using popular SDKs and tools.</p>
<h3 class="h6 text-uppercase text-muted mt-4">Python (boto3)</h3>
<pre class="mb-4"><code class="language-python">import boto3
s3 = boto3.client(
's3',
endpoint_url='{{ api_base }}',
aws_access_key_id='&lt;access_key&gt;',
aws_secret_access_key='&lt;secret_key&gt;'
)
# List buckets
buckets = s3.list_buckets()['Buckets']
# Create bucket
s3.create_bucket(Bucket='mybucket')
# Upload file
s3.upload_file('local.txt', 'mybucket', 'remote.txt')
# Download file
s3.download_file('mybucket', 'remote.txt', 'downloaded.txt')
# Generate presigned URL (valid 1 hour)
url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': 'mybucket', 'Key': 'remote.txt'},
ExpiresIn=3600
)</code></pre>
<h3 class="h6 text-uppercase text-muted mt-4">JavaScript (AWS SDK v3)</h3>
<pre class="mb-4"><code class="language-javascript">import { S3Client, ListBucketsCommand, PutObjectCommand } from '@aws-sdk/client-s3';
const s3 = new S3Client({
endpoint: '{{ api_base }}',
region: 'us-east-1',
credentials: {
accessKeyId: '&lt;access_key&gt;',
secretAccessKey: '&lt;secret_key&gt;'
},
forcePathStyle: true // Required for S3-compatible services
});
// List buckets
const { Buckets } = await s3.send(new ListBucketsCommand({}));
// Upload object
await s3.send(new PutObjectCommand({
Bucket: 'mybucket',
Key: 'hello.txt',
Body: 'Hello, World!'
}));</code></pre>
<h3 class="h6 text-uppercase text-muted mt-4">Multipart Upload (Python)</h3>
<pre class="mb-4"><code class="language-python">import boto3
<p class="text-muted">Common operations using boto3.</p>
<h5 class="mt-4">Multipart Upload</h5>
<pre><code class="language-python">import boto3
s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
@@ -470,9 +418,9 @@ s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
upload_id = response['UploadId']
# Upload parts (minimum 5MB each, except last part)
# Upload parts
parts = []
chunks = [b'chunk1...', b'chunk2...']
chunks = [b'chunk1', b'chunk2'] # Example data chunks
for part_number, chunk in enumerate(chunks, start=1):
response = s3.upload_part(
Bucket='mybucket',
@@ -490,19 +438,6 @@ s3.complete_multipart_upload(
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)</code></pre>
<h3 class="h6 text-uppercase text-muted mt-4">Presigned URLs for Sharing</h3>
<pre class="mb-0"><code class="language-bash"># Generate a download link valid for 15 minutes
curl -X POST "{{ api_base }}/presign/mybucket/photo.jpg" \
-H "Content-Type: application/json" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;" \
-d '{"method": "GET", "expires_in": 900}'
# Generate an upload link (PUT) valid for 1 hour
curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
-H "Content-Type: application/json" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;" \
-d '{"method": "PUT", "expires_in": 3600}'</code></pre>
</div>
</article>
<article id="replication" class="card shadow-sm docs-section">
@@ -552,86 +487,6 @@ curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
</p>
</div>
</article>
<article id="versioning" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
<span class="docs-section-kicker">09</span>
<h2 class="h4 mb-0">Object Versioning</h2>
</div>
<p class="text-muted">Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.</p>
<h3 class="h6 text-uppercase text-muted mt-4">Enabling Versioning</h3>
<ol class="docs-steps mb-3">
<li>Navigate to your bucket's <strong>Properties</strong> tab.</li>
<li>Find the <strong>Versioning</strong> card and click <strong>Enable</strong>.</li>
<li>All subsequent uploads will create new versions instead of overwriting.</li>
</ol>
<h3 class="h6 text-uppercase text-muted mt-4">Version Operations</h3>
<div class="table-responsive mb-3">
<table class="table table-sm table-bordered small">
<thead class="table-light">
<tr>
<th>Operation</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>View Versions</strong></td>
<td>Click the version icon on any object to see all historical versions with timestamps and sizes.</td>
</tr>
<tr>
<td><strong>Restore Version</strong></td>
<td>Click <strong>Restore</strong> on any version to make it the current version (creates a copy).</td>
</tr>
<tr>
<td><strong>Delete Current</strong></td>
<td>Deleting an object archives it. Previous versions remain accessible.</td>
</tr>
<tr>
<td><strong>Purge All</strong></td>
<td>Permanently delete an object and all its versions. This cannot be undone.</td>
</tr>
</tbody>
</table>
</div>
<h3 class="h6 text-uppercase text-muted mt-4">Archived Objects</h3>
<p class="small text-muted mb-3">When you delete a versioned object, it becomes "archived" - the current version is removed but historical versions remain. The <strong>Archived</strong> tab shows these objects so you can restore them.</p>
<h3 class="h6 text-uppercase text-muted mt-4">API Usage</h3>
<pre class="mb-3"><code class="language-bash"># Enable versioning
curl -X PUT "{{ api_base }}/&lt;bucket&gt;?versioning" \
-H "Content-Type: application/json" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;" \
-d '{"Status": "Enabled"}'
# Get versioning status
curl "{{ api_base }}/&lt;bucket&gt;?versioning" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;"
# List object versions
curl "{{ api_base }}/&lt;bucket&gt;?versions" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;"
# Get specific version
curl "{{ api_base }}/&lt;bucket&gt;/&lt;key&gt;?versionId=&lt;version-id&gt;" \
-H "X-Access-Key: &lt;key&gt;" -H "X-Secret-Key: &lt;secret&gt;"</code></pre>
<div class="alert alert-light border mb-0">
<div class="d-flex gap-2">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-info-circle text-muted mt-1" viewBox="0 0 16 16">
<path d="M8 15A7 7 0 1 1 8 1a7 7 0 0 1 0 14zm0 1A8 8 0 1 0 8 0a8 8 0 0 0 0 16z"/>
<path d="m8.93 6.588-2.29.287-.082.38.45.083c.294.07.352.176.288.469l-.738 3.468c-.194.897.105 1.319.808 1.319.545 0 1.178-.252 1.465-.598l.088-.416c-.2.176-.492.246-.686.246-.275 0-.375-.193-.304-.533L8.93 6.588zM9 4.5a1 1 0 1 1-2 0 1 1 0 0 1 2 0z"/>
</svg>
<div>
<strong>Storage Impact:</strong> Each version consumes storage. Enable quotas to limit total bucket size including all versions.
</div>
</div>
</div>
</div>
</article>
<article id="quotas" class="card shadow-sm docs-section">
<div class="card-body">
<div class="d-flex align-items-center gap-2 mb-3">
@@ -854,7 +709,6 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
<li><a href="#api">REST endpoints</a></li>
<li><a href="#examples">API Examples</a></li>
<li><a href="#replication">Site Replication</a></li>
<li><a href="#versioning">Object Versioning</a></li>
<li><a href="#quotas">Bucket Quotas</a></li>
<li><a href="#encryption">Encryption</a></li>
<li><a href="#troubleshooting">Troubleshooting</a></li>

View File

@@ -150,21 +150,16 @@ class TestPaginatedObjectListing:
assert len(data["objects"]) == 1
obj = data["objects"][0]
# Check all expected fields
assert obj["key"] == "test.txt"
assert obj["size"] == 12 # len("test content")
assert "last_modified" in obj
assert "last_modified_display" in obj
assert "etag" in obj
# URLs are now returned as templates (not per-object) for performance
assert "url_templates" in data
templates = data["url_templates"]
assert "preview" in templates
assert "download" in templates
assert "delete" in templates
assert "KEY_PLACEHOLDER" in templates["preview"]
assert "preview_url" in obj
assert "download_url" in obj
assert "delete_endpoint" in obj
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
"""Bucket detail page should load even with many objects."""