diff --git a/app/replication.py b/app/replication.py index 5a4ad8a..4703bf9 100644 --- a/app/replication.py +++ b/app/replication.py @@ -9,7 +9,7 @@ import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional +from typing import Any, Dict, Optional import boto3 from botocore.config import Config @@ -24,11 +24,42 @@ logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" REPLICATION_CONNECT_TIMEOUT = 5 REPLICATION_READ_TIMEOUT = 30 +STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" +def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: + """Create a boto3 S3 client for the given connection. + + Args: + connection: Remote S3 connection configuration + health_check: If True, use minimal retries for quick health checks + + Returns: + Configured boto3 S3 client + """ + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 1 if health_check else 2}, + signature_version='s3v4', + s3={'addressing_style': 'path'}, + request_checksum_calculation='when_required', + response_checksum_validation='when_required', + ) + return boto3.client( + "s3", + endpoint_url=connection.endpoint_url, + aws_access_key_id=connection.access_key, + aws_secret_access_key=connection.secret_key, + region_name=connection.region or 'us-east-1', + config=config, + ) + + @dataclass class ReplicationStats: """Statistics for replication operations - computed dynamically.""" @@ -102,8 +133,19 @@ class ReplicationManager: self._rules: Dict[str, ReplicationRule] = {} self._stats_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") + self._shutdown = False self.reload_rules() + def shutdown(self, wait: bool = True) -> None: + """Shutdown the replication executor gracefully. + + Args: + wait: If True, wait for pending tasks to complete + """ + self._shutdown = True + self._executor.shutdown(wait=wait) + logger.info("Replication manager shut down") + def reload_rules(self) -> None: if not self.rules_path.exists(): self._rules = {} @@ -124,25 +166,12 @@ class ReplicationManager: def check_endpoint_health(self, connection: RemoteConnection) -> bool: """Check if a remote endpoint is reachable and responsive. - + Returns True if endpoint is healthy, False otherwise. Uses short timeouts to prevent blocking. """ try: - config = Config( - user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 1} - ) - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - config=config, - ) + s3 = _create_s3_client(connection, health_check=True) s3.list_buckets() return True except Exception as e: @@ -184,15 +213,9 @@ class ReplicationManager: try: source_objects = self.storage.list_objects_all(bucket_name) source_keys = {obj.key: obj.size for obj in source_objects} - - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - ) - + + s3 = _create_s3_client(connection) + dest_keys = set() bytes_synced = 0 paginator = s3.get_paginator('list_objects_v2') @@ -257,13 +280,7 @@ class ReplicationManager: raise ValueError(f"Connection {connection_id} not found") try: - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - ) + s3 = _create_s3_client(connection) s3.create_bucket(Bucket=bucket_name) except ClientError as e: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") @@ -286,41 +303,28 @@ class ReplicationManager: self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: + if self._shutdown: + return + + # Re-check if rule is still enabled (may have been paused after task was submitted) + current_rule = self.get_rule(bucket_name) + if not current_rule or not current_rule.enabled: + logger.debug(f"Replication skipped for {bucket_name}/{object_key}: rule disabled or removed") + return + if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"): logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}") return - + try: from .storage import ObjectStorage ObjectStorage._sanitize_object_key(object_key) except StorageError as e: logger.error(f"Object key validation failed in replication: {e}") return - - file_size = 0 + try: - config = Config( - user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 2}, - signature_version='s3v4', - s3={ - 'addressing_style': 'path', - }, - # Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors - # with S3-compatible servers that don't support CRC32 checksum headers - request_checksum_calculation='when_required', - response_checksum_validation='when_required', - ) - s3 = boto3.client( - "s3", - endpoint_url=conn.endpoint_url, - aws_access_key_id=conn.access_key, - aws_secret_access_key=conn.secret_key, - region_name=conn.region or 'us-east-1', - config=config, - ) + s3 = _create_s3_client(conn) if action == "delete": try: @@ -337,34 +341,42 @@ class ReplicationManager: logger.error(f"Source object not found: {bucket_name}/{object_key}") return - # Don't replicate metadata - destination server will generate its own - # __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values. - content_type, _ = mimetypes.guess_type(path) file_size = path.stat().st_size logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") - def do_put_object() -> None: - """Helper to upload object. - - Reads the file content into memory first to avoid signature calculation - issues with certain binary file types (like GIFs) when streaming. - Do NOT set ContentLength explicitly - boto3 calculates it from the bytes - and setting it manually can cause SignatureDoesNotMatch errors. + def do_upload() -> None: + """Upload object using appropriate method based on file size. + + For small files (< 10 MiB): Read into memory for simpler handling + For large files: Use streaming upload to avoid memory issues """ - file_content = path.read_bytes() - put_kwargs = { - "Bucket": rule.target_bucket, - "Key": object_key, - "Body": file_content, - } + extra_args = {} if content_type: - put_kwargs["ContentType"] = content_type - s3.put_object(**put_kwargs) + extra_args["ContentType"] = content_type + + if file_size >= STREAMING_THRESHOLD_BYTES: + # Use multipart upload for large files + s3.upload_file( + str(path), + rule.target_bucket, + object_key, + ExtraArgs=extra_args if extra_args else None, + ) + else: + # Read small files into memory + file_content = path.read_bytes() + put_kwargs = { + "Bucket": rule.target_bucket, + "Key": object_key, + "Body": file_content, + **extra_args, + } + s3.put_object(**put_kwargs) try: - do_put_object() + do_upload() except (ClientError, S3UploadFailedError) as e: error_code = None if isinstance(e, ClientError): @@ -386,13 +398,13 @@ class ReplicationManager: bucket_ready = True else: logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}") - raise e - + raise e + if bucket_ready: - do_put_object() + do_upload() else: raise e - + logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") self._update_last_sync(bucket_name, object_key) diff --git a/app/s3_api.py b/app/s3_api.py index d0d32d9..78b9afb 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1,13 +1,15 @@ """Flask blueprint exposing a subset of the S3 REST API.""" from __future__ import annotations +import base64 import hashlib import hmac +import logging import mimetypes import re import uuid from datetime import datetime, timedelta, timezone -from typing import Any, Dict +from typing import Any, Dict, Optional from urllib.parse import quote, urlencode, urlparse, unquote from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError @@ -20,6 +22,8 @@ from .iam import IamError, Principal from .replication import ReplicationManager from .storage import ObjectStorage, StorageError, QuotaExceededError +logger = logging.getLogger(__name__) + s3_api_bp = Blueprint("s3_api", __name__) def _storage() -> ObjectStorage: @@ -118,6 +122,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if header_val is None: header_val = "" + if header.lower() == 'expect' and header_val == "": + header_val = "100-continue" + header_val = " ".join(header_val.split()) canonical_headers_parts.append(f"{header.lower()}:{header_val}\n") canonical_headers = "".join(canonical_headers_parts) @@ -128,15 +135,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}" - # Debug logging for signature issues - import logging - logger = logging.getLogger(__name__) - logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}") - logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}") - logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}") - logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}") - logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}") - amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date") if not amz_date: raise IamError("Missing Date header") @@ -167,24 +165,18 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() if not hmac.compare_digest(calculated_signature, signature): - # Debug logging for signature mismatch - import logging - logger = logging.getLogger(__name__) - logger.error(f"Signature mismatch for {req.path}") - logger.error(f" Content-Type: {req.headers.get('Content-Type')}") - logger.error(f" Content-Length: {req.headers.get('Content-Length')}") - logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}") - logger.error(f" Canonical URI: {canonical_uri}") - logger.error(f" Signed headers: {signed_headers_str}") - # Log each signed header's value - for h in signed_headers_list: - logger.error(f" Header '{h}': {repr(req.headers.get(h))}") - logger.error(f" Expected sig: {signature[:16]}...") - logger.error(f" Calculated sig: {calculated_signature[:16]}...") - # Log first part of canonical request to compare - logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...") - # Log the full canonical request for debugging - logger.error(f" Canonical request:\n{canonical_request[:500]}...") + # Only log detailed signature debug info if DEBUG_SIGV4 is enabled + if current_app.config.get("DEBUG_SIGV4"): + logger.warning( + "SigV4 signature mismatch", + extra={ + "path": req.path, + "method": method, + "signed_headers": signed_headers_str, + "content_type": req.headers.get("Content-Type"), + "content_length": req.headers.get("Content-Length"), + } + ) raise IamError("SignatureDoesNotMatch") return _iam().get_principal(access_key) @@ -236,6 +228,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_headers_parts = [] for header in signed_headers_list: val = req.headers.get(header, "").strip() + if header.lower() == 'expect' and val == "": + val = "100-continue" val = " ".join(val.split()) canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers = "".join(canonical_headers_parts) @@ -569,6 +563,28 @@ def _strip_ns(tag: str | None) -> str: return tag.split("}")[-1] +def _find_element(parent: Element, name: str) -> Optional[Element]: + """Find a child element by name, trying both namespaced and non-namespaced variants. + + This handles XML documents that may or may not include namespace prefixes. + """ + el = parent.find(f"{{*}}{name}") + if el is None: + el = parent.find(name) + return el + + +def _find_element_text(parent: Element, name: str, default: str = "") -> str: + """Find a child element and return its text content. + + Returns the default value if element not found or has no text. + """ + el = _find_element(parent, name) + if el is None or el.text is None: + return default + return el.text.strip() + + def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]: try: root = fromstring(payload) @@ -585,17 +601,11 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]: for tag_el in list(tagset): if _strip_ns(tag_el.tag) != "Tag": continue - key_el = tag_el.find("{*}Key") - if key_el is None: - key_el = tag_el.find("Key") - value_el = tag_el.find("{*}Value") - if value_el is None: - value_el = tag_el.find("Value") - key = (key_el.text or "").strip() if key_el is not None else "" + key = _find_element_text(tag_el, "Key") if not key: continue - value = value_el.text if value_el is not None else "" - tags.append({"Key": key, "Value": value or ""}) + value = _find_element_text(tag_el, "Value") + tags.append({"Key": key, "Value": value}) return tags @@ -1439,7 +1449,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if request.method == "DELETE": try: - storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None) + storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None) except StorageError as exc: return _error_response("NoSuchBucket", str(exc), 404) current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name}) @@ -1473,7 +1483,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response: return _error_response("InvalidArgument", f"max_objects {exc}", 400) try: - storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects) + storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects) except StorageError as exc: return _error_response("NoSuchBucket", str(exc), 404) @@ -1665,7 +1675,6 @@ def bucket_handler(bucket_name: str) -> Response: effective_start = "" if list_type == "2": if continuation_token: - import base64 try: effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8") except Exception: @@ -1722,7 +1731,6 @@ def bucket_handler(bucket_name: str) -> Response: next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1] if list_type == "2" and next_marker: - import base64 next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8") if list_type == "2": diff --git a/app/storage.py b/app/storage.py index c710847..702d5bb 100644 --- a/app/storage.py +++ b/app/storage.py @@ -7,9 +7,11 @@ import os import re import shutil import stat +import threading import time import unicodedata import uuid +from collections import OrderedDict from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone @@ -129,12 +131,17 @@ class ObjectStorage: MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" KEY_INDEX_CACHE_TTL = 30 + OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache def __init__(self, root: Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() - self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} + # LRU cache for object metadata with thread-safe access + self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict() + self._cache_lock = threading.Lock() + # Cache version counter for detecting stale reads + self._cache_version: Dict[str, int] = {} def list_buckets(self) -> List[BucketMeta]: buckets: List[BucketMeta] = [] @@ -729,8 +736,6 @@ class ObjectStorage: bucket_id = bucket_path.name safe_key = self._sanitize_object_key(object_key) version_dir = self._version_dir(bucket_id, safe_key) - if not version_dir.exists(): - version_dir = self._legacy_version_dir(bucket_id, safe_key) if not version_dir.exists(): version_dir = self._legacy_version_dir(bucket_id, safe_key) if not version_dir.exists(): @@ -879,41 +884,73 @@ class ObjectStorage: part_number: int, stream: BinaryIO, ) -> str: + """Upload a part for a multipart upload. + + Uses file locking to safely update the manifest and handle concurrent uploads. + """ if part_number < 1: raise StorageError("part_number must be >= 1") bucket_path = self._bucket_path(bucket_name) - + upload_root = self._multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): raise StorageError("Multipart upload not found") - + + # Write part to temporary file first, then rename atomically checksum = hashlib.md5() part_filename = f"part-{part_number:05d}.part" part_path = upload_root / part_filename - with part_path.open("wb") as target: - shutil.copyfileobj(_HashingReader(stream, checksum), target) + temp_path = upload_root / f".{part_filename}.tmp" + + try: + with temp_path.open("wb") as target: + shutil.copyfileobj(_HashingReader(stream, checksum), target) + + # Atomic rename (or replace on Windows) + temp_path.replace(part_path) + except OSError: + # Clean up temp file on failure + try: + temp_path.unlink(missing_ok=True) + except OSError: + pass + raise + record = { "etag": checksum.hexdigest(), "size": part_path.stat().st_size, "filename": part_filename, } - + manifest_path = upload_root / self.MULTIPART_MANIFEST lock_path = upload_root / ".manifest.lock" - - with lock_path.open("w") as lock_file: - with _file_lock(lock_file): - try: - manifest = json.loads(manifest_path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError) as exc: - raise StorageError("Multipart manifest unreadable") from exc - - parts = manifest.setdefault("parts", {}) - parts[str(part_number)] = record - manifest_path.write_text(json.dumps(manifest), encoding="utf-8") - + + # Retry loop for handling transient lock/read failures + max_retries = 3 + for attempt in range(max_retries): + try: + with lock_path.open("w") as lock_file: + with _file_lock(lock_file): + try: + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + if attempt < max_retries - 1: + time.sleep(0.1 * (attempt + 1)) + continue + raise StorageError("Multipart manifest unreadable") from exc + + parts = manifest.setdefault("parts", {}) + parts[str(part_number)] = record + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + break + except OSError as exc: + if attempt < max_retries - 1: + time.sleep(0.1 * (attempt + 1)) + continue + raise StorageError(f"Failed to update multipart manifest: {exc}") from exc + return record["etag"] def complete_multipart_upload( @@ -1015,9 +1052,10 @@ class ObjectStorage: pass shutil.rmtree(upload_root, ignore_errors=True) - + self._invalidate_bucket_stats_cache(bucket_id) - + self._invalidate_object_cache(bucket_id) + stat = destination.stat() return ObjectMeta( key=safe_key.as_posix(), @@ -1264,22 +1302,52 @@ class ObjectStorage: return objects def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]: - """Get cached object metadata for a bucket, refreshing if stale.""" + """Get cached object metadata for a bucket, refreshing if stale. + + Uses LRU eviction to prevent unbounded cache growth. + Thread-safe with version tracking to detect concurrent invalidations. + """ now = time.time() - cached = self._object_cache.get(bucket_id) - - if cached: - objects, timestamp = cached - if now - timestamp < self.KEY_INDEX_CACHE_TTL: - return objects - + + with self._cache_lock: + cached = self._object_cache.get(bucket_id) + cache_version = self._cache_version.get(bucket_id, 0) + + if cached: + objects, timestamp = cached + if now - timestamp < self.KEY_INDEX_CACHE_TTL: + # Move to end (most recently used) + self._object_cache.move_to_end(bucket_id) + return objects + + # Build cache outside lock to avoid holding lock during I/O objects = self._build_object_cache(bucket_path) - self._object_cache[bucket_id] = (objects, now) + + with self._cache_lock: + # Check if cache was invalidated while we were building + current_version = self._cache_version.get(bucket_id, 0) + if current_version != cache_version: + # Cache was invalidated, rebuild + objects = self._build_object_cache(bucket_path) + + # Evict oldest entries if cache is full + while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: + self._object_cache.popitem(last=False) + + self._object_cache[bucket_id] = (objects, time.time()) + self._object_cache.move_to_end(bucket_id) + return objects def _invalidate_object_cache(self, bucket_id: str) -> None: - """Invalidate the object cache and etag index for a bucket.""" - self._object_cache.pop(bucket_id, None) + """Invalidate the object cache and etag index for a bucket. + + Increments version counter to signal stale reads. + """ + with self._cache_lock: + self._object_cache.pop(bucket_id, None) + self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True) diff --git a/app/ui.py b/app/ui.py index 4baf5c3..3cc6c23 100644 --- a/app/ui.py +++ b/app/ui.py @@ -415,7 +415,7 @@ def list_bucket_objects(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - max_keys = min(int(request.args.get("max_keys", 1000)), 10000) + max_keys = min(int(request.args.get("max_keys", 1000)), 100000) continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None @@ -434,6 +434,14 @@ def list_bucket_objects(bucket_name: str): except StorageError: versioning_enabled = False + # Pre-compute URL templates once (not per-object) for performance + # Frontend will construct actual URLs by replacing KEY_PLACEHOLDER + preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") + objects_data = [] for obj in result.objects: objects_data.append({ @@ -442,13 +450,6 @@ def list_bucket_objects(bucket_name: str): "last_modified": obj.last_modified.isoformat(), "last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"), "etag": obj.etag, - "metadata": obj.metadata or {}, - "preview_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key), - "download_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key) + "?download=1", - "presign_endpoint": url_for("ui.object_presign", bucket_name=bucket_name, object_key=obj.key), - "delete_endpoint": url_for("ui.delete_object", bucket_name=bucket_name, object_key=obj.key), - "versions_endpoint": url_for("ui.object_versions", bucket_name=bucket_name, object_key=obj.key), - "restore_template": url_for("ui.restore_object_version", bucket_name=bucket_name, object_key=obj.key, version_id="VERSION_ID_PLACEHOLDER"), }) return jsonify({ @@ -457,6 +458,14 @@ def list_bucket_objects(bucket_name: str): "next_continuation_token": result.next_continuation_token, "total_count": result.total_count, "versioning_enabled": versioning_enabled, + "url_templates": { + "preview": preview_template, + "download": preview_template + "?download=1", + "presign": presign_template, + "delete": delete_template, + "versions": versions_template, + "restore": restore_template, + }, }) @@ -1458,11 +1467,17 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to pause", "warning") elif action == "resume": + from .replication import REPLICATION_MODE_ALL rule = _replication().get_rule(bucket_name) if rule: rule.enabled = True _replication().set_rule(rule) - flash("Replication resumed", "success") + # When resuming, sync any pending objects that accumulated while paused + if rule.mode == REPLICATION_MODE_ALL: + _replication().replicate_existing_objects(bucket_name) + flash("Replication resumed. Syncing pending objects in background.", "success") + else: + flash("Replication resumed", "success") else: flash("No replication configuration to resume", "warning") elif action == "create": diff --git a/app/version.py b/app/version.py index ea75040..445cb63 100644 --- a/app/version.py +++ b/app/version.py @@ -1,7 +1,7 @@ """Central location for the application version string.""" from __future__ import annotations -APP_VERSION = "0.1.8" +APP_VERSION = "0.1.9" def get_version() -> str: diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index daa8f45..f9832e1 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -173,14 +173,16 @@
Batch - + + - objects + per batch
@@ -1144,13 +1146,18 @@ {% elif replication_rule and not replication_rule.enabled %} - -

