diff --git a/app/__init__.py b/app/__init__.py index 17da538..cf75a06 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -12,6 +12,7 @@ from typing import Any, Dict, Optional from flask import Flask, g, has_request_context, redirect, render_template, request, url_for from flask_cors import CORS from flask_wtf.csrf import CSRFError +from werkzeug.middleware.proxy_fix import ProxyFix from .bucket_policies import BucketPolicyStore from .config import AppConfig @@ -47,6 +48,9 @@ def create_app( if app.config.get("TESTING"): app.config.setdefault("WTF_CSRF_ENABLED", False) + # Trust X-Forwarded-* headers from proxies + app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) + _configure_cors(app) _configure_logging(app) diff --git a/app/config.py b/app/config.py index 3bfd14f..282b3a9 100644 --- a/app/config.py +++ b/app/config.py @@ -39,7 +39,7 @@ class AppConfig: secret_key: str iam_config_path: Path bucket_policy_path: Path - api_base_url: str + api_base_url: Optional[str] aws_region: str aws_service: str ui_enforce_bucket_policies: bool @@ -100,7 +100,10 @@ class AppConfig: bucket_policy_path, legacy_path=None if bucket_policy_override else PROJECT_ROOT / "data" / "bucket_policies.json", ) - api_base_url = str(_get("API_BASE_URL", "http://127.0.0.1:5000")) + api_base_url = _get("API_BASE_URL", None) + if api_base_url: + api_base_url = str(api_base_url) + aws_region = str(_get("AWS_REGION", "us-east-1")) aws_service = str(_get("AWS_SERVICE", "s3")) enforce_ui_policies = str(_get("UI_ENFORCE_BUCKET_POLICIES", "0")).lower() in {"1", "true", "yes", "on"} diff --git a/app/replication.py b/app/replication.py index d6313b5..c604727 100644 --- a/app/replication.py +++ b/app/replication.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import Dict, Optional import boto3 +from botocore.config import Config from botocore.exceptions import ClientError from boto3.exceptions import S3UploadFailedError @@ -18,6 +19,8 @@ from .storage import ObjectStorage, StorageError logger = logging.getLogger(__name__) +REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0" + @dataclass class ReplicationRule: @@ -87,7 +90,7 @@ class ReplicationManager: logger.error(f"Failed to create remote bucket {bucket_name}: {e}") raise - def trigger_replication(self, bucket_name: str, object_key: str) -> None: + def trigger_replication(self, bucket_name: str, object_key: str, action: str = "write") -> None: rule = self.get_rule(bucket_name) if not rule or not rule.enabled: return @@ -97,24 +100,34 @@ class ReplicationManager: logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found") return - self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection) + self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action) - def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection) -> None: + def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None: try: - # 1. Get local file path - # Note: We are accessing internal storage structure here. - # Ideally storage.py should expose a 'get_file_path' or we read the stream. - # For efficiency, we'll try to read the file directly if we can, or use storage.get_object - # Using boto3 to upload + config = Config(user_agent_extra=REPLICATION_USER_AGENT) s3 = boto3.client( "s3", endpoint_url=conn.endpoint_url, aws_access_key_id=conn.access_key, aws_secret_access_key=conn.secret_key, region_name=conn.region, + config=config, ) + if action == "delete": + try: + s3.delete_object(Bucket=rule.target_bucket, Key=object_key) + logger.info(f"Replicated DELETE {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})") + except ClientError as e: + logger.error(f"Replication DELETE failed for {bucket_name}/{object_key}: {e}") + return + + # 1. Get local file path + # Note: We are accessing internal storage structure here. + # Ideally storage.py should expose a 'get_file_path' or we read the stream. + # For efficiency, we'll try to read the file directly if we can, or use storage.get_object + # We need the file content. # Since ObjectStorage is filesystem based, let's get the stream. # We need to be careful about closing it. diff --git a/app/s3_api.py b/app/s3_api.py index a0e51b0..950990a 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -17,6 +17,7 @@ from werkzeug.http import http_date from .bucket_policies import BucketPolicyStore from .extensions import limiter from .iam import IamError, Principal +from .replication import ReplicationManager from .storage import ObjectStorage, StorageError s3_api_bp = Blueprint("s3_api", __name__) @@ -31,6 +32,9 @@ def _iam(): return current_app.extensions["iam"] +def _replication_manager() -> ReplicationManager: + return current_app.extensions["replication"] + def _bucket_policies() -> BucketPolicyStore: store: BucketPolicyStore = current_app.extensions["bucket_policies"] @@ -1107,6 +1111,12 @@ def object_handler(bucket_name: str, object_key: str): ) response = Response(status=200) response.headers["ETag"] = f'"{meta.etag}"' + + # Trigger replication if not a replication request + user_agent = request.headers.get("User-Agent", "") + if "S3ReplicationAgent" not in user_agent: + _replication_manager().trigger_replication(bucket_name, object_key, action="write") + return response if request.method in {"GET", "HEAD"}: @@ -1141,6 +1151,12 @@ def object_handler(bucket_name: str, object_key: str): return error storage.delete_object(bucket_name, object_key) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) + + # Trigger replication if not a replication request + user_agent = request.headers.get("User-Agent", "") + if "S3ReplicationAgent" not in user_agent: + _replication_manager().trigger_replication(bucket_name, object_key, action="delete") + return Response(status=204) diff --git a/app/ui.py b/app/ui.py index 3b2517f..5f7a1f3 100644 --- a/app/ui.py +++ b/app/ui.py @@ -703,10 +703,21 @@ def object_presign(bucket_name: str, object_key: str): _authorize_ui(principal, bucket_name, action, object_key=object_key) except IamError as exc: return jsonify({"error": str(exc)}), 403 - api_base = current_app.config["API_BASE_URL"].rstrip("/") + + api_base = current_app.config.get("API_BASE_URL") + if not api_base: + api_base = "http://127.0.0.1:5000" + api_base = api_base.rstrip("/") + url = f"{api_base}/presign/{bucket_name}/{object_key}" + + headers = _api_headers() + # Forward the host so the API knows the public URL + headers["X-Forwarded-Host"] = request.host + headers["X-Forwarded-Proto"] = request.scheme + try: - response = requests.post(url, headers=_api_headers(), json=payload, timeout=5) + response = requests.post(url, headers=headers, json=payload, timeout=5) except requests.RequestException as exc: return jsonify({"error": f"API unavailable: {exc}"}), 502 try: