Enhance replication functionalilty

This commit is contained in:
2025-11-22 14:32:28 +08:00
parent a32d9dbd77
commit 8c00d7bd4b
5 changed files with 59 additions and 12 deletions

View File

@@ -12,6 +12,7 @@ from typing import Any, Dict, Optional
from flask import Flask, g, has_request_context, redirect, render_template, request, url_for from flask import Flask, g, has_request_context, redirect, render_template, request, url_for
from flask_cors import CORS from flask_cors import CORS
from flask_wtf.csrf import CSRFError from flask_wtf.csrf import CSRFError
from werkzeug.middleware.proxy_fix import ProxyFix
from .bucket_policies import BucketPolicyStore from .bucket_policies import BucketPolicyStore
from .config import AppConfig from .config import AppConfig
@@ -47,6 +48,9 @@ def create_app(
if app.config.get("TESTING"): if app.config.get("TESTING"):
app.config.setdefault("WTF_CSRF_ENABLED", False) app.config.setdefault("WTF_CSRF_ENABLED", False)
# Trust X-Forwarded-* headers from proxies
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
_configure_cors(app) _configure_cors(app)
_configure_logging(app) _configure_logging(app)

View File

@@ -39,7 +39,7 @@ class AppConfig:
secret_key: str secret_key: str
iam_config_path: Path iam_config_path: Path
bucket_policy_path: Path bucket_policy_path: Path
api_base_url: str api_base_url: Optional[str]
aws_region: str aws_region: str
aws_service: str aws_service: str
ui_enforce_bucket_policies: bool ui_enforce_bucket_policies: bool
@@ -100,7 +100,10 @@ class AppConfig:
bucket_policy_path, bucket_policy_path,
legacy_path=None if bucket_policy_override else PROJECT_ROOT / "data" / "bucket_policies.json", legacy_path=None if bucket_policy_override else PROJECT_ROOT / "data" / "bucket_policies.json",
) )
api_base_url = str(_get("API_BASE_URL", "http://127.0.0.1:5000")) api_base_url = _get("API_BASE_URL", None)
if api_base_url:
api_base_url = str(api_base_url)
aws_region = str(_get("AWS_REGION", "us-east-1")) aws_region = str(_get("AWS_REGION", "us-east-1"))
aws_service = str(_get("AWS_SERVICE", "s3")) aws_service = str(_get("AWS_SERVICE", "s3"))
enforce_ui_policies = str(_get("UI_ENFORCE_BUCKET_POLICIES", "0")).lower() in {"1", "true", "yes", "on"} enforce_ui_policies = str(_get("UI_ENFORCE_BUCKET_POLICIES", "0")).lower() in {"1", "true", "yes", "on"}

View File

@@ -10,6 +10,7 @@ from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
import boto3 import boto3
from botocore.config import Config
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from boto3.exceptions import S3UploadFailedError from boto3.exceptions import S3UploadFailedError
@@ -18,6 +19,8 @@ from .storage import ObjectStorage, StorageError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
@dataclass @dataclass
class ReplicationRule: class ReplicationRule:
@@ -87,7 +90,7 @@ class ReplicationManager:
logger.error(f"Failed to create remote bucket {bucket_name}: {e}") logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
raise raise
def trigger_replication(self, bucket_name: str, object_key: str) -> None: def trigger_replication(self, bucket_name: str, object_key: str, action: str = "write") -> None:
rule = self.get_rule(bucket_name) rule = self.get_rule(bucket_name)
if not rule or not rule.enabled: if not rule or not rule.enabled:
return return
@@ -97,24 +100,34 @@ class ReplicationManager:
logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found") logger.warning(f"Replication skipped for {bucket_name}/{object_key}: Connection {rule.target_connection_id} not found")
return return
self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection) self._executor.submit(self._replicate_task, bucket_name, object_key, rule, connection, action)
def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection) -> None: def _replicate_task(self, bucket_name: str, object_key: str, rule: ReplicationRule, conn: RemoteConnection, action: str) -> None:
try: try:
# 1. Get local file path
# Note: We are accessing internal storage structure here.
# Ideally storage.py should expose a 'get_file_path' or we read the stream.
# For efficiency, we'll try to read the file directly if we can, or use storage.get_object
# Using boto3 to upload # Using boto3 to upload
config = Config(user_agent_extra=REPLICATION_USER_AGENT)
s3 = boto3.client( s3 = boto3.client(
"s3", "s3",
endpoint_url=conn.endpoint_url, endpoint_url=conn.endpoint_url,
aws_access_key_id=conn.access_key, aws_access_key_id=conn.access_key,
aws_secret_access_key=conn.secret_key, aws_secret_access_key=conn.secret_key,
region_name=conn.region, region_name=conn.region,
config=config,
) )
if action == "delete":
try:
s3.delete_object(Bucket=rule.target_bucket, Key=object_key)
logger.info(f"Replicated DELETE {bucket_name}/{object_key} to {conn.name} ({rule.target_bucket})")
except ClientError as e:
logger.error(f"Replication DELETE failed for {bucket_name}/{object_key}: {e}")
return
# 1. Get local file path
# Note: We are accessing internal storage structure here.
# Ideally storage.py should expose a 'get_file_path' or we read the stream.
# For efficiency, we'll try to read the file directly if we can, or use storage.get_object
# We need the file content. # We need the file content.
# Since ObjectStorage is filesystem based, let's get the stream. # Since ObjectStorage is filesystem based, let's get the stream.
# We need to be careful about closing it. # We need to be careful about closing it.

