461 lines
15 KiB
Python
461 lines
15 KiB
Python
import io
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.connections import ConnectionStore, RemoteConnection
|
|
from app.replication import (
|
|
ReplicationManager,
|
|
ReplicationRule,
|
|
REPLICATION_MODE_BIDIRECTIONAL,
|
|
REPLICATION_MODE_NEW_ONLY,
|
|
)
|
|
from app.site_sync import (
|
|
SiteSyncWorker,
|
|
SyncState,
|
|
SyncedObjectInfo,
|
|
SiteSyncStats,
|
|
RemoteObjectMeta,
|
|
)
|
|
from app.storage import ObjectStorage
|
|
|
|
|
|
@pytest.fixture
|
|
def storage(tmp_path: Path):
|
|
storage_root = tmp_path / "data"
|
|
storage_root.mkdir(parents=True)
|
|
return ObjectStorage(storage_root)
|
|
|
|
|
|
@pytest.fixture
|
|
def connections(tmp_path: Path):
|
|
connections_path = tmp_path / "connections.json"
|
|
store = ConnectionStore(connections_path)
|
|
conn = RemoteConnection(
|
|
id="test-conn",
|
|
name="Test Remote",
|
|
endpoint_url="http://localhost:9000",
|
|
access_key="remote-access",
|
|
secret_key="remote-secret",
|
|
region="us-east-1",
|
|
)
|
|
store.add(conn)
|
|
return store
|
|
|
|
|
|
@pytest.fixture
|
|
def replication_manager(storage, connections, tmp_path):
|
|
rules_path = tmp_path / "replication_rules.json"
|
|
storage_root = tmp_path / "data"
|
|
storage_root.mkdir(exist_ok=True)
|
|
manager = ReplicationManager(storage, connections, rules_path, storage_root)
|
|
yield manager
|
|
manager.shutdown(wait=False)
|
|
|
|
|
|
@pytest.fixture
|
|
def site_sync_worker(storage, connections, replication_manager, tmp_path):
|
|
storage_root = tmp_path / "data"
|
|
worker = SiteSyncWorker(
|
|
storage=storage,
|
|
connections=connections,
|
|
replication_manager=replication_manager,
|
|
storage_root=storage_root,
|
|
interval_seconds=60,
|
|
batch_size=100,
|
|
)
|
|
yield worker
|
|
worker.shutdown()
|
|
|
|
|
|
class TestSyncedObjectInfo:
|
|
def test_to_dict(self):
|
|
info = SyncedObjectInfo(
|
|
last_synced_at=1234567890.0,
|
|
remote_etag="abc123",
|
|
source="remote",
|
|
)
|
|
result = info.to_dict()
|
|
assert result["last_synced_at"] == 1234567890.0
|
|
assert result["remote_etag"] == "abc123"
|
|
assert result["source"] == "remote"
|
|
|
|
def test_from_dict(self):
|
|
data = {
|
|
"last_synced_at": 9876543210.0,
|
|
"remote_etag": "def456",
|
|
"source": "local",
|
|
}
|
|
info = SyncedObjectInfo.from_dict(data)
|
|
assert info.last_synced_at == 9876543210.0
|
|
assert info.remote_etag == "def456"
|
|
assert info.source == "local"
|
|
|
|
|
|
class TestSyncState:
|
|
def test_to_dict(self):
|
|
state = SyncState(
|
|
synced_objects={
|
|
"test.txt": SyncedObjectInfo(
|
|
last_synced_at=1000.0,
|
|
remote_etag="etag1",
|
|
source="remote",
|
|
)
|
|
},
|
|
last_full_sync=2000.0,
|
|
)
|
|
result = state.to_dict()
|
|
assert "test.txt" in result["synced_objects"]
|
|
assert result["synced_objects"]["test.txt"]["remote_etag"] == "etag1"
|
|
assert result["last_full_sync"] == 2000.0
|
|
|
|
def test_from_dict(self):
|
|
data = {
|
|
"synced_objects": {
|
|
"file.txt": {
|
|
"last_synced_at": 3000.0,
|
|
"remote_etag": "etag2",
|
|
"source": "remote",
|
|
}
|
|
},
|
|
"last_full_sync": 4000.0,
|
|
}
|
|
state = SyncState.from_dict(data)
|
|
assert "file.txt" in state.synced_objects
|
|
assert state.synced_objects["file.txt"].remote_etag == "etag2"
|
|
assert state.last_full_sync == 4000.0
|
|
|
|
def test_from_dict_empty(self):
|
|
state = SyncState.from_dict({})
|
|
assert state.synced_objects == {}
|
|
assert state.last_full_sync is None
|
|
|
|
|
|
class TestSiteSyncStats:
|
|
def test_to_dict(self):
|
|
stats = SiteSyncStats(
|
|
last_sync_at=1234567890.0,
|
|
objects_pulled=10,
|
|
objects_skipped=5,
|
|
conflicts_resolved=2,
|
|
deletions_applied=1,
|
|
errors=0,
|
|
)
|
|
result = stats.to_dict()
|
|
assert result["objects_pulled"] == 10
|
|
assert result["objects_skipped"] == 5
|
|
assert result["conflicts_resolved"] == 2
|
|
assert result["deletions_applied"] == 1
|
|
assert result["errors"] == 0
|
|
|
|
|
|
class TestRemoteObjectMeta:
|
|
def test_from_s3_object(self):
|
|
obj = {
|
|
"Key": "test/file.txt",
|
|
"Size": 1024,
|
|
"LastModified": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
|
|
"ETag": '"abc123def456"',
|
|
}
|
|
meta = RemoteObjectMeta.from_s3_object(obj)
|
|
assert meta.key == "test/file.txt"
|
|
assert meta.size == 1024
|
|
assert meta.last_modified == datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
assert meta.etag == "abc123def456"
|
|
|
|
|
|
class TestReplicationRuleBidirectional:
|
|
def test_rule_with_bidirectional_mode(self):
|
|
rule = ReplicationRule(
|
|
bucket_name="sync-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
enabled=True,
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
sync_deletions=True,
|
|
)
|
|
assert rule.mode == REPLICATION_MODE_BIDIRECTIONAL
|
|
assert rule.sync_deletions is True
|
|
assert rule.last_pull_at is None
|
|
|
|
def test_rule_to_dict_includes_new_fields(self):
|
|
rule = ReplicationRule(
|
|
bucket_name="sync-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
sync_deletions=False,
|
|
last_pull_at=1234567890.0,
|
|
)
|
|
result = rule.to_dict()
|
|
assert result["mode"] == REPLICATION_MODE_BIDIRECTIONAL
|
|
assert result["sync_deletions"] is False
|
|
assert result["last_pull_at"] == 1234567890.0
|
|
|
|
def test_rule_from_dict_with_new_fields(self):
|
|
data = {
|
|
"bucket_name": "sync-bucket",
|
|
"target_connection_id": "test-conn",
|
|
"target_bucket": "remote-bucket",
|
|
"mode": REPLICATION_MODE_BIDIRECTIONAL,
|
|
"sync_deletions": False,
|
|
"last_pull_at": 1234567890.0,
|
|
}
|
|
rule = ReplicationRule.from_dict(data)
|
|
assert rule.mode == REPLICATION_MODE_BIDIRECTIONAL
|
|
assert rule.sync_deletions is False
|
|
assert rule.last_pull_at == 1234567890.0
|
|
|
|
def test_rule_from_dict_defaults_new_fields(self):
|
|
data = {
|
|
"bucket_name": "sync-bucket",
|
|
"target_connection_id": "test-conn",
|
|
"target_bucket": "remote-bucket",
|
|
}
|
|
rule = ReplicationRule.from_dict(data)
|
|
assert rule.sync_deletions is True
|
|
assert rule.last_pull_at is None
|
|
|
|
|
|
class TestSiteSyncWorker:
|
|
def test_start_and_shutdown(self, site_sync_worker):
|
|
site_sync_worker.start()
|
|
assert site_sync_worker._sync_thread is not None
|
|
assert site_sync_worker._sync_thread.is_alive()
|
|
site_sync_worker.shutdown()
|
|
assert not site_sync_worker._sync_thread.is_alive()
|
|
|
|
def test_trigger_sync_no_rule(self, site_sync_worker):
|
|
result = site_sync_worker.trigger_sync("nonexistent-bucket")
|
|
assert result is None
|
|
|
|
def test_trigger_sync_wrong_mode(self, site_sync_worker, replication_manager):
|
|
rule = ReplicationRule(
|
|
bucket_name="new-only-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_NEW_ONLY,
|
|
enabled=True,
|
|
)
|
|
replication_manager.set_rule(rule)
|
|
result = site_sync_worker.trigger_sync("new-only-bucket")
|
|
assert result is None
|
|
|
|
def test_trigger_sync_disabled_rule(self, site_sync_worker, replication_manager):
|
|
rule = ReplicationRule(
|
|
bucket_name="disabled-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
enabled=False,
|
|
)
|
|
replication_manager.set_rule(rule)
|
|
result = site_sync_worker.trigger_sync("disabled-bucket")
|
|
assert result is None
|
|
|
|
def test_get_stats_no_sync(self, site_sync_worker):
|
|
stats = site_sync_worker.get_stats("nonexistent")
|
|
assert stats is None
|
|
|
|
def test_resolve_conflict_remote_newer(self, site_sync_worker):
|
|
local_meta = MagicMock()
|
|
local_meta.last_modified = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
local_meta.etag = "local123"
|
|
|
|
remote_meta = RemoteObjectMeta(
|
|
key="test.txt",
|
|
size=100,
|
|
last_modified=datetime(2025, 1, 2, 12, 0, 0, tzinfo=timezone.utc),
|
|
etag="remote456",
|
|
)
|
|
|
|
result = site_sync_worker._resolve_conflict(local_meta, remote_meta)
|
|
assert result == "pull"
|
|
|
|
def test_resolve_conflict_local_newer(self, site_sync_worker):
|
|
local_meta = MagicMock()
|
|
local_meta.last_modified = datetime(2025, 1, 2, 12, 0, 0, tzinfo=timezone.utc)
|
|
local_meta.etag = "local123"
|
|
|
|
remote_meta = RemoteObjectMeta(
|
|
key="test.txt",
|
|
size=100,
|
|
last_modified=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
|
|
etag="remote456",
|
|
)
|
|
|
|
result = site_sync_worker._resolve_conflict(local_meta, remote_meta)
|
|
assert result == "keep"
|
|
|
|
def test_resolve_conflict_same_time_same_etag(self, site_sync_worker):
|
|
ts = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
local_meta = MagicMock()
|
|
local_meta.last_modified = ts
|
|
local_meta.etag = "same123"
|
|
|
|
remote_meta = RemoteObjectMeta(
|
|
key="test.txt",
|
|
size=100,
|
|
last_modified=ts,
|
|
etag="same123",
|
|
)
|
|
|
|
result = site_sync_worker._resolve_conflict(local_meta, remote_meta)
|
|
assert result == "skip"
|
|
|
|
def test_resolve_conflict_same_time_different_etag(self, site_sync_worker):
|
|
ts = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
local_meta = MagicMock()
|
|
local_meta.last_modified = ts
|
|
local_meta.etag = "aaa"
|
|
|
|
remote_meta = RemoteObjectMeta(
|
|
key="test.txt",
|
|
size=100,
|
|
last_modified=ts,
|
|
etag="zzz",
|
|
)
|
|
|
|
result = site_sync_worker._resolve_conflict(local_meta, remote_meta)
|
|
assert result == "pull"
|
|
|
|
def test_sync_state_persistence(self, site_sync_worker, tmp_path):
|
|
bucket_name = "test-bucket"
|
|
state = SyncState(
|
|
synced_objects={
|
|
"file1.txt": SyncedObjectInfo(
|
|
last_synced_at=time.time(),
|
|
remote_etag="etag1",
|
|
source="remote",
|
|
)
|
|
},
|
|
last_full_sync=time.time(),
|
|
)
|
|
|
|
site_sync_worker._save_sync_state(bucket_name, state)
|
|
|
|
loaded = site_sync_worker._load_sync_state(bucket_name)
|
|
assert "file1.txt" in loaded.synced_objects
|
|
assert loaded.synced_objects["file1.txt"].remote_etag == "etag1"
|
|
|
|
def test_load_sync_state_nonexistent(self, site_sync_worker):
|
|
state = site_sync_worker._load_sync_state("nonexistent-bucket")
|
|
assert state.synced_objects == {}
|
|
assert state.last_full_sync is None
|
|
|
|
@patch("app.site_sync._create_sync_client")
|
|
def test_list_remote_objects(self, mock_create_client, site_sync_worker, connections, replication_manager):
|
|
mock_client = MagicMock()
|
|
mock_paginator = MagicMock()
|
|
mock_paginator.paginate.return_value = [
|
|
{
|
|
"Contents": [
|
|
{
|
|
"Key": "file1.txt",
|
|
"Size": 100,
|
|
"LastModified": datetime(2025, 1, 1, tzinfo=timezone.utc),
|
|
"ETag": '"etag1"',
|
|
},
|
|
{
|
|
"Key": "file2.txt",
|
|
"Size": 200,
|
|
"LastModified": datetime(2025, 1, 2, tzinfo=timezone.utc),
|
|
"ETag": '"etag2"',
|
|
},
|
|
]
|
|
}
|
|
]
|
|
mock_client.get_paginator.return_value = mock_paginator
|
|
mock_create_client.return_value = mock_client
|
|
|
|
rule = ReplicationRule(
|
|
bucket_name="local-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
)
|
|
conn = connections.get("test-conn")
|
|
|
|
result = site_sync_worker._list_remote_objects(rule, conn)
|
|
|
|
assert "file1.txt" in result
|
|
assert "file2.txt" in result
|
|
assert result["file1.txt"].size == 100
|
|
assert result["file2.txt"].size == 200
|
|
|
|
def test_list_local_objects(self, site_sync_worker, storage):
|
|
storage.create_bucket("test-bucket")
|
|
storage.put_object("test-bucket", "file1.txt", io.BytesIO(b"content1"))
|
|
storage.put_object("test-bucket", "file2.txt", io.BytesIO(b"content2"))
|
|
|
|
result = site_sync_worker._list_local_objects("test-bucket")
|
|
|
|
assert "file1.txt" in result
|
|
assert "file2.txt" in result
|
|
|
|
@patch("app.site_sync._create_sync_client")
|
|
def test_sync_bucket_connection_not_found(self, mock_create_client, site_sync_worker, replication_manager):
|
|
rule = ReplicationRule(
|
|
bucket_name="test-bucket",
|
|
target_connection_id="missing-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
enabled=True,
|
|
)
|
|
replication_manager.set_rule(rule)
|
|
|
|
stats = site_sync_worker._sync_bucket(rule)
|
|
assert stats.errors == 1
|
|
|
|
|
|
class TestSiteSyncIntegration:
|
|
@patch("app.site_sync._create_sync_client")
|
|
def test_full_sync_cycle(self, mock_create_client, site_sync_worker, storage, connections, replication_manager):
|
|
storage.create_bucket("sync-bucket")
|
|
storage.put_object("sync-bucket", "local-only.txt", io.BytesIO(b"local content"))
|
|
|
|
mock_client = MagicMock()
|
|
mock_paginator = MagicMock()
|
|
mock_paginator.paginate.return_value = [
|
|
{
|
|
"Contents": [
|
|
{
|
|
"Key": "remote-only.txt",
|
|
"Size": 100,
|
|
"LastModified": datetime(2025, 1, 15, tzinfo=timezone.utc),
|
|
"ETag": '"remoteetag"',
|
|
},
|
|
]
|
|
}
|
|
]
|
|
mock_client.get_paginator.return_value = mock_paginator
|
|
mock_client.head_object.return_value = {"Metadata": {}}
|
|
|
|
def mock_download(bucket, key, path):
|
|
Path(path).write_bytes(b"remote content")
|
|
|
|
mock_client.download_file.side_effect = mock_download
|
|
mock_create_client.return_value = mock_client
|
|
|
|
rule = ReplicationRule(
|
|
bucket_name="sync-bucket",
|
|
target_connection_id="test-conn",
|
|
target_bucket="remote-bucket",
|
|
mode=REPLICATION_MODE_BIDIRECTIONAL,
|
|
enabled=True,
|
|
)
|
|
replication_manager.set_rule(rule)
|
|
|
|
stats = site_sync_worker._sync_bucket(rule)
|
|
|
|
assert stats.objects_pulled == 1
|
|
assert stats.errors == 0
|
|
|
|
objects = site_sync_worker._list_local_objects("sync-bucket")
|
|
assert "local-only.txt" in objects
|
|
assert "remote-only.txt" in objects
|