diff --git a/app/replication.py b/app/replication.py index 5a4ad8a..4703bf9 100644 --- a/app/replication.py +++ b/app/replication.py @@ -9,7 +9,7 @@ import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional +from typing import Any, Dict, Optional import boto3 from botocore.config import Config @@ -24,11 +24,42 @@ logger = logging.getLogger(__name__) REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" REPLICATION_CONNECT_TIMEOUT = 5 REPLICATION_READ_TIMEOUT = 30 +STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024 # 10 MiB - use streaming for larger files REPLICATION_MODE_NEW_ONLY = "new_only" REPLICATION_MODE_ALL = "all" +def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: + """Create a boto3 S3 client for the given connection. + + Args: + connection: Remote S3 connection configuration + health_check: If True, use minimal retries for quick health checks + + Returns: + Configured boto3 S3 client + """ + config = Config( + user_agent_extra=REPLICATION_USER_AGENT, + connect_timeout=REPLICATION_CONNECT_TIMEOUT, + read_timeout=REPLICATION_READ_TIMEOUT, + retries={'max_attempts': 1 if health_check else 2}, + signature_version='s3v4', + s3={'addressing_style': 'path'}, + request_checksum_calculation='when_required', + response_checksum_validation='when_required', + ) + return boto3.client( + "s3", + endpoint_url=connection.endpoint_url, + aws_access_key_id=connection.access_key, + aws_secret_access_key=connection.secret_key, + region_name=connection.region or 'us-east-1', + config=config, + ) + + @dataclass class ReplicationStats: """Statistics for replication operations - computed dynamically.""" @@ -102,8 +133,19 @@ class ReplicationManager: self._rules: Dict[str, ReplicationRule] = {} self._stats_lock = threading.Lock() self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker") + self._shutdown = False self.reload_rules() + def shutdown(self, wait: bool = True) -> None: + """Shutdown the replication executor gracefully. + + Args: + wait: If True, wait for pending tasks to complete + """ + self._shutdown = True + self._executor.shutdown(wait=wait) + logger.info("Replication manager shut down") + def reload_rules(self) -> None: if not self.rules_path.exists(): self._rules = {} @@ -124,25 +166,12 @@ class ReplicationManager: def check_endpoint_health(self, connection: RemoteConnection) -> bool: """Check if a remote endpoint is reachable and responsive. - + Returns True if endpoint is healthy, False otherwise. Uses short timeouts to prevent blocking. """ try: - config = Config( - user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 1} - ) - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - config=config, - ) + s3 = _create_s3_client(connection, health_check=True) s3.list_buckets() return True except Exception as e: @@ -184,15 +213,9 @@ class ReplicationManager: try: source_objects = self.storage.list_objects_all(bucket_name) source_keys = {obj.key: obj.size for obj in source_objects} - - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - ) - + + s3 = _create_s3_client(connection) + dest_keys = set() bytes_synced = 0 paginator = s3.get_paginator('list_objects_v2') @@ -257,13 +280,7 @@ class ReplicationManager: raise ValueError(f"Connection {connection_id} not found") try: - s3 = boto3.client( - "s3", - endpoint_url=connection.endpoint_url, - aws_access_key_id=connection.access_key, - aws_secret_access_key=connection.secret_key, - region_name=connection.region, - ) + s3 = _create_s3_client(connection) s3.create_bucket(Bucket=bucket_name) except ClientError as e: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") @@ -286,41 +303,28 @@ class ReplicationManager: self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: + if self._shutdown: + return + + # Re-check if rule is still enabled (may have been paused after task was submitted) + current_rule = self.get_rule(bucket_name) + if not current_rule or not current_rule.enabled: + logger.debug(f"Replication skipped for {bucket_name}/{object_key}: rule disabled or removed") + return + if ".." in object_key or object_key.startswith("/") or object_key.startswith("\\"): logger.error(f"Invalid object key in replication (path traversal attempt): {object_key}") return - + try: from .storage import ObjectStorage ObjectStorage._sanitize_object_key(object_key) except StorageError as e: logger.error(f"Object key validation failed in replication: {e}") return - - file_size = 0 + try: - config = Config( - user_agent_extra=REPLICATION_USER_AGENT, - connect_timeout=REPLICATION_CONNECT_TIMEOUT, - read_timeout=REPLICATION_READ_TIMEOUT, - retries={'max_attempts': 2}, - signature_version='s3v4', - s3={ - 'addressing_style': 'path', - }, - # Disable SDK automatic checksums - they cause SignatureDoesNotMatch errors - # with S3-compatible servers that don't support CRC32 checksum headers - request_checksum_calculation='when_required', - response_checksum_validation='when_required', - ) - s3 = boto3.client( - "s3", - endpoint_url=conn.endpoint_url, - aws_access_key_id=conn.access_key, - aws_secret_access_key=conn.secret_key, - region_name=conn.region or 'us-east-1', - config=config, - ) + s3 = _create_s3_client(conn) if action == "delete": try: @@ -337,34 +341,42 @@ class ReplicationManager: logger.error(f"Source object not found: {bucket_name}/{object_key}") return - # Don't replicate metadata - destination server will generate its own - # __etag__ and __size__. Replicating them causes signature mismatches when they have None/empty values. - content_type, _ = mimetypes.guess_type(path) file_size = path.stat().st_size logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, ContentType={content_type}") - def do_put_object() -> None: - """Helper to upload object. - - Reads the file content into memory first to avoid signature calculation - issues with certain binary file types (like GIFs) when streaming. - Do NOT set ContentLength explicitly - boto3 calculates it from the bytes - and setting it manually can cause SignatureDoesNotMatch errors. + def do_upload() -> None: + """Upload object using appropriate method based on file size. + + For small files (< 10 MiB): Read into memory for simpler handling + For large files: Use streaming upload to avoid memory issues """ - file_content = path.read_bytes() - put_kwargs = { - "Bucket": rule.target_bucket, - "Key": object_key, - "Body": file_content, - } + extra_args = {} if content_type: - put_kwargs["ContentType"] = content_type - s3.put_object(**put_kwargs) + extra_args["ContentType"] = content_type + + if file_size >= STREAMING_THRESHOLD_BYTES: + # Use multipart upload for large files + s3.upload_file( + str(path), + rule.target_bucket, + object_key, + ExtraArgs=extra_args if extra_args else None, + ) + else: + # Read small files into memory + file_content = path.read_bytes() + put_kwargs = { + "Bucket": rule.target_bucket, + "Key": object_key, + "Body": file_content, + **extra_args, + } + s3.put_object(**put_kwargs) try: - do_put_object() + do_upload() except (ClientError, S3UploadFailedError) as e: error_code = None if isinstance(e, ClientError): @@ -386,13 +398,13 @@ class ReplicationManager: bucket_ready = True else: logger.error(f"Failed to create target bucket {rule.target_bucket}: {bucket_err}") - raise e - + raise e + if bucket_ready: - do_put_object() + do_upload() else: raise e - + logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") self._update_last_sync(bucket_name, object_key) diff --git a/app/s3_api.py b/app/s3_api.py index d0d32d9..78b9afb 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1,13 +1,15 @@ """Flask blueprint exposing a subset of the S3 REST API.""" from __future__ import annotations +import base64 import hashlib import hmac +import logging import mimetypes import re import uuid from datetime import datetime, timedelta, timezone -from typing import Any, Dict +from typing import Any, Dict, Optional from urllib.parse import quote, urlencode, urlparse, unquote from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, ParseError @@ -20,6 +22,8 @@ from .iam import IamError, Principal from .replication import ReplicationManager from .storage import ObjectStorage, StorageError, QuotaExceededError +logger = logging.getLogger(__name__) + s3_api_bp = Blueprint("s3_api", __name__) def _storage() -> ObjectStorage: @@ -118,6 +122,9 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: if header_val is None: header_val = "" + if header.lower() == 'expect' and header_val == "": + header_val = "100-continue" + header_val = " ".join(header_val.split()) canonical_headers_parts.append(f"{header.lower()}:{header_val}\n") canonical_headers = "".join(canonical_headers_parts) @@ -128,15 +135,6 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: canonical_request = f"{method}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}" - # Debug logging for signature issues - import logging - logger = logging.getLogger(__name__) - logger.debug(f"SigV4 Debug - Method: {method}, URI: {canonical_uri}") - logger.debug(f"SigV4 Debug - Payload hash from header: {req.headers.get('X-Amz-Content-Sha256')}") - logger.debug(f"SigV4 Debug - Signed headers: {signed_headers_str}") - logger.debug(f"SigV4 Debug - Content-Type: {req.headers.get('Content-Type')}") - logger.debug(f"SigV4 Debug - Content-Length: {req.headers.get('Content-Length')}") - amz_date = req.headers.get("X-Amz-Date") or req.headers.get("Date") if not amz_date: raise IamError("Missing Date header") @@ -167,24 +165,18 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None: calculated_signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() if not hmac.compare_digest(calculated_signature, signature): - # Debug logging for signature mismatch - import logging - logger = logging.getLogger(__name__) - logger.error(f"Signature mismatch for {req.path}") - logger.error(f" Content-Type: {req.headers.get('Content-Type')}") - logger.error(f" Content-Length: {req.headers.get('Content-Length')}") - logger.error(f" X-Amz-Content-Sha256: {req.headers.get('X-Amz-Content-Sha256')}") - logger.error(f" Canonical URI: {canonical_uri}") - logger.error(f" Signed headers: {signed_headers_str}") - # Log each signed header's value - for h in signed_headers_list: - logger.error(f" Header '{h}': {repr(req.headers.get(h))}") - logger.error(f" Expected sig: {signature[:16]}...") - logger.error(f" Calculated sig: {calculated_signature[:16]}...") - # Log first part of canonical request to compare - logger.error(f" Canonical request hash: {hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()[:16]}...") - # Log the full canonical request for debugging - logger.error(f" Canonical request:\n{canonical_request[:500]}...") + # Only log detailed signature debug info if DEBUG_SIGV4 is enabled + if current_app.config.get("DEBUG_SIGV4"): + logger.warning( + "SigV4 signature mismatch", + extra={ + "path": req.path, + "method": method, + "signed_headers": signed_headers_str, + "content_type": req.headers.get("Content-Type"), + "content_length": req.headers.get("Content-Length"), + } + ) raise IamError("SignatureDoesNotMatch") return _iam().get_principal(access_key) @@ -236,6 +228,8 @@ def _verify_sigv4_query(req: Any) -> Principal | None: canonical_headers_parts = [] for header in signed_headers_list: val = req.headers.get(header, "").strip() + if header.lower() == 'expect' and val == "": + val = "100-continue" val = " ".join(val.split()) canonical_headers_parts.append(f"{header}:{val}\n") canonical_headers = "".join(canonical_headers_parts) @@ -569,6 +563,28 @@ def _strip_ns(tag: str | None) -> str: return tag.split("}")[-1] +def _find_element(parent: Element, name: str) -> Optional[Element]: + """Find a child element by name, trying both namespaced and non-namespaced variants. + + This handles XML documents that may or may not include namespace prefixes. + """ + el = parent.find(f"{{*}}{name}") + if el is None: + el = parent.find(name) + return el + + +def _find_element_text(parent: Element, name: str, default: str = "") -> str: + """Find a child element and return its text content. + + Returns the default value if element not found or has no text. + """ + el = _find_element(parent, name) + if el is None or el.text is None: + return default + return el.text.strip() + + def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]: try: root = fromstring(payload) @@ -585,17 +601,11 @@ def _parse_tagging_document(payload: bytes) -> list[dict[str, str]]: for tag_el in list(tagset): if _strip_ns(tag_el.tag) != "Tag": continue - key_el = tag_el.find("{*}Key") - if key_el is None: - key_el = tag_el.find("Key") - value_el = tag_el.find("{*}Value") - if value_el is None: - value_el = tag_el.find("Value") - key = (key_el.text or "").strip() if key_el is not None else "" + key = _find_element_text(tag_el, "Key") if not key: continue - value = value_el.text if value_el is not None else "" - tags.append({"Key": key, "Value": value or ""}) + value = _find_element_text(tag_el, "Value") + tags.append({"Key": key, "Value": value}) return tags @@ -1439,7 +1449,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response: if request.method == "DELETE": try: - storage.set_bucket_quota(bucket_name, max_size_bytes=None, max_objects=None) + storage.set_bucket_quota(bucket_name, max_bytes=None, max_objects=None) except StorageError as exc: return _error_response("NoSuchBucket", str(exc), 404) current_app.logger.info("Bucket quota deleted", extra={"bucket": bucket_name}) @@ -1473,7 +1483,7 @@ def _bucket_quota_handler(bucket_name: str) -> Response: return _error_response("InvalidArgument", f"max_objects {exc}", 400) try: - storage.set_bucket_quota(bucket_name, max_size_bytes=max_size_bytes, max_objects=max_objects) + storage.set_bucket_quota(bucket_name, max_bytes=max_size_bytes, max_objects=max_objects) except StorageError as exc: return _error_response("NoSuchBucket", str(exc), 404) @@ -1665,7 +1675,6 @@ def bucket_handler(bucket_name: str) -> Response: effective_start = "" if list_type == "2": if continuation_token: - import base64 try: effective_start = base64.urlsafe_b64decode(continuation_token.encode()).decode("utf-8") except Exception: @@ -1722,7 +1731,6 @@ def bucket_handler(bucket_name: str) -> Response: next_marker = common_prefixes[-1].rstrip(delimiter) if delimiter else common_prefixes[-1] if list_type == "2" and next_marker: - import base64 next_continuation_token = base64.urlsafe_b64encode(next_marker.encode()).decode("utf-8") if list_type == "2": diff --git a/app/storage.py b/app/storage.py index c710847..702d5bb 100644 --- a/app/storage.py +++ b/app/storage.py @@ -7,9 +7,11 @@ import os import re import shutil import stat +import threading import time import unicodedata import uuid +from collections import OrderedDict from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone @@ -129,12 +131,17 @@ class ObjectStorage: MULTIPART_MANIFEST = "manifest.json" BUCKET_CONFIG_FILE = ".bucket.json" KEY_INDEX_CACHE_TTL = 30 + OBJECT_CACHE_MAX_SIZE = 100 # Maximum number of buckets to cache def __init__(self, root: Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self._ensure_system_roots() - self._object_cache: Dict[str, tuple[Dict[str, ObjectMeta], float]] = {} + # LRU cache for object metadata with thread-safe access + self._object_cache: OrderedDict[str, tuple[Dict[str, ObjectMeta], float]] = OrderedDict() + self._cache_lock = threading.Lock() + # Cache version counter for detecting stale reads + self._cache_version: Dict[str, int] = {} def list_buckets(self) -> List[BucketMeta]: buckets: List[BucketMeta] = [] @@ -729,8 +736,6 @@ class ObjectStorage: bucket_id = bucket_path.name safe_key = self._sanitize_object_key(object_key) version_dir = self._version_dir(bucket_id, safe_key) - if not version_dir.exists(): - version_dir = self._legacy_version_dir(bucket_id, safe_key) if not version_dir.exists(): version_dir = self._legacy_version_dir(bucket_id, safe_key) if not version_dir.exists(): @@ -879,41 +884,73 @@ class ObjectStorage: part_number: int, stream: BinaryIO, ) -> str: + """Upload a part for a multipart upload. + + Uses file locking to safely update the manifest and handle concurrent uploads. + """ if part_number < 1: raise StorageError("part_number must be >= 1") bucket_path = self._bucket_path(bucket_name) - + upload_root = self._multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): upload_root = self._legacy_multipart_dir(bucket_path.name, upload_id) if not upload_root.exists(): raise StorageError("Multipart upload not found") - + + # Write part to temporary file first, then rename atomically checksum = hashlib.md5() part_filename = f"part-{part_number:05d}.part" part_path = upload_root / part_filename - with part_path.open("wb") as target: - shutil.copyfileobj(_HashingReader(stream, checksum), target) + temp_path = upload_root / f".{part_filename}.tmp" + + try: + with temp_path.open("wb") as target: + shutil.copyfileobj(_HashingReader(stream, checksum), target) + + # Atomic rename (or replace on Windows) + temp_path.replace(part_path) + except OSError: + # Clean up temp file on failure + try: + temp_path.unlink(missing_ok=True) + except OSError: + pass + raise + record = { "etag": checksum.hexdigest(), "size": part_path.stat().st_size, "filename": part_filename, } - + manifest_path = upload_root / self.MULTIPART_MANIFEST lock_path = upload_root / ".manifest.lock" - - with lock_path.open("w") as lock_file: - with _file_lock(lock_file): - try: - manifest = json.loads(manifest_path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError) as exc: - raise StorageError("Multipart manifest unreadable") from exc - - parts = manifest.setdefault("parts", {}) - parts[str(part_number)] = record - manifest_path.write_text(json.dumps(manifest), encoding="utf-8") - + + # Retry loop for handling transient lock/read failures + max_retries = 3 + for attempt in range(max_retries): + try: + with lock_path.open("w") as lock_file: + with _file_lock(lock_file): + try: + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + if attempt < max_retries - 1: + time.sleep(0.1 * (attempt + 1)) + continue + raise StorageError("Multipart manifest unreadable") from exc + + parts = manifest.setdefault("parts", {}) + parts[str(part_number)] = record + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + break + except OSError as exc: + if attempt < max_retries - 1: + time.sleep(0.1 * (attempt + 1)) + continue + raise StorageError(f"Failed to update multipart manifest: {exc}") from exc + return record["etag"] def complete_multipart_upload( @@ -1015,9 +1052,10 @@ class ObjectStorage: pass shutil.rmtree(upload_root, ignore_errors=True) - + self._invalidate_bucket_stats_cache(bucket_id) - + self._invalidate_object_cache(bucket_id) + stat = destination.stat() return ObjectMeta( key=safe_key.as_posix(), @@ -1264,22 +1302,52 @@ class ObjectStorage: return objects def _get_object_cache(self, bucket_id: str, bucket_path: Path) -> Dict[str, ObjectMeta]: - """Get cached object metadata for a bucket, refreshing if stale.""" + """Get cached object metadata for a bucket, refreshing if stale. + + Uses LRU eviction to prevent unbounded cache growth. + Thread-safe with version tracking to detect concurrent invalidations. + """ now = time.time() - cached = self._object_cache.get(bucket_id) - - if cached: - objects, timestamp = cached - if now - timestamp < self.KEY_INDEX_CACHE_TTL: - return objects - + + with self._cache_lock: + cached = self._object_cache.get(bucket_id) + cache_version = self._cache_version.get(bucket_id, 0) + + if cached: + objects, timestamp = cached + if now - timestamp < self.KEY_INDEX_CACHE_TTL: + # Move to end (most recently used) + self._object_cache.move_to_end(bucket_id) + return objects + + # Build cache outside lock to avoid holding lock during I/O objects = self._build_object_cache(bucket_path) - self._object_cache[bucket_id] = (objects, now) + + with self._cache_lock: + # Check if cache was invalidated while we were building + current_version = self._cache_version.get(bucket_id, 0) + if current_version != cache_version: + # Cache was invalidated, rebuild + objects = self._build_object_cache(bucket_path) + + # Evict oldest entries if cache is full + while len(self._object_cache) >= self.OBJECT_CACHE_MAX_SIZE: + self._object_cache.popitem(last=False) + + self._object_cache[bucket_id] = (objects, time.time()) + self._object_cache.move_to_end(bucket_id) + return objects def _invalidate_object_cache(self, bucket_id: str) -> None: - """Invalidate the object cache and etag index for a bucket.""" - self._object_cache.pop(bucket_id, None) + """Invalidate the object cache and etag index for a bucket. + + Increments version counter to signal stale reads. + """ + with self._cache_lock: + self._object_cache.pop(bucket_id, None) + self._cache_version[bucket_id] = self._cache_version.get(bucket_id, 0) + 1 + etag_index_path = self._system_bucket_root(bucket_id) / "etag_index.json" try: etag_index_path.unlink(missing_ok=True) diff --git a/app/ui.py b/app/ui.py index 4baf5c3..3cc6c23 100644 --- a/app/ui.py +++ b/app/ui.py @@ -415,7 +415,7 @@ def list_bucket_objects(bucket_name: str): except IamError as exc: return jsonify({"error": str(exc)}), 403 - max_keys = min(int(request.args.get("max_keys", 1000)), 10000) + max_keys = min(int(request.args.get("max_keys", 1000)), 100000) continuation_token = request.args.get("continuation_token") or None prefix = request.args.get("prefix") or None @@ -434,6 +434,14 @@ def list_bucket_objects(bucket_name: str): except StorageError: versioning_enabled = False + # Pre-compute URL templates once (not per-object) for performance + # Frontend will construct actual URLs by replacing KEY_PLACEHOLDER + preview_template = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + delete_template = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") + restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") + objects_data = [] for obj in result.objects: objects_data.append({ @@ -442,13 +450,6 @@ def list_bucket_objects(bucket_name: str): "last_modified": obj.last_modified.isoformat(), "last_modified_display": obj.last_modified.strftime("%b %d, %Y %H:%M"), "etag": obj.etag, - "metadata": obj.metadata or {}, - "preview_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key), - "download_url": url_for("ui.object_preview", bucket_name=bucket_name, object_key=obj.key) + "?download=1", - "presign_endpoint": url_for("ui.object_presign", bucket_name=bucket_name, object_key=obj.key), - "delete_endpoint": url_for("ui.delete_object", bucket_name=bucket_name, object_key=obj.key), - "versions_endpoint": url_for("ui.object_versions", bucket_name=bucket_name, object_key=obj.key), - "restore_template": url_for("ui.restore_object_version", bucket_name=bucket_name, object_key=obj.key, version_id="VERSION_ID_PLACEHOLDER"), }) return jsonify({ @@ -457,6 +458,14 @@ def list_bucket_objects(bucket_name: str): "next_continuation_token": result.next_continuation_token, "total_count": result.total_count, "versioning_enabled": versioning_enabled, + "url_templates": { + "preview": preview_template, + "download": preview_template + "?download=1", + "presign": presign_template, + "delete": delete_template, + "versions": versions_template, + "restore": restore_template, + }, }) @@ -1458,11 +1467,17 @@ def update_bucket_replication(bucket_name: str): else: flash("No replication configuration to pause", "warning") elif action == "resume": + from .replication import REPLICATION_MODE_ALL rule = _replication().get_rule(bucket_name) if rule: rule.enabled = True _replication().set_rule(rule) - flash("Replication resumed", "success") + # When resuming, sync any pending objects that accumulated while paused + if rule.mode == REPLICATION_MODE_ALL: + _replication().replicate_existing_objects(bucket_name) + flash("Replication resumed. Syncing pending objects in background.", "success") + else: + flash("Replication resumed", "success") else: flash("No replication configuration to resume", "warning") elif action == "create": diff --git a/app/version.py b/app/version.py index ea75040..445cb63 100644 --- a/app/version.py +++ b/app/version.py @@ -1,7 +1,7 @@ """Central location for the application version string.""" from __future__ import annotations -APP_VERSION = "0.1.8" +APP_VERSION = "0.1.9" def get_version() -> str: diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index daa8f45..f9832e1 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -173,14 +173,16 @@
Common operations using boto3.
- -import boto3
+ Common operations using popular SDKs and tools.
+
+ Python (boto3)
+import boto3
+
+s3 = boto3.client(
+ 's3',
+ endpoint_url='{{ api_base }}',
+ aws_access_key_id='<access_key>',
+ aws_secret_access_key='<secret_key>'
+)
+
+# List buckets
+buckets = s3.list_buckets()['Buckets']
+
+# Create bucket
+s3.create_bucket(Bucket='mybucket')
+
+# Upload file
+s3.upload_file('local.txt', 'mybucket', 'remote.txt')
+
+# Download file
+s3.download_file('mybucket', 'remote.txt', 'downloaded.txt')
+
+# Generate presigned URL (valid 1 hour)
+url = s3.generate_presigned_url(
+ 'get_object',
+ Params={'Bucket': 'mybucket', 'Key': 'remote.txt'},
+ ExpiresIn=3600
+)
+
+ JavaScript (AWS SDK v3)
+import { S3Client, ListBucketsCommand, PutObjectCommand } from '@aws-sdk/client-s3';
+
+const s3 = new S3Client({
+ endpoint: '{{ api_base }}',
+ region: 'us-east-1',
+ credentials: {
+ accessKeyId: '<access_key>',
+ secretAccessKey: '<secret_key>'
+ },
+ forcePathStyle: true // Required for S3-compatible services
+});
+
+// List buckets
+const { Buckets } = await s3.send(new ListBucketsCommand({}));
+
+// Upload object
+await s3.send(new PutObjectCommand({
+ Bucket: 'mybucket',
+ Key: 'hello.txt',
+ Body: 'Hello, World!'
+}));
+
+ Multipart Upload (Python)
+import boto3
s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
@@ -418,9 +470,9 @@ s3 = boto3.client('s3', endpoint_url='{{ api_base }}')
response = s3.create_multipart_upload(Bucket='mybucket', Key='large.bin')
upload_id = response['UploadId']
-# Upload parts
+# Upload parts (minimum 5MB each, except last part)
parts = []
-chunks = [b'chunk1', b'chunk2'] # Example data chunks
+chunks = [b'chunk1...', b'chunk2...']
for part_number, chunk in enumerate(chunks, start=1):
response = s3.upload_part(
Bucket='mybucket',
@@ -438,6 +490,19 @@ s3.complete_multipart_upload(
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
+
+ Presigned URLs for Sharing
+# Generate a download link valid for 15 minutes
+curl -X POST "{{ api_base }}/presign/mybucket/photo.jpg" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"method": "GET", "expires_in": 900}'
+
+# Generate an upload link (PUT) valid for 1 hour
+curl -X POST "{{ api_base }}/presign/mybucket/upload.bin" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"method": "PUT", "expires_in": 3600}'
@@ -487,6 +552,86 @@ s3.complete_multipart_upload(
+
+
+
+ 09
+ Object Versioning
+
+ Keep multiple versions of objects to protect against accidental deletions and overwrites. Restore previous versions at any time.
+
+ Enabling Versioning
+
+ - Navigate to your bucket's Properties tab.
+ - Find the Versioning card and click Enable.
+ - All subsequent uploads will create new versions instead of overwriting.
+
+
+ Version Operations
+
+
+
+
+ Operation
+ Description
+
+
+
+
+ View Versions
+ Click the version icon on any object to see all historical versions with timestamps and sizes.
+
+
+ Restore Version
+ Click Restore on any version to make it the current version (creates a copy).
+
+
+ Delete Current
+ Deleting an object archives it. Previous versions remain accessible.
+
+
+ Purge All
+ Permanently delete an object and all its versions. This cannot be undone.
+
+
+
+
+
+ Archived Objects
+ When you delete a versioned object, it becomes "archived" - the current version is removed but historical versions remain. The Archived tab shows these objects so you can restore them.
+
+ API Usage
+# Enable versioning
+curl -X PUT "{{ api_base }}/<bucket>?versioning" \
+ -H "Content-Type: application/json" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>" \
+ -d '{"Status": "Enabled"}'
+
+# Get versioning status
+curl "{{ api_base }}/<bucket>?versioning" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# List object versions
+curl "{{ api_base }}/<bucket>?versions" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+# Get specific version
+curl "{{ api_base }}/<bucket>/<key>?versionId=<version-id>" \
+ -H "X-Access-Key: <key>" -H "X-Secret-Key: <secret>"
+
+
+
+
+
+
+
+
+ Storage Impact: Each version consumes storage. Enable quotas to limit total bucket size including all versions.
+
+
+
+
+
@@ -709,6 +854,7 @@ curl -X DELETE "{{ api_base }}/kms/keys/{key-id}?waiting_period_days=30" \
REST endpoints
API Examples
Site Replication
+ Object Versioning
Bucket Quotas
Encryption
Troubleshooting
diff --git a/tests/test_ui_pagination.py b/tests/test_ui_pagination.py
index 5e42d60..683f527 100644
--- a/tests/test_ui_pagination.py
+++ b/tests/test_ui_pagination.py
@@ -150,16 +150,21 @@ class TestPaginatedObjectListing:
assert len(data["objects"]) == 1
obj = data["objects"][0]
-
+
# Check all expected fields
assert obj["key"] == "test.txt"
assert obj["size"] == 12 # len("test content")
assert "last_modified" in obj
assert "last_modified_display" in obj
assert "etag" in obj
- assert "preview_url" in obj
- assert "download_url" in obj
- assert "delete_endpoint" in obj
+
+ # URLs are now returned as templates (not per-object) for performance
+ assert "url_templates" in data
+ templates = data["url_templates"]
+ assert "preview" in templates
+ assert "download" in templates
+ assert "delete" in templates
+ assert "KEY_PLACEHOLDER" in templates["preview"]
def test_bucket_detail_page_loads_without_objects(self, tmp_path):
"""Bucket detail page should load even with many objects."""