View File

@@ -17,6 +17,7 @@ from werkzeug.http import http_date
from .bucket_policies import BucketPolicyStore from .bucket_policies import BucketPolicyStore
from .extensions import limiter from .extensions import limiter
from .iam import IamError, Principal from .iam import IamError, Principal
from .replication import ReplicationManager
from .storage import ObjectStorage, StorageError from .storage import ObjectStorage, StorageError
s3_api_bp = Blueprint("s3_api", __name__) s3_api_bp = Blueprint("s3_api", __name__)
@@ -31,6 +32,9 @@ def _iam():
return current_app.extensions["iam"] return current_app.extensions["iam"]
def _replication_manager() -> ReplicationManager:
return current_app.extensions["replication"]
def _bucket_policies() -> BucketPolicyStore: def _bucket_policies() -> BucketPolicyStore:
store: BucketPolicyStore = current_app.extensions["bucket_policies"] store: BucketPolicyStore = current_app.extensions["bucket_policies"]
@@ -1107,6 +1111,12 @@ def object_handler(bucket_name: str, object_key: str):
) )
response = Response(status=200) response = Response(status=200)
response.headers["ETag"] = f'"{meta.etag}"' response.headers["ETag"] = f'"{meta.etag}"'
# Trigger replication if not a replication request
user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent:
_replication_manager().trigger_replication(bucket_name, object_key, action="write")
return response return response
if request.method in {"GET", "HEAD"}: if request.method in {"GET", "HEAD"}:
@@ -1141,6 +1151,12 @@ def object_handler(bucket_name: str, object_key: str):
return error return error
storage.delete_object(bucket_name, object_key) storage.delete_object(bucket_name, object_key)
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
# Trigger replication if not a replication request
user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent:
_replication_manager().trigger_replication(bucket_name, object_key, action="delete")
return Response(status=204) return Response(status=204)

View File

@@ -703,10 +703,21 @@ def object_presign(bucket_name: str, object_key: str):
_authorize_ui(principal, bucket_name, action, object_key=object_key) _authorize_ui(principal, bucket_name, action, object_key=object_key)
except IamError as exc: except IamError as exc:
return jsonify({"error": str(exc)}), 403 return jsonify({"error": str(exc)}), 403
api_base = current_app.config["API_BASE_URL"].rstrip("/")
api_base = current_app.config.get("API_BASE_URL")
if not api_base:
api_base = "http://127.0.0.1:5000"
api_base = api_base.rstrip("/")
url = f"{api_base}/presign/{bucket_name}/{object_key}" url = f"{api_base}/presign/{bucket_name}/{object_key}"
headers = _api_headers()
# Forward the host so the API knows the public URL
headers["X-Forwarded-Host"] = request.host
headers["X-Forwarded-Proto"] = request.scheme
try: try:
response = requests.post(url, headers=_api_headers(), json=payload, timeout=5) response = requests.post(url, headers=headers, json=payload, timeout=5)
except requests.RequestException as exc: except requests.RequestException as exc:
return jsonify({"error": f"API unavailable: {exc}"}), 502 return jsonify({"error": f"API unavailable: {exc}"}), 502
try: try: