Compare commits
4 Commits
a2745ff2ee
...
bb366cb4cd
| Author | SHA1 | Date | |
|---|---|---|---|
| bb366cb4cd | |||
| 1cacb80dd6 | |||
| e89bbb62dc | |||
| c8eb3de629 |
@@ -9,7 +9,7 @@ import time
|
|||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
from botocore.config import Config
|
from botocore.config import Config
|
||||||
@@ -24,11 +24,42 @@ logger = logging.getLogger(__name__)
|
|||||||
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
|
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
|
||||||
REPLICATION_CONNECT_TIMEOUT = 5
|
REPLICATION_CONNECT_TIMEOUT = 5
|
||||||
REPLICATION_READ_TIMEOUT = 30
|
REPLICATION_READ_TIMEOUT = 30
|
||||||
|
STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files
|
||||||
|
|
||||||
REPLICATION_MODE_NEW_ONLY = "new_only"
|
REPLICATION_MODE_NEW_ONLY = "new_only"
|
||||||
REPLICATION_MODE_ALL = "all"
|
REPLICATION_MODE_ALL = "all"
|
||||||
|
|
||||||
|
|
||||||
|
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
|
||||||
|
"""Create a boto3 S3 client for the given connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
connection: Remote S3 connection configuration
|
||||||
|
health_check: If True, use minimal retries for quick health checks
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured boto3 S3 client
|
||||||
|
"""
|
||||||
|
config = Config(
|
||||||
|
user_agent_extra=REPLICATION_USER_AGENT,
|
||||||
|
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
|
||||||
|
read_timeout=REPLICATION_READ_TIMEOUT,
|
||||||
|
retries={'max_attempts': 1 if health_check else 2},
|
||||||
|
signature_version='s3v4',
|
||||||
|
s3={'addressing_style': 'path'},
|
||||||
|
request_checksum_calculation='when_required',
|
||||||
|
response_checksum_validation='when_required',
|
||||||
|
)
|
||||||
|
return boto3.client(
|
||||||
|
"s3",
|
||||||
|
endpoint_url=connection.endpoint_url,
|
||||||
|
aws_access_key_id=connection.access_key,
|
||||||
|
aws_secret_access_key=connection.secret_key,
|
||||||
|
region_name=connection.region or 'us-east-1',
|
||||||
|
config=config,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ReplicationStats:
|
class ReplicationStats:
|
||||||
"""Statistics for replication operations - computed dynamically."""
|
"""Statistics for replication operations - computed dynamically."""
|
||||||
@@ -102,8 +133,19 @@ class ReplicationManager:
|
|||||||
self._rules: Dict[str, ReplicationRule] = {}
|
self._rules: Dict[str, ReplicationRule] = {}
|
||||||
self._stats_lock = threading.Lock()
|
self._stats_lock = threading.Lock()
|
||||||
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
|
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
|
||||||
|
self._shutdown = False
|
||||||
self.reload_rules()
|
self.reload_rules()
|
||||||
|
|
||||||
|
def shutdown(self, wait: bool = True) -> None:
|
||||||
|
"""Shutdown the replication executor gracefully.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
wait: If True, wait for pending tasks to complete
|
||||||
|
"""
|
||||||
|
self._shutdown = True
|
||||||
|
self._executor.shutdown(wait=wait)
|
||||||
|
logger.info("Replication manager shut down")
|
||||||
|
|
||||||
def reload_rules(self) -> None:
|
def reload_rules(self) -> None:
|
||||||
if not self.rules_path.exists():
|
if not self.rules_path.exists():
|
||||||
self._rules = {}
|
self._rules = {}
|
||||||
@@ -129,20 +171,7 @@ class ReplicationManager:
|
|||||||
Uses short timeouts to prevent blocking.
|
Uses short timeouts to prevent blocking.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
config = Config(
|
s3 = _create_s3_client(connection, health_check=True)
|
||||||
user_agent_extra=REPLICATION_USER_AGENT,
|
|
||||||
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
|
|
||||||
read_timeout=REPLICATION_READ_TIMEOUT,
|
|
||||||
retries={'max_attempts': 1}
|
|
||||||
)
|
|
||||||
s3 = boto3.client(
|
|
||||||
"s3",
|
|
||||||
endpoint_url=connection.endpoint_url,
|
|
||||||
aws_access_key_id=connection.access_key,
|
|
||||||
aws_secret_access_key=connection.secret_key,
|
|
||||||
region_name=connection.region,
|
|
||||||
config=config,
|
|
||||||
)
|
|
||||||
s3.list_buckets()
|
s3.list_buckets()
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -185,13 +214,7 @@ class ReplicationManager:
|
|||||||
source_objects = self.storage.list_objects_all(bucket_name)
|
source_objects = self.storage.list_objects_all(bucket_name)
|
||||||
source_keys = {obj.key: obj.size for obj in source_objects}
|
source_keys = {obj.key: obj.size for obj in source_objects}
|
||||||
|
|
||||||
s3 = boto3.client(
|
s3 = _create_s3_client(connection)
|
||||||
"s3",
|
|
||||||
endpoint_url=connection.endpoint_url,
|
|
||||||
aws_access_key_id=connection.access_key,
|
|
||||||
aws_secret_access_key=connection.secret_key,
|
|
||||||
region_name=connection.region,
|
|
||||||
)
|
|
||||||
|
|
||||||
dest_keys = set()
|
dest_keys = set()
|
||||||
bytes_synced = 0
|
bytes_synced = 0
|
||||||
@@ -257,13 +280,7 @@ class ReplicationManager:
|
|||||||
raise ValueError(f"Connection {connection_id} not found")
|
raise ValueError(f"Connection {connection_id} not found")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
s3 = boto3.client(
|
s3 = _create_s3_client(connection)
|
||||||
"s3",
|
|
||||||
endpoint_url=connection.endpoint_url,
|
|
||||||
aws_access_key_id=connection.access_key,
|
|
||||||
aws_secret_access_key=connection.secret_key,
|
|
||||||
region_name=connection.region,
|
|
||||||
)
|
|
||||||
s3.create_bucket(Bucket=bucket_name)
|
s3.create_bucket(Bucket=bucket_name)
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
|
logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
|
||||||
@@ -286,6 +303,15 @@ class ReplicationManager:
|
|||||||
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
|
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
|
||||||
|
|
||||||
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
|
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
|
||||||
|
if self._shutdown:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Re-check if rule is still enabled (may have been paused after task was submitted)
|
||||||
|
current_rule = self.get_rule(bucket_name)
|
||||||
|
if not current_rule or not current_rule.enabled:
|
||||||
|
logger.debug(f"Replication skipped for {bucket_name}/{object_key}: rule disabled or removed")
|
||||||
|
return
|
||||||
|
|
||||||
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
|
if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"):
|
||||||
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
|
logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}")
|
||||||
return
|
return
|
||||||
@@ -297,30 +323,8 @@ class ReplicationManager:
|
|||||||
logger.error(f"Object key validation failed in replication: {e}")
|
logger.error(f"Object key validation failed in replication: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
file_size = 0
|
|
||||||
try:
|
try:
|
||||||
config = Config(
|
s3 = _create_s3_client(conn)
|
||||||
user_agent_extra=REPLICATION_USER_AGENT,
|
|
||||||
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
|
|
||||||
read_timeout=REPLICATION_READ_TIMEOUT,
|
|
||||||
retries={'max_attempts': 2},
|
|
||||||
signature_version='s3v4',
|
|
||||||
s3={
|
|
||||||
'addressing_style': 'path',
|
|
||||||
},
|
|
||||||
# Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors
|
|
||||||
# with S3-compatible servers that don't support CRC32 checksum headers
|
|
||||||
request_checksum_calculation='when_required',
|
|
||||||
response_checksum_validation='when_required',
|
|
||||||
)
|
|
||||||
s3 = boto3.client(
|
|
||||||
"s3",
|
|
||||||
endpoint_url=conn.endpoint_url,
|
|
||||||
aws_access_key_id=conn.access_key,
|
|
||||||
aws_secret_access_key=conn.secret_key,
|
|
||||||
region_name=conn.region or 'us-east-1',
|
|
||||||
config=config,
|
|
||||||
)
|
|
||||||
|
|
||||||
if action == "delete":
|
if action == "delete":
|
||||||
try:
|
try:
|
||||||
@@ -337,34 +341,42 @@ class ReplicationManager:
|
|||||||
logger.error(f"Source object not found: {bucket_name}/{object_key}")
|
logger.error(f"Source object not found: {bucket_name}/{object_key}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Don't replicate metadata - destination server will generate its own
|
|
||||||
# __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values.
|
|
||||||
|
|
||||||
content_type, _ = mimetypes.guess_type(path)
|
content_type, _ = mimetypes.guess_type(path)
|
||||||
file_size = path.stat().st_size
|
file_size = path.stat().st_size
|
||||||
|
|
||||||
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
|
logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}")
|
||||||
|
|
||||||
def do_put_object() -> None:
|
def do_upload() -> None:
|
||||||
"""Helper to upload object.
|
"""Upload object using appropriate method based on file size.
|
||||||
|
|
||||||
Reads the file content into memory first to avoid signature calculation
|
For small files (< 10 MiB): Read into memory for simpler handling
|
||||||
issues with certain binary file types (like GIFs) when streaming.
|
For large files: Use streaming upload to avoid memory issues
|
||||||
Do NOT set ContentLength explicitly - boto3 calculates it from the bytes
|
|
||||||
and setting it manually can cause SignatureDoesNotMatch errors.
|
|
||||||
"""
|
"""
|
||||||
file_content = path.read_bytes()
|
extra_args = {}
|
||||||
put_kwargs = {
|
|
||||||
"Bucket": rule.target_bucket,
|
|
||||||
"Key": object_key,
|
|
||||||
"Body": file_content,
|
|
||||||
}
|
|
||||||
if content_type:
|
if content_type:
|
||||||
put_kwargs["ContentType"] = content_type
|
extra_args["ContentType"] = content_type
|
||||||
s3.put_object(**put_kwargs)
|
|
||||||
|
if file_size >= STREAMING_THRESHOLD_BYTES:
|
||||||
|
# Use multipart upload for large files
|
||||||
|
s3.upload_file(
|
||||||
|
str(path),
|
||||||
|
rule.target_bucket,
|
||||||
|
object_key,
|
||||||
|
ExtraArgs=extra_args if extra_args else None,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Read small files into memory
|
||||||
|
file_content = path.read_bytes()
|
||||||
|
put_kwargs = {
|
||||||
|
"Bucket": rule.target_bucket,
|
||||||
|
"Key": object_key,
|
||||||
|
"Body": file_content,
|
||||||
|
**extra_args,
|
||||||
|
}
|
||||||
|
s3.put_object(**put_kwargs)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
do_put_object()
|
do_upload()
|
||||||
except (ClientError, S3UploadFailedError) as e:
|
except (ClientError, S3UploadFailedError) as e:
|
||||||
error_code = None
|
error_code = None
|
||||||
if isinstance(e, ClientError):
|
if isinstance(e, ClientError):
|
||||||
@@ -389,7 +401,7 @@ class ReplicationManager:
|
|||||||
raise e
|
raise e
|
||||||
|
|
||||||
if bucket_ready:
|
if bucket_ready:
|
||||||
do_put_object()
|
do_upload()
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
"""Flask blueprint exposing a subset of the S3 REST API."""
|
"""Flask blueprint exposing a subset of the S3 REST API."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
|
import logging
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import re
|
import re
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict, Optional
|
||||||
from urllib.parse import quote, urlencode, urlparse, unquote
|
from urllib.parse import quote, urlencode, urlparse, unquote
|
||||||
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
|
from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError
|
||||||
|
|
||||||
@@ -20,6 +22,8 @@ from .iam import IamError, Principal
|
|||||||
from .replication import ReplicationManager
|
from .replication import ReplicationManager
|
||||||
from .storage import ObjectStorage, StorageError, QuotaExceededError
|
from .storage import ObjectStorage, StorageError, QuotaExceededError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
s3_api_bp = Blueprint("s3_api", __name__)
|
s3_api_bp = Blueprint("s3_api", __name__)
|
||||||
|
|
||||||
def _storage() -> ObjectStorage:
|
def _storage() -> ObjectStorage:
|
||||||
@@ -118,6 +122,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
|||||||
if header_val is None:
|
if header_val is None:
|
||||||
header_val = ""
|
header_val = ""
|
||||||
|
|
||||||
|
if header.lower() == 'expect' and header_val == "":
|
||||||
|
header_val = "100-continue"
|
||||||
|
|
||||||
header_val = " ".join(header_val.split())
|
header_val = " ".join(header_val.split())
|
||||||
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
|
canonical_headers_parts.append(f"{header.lower()}:{header_val}\n")
|
||||||
canonical_headers = "".join(canonical_headers_parts)
|
canonical_headers = "".join(canonical_headers_parts)
|
||||||
@@ -128,15 +135,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
|||||||
|
|
||||||
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
|
canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
|
||||||
|
|
||||||
# Debug logging for signature issues
|
|
||||||
import logging
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}")
|
|
||||||
logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}")
|
|
||||||
logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}")
|
|
||||||
logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}")
|
|
||||||
logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}")
|
|
||||||
|
|
||||||
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
|
amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date")
|
||||||
if not amz_date:
|
if not amz_date:
|
||||||
raise IamError("Missing Date header")
|
raise IamError("Missing Date header")
|
||||||
@@ -167,24 +165,18 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
|||||||
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
|
calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
|
||||||
|
|
||||||
if not hmac.compare_digest(calculated_signature, signature):
|
if not hmac.compare_digest(calculated_signature, signature):
|
||||||
# Debug logging for signature mismatch
|
# Only log detailed signature debug info if DEBUG_SIGV4 is enabled
|
||||||
import logging
|
if current_app.config.get("DEBUG_SIGV4"):
|
||||||
logger = logging.getLogger(__name__)
|
logger.warning(
|
||||||
logger.error(f"Signature mismatch for {req.path}")
|
"SigV4 signature mismatch",
|
||||||
logger.error(f" Content-Type: {req.headers.get('Content-Type')}")
|
extra={
|
||||||
logger.error(f" Content-Length: {req.headers.get('Content-Length')}")
|
"path": req.path,
|
||||||
logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}")
|
"method": method,
|
||||||
logger.error(f" Canonical URI: {canonical_uri}")
|
"signed_headers": signed_headers_str,
|
||||||
logger.error(f" Signed headers: {signed_headers_str}")
|
"content_type": req.headers.get("Content-Type"),
|
||||||
# Log each signed header's value
|
"content_length": req.headers.get("Content-Length"),
|
||||||
for h in signed_headers_list:
|
}
|
||||||
logger.error(f" Header '{h}': {repr(req.headers.get(h))}")
|
)
|
||||||
logger.error(f" Expected sig: {signature[:16]}...")
|
|
||||||
logger.error(f" Calculated sig: {calculated_signature[:16]}...")
|
|
||||||
# Log first part of canonical request to compare
|
|
||||||
logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...")
|
|
||||||
# Log the full canonical request for debugging
|
|
||||||
logger.error(f" Canonical request:\n{canonical_request[:500]}...")
|
|
||||||
raise IamError("SignatureDoesNotMatch")
|
raise IamError("SignatureDoesNotMatch")
|
||||||
|
|
||||||
return _iam().get_principal(access_key)
|
return _iam().get_principal(access_key)
|
||||||
@@ -236,6 +228,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
|
|||||||
canonical_headers_parts = []
|
canonical_headers_parts = []
|
||||||
for header in signed_headers_list:
|
for header in signed_headers_list:
|
||||||
val = req.headers.get(header, "").strip()
|
val = req.headers.get(header, "").strip()
|
||||||
|
if header.lower() == 'expect' and val == "":
|
||||||
|
val = "100-continue"
|
||||||
val = " ".join(val.split())
|
val = " ".join(val.split())
|
||||||
canonical_headers_parts.append(f"{header}:{val}\n")
|
canonical_headers_parts.append(f"{header}:{val}\n")
|
||||||
canonical_headers = "".join(canonical_headers_parts)
|
canonical_headers = "".join(canonical_headers_parts)
|
||||||
@@ -569,6 +563,28 @@ def _strip_ns(tag: str | None) -> str:
|
|||||||
return tag.split("}")[-1]
|
return tag.split("}")[-1]
|
||||||
|
|
||||||
|
|
||||||
|
def _find_element(parent: Element, name: str) -> Optional[Element]:
|
||||||
|
"""Find a child element by name, trying both namespaced and non-namespaced variants.
|
||||||
|
|
||||||
|
This handles XML documents that may or may not include namespace prefixes.
|
||||||
|
"""
|
||||||
|
el = parent.find(f"{{*}}{name}")
|
||||||
|
if el is None:
|
||||||
|
el = parent.find(name)
|
||||||
|
return el
|
||||||
|
|
||||||
|
|
||||||
|
def _find_element_text(parent: Element, name: str, default: str = "") -> str:
|
||||||
|
"""Find a child element and return its text content.
|
||||||
|
|
||||||
|
Returns the default value if element not found or has no text.
|
||||||
|
"""
|
||||||
|
el = _find_element(parent, name)
|
||||||
|
if el is None or el.text is None:
|
||||||
|
return default
|
||||||
|
return el.text.strip()
|
||||||
|
|
||||||
|
|
||||||
def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
|
def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
|
||||||
try:
|
try:
|
||||||
root = fromstring(payload)
|
root = fromstring(payload)
|
||||||
@@ -585,17 +601,11 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]:
|
|||||||
for tag_el in list(tagset):
|
for tag_el in list(tagset):
|
||||||
if _strip_ns(tag_el.tag) != "Tag":
|
if _strip_ns(tag_el.tag) != "Tag":
|
||||||
continue
|
continue
|
||||||
key_el = tag_el.find("{*}Key")
|
key = _find_element_text(tag_el, "Key")
|
||||||
if key_el is None:
|
|
||||||
key_el = tag_el.find("Key")
|
|
||||||
value_el = tag_el.find("{*}Value")
|
|
||||||
if value_el is None:
|
|
||||||
value_el = tag_el.find("Value")
|
|
||||||
key = (key_el.text or "").strip() if key_el is not None else ""
|
|
||||||
if not key:
|
if not key:
|
||||||
continue
|
continue
|
||||||
value = value_el.text if value_el is not None else ""
|
value = _find_element_text(tag_el, "Value")
|
||||||
tags.append({"Key": key, "Value": value or ""})
|
tags.append({"Key": key, "Value": value})
|
||||||
return tags
|
return tags
|
||||||
|
|
||||||
|
|
||||||
@@ -1439,7 +1449,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
|
|||||||
|
|
||||||
if request.method == "DELETE":
|
if request.method == "DELETE":
|
||||||
try:
|
try:
|
||||||
storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None)
|
storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None)
|
||||||
except StorageError as exc:
|
except StorageError as exc:
|
||||||
return _error_response("NoSuchBucket", str(exc), 404)
|
return _error_response("NoSuchBucket", str(exc), 404)
|
||||||
current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name})
|
current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name})
|
||||||
@@ -1473,7 +1483,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
|
|||||||
return _error_response("InvalidArgument", f"max_objects {exc}", 400)
|
return _error_response("InvalidArgument", f"max_objects {exc}", 400)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects)
|
storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects)
|
||||||
except StorageError as exc:
|
except StorageError as exc:
|
||||||
return _error_response("NoSuchBucket", str(exc), 404)
|
return _error_response("NoSuchBucket", str(exc), 404)
|
||||||
|
|
||||||
@@ -1665,7 +1675,6 @@ def bucket_handler(bucket_name: str) -> Response:
|
|||||||
effective_start = ""
|
effective_start = ""
|
||||||
if list_type == "2":
|
if list_type == "2":
|
||||||
if continuation_token:
|
if continuation_token:
|
||||||
import base64
|
|
||||||
try:
|
try:
|
||||||
effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8")
|
effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8")
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -1722,7 +1731,6 @@ def bucket_handler(bucket_name: str) -> Response:
|
|||||||
next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1]
|
next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1]
|
||||||
|
|
||||||
if list_type == "2" and next_marker:
|
if list_type == "2" and next_marker:
|
||||||
import base64
|
|
||||||
next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8")
|
next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8")
|
||||||
|
|
||||||
if list_type == "2":
|
if list_type == "2":
|
||||||
|
|||||||
114
app/storage.py
114
app/storage.py
@@ -7,9 +7,11 @@ import os
|
|||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import stat
|
import stat
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
import unicodedata
|
import unicodedata
|
||||||
import uuid
|
import uuid
|
||||||
|
from collections import OrderedDict
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
@@ -129,12 +131,17 @@ class ObjectStorage:
|
|||||||
MULTIPART_MANIFEST = "manifest.json"
|
MULTIPART_MANIFEST = "manifest.json"
|
||||||
BUCKET_CONFIG_FILE = ".bucket.json"
|
BUCKET_CONFIG_FILE = ".bucket.json"
|
||||||
KEY_INDEX_CACHE_TTL = 30
|
KEY_INDEX_CACHE_TTL = 30
|
||||||
|
OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache
|
||||||
|
|
||||||
def __init__(self, root: Path) -> None:
|
def __init__(self, root: Path) -> None:
|
||||||
self.root = Path(root)
|
self.root = Path(root)
|
||||||
self.root.mkdir(parents=True, exist_ok=True)
|
self.root.mkdir(parents=True, exist_ok=True)
|
||||||
self._ensure_system_roots()
|
self._ensure_system_roots()
|
||||||
self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {}
|
# LRU cache for object metadata with thread-safe access
|
||||||
|
self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict()
|
||||||
|
self._cache_lock = threading.Lock()
|
||||||
|
# Cache version counter for detecting stale reads
|
||||||
|
self._cache_version: Dict[str, int] = {}
|
||||||
|
|
||||||
def list_buckets(self) -> List[BucketMeta]:
|
def list_buckets(self) -> List[BucketMeta]:
|
||||||
buckets: List[BucketMeta] = []
|
buckets: List[BucketMeta] = []
|
||||||
@@ -729,8 +736,6 @@ class ObjectStorage:
|
|||||||
bucket_id = bucket_path.name
|
bucket_id = bucket_path.name
|
||||||
safe_key = self._sanitize_object_key(object_key)
|
safe_key = self._sanitize_object_key(object_key)
|
||||||
version_dir = self._version_dir(bucket_id, safe_key)
|
version_dir = self._version_dir(bucket_id, safe_key)
|
||||||
if not version_dir.exists():
|
|
||||||
version_dir = self._legacy_version_dir(bucket_id, safe_key)
|
|
||||||
if not version_dir.exists():
|
if not version_dir.exists():
|
||||||
version_dir = self._legacy_version_dir(bucket_id, safe_key)
|
version_dir = self._legacy_version_dir(bucket_id, safe_key)
|
||||||
if not version_dir.exists():
|
if not version_dir.exists():
|
||||||
@@ -879,6 +884,10 @@ class ObjectStorage:
|
|||||||
part_number: int,
|
part_number: int,
|
||||||
stream: BinaryIO,
|
stream: BinaryIO,
|
||||||
) -> str:
|
) -> str:
|
||||||
|
"""Upload a part for a multipart upload.
|
||||||
|
|
||||||
|
Uses file locking to safely update the manifest and handle concurrent uploads.
|
||||||
|
"""
|
||||||
if part_number < 1:
|
if part_number < 1:
|
||||||
raise StorageError("part_number must be >= 1")
|
raise StorageError("part_number must be >= 1")
|
||||||
bucket_path = self._bucket_path(bucket_name)
|
bucket_path = self._bucket_path(bucket_name)
|
||||||
@@ -889,11 +898,26 @@ class ObjectStorage:
|
|||||||
if not upload_root.exists():
|
if not upload_root.exists():
|
||||||
raise StorageError("Multipart upload not found")
|
raise StorageError("Multipart upload not found")
|
||||||
|
|
||||||
|
# Write part to temporary file first, then rename atomically
|
||||||
checksum = hashlib.md5()
|
checksum = hashlib.md5()
|
||||||
part_filename = f"part-{part_number:05d}.part"
|
part_filename = f"part-{part_number:05d}.part"
|
||||||
part_path = upload_root / part_filename
|
part_path = upload_root / part_filename
|
||||||
with part_path.open("wb") as target:
|
temp_path = upload_root / f".{part_filename}.tmp"
|
||||||
shutil.copyfileobj(_HashingReader(stream, checksum), target)
|
|
||||||
|
try:
|
||||||
|
with temp_path.open("wb") as target:
|
||||||
|
shutil.copyfileobj(_HashingReader(stream, checksum), target)
|
||||||
|
|
||||||
|
# Atomic rename (or replace on Windows)
|
||||||
|
temp_path.replace(part_path)
|
||||||
|
except OSError:
|
||||||
|
# Clean up temp file on failure
|
||||||
|
try:
|
||||||
|
temp_path.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
|
||||||
record = {
|
record = {
|
||||||
"etag": checksum.hexdigest(),
|
"etag": checksum.hexdigest(),
|
||||||
"size": part_path.stat().st_size,
|
"size": part_path.stat().st_size,
|
||||||
@@ -903,16 +927,29 @@ class ObjectStorage:
|
|||||||
manifest_path = upload_root / self.MULTIPART_MANIFEST
|
manifest_path = upload_root / self.MULTIPART_MANIFEST
|
||||||
lock_path = upload_root / ".manifest.lock"
|
lock_path = upload_root / ".manifest.lock"
|
||||||
|
|
||||||
with lock_path.open("w") as lock_file:
|
# Retry loop for handling transient lock/read failures
|
||||||
with _file_lock(lock_file):
|
max_retries = 3
|
||||||
try:
|
for attempt in range(max_retries):
|
||||||
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
try:
|
||||||
except (OSError, json.JSONDecodeError) as exc:
|
with lock_path.open("w") as lock_file:
|
||||||
raise StorageError("Multipart manifest unreadable") from exc
|
with _file_lock(lock_file):
|
||||||
|
try:
|
||||||
|
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError) as exc:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(0.1 * (attempt + 1))
|
||||||
|
continue
|
||||||
|
raise StorageError("Multipart manifest unreadable") from exc
|
||||||
|
|
||||||
parts = manifest.setdefault("parts", {})
|
parts = manifest.setdefault("parts", {})
|
||||||
parts[str(part_number)] = record
|
parts[str(part_number)] = record
|
||||||
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
|
manifest_path.write_text(json.dumps(manifest), encoding="utf-8")
|
||||||
|
break
|
||||||
|
except OSError as exc:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
time.sleep(0.1 * (attempt + 1))
|
||||||
|
continue
|
||||||
|
raise StorageError(f"Failed to update multipart manifest: {exc}") from exc
|
||||||
|
|
||||||
return record["etag"]
|
return record["etag"]
|
||||||
|
|
||||||
@@ -1017,6 +1054,7 @@ class ObjectStorage:
|
|||||||
shutil.rmtree(upload_root, ignore_errors=True)
|
shutil.rmtree(upload_root, ignore_errors=True)
|
||||||
|
|
||||||
self._invalidate_bucket_stats_cache(bucket_id)
|
self._invalidate_bucket_stats_cache(bucket_id)
|
||||||
|
self._invalidate_object_cache(bucket_id)
|
||||||
|
|
||||||
stat = destination.stat()
|
stat = destination.stat()
|
||||||
return ObjectMeta(
|
return ObjectMeta(
|
||||||
@@ -1264,22 +1302,52 @@ class ObjectStorage:
|
|||||||
return objects
|
return objects
|
||||||
|
|
||||||
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
|
def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]:
|
||||||
"""Get cached object metadata for a bucket, refreshing if stale."""
|
"""Get cached object metadata for a bucket, refreshing if stale.
|
||||||
|
|
||||||
|
Uses LRU eviction to prevent unbounded cache growth.
|
||||||
|
Thread-safe with version tracking to detect concurrent invalidations.
|
||||||
|
"""
|
||||||
now = time.time()
|
now = time.time()
|
||||||
cached = self._object_cache.get(bucket_id)
|
|
||||||
|
|
||||||
if cached:
|
with self._cache_lock:
|
||||||
objects, timestamp = cached
|
cached = self._object_cache.get(bucket_id)
|
||||||
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
|
cache_version = self._cache_version.get(bucket_id, 0)
|
||||||
return objects
|
|
||||||
|
|
||||||
|
if cached:
|
||||||
|
objects, timestamp = cached
|
||||||
|
if now - timestamp < self.KEY_INDEX_CACHE_TTL:
|
||||||
|
# Move to end (most recently used)
|
||||||
|
self._object_cache.move_to_end(bucket_id)
|
||||||
|
return objects
|
||||||
|
|
||||||
|
# Build cache outside lock to avoid holding lock during I/O
|
||||||
objects = self._build_object_cache(bucket_path)
|
objects = self._build_object_cache(bucket_path)
|
||||||
self._object_cache[bucket_id] = (objects, now)
|
|
||||||
|
with self._cache_lock:
|
||||||
|
# Check if cache was invalidated while we were building
|
||||||
|
current_version = self._cache_version.get(bucket_id, 0)
|
||||||
|
if current_version != cache_version:
|
||||||
|
# Cache was invalidated, rebuild
|
||||||
|
objects = self._build_object_cache(bucket_path)
|
||||||
|
|
||||||
|
# Evict oldest entries if cache is full
|
||||||
|
while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE:
|
||||||
|
self._object_cache.popitem(last=False)
|
||||||
|
|
||||||
|
self._object_cache[bucket_id] = (objects, time.time())
|
||||||
|
self._object_cache.move_to_end(bucket_id)
|
||||||
|
|
||||||
return objects
|
return objects
|
||||||
|
|
||||||
def _invalidate_object_cache(self, bucket_id: str) -> None:
|
def _invalidate_object_cache(self, bucket_id: str) -> None:
|
||||||
"""Invalidate the object cache and etag index for a bucket."""
|
"""Invalidate the object cache and etag index for a bucket.
|
||||||
self._object_cache.pop(bucket_id, None)
|
|
||||||
|
Increments version counter to signal stale reads.
|
||||||
|
"""
|
||||||
|
with self._cache_lock:
|
||||||
|
self._object_cache.pop(bucket_id, None)
|
||||||
|
self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1
|
||||||
|
|
||||||
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json"
|
||||||
try:
|
try:
|
||||||
etag_index_path.unlink(missing_ok=True)
|
etag_index_path.unlink(missing_ok=True)
|
||||||
|
|||||||
33
app/ui.py
33
app/ui.py
@@ -415,7 +415,7 @@ def list_bucket_objects(bucket_name: str):
|
|||||||
except IamError as exc:
|
except IamError as exc:
|
||||||
return jsonify({"error": str(exc)}), 403
|
return jsonify({"error": str(exc)}), 403
|
||||||
|
|
||||||
max_keys = min(int(request.args.get("max_keys", 1000)), 10000)
|
max_keys = min(int(request.args.get("max_keys", 1000)), 100000)
|
||||||
continuation_token = request.args.get("continuation_token") or None
|
continuation_token = request.args.get("continuation_token") or None
|
||||||
prefix = request.args.get("prefix") or None
|
prefix = request.args.get("prefix") or None
|
||||||
|
|
||||||
@@ -434,6 +434,14 @@ def list_bucket_objects(bucket_name: str):
|
|||||||
except StorageError:
|
except StorageError:
|
||||||
versioning_enabled = False
|
versioning_enabled = False
|
||||||
|
|
||||||
|
# Pre-compute URL templates once (not per-object) for performance
|
||||||
|
# Frontend will construct actual URLs by replacing KEY_PLACEHOLDER
|
||||||
|
preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||||
|
delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||||
|
presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||||
|
versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||||
|
restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER")
|
||||||
|
|
||||||
objects_data = []
|
objects_data = []
|
||||||
for obj in result.objects:
|
for obj in result.objects:
|
||||||
objects_data.append({
|
objects_data.append({
|
||||||
@@ -442,13 +450,6 @@ def list_bucket_objects(bucket_name: str):
|
|||||||
"last_modified": obj.last_modified.isoformat(),
|
"last_modified": obj.last_modified.isoformat(),
|
||||||
"last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"),
|
"last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"),
|
||||||
"etag": obj.etag,
|
"etag": obj.etag,
|
||||||
"metadata": obj.metadata or {},
|
|
||||||
"preview_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key),
|
|
||||||
"download_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key) + "?download=1",
|
|
||||||
"presign_endpoint": url_for("ui.object_presign", bucket_name=bucket_name, object_key=obj.key),
|
|
||||||
"delete_endpoint": url_for("ui.delete_object", bucket_name=bucket_name, object_key=obj.key),
|
|
||||||
"versions_endpoint": url_for("ui.object_versions", bucket_name=bucket_name, object_key=obj.key),
|
|
||||||
"restore_template": url_for("ui.restore_object_version", bucket_name=bucket_name, object_key=obj.key, version_id="VERSION_ID_PLACEHOLDER"),
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
@@ -457,6 +458,14 @@ def list_bucket_objects(bucket_name: str):
|
|||||||
"next_continuation_token": result.next_continuation_token,
|
"next_continuation_token": result.next_continuation_token,
|
||||||
"total_count": result.total_count,
|
"total_count": result.total_count,
|
||||||
"versioning_enabled": versioning_enabled,
|
"versioning_enabled": versioning_enabled,
|
||||||
|
"url_templates": {
|
||||||
|
"preview": preview_template,
|
||||||
|
"download": preview_template + "?download=1",
|
||||||
|
"presign": presign_template,
|
||||||
|
"delete": delete_template,
|
||||||
|
"versions": versions_template,
|
||||||
|
"restore": restore_template,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@@ -1458,11 +1467,17 @@ def update_bucket_replication(bucket_name: str):
|
|||||||
else:
|
else:
|
||||||
flash("No replication configuration to pause", "warning")
|
flash("No replication configuration to pause", "warning")
|
||||||
elif action == "resume":
|
elif action == "resume":
|
||||||
|
from .replication import REPLICATION_MODE_ALL
|
||||||
rule = _replication().get_rule(bucket_name)
|
rule = _replication().get_rule(bucket_name)
|
||||||
if rule:
|
if rule:
|
||||||
rule.enabled = True
|
rule.enabled = True
|
||||||
_replication().set_rule(rule)
|
_replication().set_rule(rule)
|
||||||
flash("Replication resumed", "success")
|
# When resuming, sync any pending objects that accumulated while paused
|
||||||
|
if rule.mode == REPLICATION_MODE_ALL:
|
||||||
|
_replication().replicate_existing_objects(bucket_name)
|
||||||
|
flash("Replication resumed. Syncing pending objects in background.", "success")
|
||||||
|
else:
|
||||||
|
flash("Replication resumed", "success")
|
||||||
else:
|
else:
|
||||||
flash("No replication configuration to resume", "warning")
|
flash("No replication configuration to resume", "warning")
|
||||||
elif action == "create":
|
elif action == "create":
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
"""Central location for the application version string."""
|
"""Central location for the application version string."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
APP_VERSION = "0.1.8"
|
APP_VERSION = "0.1.9"
|
||||||
|
|
||||||
|
|
||||||
def get_version() -> str:
|
def get_version() -> str:
|
||||||
|
|||||||
@@ -173,14 +173,16 @@
|
|||||||
</div>
|
</div>
|
||||||
<div class="d-flex align-items-center gap-1">
|
<div class="d-flex align-items-center gap-1">
|
||||||
<span class="text-muted">Batch</span>
|
<span class="text-muted">Batch</span>
|
||||||
<select id="page-size-select" class="form-select form-select-sm py-0" style="width: auto; font-size: 0.75rem;">
|
<select id="page-size-select" class="form-select form-select-sm py-0" style="width: auto; font-size: 0.75rem;" title="Number of objects to load per batch">
|
||||||
<option value="1000">1K</option>
|
<option value="1000">1K</option>
|
||||||
<option value="5000" selected>5K</option>
|
<option value="5000" selected>5K</option>
|
||||||
<option value="10000">10K</option>
|
<option value="10000">10K</option>
|
||||||
<option value="25000">25K</option>
|
<option value="25000">25K</option>
|
||||||
<option value="50000">50K</option>
|
<option value="50000">50K</option>
|
||||||
|
<option value="75000">75K</option>
|
||||||
|
<option value="100000">100K</option>
|
||||||
</select>
|
</select>
|
||||||
<span class="text-muted">objects</span>
|
<span class="text-muted">per batch</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
@@ -1144,13 +1146,18 @@
|
|||||||
</div>
|
</div>
|
||||||
|
|
||||||
{% elif replication_rule and not replication_rule.enabled %}
|
{% elif replication_rule and not replication_rule.enabled %}
|
||||||
<div class="alert alert-warning d-flex align-items-center mb-4" role="alert">
|
<div class="alert alert-warning d-flex align-items-start mb-4" role="alert">
|
||||||
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="flex-shrink-0 me-2" viewBox="0 0 16 16">
|
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" class="flex-shrink-0 me-2 mt-1" viewBox="0 0 16 16">
|
||||||
<path d="M5.5 3.5A1.5 1.5 0 0 1 7 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5zm5 0A1.5 1.5 0 0 1 12 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5z"/>
|
<path d="M5.5 3.5A1.5 1.5 0 0 1 7 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5zm5 0A1.5 1.5 0 0 1 12 5v6a1.5 1.5 0 0 1-3 0V5a1.5 1.5 0 0 1 1.5-1.5z"/>
|
||||||
</svg>
|
</svg>
|
||||||
<div>
|
<div>
|
||||||
<strong>Replication Paused</strong> —
|
<strong>Replication Paused</strong>
|
||||||
Replication is configured but currently paused. New uploads will not be replicated until resumed.
|
<p class="mb-1">Replication is configured but currently paused. New uploads will not be replicated until resumed.</p>
|
||||||
|
{% if replication_rule.mode == 'all' %}
|
||||||
|
<p class="mb-0 small text-dark"><strong>Tip:</strong> When you resume, any objects uploaded while paused will be automatically synced to the target.</p>
|
||||||
|
{% else %}
|
||||||
|
<p class="mb-0 small text-dark"><strong>Note:</strong> Objects uploaded while paused will not be synced (mode: new_only). Consider switching to "All Objects" mode if you need to sync missed uploads.</p>
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
@@ -1882,6 +1889,13 @@
|
|||||||
let pageSize = 5000; // Load large batches for virtual scrolling
|
let pageSize = 5000; // Load large batches for virtual scrolling
|
||||||
let currentPrefix = ''; // Current folder prefix for navigation
|
let currentPrefix = ''; // Current folder prefix for navigation
|
||||||
let allObjects = []; // All loaded object metadata (lightweight)
|
let allObjects = []; // All loaded object metadata (lightweight)
|
||||||
|
let urlTemplates = null; // URL templates from API for constructing object URLs
|
||||||
|
|
||||||
|
// Helper to build URL from template by replacing KEY_PLACEHOLDER with encoded key
|
||||||
|
const buildUrlFromTemplate = (template, key) => {
|
||||||
|
if (!template) return '';
|
||||||
|
return template.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/'));
|
||||||
|
};
|
||||||
|
|
||||||
// Virtual scrolling state
|
// Virtual scrolling state
|
||||||
const ROW_HEIGHT = 53; // Height of each table row in pixels
|
const ROW_HEIGHT = 53; // Height of each table row in pixels
|
||||||
@@ -2223,22 +2237,26 @@
|
|||||||
objectsLoadingRow.remove();
|
objectsLoadingRow.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store lightweight object metadata (no DOM elements!)
|
if (data.url_templates && !urlTemplates) {
|
||||||
|
urlTemplates = data.url_templates;
|
||||||
|
}
|
||||||
|
|
||||||
data.objects.forEach(obj => {
|
data.objects.forEach(obj => {
|
||||||
loadedObjectCount++;
|
loadedObjectCount++;
|
||||||
|
const key = obj.key;
|
||||||
allObjects.push({
|
allObjects.push({
|
||||||
key: obj.key,
|
key: key,
|
||||||
size: obj.size,
|
size: obj.size,
|
||||||
lastModified: obj.last_modified,
|
lastModified: obj.last_modified,
|
||||||
lastModifiedDisplay: obj.last_modified_display,
|
lastModifiedDisplay: obj.last_modified_display,
|
||||||
etag: obj.etag,
|
etag: obj.etag,
|
||||||
previewUrl: obj.preview_url,
|
previewUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.preview, key) : '',
|
||||||
downloadUrl: obj.download_url,
|
downloadUrl: urlTemplates ? buildUrlFromTemplate(urlTemplates.download, key) : '',
|
||||||
presignEndpoint: obj.presign_endpoint,
|
presignEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.presign, key) : '',
|
||||||
deleteEndpoint: obj.delete_endpoint,
|
deleteEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.delete, key) : '',
|
||||||
metadata: JSON.stringify(obj.metadata || {}),
|
metadata: '{}',
|
||||||
versionsEndpoint: obj.versions_endpoint,
|
versionsEndpoint: urlTemplates ? buildUrlFromTemplate(urlTemplates.versions, key) : '',
|
||||||
restoreTemplate: obj.restore_template
|
restoreTemplate: urlTemplates ? urlTemplates.restore.replace('KEY_PLACEHOLDER', encodeURIComponent(key).replace(/%2F/g, '/')) : ''
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -3784,40 +3802,39 @@
|
|||||||
selectAllCheckbox?.addEventListener('change', (event) => {
|
selectAllCheckbox?.addEventListener('change', (event) => {
|
||||||
const shouldSelect = Boolean(event.target?.checked);
|
const shouldSelect = Boolean(event.target?.checked);
|
||||||
|
|
||||||
if (hasFolders()) {
|
// Get all file items in the current view (works with virtual scrolling)
|
||||||
|
const filesInView = visibleItems.filter(item => item.type === 'file');
|
||||||
|
|
||||||
const objectsInCurrentView = allObjects.filter(obj => obj.key.startsWith(currentPrefix));
|
// Update selectedRows directly using object keys (not DOM elements)
|
||||||
objectsInCurrentView.forEach(obj => {
|
filesInView.forEach(item => {
|
||||||
const checkbox = obj.element.querySelector('[data-object-select]');
|
if (shouldSelect) {
|
||||||
if (checkbox && !checkbox.disabled) {
|
selectedRows.set(item.data.key, item.data);
|
||||||
checkbox.checked = shouldSelect;
|
} else {
|
||||||
}
|
selectedRows.delete(item.data.key);
|
||||||
toggleRowSelection(obj.element, shouldSelect);
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
document.querySelectorAll('[data-folder-select]').forEach(cb => {
|
// Update folder checkboxes in DOM (folders are always rendered)
|
||||||
cb.checked = shouldSelect;
|
document.querySelectorAll('[data-folder-select]').forEach(cb => {
|
||||||
});
|
cb.checked = shouldSelect;
|
||||||
} else {
|
});
|
||||||
|
|
||||||
document.querySelectorAll('[data-object-row]').forEach((row) => {
|
// Update any currently rendered object checkboxes
|
||||||
if (row.style.display === 'none') return;
|
document.querySelectorAll('[data-object-row]').forEach((row) => {
|
||||||
const checkbox = row.querySelector('[data-object-select]');
|
const checkbox = row.querySelector('[data-object-select]');
|
||||||
if (!checkbox || checkbox.disabled) {
|
if (checkbox) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
checkbox.checked = shouldSelect;
|
checkbox.checked = shouldSelect;
|
||||||
toggleRowSelection(row, shouldSelect);
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
updateBulkDeleteState();
|
||||||
setTimeout(updateBulkDownloadState, 0);
|
setTimeout(updateBulkDownloadState, 0);
|
||||||
});
|
});
|
||||||
|
|
||||||
bulkDownloadButton?.addEventListener('click', async () => {
|
bulkDownloadButton?.addEventListener('click', async () => {
|
||||||
if (!bulkDownloadEndpoint) return;
|
if (!bulkDownloadEndpoint) return;
|
||||||
const selected = Array.from(document.querySelectorAll('[data-object-select]:checked')).map(
|
// Use selectedRows which tracks all selected objects (not just rendered ones)
|
||||||
(cb) => cb.closest('tr').dataset.key
|
const selected = Array.from(selectedRows.keys());
|
||||||
);
|
|
||||||
if (selected.length === 0) return;
|
if (selected.length === 0) return;
|
||||||
|
|
||||||
bulkDownloadButton.disabled = true;
|
bulkDownloadButton.disabled = true;
|
||||||
|
|||||||
@@ -407,10 +407,62 @@ curl -X POST {{ api_base }}/presign/demo/notes.txt \
|
|||||||
<span class="docs-section-kicker">07</span>
|
<span class="docs-section-kicker">07</span>
|
||||||
<h2 class="h4 mb-0">API Examples</h2>
|
<h2 class="h4 mb-0">API Examples</h2>
|
||||||
</div>
|
</div>
|
||||||
<p class="text-muted">Common operations using boto3.</p>
|
<p class="text-muted">Common operations using popular SDKs and tools.</p>
|
||||||
|
|
||||||
<h5 class="mt-4">Multipart Upload</h5>
|
<h3 class="h6 text-uppercase text-muted mt-4">Python (boto3)</h3>
|
||||||
<pre><code class="language-python">import boto3
|
<pre class="mb-4"><code class="language-python">import boto3
|
||||||
|
|
||||||
|
s3 = boto3.client(
|
||||||
|
's3',
|
||||||
|
endpoint_url='{{ api_base }}',
|
||||||
|
aws_access_key_id='<access_key>',
|
||||||
|
aws_secret_access_key='<secret_key>'
|
||||||
|
)
|
||||||
|
|
||||||
|
# List buckets
|
||||||
|
buckets = s3.list_buckets()['Buckets']
|
||||||
|
|
||||||
|
# Create bucket
|
||||||
|
s3.create_bucket(Bucket='mybucket')
|
||||||
|
|
||||||
|
# Upload file
|
||||||
|
s3.upload_file('local.txt', 'mybucket', 'remote.txt')
|
||||||
|
|
||||||
|
# Download file
|
||||||
|
s3.download_file('mybucket', 'remote.txt', 'downloaded.txt')
|
||||||
|
|
||||||
|
# Generate presigned URL (valid 1 hour)
|
||||||
|
url = s3.generate_presigned_url(
|
||||||
|
'get_object',
|
||||||
|
Params={'Bucket': 'mybucket', 'Key': 'remote.txt'},
|
||||||
|
ExpiresIn=3600
|
||||||
|
)</code></pre>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">JavaScript (AWS SDK v3)</h3>
|
||||||
|
<pre class="mb-4"><code class="language-javascript">import { S3Client, ListBucketsCommand, PutObjectCommand } from '@aws-sdk/client-s3';
|
||||||
|
|
||||||
|
const s3 = new S3Client({
|
||||||
|
endpoint: '{{ api_base }}',
|
||||||
|
region: 'us-east-1',
|
||||||
|
credentials: {
|
||||||
|
accessKeyId: '<access_key>',
|
||||||
|
secretAccessKey: '<secret_key>'
|
||||||
|
},
|
||||||
|
forcePathStyle: true // Required for S3-compatible services
|
||||||
|
});
|
||||||
|
|
||||||
|
// List buckets
|
||||||
|
const { Buckets } = await s3.send(new ListBucketsCommand({}));
|
||||||
|
|
||||||
|
// Upload object
|
||||||
|
await s3.send(new PutObjectCommand({
|
||||||
|
Bucket: 'mybucket',
|
||||||
|
Key: 'hello.txt',
|
||||||
|
Body: 'Hello, World!'
|
||||||
|
}));</code></pre>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">Multipart Upload (Python)</h3>
|
||||||
|
<pre class="mb-4"><code class="language-python">import boto3
|
||||||
|
|
||||||
s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
|
s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
|
||||||
|
|
||||||
@@ -418,9 +470,9 @@ s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
|
|||||||
response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
|
response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
|
||||||
upload_id = response['UploadId']
|
upload_id = response['UploadId']
|
||||||
|
|
||||||
# Upload parts
|
# Upload parts (minimum 5MB each, except last part)
|
||||||
parts = []
|
parts = []
|
||||||
chunks = [b'chunk1', b'chunk2'] # Example data chunks
|
chunks = [b'chunk1...', b'chunk2...']
|
||||||
for part_number, chunk in enumerate(chunks, start=1):
|
for part_number, chunk in enumerate(chunks, start=1):
|
||||||
response = s3.upload_part(
|
response = s3.upload_part(
|
||||||
Bucket='mybucket',
|
Bucket='mybucket',
|
||||||
@@ -438,6 +490,19 @@ s3.complete_multipart_upload(
|
|||||||
UploadId=upload_id,
|
UploadId=upload_id,
|
||||||
MultipartUpload={'Parts': parts}
|
MultipartUpload={'Parts': parts}
|
||||||
)</code></pre>
|
)</code></pre>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">Presigned URLs for Sharing</h3>
|
||||||
|
<pre class="mb-0"><code class="language-bash"># Generate a download link valid for 15 minutes
|
||||||
|
curl -X POST "{{ api_base }}/presign/mybucket/photo.jpg" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||||
|
-d '{"method": "GET", "expires_in": 900}'
|
||||||
|
|
||||||
|
# Generate an upload link (PUT) valid for 1 hour
|
||||||
|
curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||||
|
-d '{"method": "PUT", "expires_in": 3600}'</code></pre>
|
||||||
</div>
|
</div>
|
||||||
</article>
|
</article>
|
||||||
<article id="replication" class="card shadow-sm docs-section">
|
<article id="replication" class="card shadow-sm docs-section">
|
||||||
@@ -487,6 +552,86 @@ s3.complete_multipart_upload(
|
|||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
</article>
|
</article>
|
||||||
|
<article id="versioning" class="card shadow-sm docs-section">
|
||||||
|
<div class="card-body">
|
||||||
|
<div class="d-flex align-items-center gap-2 mb-3">
|
||||||
|
<span class="docs-section-kicker">09</span>
|
||||||
|
<h2 class="h4 mb-0">Object Versioning</h2>
|
||||||
|
</div>
|
||||||
|
<p class="text-muted">Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.</p>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">Enabling Versioning</h3>
|
||||||
|
<ol class="docs-steps mb-3">
|
||||||
|
<li>Navigate to your bucket's <strong>Properties</strong> tab.</li>
|
||||||
|
<li>Find the <strong>Versioning</strong> card and click <strong>Enable</strong>.</li>
|
||||||
|
<li>All subsequent uploads will create new versions instead of overwriting.</li>
|
||||||
|
</ol>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">Version Operations</h3>
|
||||||
|
<div class="table-responsive mb-3">
|
||||||
|
<table class="table table-sm table-bordered small">
|
||||||
|
<thead class="table-light">
|
||||||
|
<tr>
|
||||||
|
<th>Operation</th>
|
||||||
|
<th>Description</th>
|
||||||
|
</tr>
|
||||||
|
</thead>
|
||||||
|
<tbody>
|
||||||
|
<tr>
|
||||||
|
<td><strong>View Versions</strong></td>
|
||||||
|
<td>Click the version icon on any object to see all historical versions with timestamps and sizes.</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td><strong>Restore Version</strong></td>
|
||||||
|
<td>Click <strong>Restore</strong> on any version to make it the current version (creates a copy).</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td><strong>Delete Current</strong></td>
|
||||||
|
<td>Deleting an object archives it. Previous versions remain accessible.</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td><strong>Purge All</strong></td>
|
||||||
|
<td>Permanently delete an object and all its versions. This cannot be undone.</td>
|
||||||
|
</tr>
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">Archived Objects</h3>
|
||||||
|
<p class="small text-muted mb-3">When you delete a versioned object, it becomes "archived" - the current version is removed but historical versions remain. The <strong>Archived</strong> tab shows these objects so you can restore them.</p>
|
||||||
|
|
||||||
|
<h3 class="h6 text-uppercase text-muted mt-4">API Usage</h3>
|
||||||
|
<pre class="mb-3"><code class="language-bash"># Enable versioning
|
||||||
|
curl -X PUT "{{ api_base }}/<bucket>?versioning" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
|
||||||
|
-d '{"Status": "Enabled"}'
|
||||||
|
|
||||||
|
# Get versioning status
|
||||||
|
curl "{{ api_base }}/<bucket>?versioning" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
|
||||||
|
|
||||||
|
# List object versions
|
||||||
|
curl "{{ api_base }}/<bucket>?versions" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
|
||||||
|
|
||||||
|
# Get specific version
|
||||||
|
curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
|
||||||
|
-H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"</code></pre>
|
||||||
|
|
||||||
|
<div class="alert alert-light border mb-0">
|
||||||
|
<div class="d-flex gap-2">
|
||||||
|
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-info-circle text-muted mt-1" viewBox="0 0 16 16">
|
||||||
|
<path d="M8 15A7 7 0 1 1 8 1a7 7 0 0 1 0 14zm0 1A8 8 0 1 0 8 0a8 8 0 0 0 0 16z"/>
|
||||||
|
<path d="m8.93 6.588-2.29.287-.082.38.45.083c.294.07.352.176.288.469l-.738 3.468c-.194.897.105 1.319.808 1.319.545 0 1.178-.252 1.465-.598l.088-.416c-.2.176-.492.246-.686.246-.275 0-.375-.193-.304-.533L8.93 6.588zM9 4.5a1 1 0 1 1-2 0 1 1 0 0 1 2 0z"/>
|
||||||
|
</svg>
|
||||||
|
<div>
|
||||||
|
<strong>Storage Impact:</strong> Each version consumes storage. Enable quotas to limit total bucket size including all versions.
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</article>
|
||||||
<article id="quotas" class="card shadow-sm docs-section">
|
<article id="quotas" class="card shadow-sm docs-section">
|
||||||
<div class="card-body">
|
<div class="card-body">
|
||||||
<div class="d-flex align-items-center gap-2 mb-3">
|
<div class="d-flex align-items-center gap-2 mb-3">
|
||||||
@@ -709,6 +854,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
|
|||||||
<li><a href="#api">REST endpoints</a></li>
|
<li><a href="#api">REST endpoints</a></li>
|
||||||
<li><a href="#examples">API Examples</a></li>
|
<li><a href="#examples">API Examples</a></li>
|
||||||
<li><a href="#replication">Site Replication</a></li>
|
<li><a href="#replication">Site Replication</a></li>
|
||||||
|
<li><a href="#versioning">Object Versioning</a></li>
|
||||||
<li><a href="#quotas">Bucket Quotas</a></li>
|
<li><a href="#quotas">Bucket Quotas</a></li>
|
||||||
<li><a href="#encryption">Encryption</a></li>
|
<li><a href="#encryption">Encryption</a></li>
|
||||||
<li><a href="#troubleshooting">Troubleshooting</a></li>
|
<li><a href="#troubleshooting">Troubleshooting</a></li>
|
||||||
|
|||||||
@@ -157,9 +157,14 @@ class TestPaginatedObjectListing:
|
|||||||
assert "last_modified" in obj
|
assert "last_modified" in obj
|
||||||
assert "last_modified_display" in obj
|
assert "last_modified_display" in obj
|
||||||
assert "etag" in obj
|
assert "etag" in obj
|
||||||
assert "preview_url" in obj
|
|
||||||
assert "download_url" in obj
|
# URLs are now returned as templates (not per-object) for performance
|
||||||
assert "delete_endpoint" in obj
|
assert "url_templates" in data
|
||||||
|
templates = data["url_templates"]
|
||||||
|
assert "preview" in templates
|
||||||
|
assert "download" in templates
|
||||||
|
assert "delete" in templates
|
||||||
|
assert "KEY_PLACEHOLDER" in templates["preview"]
|
||||||
|
|
||||||
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
|
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
|
||||||
"""Bucket detail page should load even with many objects."""
|
"""Bucket detail page should load even with many objects."""
|
||||||
|
|||||||
Reference in New Issue
Block a user