From c8eb3de629ea25db886ec90ca3d807499d3c61c5 Mon Sep 17 00:00:00 2001
From: kqjy
Date: Mon, 29 Dec 2025 12:46:23 +0800
Subject: [PATCH 1/3] Fix issues -- Bug fixes: - Fix duplicate
_legacy_version_dir check in storage.py - Fix max_size_bytes ->
max_bytes param in quota handler - Move base64 import to module level
in s3_api.py - Add retry logic and atomic file ops to multipart upload
- Add shutdown() method to ReplicationManager
Performance:
- Add LRU eviction with OrderedDict to object cache
- Add cache version tracking for stale read detection
- Add streaming uploads for large files (>10 MiB) in replication
- Create _find_element() XML parsing helpers
Security:
- Gate SigV4 debug logging behind DEBUG_SIGV4 config
---
app/replication.py | 166 +++++++++++++++++++++++----------------------
app/s3_api.py | 90 +++++++++++++-----------
app/storage.py | 129 ++++++++++++++++++++++++++---------
app/version.py | 2 +-
4 files changed, 234 insertions(+), 153 deletions(-)
diff --git a/app/replication.py b/app/replication.py
index 5a4ad8a..884bbc3 100644
--- a/app/replication.py
+++ b/app/replication.py
@@ -9,7 +9,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from pathlib import Path
-from typing import Dict, Optional
+from typing import Any, Dict, Optional
import boto3
from botocore.config import Config
@@ -24,11 +24,42 @@ logger = logging.getLogger(__name__)
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
REPLICATION_CONNECT_TIMEOUT = 5
REPLICATION_READ_TIMEOUT = 30
+STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files
REPLICATION_MODE_NEW_ONLY = "new_only"
REPLICATION_MODE_ALL = "all"
+def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
+ """Create a boto3 S3 client for the given connection.
+
+ Args:
+ connection: Remote S3 connection configuration
+ health_check: If True, use minimal retries for quick health checks
+
+ Returns:
+ Configured boto3 S3 client
+ """
+ config = Config(
+ user_agent_extra=REPLICATION_USER_AGENT,
+ connect_timeout=REPLICATION_CONNECT_TIMEOUT,
+ read_timeout=REPLICATION_READ_TIMEOUT,
+ retries={'max_attempts': 1 if health_check else 2},
+ signature_version='s3v4',
+ s3={'addressing_style': 'path'},
+ request_checksum_calculation='when_required',
+ response_checksum_validation='when_required',
+ )
+ return boto3.client(
+ "s3",
+ endpoint_url=connection.endpoint_url,
+ aws_access_key_id=connection.access_key,
+ aws_secret_access_key=connection.secret_key,
+ region_name=connection.region or 'us-east-1',
+ config=config,
+ )
+
+
@dataclass
class ReplicationStats:
"""Statistics for replication operations - computed dynamically."""
@@ -102,8 +133,19 @@ class ReplicationManager:
self._rules: Dict[str, ReplicationRule] = {}
self._stats_lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
+ self._shutdown = False
self.reload_rules()
+ def shutdown(self, wait: bool = True) -> None:
+ """Shutdown the replication executor gracefully.
+
+ Args:
+ wait: If True, wait for pending tasks to complete
+ """
+ self._shutdown = True
+ self._executor.shutdown(wait=wait)
+ logger.info("Replication manager shut down")
+
def reload_rules(self) -> None:
if not self.rules_path.exists():
self._rules = {}
@@ -124,25 +166,12 @@ class ReplicationManager:
def check_endpoint_health(self, connection: RemoteConnection) -> bool:
"""Check if a remote endpoint is reachable and responsive.
-
+
Returns True if endpoint is healthy, False otherwise.
Uses short timeouts to prevent blocking.
"""
try:
- config = Config(
- user_agent_extra=REPLICATION_USER_AGENT,
- connect_timeout=REPLICATION_CONNECT_TIMEOUT,
- read_timeout=REPLICATION_READ_TIMEOUT,
- retries={'max_attempts': 1}
- )
- s3 = boto3.client(
- "s3",
- endpoint_url=connection.endpoint_url,
- aws_access_key_id=connection.access_key,
- aws_secret_access_key=connection.secret_key,
- region_name=connection.region,
- config=config,
- )
+ s3 = _create_s3_client(connection, health_check=True)
s3.list_buckets()
return True
except Exception as e:
@@ -184,15 +213,9 @@ class ReplicationManager:
try:
source_objects = self.storage.list_objects_all(bucket_name)
source_keys = {obj.key: obj.size for obj in source_objects}
-
- s3 = boto3.client(
- "s3",
- endpoint_url=connection.endpoint_url,
- aws_access_key_id=connection.access_key,
- aws_secret_access_key=connection.secret_key,
- region_name=connection.region,
- )
-
+
+ s3 = _create_s3_client(connection)
+
dest_keys = set()
bytes_synced = 0
paginator = s3.get_paginator('list_objects_v2')
@@ -257,13 +280,7 @@ class ReplicationManager:
raise ValueError(f"Connection {connection_id} not found")
try:
- s3 = boto3.client(
- "s3",
- endpoint_url=connection.endpoint_url,
- aws_access_key_id=connection.access_key,
- aws_secret_access_key=connection.secret_key,
- region_name=connection.region,
- )
+ s3 = _create_s3_client(connection)
s3.create_bucket(Bucket=bucket_name)
except ClientError as e:
logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
@@ -286,41 +303,22 @@ class ReplicationManager:
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
+ if self._shutdown:
+ return
+
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
return
-
+
try:
from .storage import ObjectStorage
ObjectStorage._sanitize_object_key(object_key)
except StorageError as e:
logger.error(f"Object key validation failed in replication: {e}")
return
-
- file_size = 0
+
try:
- config = Config(
- user_agent_extra=REPLICATION_USER_AGENT,
- connect_timeout=REPLICATION_CONNECT_TIMEOUT,
- read_timeout=REPLICATION_READ_TIMEOUT,
- retries={'max_attempts': 2},
- signature_version='s3v4',
- s3={
- 'addressing_style': 'path',
- },
- # Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors
- # with S3-compatible servers that don't support CRC32 checksum headers
- request_checksum_calculation='when_required',
- response_checksum_validation='when_required',
- )
- s3 = boto3.client(
- "s3",
- endpoint_url=conn.endpoint_url,
- aws_access_key_id=conn.access_key,
- aws_secret_access_key=conn.secret_key,
- region_name=conn.region or 'us-east-1',
- config=config,
- )
+ s3 = _create_s3_client(conn)
if action == "delete":
try:
@@ -337,34 +335,42 @@ class ReplicationManager:
logger.error(f"Source object not found: {bucket_name}/{object_key}")
return
- # Don't replicate metadata - destination server will generate its own
- # __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values.
-
content_type, _ = mimetypes.guess_type(path)
file_size = path.stat().st_size
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
- def do_put_object() -> None:
- """Helper to upload object.
-
- Reads the file content into memory first to avoid signature calculation
- issues with certain binary file types (like GIFs) when streaming.
- Do NOT set ContentLength explicitly - boto3 calculates it from the bytes
- and setting it manually can cause SignatureDoesNotMatch errors.
+ def do_upload() -> None:
+ """Upload object using appropriate method based on file size.
+
+ For small files (< 10 MiB): Read into memory for simpler handling
+ For large files: Use streaming upload to avoid memory issues
"""
- file_content = path.read_bytes()
- put_kwargs = {
- "Bucket": rule.target_bucket,
- "Key": object_key,
- "Body": file_content,
- }
+ extra_args = {}
if content_type:
- put_kwargs["ContentType"] = content_type
- s3.put_object(**put_kwargs)
+ extra_args["ContentType"] = content_type
+
+ if file_size >= STREAMING_THRESHOLD_BYTES:
+ # Use multipart upload for large files
+ s3.upload_file(
+ str(path),
+ rule.target_bucket,
+ object_key,
+ ExtraArgs=extra_args if extra_args else None,
+ )
+ else:
+ # Read small files into memory
+ file_content = path.read_bytes()
+ put_kwargs = {
+ "Bucket": rule.target_bucket,
+ "Key": object_key,
+ "Body": file_content,
+ **extra_args,
+ }
+ s3.put_object(**put_kwargs)
try:
- do_put_object()
+ do_upload()
except (ClientError, S3UploadFailedError) as e:
error_code = None
if isinstance(e, ClientError):
@@ -386,13 +392,13 @@ class ReplicationManager:
bucket_ready = True
else:
logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}")
- raise e
-
+ raise e
+
if bucket_ready:
- do_put_object()
+ do_upload()
else:
raise e
-
+
logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})")
self._update_last_sync(bucket_name, object_key)
diff --git a/app/s3_api.py b/app/s3_api.py
index d0d32d9..78b9afb 100644
--- a/app/s3_api.py
+++ b/app/s3_api.py
@@ -1,13 +1,15 @@
"""Flask blueprint exposing a subset of the S3 REST API."""
from __future__ import annotations
+import base64
import hashlib
import hmac
+import logging
import mimetypes
import re
import uuid
from datetime import datetime, timedelta, timezone
-from typing import Any, Dict
+from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode, urlparse, unquote
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
@@ -20,6 +22,8 @@ from .iam import IamError, Principal
from .replication import ReplicationManager
from .storage import ObjectStorage, StorageError, QuotaExceededError
+logger = logging.getLogger(__name__)
+
s3_api_bp = Blueprint("s3_api", __name__)
def _storage() -> ObjectStorage:
@@ -118,6 +122,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
if header_val is None:
header_val = ""
+ if header.lower() == 'expect' and header_val == "":
+ header_val = "100-continue"
+
header_val = " ".join(header_val.split())
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
canonical_headers = "".join(canonical_headers_parts)
@@ -128,15 +135,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
- # Debug logging for signature issues
- import logging
- logger = logging.getLogger(__name__)
- logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}")
- logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}")
- logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}")
- logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}")
- logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}")
-
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
if not amz_date:
raise IamError("Missing Date header")
@@ -167,24 +165,18 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(calculated_signature, signature):
- # Debug logging for signature mismatch
- import logging
- logger = logging.getLogger(__name__)
- logger.error(f"Signature mismatch for {req.path}")
- logger.error(f" Content-Type: {req.headers.get('Content-Type')}")
- logger.error(f" Content-Length: {req.headers.get('Content-Length')}")
- logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}")
- logger.error(f" Canonical URI: {canonical_uri}")
- logger.error(f" Signed headers: {signed_headers_str}")
- # Log each signed header's value
- for h in signed_headers_list:
- logger.error(f" Header '{h}': {repr(req.headers.get(h))}")
- logger.error(f" Expected sig: {signature[:16]}...")
- logger.error(f" Calculated sig: {calculated_signature[:16]}...")
- # Log first part of canonical request to compare
- logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...")
- # Log the full canonical request for debugging
- logger.error(f" Canonical request:\n{canonical_request[:500]}...")
+ # Only log detailed signature debug info if DEBUG_SIGV4 is enabled
+ if current_app.config.get("DEBUG_SIGV4"):
+ logger.warning(
+ "SigV4 signature mismatch",
+ extra={
+ "path": req.path,
+ "method": method,
+ "signed_headers": signed_headers_str,
+ "content_type": req.headers.get("Content-Type"),
+ "content_length": req.headers.get("Content-Length"),
+ }
+ )
raise IamError("SignatureDoesNotMatch")
return _iam().get_principal(access_key)
@@ -236,6 +228,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
canonical_headers_parts = []
for header in signed_headers_list:
val = req.headers.get(header, "").strip()
+ if header.lower() == 'expect' and val == "":
+ val = "100-continue"
val = " ".join(val.split())
canonical_headers_parts.append(f"{header}:{val}\n")
canonical_headers = "".join(canonical_headers_parts)
@@ -569,6 +563,28 @@ def _strip_ns(tag: str | None) -> str:
return tag.split("}")[-1]
+def _find_element(parent: Element, name: str) -> Optional[Element]:
+ """Find a child element by name, trying both namespaced and non-namespaced variants.
+
+ This handles XML documents that may or may not include namespace prefixes.
+ """
+ el = parent.find(f"{{*}}{name}")
+ if el is None:
+ el = parent.find(name)
+ return el
+
+
+def _find_element_text(parent: Element, name: str, default: str = "") -> str:
+ """Find a child element and return its text content.
+
+ Returns the default value if element not found or has no text.
+ """
+ el = _find_element(parent, name)
+ if el is None or el.text is None:
+ return default
+ return el.text.strip()
+
+
def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
try:
root = fromstring(payload)
@@ -585,17 +601,11 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
for tag_el in list(tagset):
if _strip_ns(tag_el.tag) != "Tag":
continue
- key_el = tag_el.find("{*}Key")
- if key_el is None:
- key_el = tag_el.find("Key")
- value_el = tag_el.find("{*}Value")
- if value_el is None:
- value_el = tag_el.find("Value")
- key = (key_el.text or "").strip() if key_el is not None else ""
+ key = _find_element_text(tag_el, "Key")
if not key:
continue
- value = value_el.text if value_el is not None else ""
- tags.append({"Key": key, "Value": value or ""})
+ value = _find_element_text(tag_el, "Value")
+ tags.append({"Key": key, "Value": value})
return tags
@@ -1439,7 +1449,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
if request.method == "DELETE":
try:
- storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None)
+ storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name})
@@ -1473,7 +1483,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
return _error_response("InvalidArgument", f"max_objects {exc}", 400)
try:
- storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects)
+ storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects)
except StorageError as exc:
return _error_response("NoSuchBucket", str(exc), 404)
@@ -1665,7 +1675,6 @@ def bucket_handler(bucket_name: str) -> Response:
effective_start = ""
if list_type == "2":
if continuation_token:
- import base64
try:
effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8")
except Exception:
@@ -1722,7 +1731,6 @@ def bucket_handler(bucket_name: str) -> Response:
next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1]
if list_type == "2" and next_marker:
- import base64
next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8")
if list_type == "2":
diff --git a/app/storage.py b/app/storage.py
index c710847..13b751f 100644
--- a/app/storage.py
+++ b/app/storage.py
@@ -7,9 +7,11 @@ import os
import re
import shutil
import stat
+import threading
import time
import unicodedata
import uuid
+from collections import OrderedDict
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
@@ -129,12 +131,17 @@ class ObjectStorage:
MULTIPART_MANIFEST = "manifest.json"
BUCKET_CONFIG_FILE = ".bucket.json"
KEY_INDEX_CACHE_TTL = 30
+ OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache
def __init__(self, root: Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
self._ensure_system_roots()
- self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
+ # LRU cache for object metadata with thread-safe access
+ self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
+ self._cache_lock = threading.Lock()
+ # Cache version counter for detecting stale reads
+ self._cache_version: Dict[str, int] = {}
def list_buckets(self) -> List[BucketMeta]:
buckets: List[BucketMeta] = []
@@ -729,8 +736,6 @@ class ObjectStorage:
bucket_id = bucket_path.name
safe_key = self._sanitize_object_key(object_key)
version_dir = self._version_dir(bucket_id, safe_key)
- if not version_dir.exists():
- version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists():
version_dir = self._legacy_version_dir(bucket_id, safe_key)
if not version_dir.exists():
@@ -879,41 +884,73 @@ class ObjectStorage:
part_number: int,
stream: BinaryIO,
) -> str:
+ """Upload a part for a multipart upload.
+
+ Uses file locking to safely update the manifest and handle concurrent uploads.
+ """
if part_number < 1:
raise StorageError("part_number must be >= 1")
bucket_path = self._bucket_path(bucket_name)
-
+
upload_root = self._multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id)
if not upload_root.exists():
raise StorageError("Multipart upload not found")
-
+
+ # Write part to temporary file first, then rename atomically
checksum = hashlib.md5()
part_filename = f"part-{part_number:05d}.part"
part_path = upload_root / part_filename
- with part_path.open("wb") as target:
- shutil.copyfileobj(_HashingReader(stream, checksum), target)
+ temp_path = upload_root / f".{part_filename}.tmp"
+
+ try:
+ with temp_path.open("wb") as target:
+ shutil.copyfileobj(_HashingReader(stream, checksum), target)
+
+ # Atomic rename (or replace on Windows)
+ temp_path.replace(part_path)
+ except OSError:
+ # Clean up temp file on failure
+ try:
+ temp_path.unlink(missing_ok=True)
+ except OSError:
+ pass
+ raise
+
record = {
"etag": checksum.hexdigest(),
"size": part_path.stat().st_size,
"filename": part_filename,
}
-
+
manifest_path = upload_root / self.MULTIPART_MANIFEST
lock_path = upload_root / ".manifest.lock"
-
- with lock_path.open("w") as lock_file:
- with _file_lock(lock_file):
- try:
- manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
- except (OSError, json.JSONDecodeError) as exc:
- raise StorageError("Multipart manifest unreadable") from exc
-
- parts = manifest.setdefault("parts", {})
- parts[str(part_number)] = record
- manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
-
+
+ # Retry loop for handling transient lock/read failures
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ with lock_path.open("w") as lock_file:
+ with _file_lock(lock_file):
+ try:
+ manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError) as exc:
+ if attempt < max_retries - 1:
+ time.sleep(0.1 * (attempt + 1))
+ continue
+ raise StorageError("Multipart manifest unreadable") from exc
+
+ parts = manifest.setdefault("parts", {})
+ parts[str(part_number)] = record
+ manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
+ break
+ except OSError as exc:
+ if attempt < max_retries - 1:
+ time.sleep(0.1 * (attempt + 1))
+ continue
+ raise StorageError(f"Failed to update multipart manifest: {exc}") from exc
+
return record["etag"]
def complete_multipart_upload(
@@ -1264,22 +1301,52 @@ class ObjectStorage:
return objects
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
- """Get cached object metadata for a bucket, refreshing if stale."""
+ """Get cached object metadata for a bucket, refreshing if stale.
+
+ Uses LRU eviction to prevent unbounded cache growth.
+ Thread-safe with version tracking to detect concurrent invalidations.
+ """
now = time.time()
- cached = self._object_cache.get(bucket_id)
-
- if cached:
- objects, timestamp = cached
- if now - timestamp < self.KEY_INDEX_CACHE_TTL:
- return objects
-
+
+ with self._cache_lock:
+ cached = self._object_cache.get(bucket_id)
+ cache_version = self._cache_version.get(bucket_id, 0)
+
+ if cached:
+ objects, timestamp = cached
+ if now - timestamp < self.KEY_INDEX_CACHE_TTL:
+ # Move to end (most recently used)
+ self._object_cache.move_to_end(bucket_id)
+ return objects
+
+ # Build cache outside lock to avoid holding lock during I/O
objects = self._build_object_cache(bucket_path)
- self._object_cache[bucket_id] = (objects, now)
+
+ with self._cache_lock:
+ # Check if cache was invalidated while we were building
+ current_version = self._cache_version.get(bucket_id, 0)
+ if current_version != cache_version:
+ # Cache was invalidated, rebuild
+ objects = self._build_object_cache(bucket_path)
+
+ # Evict oldest entries if cache is full
+ while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
+ self._object_cache.popitem(last=False)
+
+ self._object_cache[bucket_id] = (objects, time.time())
+ self._object_cache.move_to_end(bucket_id)
+
return objects
def _invalidate_object_cache(self, bucket_id: str) -> None:
- """Invalidate the object cache and etag index for a bucket."""
- self._object_cache.pop(bucket_id, None)
+ """Invalidate the object cache and etag index for a bucket.
+
+ Increments version counter to signal stale reads.
+ """
+ with self._cache_lock:
+ self._object_cache.pop(bucket_id, None)
+ self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
+
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
try:
etag_index_path.unlink(missing_ok=True)
diff --git a/app/version.py b/app/version.py
index ea75040..445cb63 100644
--- a/app/version.py
+++ b/app/version.py
@@ -1,7 +1,7 @@
"""Central location for the application version string."""
from __future__ import annotations
-APP_VERSION = "0.1.8"
+APP_VERSION = "0.1.9"
def get_version() -> str:
From e89bbb62dc08136223f25dc10a3eec9c22c43b4b Mon Sep 17 00:00:00 2001
From: kqjy
Date: Mon, 29 Dec 2025 14:05:17 +0800
Subject: [PATCH 2/3] Fix pausing replication and resuming replication does not
continue the replication for the remaining pending objects; Improve
Documentation
---
app/ui.py | 33 ++++++--
templates/bucket_detail.html | 50 +++++++----
templates/docs.html | 158 +++++++++++++++++++++++++++++++++--
tests/test_ui_pagination.py | 13 ++-
4 files changed, 219 insertions(+), 35 deletions(-)
diff --git a/app/ui.py b/app/ui.py
index 4baf5c3..3cc6c23 100644
--- a/app/ui.py
+++ b/app/ui.py
@@ -415,7 +415,7 @@ def list_bucket_objects(bucket_name: str):
except IamError as exc:
return jsonify({"error": str(exc)}), 403
- max_keys = min(int(request.args.get("max_keys", 1000)), 10000)
+ max_keys = min(int(request.args.get("max_keys", 1000)), 100000)
continuation_token = request.args.get("continuation_token") or None
prefix = request.args.get("prefix") or None
@@ -434,6 +434,14 @@ def list_bucket_objects(bucket_name: str):
except StorageError:
versioning_enabled = False
+ # Pre-compute URL templates once (not per-object) for performance
+ # Frontend will construct actual URLs by replacing KEY_PLACEHOLDER
+ preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
+ delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
+ presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
+ versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
+ restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER")
+
objects_data = []
for obj in result.objects:
objects_data.append({
@@ -442,13 +450,6 @@ def list_bucket_objects(bucket_name: str):
"last_modified": obj.last_modified.isoformat(),
"last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"),
"etag": obj.etag,
- "metadata": obj.metadata or {},
- "preview_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key),
- "download_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key) + "?download=1",
- "presign_endpoint": url_for("ui.object_presign", bucket_name=bucket_name, object_key=obj.key),
- "delete_endpoint": url_for("ui.delete_object", bucket_name=bucket_name, object_key=obj.key),
- "versions_endpoint": url_for("ui.object_versions", bucket_name=bucket_name, object_key=obj.key),
- "restore_template": url_for("ui.restore_object_version", bucket_name=bucket_name, object_key=obj.key, version_id="VERSION_ID_PLACEHOLDER"),
})
return jsonify({
@@ -457,6 +458,14 @@ def list_bucket_objects(bucket_name: str):
"next_continuation_token": result.next_continuation_token,
"total_count": result.total_count,
"versioning_enabled": versioning_enabled,
+ "url_templates": {
+ "preview": preview_template,
+ "download": preview_template + "?download=1",
+ "presign": presign_template,
+ "delete": delete_template,
+ "versions": versions_template,
+ "restore": restore_template,
+ },
})
@@ -1458,11 +1467,17 @@ def update_bucket_replication(bucket_name: str):
else:
flash("No replication configuration to pause", "warning")
elif action == "resume":
+ from .replication import REPLICATION_MODE_ALL
rule = _replication().get_rule(bucket_name)
if rule:
rule.enabled = True
_replication().set_rule(rule)
- flash("Replication resumed", "success")
+ # When resuming, sync any pending objects that accumulated while paused
+ if rule.mode == REPLICATION_MODE_ALL:
+ _replication().replicate_existing_objects(bucket_name)
+ flash("Replication resumed. Syncing pending objects in background.", "success")
+ else:
+ flash("Replication resumed", "success")
else:
flash("No replication configuration to resume", "warning")
elif action == "create":
diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html
index daa8f45..6db7a77 100644
--- a/templates/bucket_detail.html
+++ b/templates/bucket_detail.html
@@ -173,14 +173,16 @@
Batch
-
+
1K
5K
10K
25K
50K
+ 75K
+ 100K
- objects
+ per batch
@@ -1144,13 +1146,18 @@
{% elif replication_rule and not replication_rule.enabled %}
-
-
+
+
-
Replication Paused —
- Replication is configured but currently paused. New uploads will not be replicated until resumed.
+
Replication Paused
+
Replication is configured but currently paused. New uploads will not be replicated until resumed.
+ {% if replication_rule.mode == 'all' %}
+
Tip: When you resume, any objects uploaded while paused will be automatically synced to the target.
+ {% else %}
+
Note: Objects uploaded while paused will not be synced (mode: new_only). Consider switching to "All Objects" mode if you need to sync missed uploads.
+ {% endif %}
@@ -1882,7 +1889,14 @@
let pageSize = 5000; // Load large batches for virtual scrolling
let currentPrefix = ''; // Current folder prefix for navigation
let allObjects = []; // All loaded object metadata (lightweight)
-
+ let urlTemplates = null; // URL templates from API for constructing object URLs
+
+ // Helper to build URL from template by replacing KEY_PLACEHOLDER with encoded key
+ const buildUrlFromTemplate = (template, key) => {
+ if (!template) return '';
+ return template.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/'));
+ };
+
// Virtual scrolling state
const ROW_HEIGHT = 53; // Height of each table row in pixels
const BUFFER_ROWS = 10; // Extra rows to render above/below viewport
@@ -2223,22 +2237,26 @@
objectsLoadingRow.remove();
}
- // Store lightweight object metadata (no DOM elements!)
+ if (data.url_templates && !urlTemplates) {
+ urlTemplates = data.url_templates;
+ }
+
data.objects.forEach(obj => {
loadedObjectCount++;
+ const key = obj.key;
allObjects.push({
- key: obj.key,
+ key: key,
size: obj.size,
lastModified: obj.last_modified,
lastModifiedDisplay: obj.last_modified_display,
etag: obj.etag,
- previewUrl: obj.preview_url,
- downloadUrl: obj.download_url,
- presignEndpoint: obj.presign_endpoint,
- deleteEndpoint: obj.delete_endpoint,
- metadata: JSON.stringify(obj.metadata || {}),
- versionsEndpoint: obj.versions_endpoint,
- restoreTemplate: obj.restore_template
+ previewUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.preview, key) : '',
+ downloadUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.download, key) : '',
+ presignEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.presign, key) : '',
+ deleteEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.delete, key) : '',
+ metadata: '{}',
+ versionsEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.versions, key) : '',
+ restoreTemplate: urlTemplates ? urlTemplates.restore.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/')) : ''
});
});
diff --git a/templates/docs.html b/templates/docs.html
index fd09517..ddd9fa3 100644
--- a/templates/docs.html
+++ b/templates/docs.html
@@ -407,10 +407,62 @@ curl -X POST {{ api_base }}/presign/demo/notes.txt \
07
API Examples
- Common operations using boto3.
-
- Multipart Upload
-import boto3
+ Common operations using popular SDKs and tools.
+
+ Python (boto3)
+import boto3
+
+s3 = boto3.client(
+ 's3',
+ endpoint_url='{{ api_base }}',
+ aws_access_key_id='<access_key>',
+ aws_secret_access_key='<secret_key>'
+)
+
+# List buckets
+buckets = s3.list_buckets()['Buckets']
+
+# Create bucket
+s3.create_bucket(Bucket='mybucket')
+
+# Upload file
+s3.upload_file('local.txt', 'mybucket', 'remote.txt')
+
+# Download file
+s3.download_file('mybucket', 'remote.txt', 'downloaded.txt')
+
+# Generate presigned URL (valid 1 hour)
+url = s3.generate_presigned_url(
+ 'get_object',
+ Params={'Bucket': 'mybucket', 'Key': 'remote.txt'},
+ ExpiresIn=3600
+)
+
+ JavaScript (AWS SDK v3)
+import { S3Client, ListBucketsCommand, PutObjectCommand } from '@aws-sdk/client-s3';
+
+const s3 = new S3Client({
+ endpoint: '{{ api_base }}',
+ region: 'us-east-1',
+ credentials: {
+ accessKeyId: '<access_key>',
+ secretAccessKey: '<secret_key>'
+ },
+ forcePathStyle: true // Required for S3-compatible services
+});
+
+// List buckets
+const { Buckets } = await s3.send(new ListBucketsCommand({}));
+
+// Upload object
+await s3.send(new PutObjectCommand({
+ Bucket: 'mybucket',
+ Key: 'hello.txt',
+ Body: 'Hello, World!'
+}));
+
+ Multipart Upload (Python)
+import boto3
s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
@@ -418,9 +470,9 @@ s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
upload_id = response['UploadId']
-# Upload parts
+# Upload parts (minimum 5MB each, except last part)
parts = []
-chunks = [b'chunk1', b'chunk2'] # Example data chunks
+chunks = [b'chunk1...', b'chunk2...']
for part_number, chunk in enumerate(chunks, start=1):
response = s3.upload_part(
Bucket='mybucket',
@@ -438,6 +490,19 @@ s3.complete_multipart_upload(
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
+
+ Presigned URLs for Sharing
+# Generate a download link valid for 15 minutes
+curl -X POST "{{ api_base }}/presign/mybucket/photo.jpg" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"method": "GET", "expires_in": 900}'
+
+# Generate an upload link (PUT) valid for 1 hour
+curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"method": "PUT", "expires_in": 3600}'
@@ -487,6 +552,86 @@ s3.complete_multipart_upload(
+
+
+
+ 09
+
Object Versioning
+
+
Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.
+
+
Enabling Versioning
+
+ Navigate to your bucket's Properties tab.
+ Find the Versioning card and click Enable .
+ All subsequent uploads will create new versions instead of overwriting.
+
+
+
Version Operations
+
+
+
+
+ Operation
+ Description
+
+
+
+
+ View Versions
+ Click the version icon on any object to see all historical versions with timestamps and sizes.
+
+
+ Restore Version
+ Click Restore on any version to make it the current version (creates a copy).
+
+
+ Delete Current
+ Deleting an object archives it. Previous versions remain accessible.
+
+
+ Purge All
+ Permanently delete an object and all its versions. This cannot be undone.
+
+
+
+
+
+
Archived Objects
+
When you delete a versioned object, it becomes "archived" - the current version is removed but historical versions remain. The Archived tab shows these objects so you can restore them.
+
+
API Usage
+
# Enable versioning
+curl -X PUT "{{ api_base }}/<bucket>?versioning" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"Status": "Enabled"}'
+
+# Get versioning status
+curl "{{ api_base }}/<bucket>?versioning" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# List object versions
+curl "{{ api_base }}/<bucket>?versions" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Get specific version
+curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+
+
+
+
+
+
+
+ Storage Impact: Each version consumes storage. Enable quotas to limit total bucket size including all versions.
+
+
+
+
+
@@ -709,6 +854,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
REST endpoints
API Examples
Site Replication
+
Object Versioning
Bucket Quotas
Encryption
Troubleshooting
diff --git a/tests/test_ui_pagination.py b/tests/test_ui_pagination.py
index 5e42d60..683f527 100644
--- a/tests/test_ui_pagination.py
+++ b/tests/test_ui_pagination.py
@@ -150,16 +150,21 @@ class TestPaginatedObjectListing:
assert len(data["objects"]) == 1
obj = data["objects"][0]
-
+
# Check all expected fields
assert obj["key"] == "test.txt"
assert obj["size"] == 12 # len("test content")
assert "last_modified" in obj
assert "last_modified_display" in obj
assert "etag" in obj
- assert "preview_url" in obj
- assert "download_url" in obj
- assert "delete_endpoint" in obj
+
+ # URLs are now returned as templates (not per-object) for performance
+ assert "url_templates" in data
+ templates = data["url_templates"]
+ assert "preview" in templates
+ assert "download" in templates
+ assert "delete" in templates
+ assert "KEY_PLACEHOLDER" in templates["preview"]
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
"""Bucket detail page should load even with many objects."""
From 1cacb80dd6b3774c35b1a00ca7193b7367da44cb Mon Sep 17 00:00:00 2001
From: kqjy
Date: Mon, 29 Dec 2025 14:46:06 +0800
Subject: [PATCH 3/3] Fix replication pause, multipart cache, and select all
with virtual scroll
---
app/replication.py | 6 +++++
app/storage.py | 5 ++--
templates/bucket_detail.html | 51 ++++++++++++++++++------------------
3 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/app/replication.py b/app/replication.py
index 884bbc3..4703bf9 100644
--- a/app/replication.py
+++ b/app/replication.py
@@ -306,6 +306,12 @@ class ReplicationManager:
if self._shutdown:
return
+ # Re-check if rule is still enabled (may have been paused after task was submitted)
+ current_rule = self.get_rule(bucket_name)
+ if not current_rule or not current_rule.enabled:
+ logger.debug(f"Replication skipped for {bucket_name}/{object_key}: rule disabled or removed")
+ return
+
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
return
diff --git a/app/storage.py b/app/storage.py
index 13b751f..702d5bb 100644
--- a/app/storage.py
+++ b/app/storage.py
@@ -1052,9 +1052,10 @@ class ObjectStorage:
pass
shutil.rmtree(upload_root, ignore_errors=True)
-
+
self._invalidate_bucket_stats_cache(bucket_id)
-
+ self._invalidate_object_cache(bucket_id)
+
stat = destination.stat()
return ObjectMeta(
key=safe_key.as_posix(),
diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html
index 6db7a77..f9832e1 100644
--- a/templates/bucket_detail.html
+++ b/templates/bucket_detail.html
@@ -3801,41 +3801,40 @@
selectAllCheckbox?.addEventListener('change', (event) => {
const shouldSelect = Boolean(event.target?.checked);
-
- if (hasFolders()) {
- const objectsInCurrentView = allObjects.filter(obj => obj.key.startsWith(currentPrefix));
- objectsInCurrentView.forEach(obj => {
- const checkbox = obj.element.querySelector('[data-object-select]');
- if (checkbox && !checkbox.disabled) {
- checkbox.checked = shouldSelect;
- }
- toggleRowSelection(obj.element, shouldSelect);
- });
+ // Get all file items in the current view (works with virtual scrolling)
+ const filesInView = visibleItems.filter(item => item.type === 'file');
- document.querySelectorAll('[data-folder-select]').forEach(cb => {
- cb.checked = shouldSelect;
- });
- } else {
+ // Update selectedRows directly using object keys (not DOM elements)
+ filesInView.forEach(item => {
+ if (shouldSelect) {
+ selectedRows.set(item.data.key, item.data);
+ } else {
+ selectedRows.delete(item.data.key);
+ }
+ });
- document.querySelectorAll('[data-object-row]').forEach((row) => {
- if (row.style.display === 'none') return;
- const checkbox = row.querySelector('[data-object-select]');
- if (!checkbox || checkbox.disabled) {
- return;
- }
+ // Update folder checkboxes in DOM (folders are always rendered)
+ document.querySelectorAll('[data-folder-select]').forEach(cb => {
+ cb.checked = shouldSelect;
+ });
+
+ // Update any currently rendered object checkboxes
+ document.querySelectorAll('[data-object-row]').forEach((row) => {
+ const checkbox = row.querySelector('[data-object-select]');
+ if (checkbox) {
checkbox.checked = shouldSelect;
- toggleRowSelection(row, shouldSelect);
- });
- }
+ }
+ });
+
+ updateBulkDeleteState();
setTimeout(updateBulkDownloadState, 0);
});
bulkDownloadButton?.addEventListener('click', async () => {
if (!bulkDownloadEndpoint) return;
- const selected = Array.from(document.querySelectorAll('[data-object-select]:checked')).map(
- (cb) => cb.closest('tr').dataset.key
- );
+ // Use selectedRows which tracks all selected objects (not just rendered ones)
+ const selected = Array.from(selectedRows.keys());
if (selected.length === 0) return;
bulkDownloadButton.disabled = true;