Add configurable env variables for hardcoded timeouts and limits
This commit is contained in:
@@ -21,16 +21,20 @@ from .storage import ObjectStorage, StorageError
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
REPLICATION_USER_AGENT = "S3ReplicationAgent/1.0"
|
||||
REPLICATION_CONNECT_TIMEOUT = 5
|
||||
REPLICATION_READ_TIMEOUT = 30
|
||||
STREAMING_THRESHOLD_BYTES = 10 * 1024 * 1024
|
||||
|
||||
REPLICATION_MODE_NEW_ONLY = "new_only"
|
||||
REPLICATION_MODE_ALL = "all"
|
||||
REPLICATION_MODE_BIDIRECTIONAL = "bidirectional"
|
||||
|
||||
|
||||
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
|
||||
def _create_s3_client(
|
||||
connection: RemoteConnection,
|
||||
*,
|
||||
health_check: bool = False,
|
||||
connect_timeout: int = 5,
|
||||
read_timeout: int = 30,
|
||||
max_retries: int = 2,
|
||||
) -> Any:
|
||||
"""Create a boto3 S3 client for the given connection.
|
||||
Args:
|
||||
connection: Remote S3 connection configuration
|
||||
@@ -38,9 +42,9 @@ def _create_s3_client(connection: RemoteConnection, *, health_check: bool = Fals
|
||||
"""
|
||||
config = Config(
|
||||
user_agent_extra=REPLICATION_USER_AGENT,
|
||||
connect_timeout=REPLICATION_CONNECT_TIMEOUT,
|
||||
read_timeout=REPLICATION_READ_TIMEOUT,
|
||||
retries={'max_attempts': 1 if health_check else 2},
|
||||
connect_timeout=connect_timeout,
|
||||
read_timeout=read_timeout,
|
||||
retries={'max_attempts': 1 if health_check else max_retries},
|
||||
signature_version='s3v4',
|
||||
s3={'addressing_style': 'path'},
|
||||
request_checksum_calculation='when_required',
|
||||
@@ -164,10 +168,9 @@ class ReplicationRule:
|
||||
|
||||
|
||||
class ReplicationFailureStore:
|
||||
MAX_FAILURES_PER_BUCKET = 50
|
||||
|
||||
def __init__(self, storage_root: Path) -> None:
|
||||
def __init__(self, storage_root: Path, max_failures_per_bucket: int = 50) -> None:
|
||||
self.storage_root = storage_root
|
||||
self.max_failures_per_bucket = max_failures_per_bucket
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_failures_path(self, bucket_name: str) -> Path:
|
||||
@@ -188,7 +191,7 @@ class ReplicationFailureStore:
|
||||
def save_failures(self, bucket_name: str, failures: List[ReplicationFailure]) -> None:
|
||||
path = self._get_failures_path(bucket_name)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
data = {"failures": [f.to_dict() for f in failures[:self.MAX_FAILURES_PER_BUCKET]]}
|
||||
data = {"failures": [f.to_dict() for f in failures[:self.max_failures_per_bucket]]}
|
||||
try:
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
@@ -233,18 +236,43 @@ class ReplicationFailureStore:
|
||||
|
||||
|
||||
class ReplicationManager:
|
||||
def __init__(self, storage: ObjectStorage, connections: ConnectionStore, rules_path: Path, storage_root: Path) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
storage: ObjectStorage,
|
||||
connections: ConnectionStore,
|
||||
rules_path: Path,
|
||||
storage_root: Path,
|
||||
connect_timeout: int = 5,
|
||||
read_timeout: int = 30,
|
||||
max_retries: int = 2,
|
||||
streaming_threshold_bytes: int = 10 * 1024 * 1024,
|
||||
max_failures_per_bucket: int = 50,
|
||||
) -> None:
|
||||
self.storage = storage
|
||||
self.connections = connections
|
||||
self.rules_path = rules_path
|
||||
self.storage_root = storage_root
|
||||
self.connect_timeout = connect_timeout
|
||||
self.read_timeout = read_timeout
|
||||
self.max_retries = max_retries
|
||||
self.streaming_threshold_bytes = streaming_threshold_bytes
|
||||
self._rules: Dict[str, ReplicationRule] = {}
|
||||
self._stats_lock = threading.Lock()
|
||||
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ReplicationWorker")
|
||||
self._shutdown = False
|
||||
self.failure_store = ReplicationFailureStore(storage_root)
|
||||
self.failure_store = ReplicationFailureStore(storage_root, max_failures_per_bucket)
|
||||
self.reload_rules()
|
||||
|
||||
def _create_client(self, connection: RemoteConnection, *, health_check: bool = False) -> Any:
|
||||
"""Create an S3 client with the manager's configured timeouts."""
|
||||
return _create_s3_client(
|
||||
connection,
|
||||
health_check=health_check,
|
||||
connect_timeout=self.connect_timeout,
|
||||
read_timeout=self.read_timeout,
|
||||
max_retries=self.max_retries,
|
||||
)
|
||||
|
||||
def shutdown(self, wait: bool = True) -> None:
|
||||
"""Shutdown the replication executor gracefully.
|
||||
|
||||
@@ -280,7 +308,7 @@ class ReplicationManager:
|
||||
Uses short timeouts to prevent blocking.
|
||||
"""
|
||||
try:
|
||||
s3 = _create_s3_client(connection, health_check=True)
|
||||
s3 = self._create_client(connection, health_check=True)
|
||||
s3.list_buckets()
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -329,7 +357,7 @@ class ReplicationManager:
|
||||
source_objects = self.storage.list_objects_all(bucket_name)
|
||||
source_keys = {obj.key: obj.size for obj in source_objects}
|
||||
|
||||
s3 = _create_s3_client(connection)
|
||||
s3 = self._create_client(connection)
|
||||
|
||||
dest_keys = set()
|
||||
bytes_synced = 0
|
||||
@@ -395,7 +423,7 @@ class ReplicationManager:
|
||||
raise ValueError(f"Connection {connection_id} not found")
|
||||
|
||||
try:
|
||||
s3 = _create_s3_client(connection)
|
||||
s3 = self._create_client(connection)
|
||||
s3.create_bucket(Bucket=bucket_name)
|
||||
except ClientError as e:
|
||||
logger.error(f"Failed to create remote bucket {bucket_name}: {e}")
|
||||
@@ -438,7 +466,7 @@ class ReplicationManager:
|
||||
return
|
||||
|
||||
try:
|
||||
s3 = _create_s3_client(conn)
|
||||
s3 = self._create_client(conn)
|
||||
|
||||
if action == "delete":
|
||||
try:
|
||||
@@ -481,7 +509,7 @@ class ReplicationManager:
|
||||
if content_type:
|
||||
extra_args["ContentType"] = content_type
|
||||
|
||||
if file_size >= STREAMING_THRESHOLD_BYTES:
|
||||
if file_size >= self.streaming_threshold_bytes:
|
||||
s3.upload_file(
|
||||
str(path),
|
||||
rule.target_bucket,
|
||||
|
||||
Reference in New Issue
Block a user