diff --git a/app/replication.py b/app/replication.py index 994bd7b..5f6d19f 100644 --- a/app/replication.py +++ b/app/replication.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging +import mimetypes import threading from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass @@ -10,9 +11,10 @@ from typing import Dict, Optional import boto3 from botocore.exceptions import ClientError +from boto3.exceptions import S3UploadFailedError from .connections import ConnectionStore, RemoteConnection -from .storage import ObjectStorage +from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) @@ -116,21 +118,73 @@ class ReplicationManager: # We need the file content. # Since ObjectStorage is filesystem based, let's get the stream. # We need to be careful about closing it. - meta = self.storage.get_object_meta(bucket_name, object_key) - if not meta: + try: + path = self.storage.get_object_path(bucket_name, object_key) + except StorageError: return - with self.storage.open_object(bucket_name, object_key) as f: - extra_args = {} - if meta.metadata: - extra_args["Metadata"] = meta.metadata - - s3.upload_fileobj( - f, - rule.target_bucket, - object_key, - ExtraArgs=extra_args - ) + metadata = self.storage.get_object_metadata(bucket_name, object_key) + + extra_args = {} + if metadata: + extra_args["Metadata"] = metadata + + # Guess content type to prevent corruption/wrong handling + content_type, _ = mimetypes.guess_type(path) + file_size = path.stat().st_size + + # Debug: Calculate MD5 of source file + import hashlib + md5_hash = hashlib.md5() + with path.open("rb") as f: + # Log first 32 bytes + header = f.read(32) + logger.info(f"Source first 32 bytes: {header.hex()}") + md5_hash.update(header) + for chunk in iter(lambda: f.read(4096), b""): + md5_hash.update(chunk) + source_md5 = md5_hash.hexdigest() + logger.info(f"Replicating {bucket_name}/{object_key}: Size={file_size}, MD5={source_md5}, ContentType={content_type}") + + try: + with path.open("rb") as f: + s3.put_object( + Bucket=rule.target_bucket, + Key=object_key, + Body=f, + ContentLength=file_size, + ContentType=content_type or "application/octet-stream", + Metadata=metadata or {} + ) + except (ClientError, S3UploadFailedError) as e: + # Check if it's a NoSuchBucket error (either direct or wrapped) + is_no_bucket = False + if isinstance(e, ClientError): + if e.response['Error']['Code'] == 'NoSuchBucket': + is_no_bucket = True + elif isinstance(e, S3UploadFailedError): + if "NoSuchBucket" in str(e): + is_no_bucket = True + + if is_no_bucket: + logger.info(f"Target bucket {rule.target_bucket} not found. Attempting to create it.") + try: + s3.create_bucket(Bucket=rule.target_bucket) + # Retry upload + with path.open("rb") as f: + s3.put_object( + Bucket=rule.target_bucket, + Key=object_key, + Body=f, + ContentLength=file_size, + ContentType=content_type or "application/octet-stream", + Metadata=metadata or {} + ) + except Exception as create_err: + logger.error(f"Failed to create target bucket {rule.target_bucket}: {create_err}") + raise e # Raise original error + else: + raise e logger.info(f"Replicated {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") diff --git a/app/s3_api.py b/app/s3_api.py index f9d496f..86d4e11 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -1078,7 +1078,13 @@ def object_handler(bucket_name: str, object_key: str): _, error = _object_principal("write", bucket_name, object_key) if error: return error - stream = request.stream + + # Debug: Log incoming request details + current_app.logger.info(f"Receiving PUT {bucket_name}/{object_key}") + current_app.logger.info(f"Headers: {dict(request.headers)}") + current_app.logger.info(f"Content-Length: {request.content_length}") + + stream = DebugStream(request.stream, current_app.logger) metadata = _extract_request_metadata() try: meta = storage.put_object( @@ -1252,3 +1258,19 @@ def head_object(bucket_name: str, object_key: str) -> Response: return _error_response("NoSuchKey", "Object not found", 404) except IamError as exc: return _error_response("AccessDenied", str(exc), 403) + + +class DebugStream: + def __init__(self, stream, logger): + self.stream = stream + self.logger = logger + self.first_chunk = True + + def read(self, size=-1): + chunk = self.stream.read(size) + if self.first_chunk and chunk: + # Log first 32 bytes + prefix = chunk[:32] + self.logger.info(f"Received first 32 bytes: {prefix.hex()}") + self.first_chunk = False + return chunk diff --git a/app/ui.py b/app/ui.py index fbf48a3..3b2517f 100644 --- a/app/ui.py +++ b/app/ui.py @@ -6,7 +6,9 @@ import uuid from typing import Any from urllib.parse import urlparse +import boto3 import requests +from botocore.exceptions import ClientError from flask import ( Blueprint, Response, @@ -1070,6 +1072,73 @@ def create_connection(): return redirect(url_for("ui.connections_dashboard")) +@ui_bp.post("/connections/test") +def test_connection(): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + return jsonify({"status": "error", "message": "Access denied"}), 403 + + data = request.get_json(silent=True) or request.form + endpoint = data.get("endpoint_url", "").strip() + access_key = data.get("access_key", "").strip() + secret_key = data.get("secret_key", "").strip() + region = data.get("region", "us-east-1").strip() + + if not all([endpoint, access_key, secret_key]): + return jsonify({"status": "error", "message": "Missing credentials"}), 400 + + try: + s3 = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + ) + # Try to list buckets to verify credentials and endpoint + s3.list_buckets() + return jsonify({"status": "ok", "message": "Connection successful"}) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 400 + + +@ui_bp.post("/connections//update") +def update_connection(connection_id: str): + principal = _current_principal() + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + flash("Access denied", "danger") + return redirect(url_for("ui.buckets_overview")) + + conn = _connections().get(connection_id) + if not conn: + flash("Connection not found", "danger") + return redirect(url_for("ui.connections_dashboard")) + + name = request.form.get("name", "").strip() + endpoint = request.form.get("endpoint_url", "").strip() + access_key = request.form.get("access_key", "").strip() + secret_key = request.form.get("secret_key", "").strip() + region = request.form.get("region", "us-east-1").strip() + + if not all([name, endpoint, access_key, secret_key]): + flash("All fields are required", "danger") + return redirect(url_for("ui.connections_dashboard")) + + conn.name = name + conn.endpoint_url = endpoint + conn.access_key = access_key + conn.secret_key = secret_key + conn.region = region + + _connections().save() + flash(f"Connection '{name}' updated", "success") + return redirect(url_for("ui.connections_dashboard")) + + @ui_bp.post("/connections//delete") def delete_connection(connection_id: str): principal = _current_principal() @@ -1105,16 +1174,6 @@ def update_bucket_replication(bucket_name: str): if not target_conn_id or not target_bucket: flash("Target connection and bucket are required", "danger") else: - # Check if user wants to create the remote bucket - create_remote = request.form.get("create_remote_bucket") == "on" - if create_remote: - try: - _replication().create_remote_bucket(target_conn_id, target_bucket) - flash(f"Created remote bucket '{target_bucket}'", "success") - except Exception as e: - flash(f"Failed to create remote bucket: {e}", "warning") - # We continue to set the rule even if creation fails (maybe it exists?) - rule = ReplicationRule( bucket_name=bucket_name, target_connection_id=target_conn_id, diff --git a/templates/bucket_detail.html b/templates/bucket_detail.html index df6604b..7a2a2a1 100644 --- a/templates/bucket_detail.html +++ b/templates/bucket_detail.html @@ -408,11 +408,9 @@ -
- - - -
+ {% else %}

Replication allows you to automatically copy new objects from this bucket to a bucket in another S3-compatible service.

@@ -436,12 +434,7 @@
-
- - -
+
If the target bucket does not exist, it will be created automatically.
@@ -715,6 +708,30 @@ + + + {% endblock %} {% block extra_scripts %} diff --git a/templates/connections.html b/templates/connections.html index 93e7e1e..3a1e603 100644 --- a/templates/connections.html +++ b/templates/connections.html @@ -12,12 +12,12 @@
-
-
+
+
Add New Connection
-
+
@@ -37,44 +37,69 @@
- +
+ + +
- +
+ + +
+
-
-
+
+
Existing Connections
{% if connections %}
- +
- + {% for conn in connections %} - - - - - + + + + {% endfor %} @@ -82,10 +107,164 @@
Name Endpoint Region Access KeyActionsActions
{{ conn.name }}{{ conn.endpoint_url }}{{ conn.region }}{{ conn.access_key }} -
- - -
+
{{ conn.name }}{{ conn.endpoint_url }}{{ conn.region }}{{ conn.access_key }} +
+ + +
{% else %} -

No remote connections configured.

+
+ + + + +

No remote connections configured.

+
{% endif %}
+ + + + + + + + {% endblock %}