Common operations using boto3.

- -
Multipart Upload
-
import boto3
+        

Common operations using popular SDKs and tools.

+ +

Python (boto3)

+
import boto3
+
+s3 = boto3.client(
+    's3',
+    endpoint_url='{{ api_base }}',
+    aws_access_key_id='<access_key>',
+    aws_secret_access_key='<secret_key>'
+)
+
+# List buckets
+buckets = s3.list_buckets()['Buckets']
+
+# Create bucket
+s3.create_bucket(Bucket='mybucket')
+
+# Upload file
+s3.upload_file('local.txt', 'mybucket', 'remote.txt')
+
+# Download file
+s3.download_file('mybucket', 'remote.txt', 'downloaded.txt')
+
+# Generate presigned URL (valid 1 hour)
+url = s3.generate_presigned_url(
+    'get_object',
+    Params={'Bucket': 'mybucket', 'Key': 'remote.txt'},
+    ExpiresIn=3600
+)
+ +

JavaScript (AWS SDK v3)

+
import { S3Client, ListBucketsCommand, PutObjectCommand } from '@aws-sdk/client-s3';
+
+const s3 = new S3Client({
+  endpoint: '{{ api_base }}',
+  region: 'us-east-1',
+  credentials: {
+    accessKeyId: '<access_key>',
+    secretAccessKey: '<secret_key>'
+  },
+  forcePathStyle: true  // Required for S3-compatible services
+});
+
+// List buckets
+const { Buckets } = await s3.send(new ListBucketsCommand({}));
+
+// Upload object
+await s3.send(new PutObjectCommand({
+  Bucket: 'mybucket',
+  Key: 'hello.txt',
+  Body: 'Hello, World!'
+}));
+ +

Multipart Upload (Python)

+
import boto3
 
 s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
 
@@ -418,9 +470,9 @@ s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
 response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
 upload_id = response['UploadId']
 
-# Upload parts
+# Upload parts (minimum 5MB each, except last part)
 parts = []
-chunks = [b'chunk1', b'chunk2'] # Example data chunks
+chunks = [b'chunk1...', b'chunk2...']
 for part_number, chunk in enumerate(chunks, start=1):
     response = s3.upload_part(
         Bucket='mybucket',
@@ -438,6 +490,19 @@ s3.complete_multipart_upload(
     UploadId=upload_id,
     MultipartUpload={'Parts': parts}
 )
+ +

Presigned URLs for Sharing

+
# Generate a download link valid for 15 minutes
+curl -X POST "{{ api_base }}/presign/mybucket/photo.jpg" \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"method": "GET", "expires_in": 900}'
+
+# Generate an upload link (PUT) valid for 1 hour
+curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"method": "PUT", "expires_in": 3600}'
@@ -487,6 +552,86 @@ s3.complete_multipart_upload(

+
+
+
+ 09 +

Object Versioning

+
+

Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.

+ +

Enabling Versioning

+
    +
  1. Navigate to your bucket's Properties tab.
  2. +
  3. Find the Versioning card and click Enable.
  4. +
  5. All subsequent uploads will create new versions instead of overwriting.
  6. +
+ +

Version Operations

+
+ + + + + + + + + + + + + + + + + + + + + + + + + +
OperationDescription
View VersionsClick the version icon on any object to see all historical versions with timestamps and sizes.
Restore VersionClick Restore on any version to make it the current version (creates a copy).
Delete CurrentDeleting an object archives it. Previous versions remain accessible.
Purge AllPermanently delete an object and all its versions. This cannot be undone.
+
+ +

Archived Objects

+

When you delete a versioned object, it becomes "archived" - the current version is removed but historical versions remain. The Archived tab shows these objects so you can restore them.

+ +

API Usage

+
# Enable versioning
+curl -X PUT "{{ api_base }}/<bucket>?versioning" \
+  -H "Content-Type: application/json" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+  -d '{"Status": "Enabled"}'
+
+# Get versioning status
+curl "{{ api_base }}/<bucket>?versioning" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# List object versions
+curl "{{ api_base }}/<bucket>?versions" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Get specific version
+curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
+  -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+ +
+
+ + + + +
+ Storage Impact: Each version consumes storage. Enable quotas to limit total bucket size including all versions. +
+
+
+
+
@@ -709,6 +854,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
  • REST endpoints
  • API Examples
  • Site Replication
  • +
  • Object Versioning
  • Bucket Quotas
  • Encryption
  • Troubleshooting
  • diff --git a/tests/test_ui_pagination.py b/tests/test_ui_pagination.py index 5e42d60..683f527 100644 --- a/tests/test_ui_pagination.py +++ b/tests/test_ui_pagination.py @@ -150,16 +150,21 @@ class TestPaginatedObjectListing: assert len(data["objects"]) == 1 obj = data["objects"][0] - + # Check all expected fields assert obj["key"] == "test.txt" assert obj["size"] == 12 # len("test content") assert "last_modified" in obj assert "last_modified_display" in obj assert "etag" in obj - assert "preview_url" in obj - assert "download_url" in obj - assert "delete_endpoint" in obj + + # URLs are now returned as templates (not per-object) for performance + assert "url_templates" in data + templates = data["url_templates"] + assert "preview" in templates + assert "download" in templates + assert "delete" in templates + assert "KEY_PLACEHOLDER" in templates["preview"] def test_bucket_detail_page_loads_without_objects(self, tmp_path): """Bucket detail page should load even with many objects."""