Add new tests; Fix typo and validations

This commit is contained in:
2026-01-03 23:29:07 +08:00
parent 2d60e36fbf
commit b9cfc45aa2
14 changed files with 1970 additions and 125 deletions

374
tests/test_notifications.py Normal file
View File

@@ -0,0 +1,374 @@
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from app.notifications import (
NotificationConfiguration,
NotificationEvent,
NotificationService,
WebhookDestination,
)
class TestNotificationEvent:
def test_default_values(self):
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="test-bucket",
object_key="test/key.txt",
)
assert event.event_name == "s3:ObjectCreated:Put"
assert event.bucket_name == "test-bucket"
assert event.object_key == "test/key.txt"
assert event.object_size == 0
assert event.etag == ""
assert event.version_id is None
assert event.request_id != ""
def test_to_s3_event(self):
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="my-bucket",
object_key="my/object.txt",
object_size=1024,
etag="abc123",
version_id="v1",
source_ip="192.168.1.1",
user_identity="user123",
)
result = event.to_s3_event()
assert "Records" in result
assert len(result["Records"]) == 1
record = result["Records"][0]
assert record["eventVersion"] == "2.1"
assert record["eventSource"] == "myfsio:s3"
assert record["eventName"] == "s3:ObjectCreated:Put"
assert record["s3"]["bucket"]["name"] == "my-bucket"
assert record["s3"]["object"]["key"] == "my/object.txt"
assert record["s3"]["object"]["size"] == 1024
assert record["s3"]["object"]["eTag"] == "abc123"
assert record["s3"]["object"]["versionId"] == "v1"
assert record["userIdentity"]["principalId"] == "user123"
assert record["requestParameters"]["sourceIPAddress"] == "192.168.1.1"
class TestWebhookDestination:
def test_default_values(self):
dest = WebhookDestination(url="http://example.com/webhook")
assert dest.url == "http://example.com/webhook"
assert dest.headers == {}
assert dest.timeout_seconds == 30
assert dest.retry_count == 3
assert dest.retry_delay_seconds == 1
def test_to_dict(self):
dest = WebhookDestination(
url="http://example.com/webhook",
headers={"X-Custom": "value"},
timeout_seconds=60,
retry_count=5,
retry_delay_seconds=2,
)
result = dest.to_dict()
assert result["url"] == "http://example.com/webhook"
assert result["headers"] == {"X-Custom": "value"}
assert result["timeout_seconds"] == 60
assert result["retry_count"] == 5
assert result["retry_delay_seconds"] == 2
def test_from_dict(self):
data = {
"url": "http://hook.example.com",
"headers": {"Authorization": "Bearer token"},
"timeout_seconds": 45,
"retry_count": 2,
"retry_delay_seconds": 5,
}
dest = WebhookDestination.from_dict(data)
assert dest.url == "http://hook.example.com"
assert dest.headers == {"Authorization": "Bearer token"}
assert dest.timeout_seconds == 45
assert dest.retry_count == 2
assert dest.retry_delay_seconds == 5
class TestNotificationConfiguration:
def test_matches_event_exact_match(self):
config = NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:Put"],
destination=WebhookDestination(url="http://example.com"),
)
assert config.matches_event("s3:ObjectCreated:Put", "any/key.txt") is True
assert config.matches_event("s3:ObjectCreated:Post", "any/key.txt") is False
def test_matches_event_wildcard(self):
config = NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
)
assert config.matches_event("s3:ObjectCreated:Put", "key.txt") is True
assert config.matches_event("s3:ObjectCreated:Copy", "key.txt") is True
assert config.matches_event("s3:ObjectRemoved:Delete", "key.txt") is False
def test_matches_event_with_prefix_filter(self):
config = NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
prefix_filter="logs/",
)
assert config.matches_event("s3:ObjectCreated:Put", "logs/app.log") is True
assert config.matches_event("s3:ObjectCreated:Put", "data/file.txt") is False
def test_matches_event_with_suffix_filter(self):
config = NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
suffix_filter=".jpg",
)
assert config.matches_event("s3:ObjectCreated:Put", "photos/image.jpg") is True
assert config.matches_event("s3:ObjectCreated:Put", "photos/image.png") is False
def test_matches_event_with_both_filters(self):
config = NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
prefix_filter="images/",
suffix_filter=".png",
)
assert config.matches_event("s3:ObjectCreated:Put", "images/photo.png") is True
assert config.matches_event("s3:ObjectCreated:Put", "images/photo.jpg") is False
assert config.matches_event("s3:ObjectCreated:Put", "documents/file.png") is False
def test_to_dict(self):
config = NotificationConfiguration(
id="my-config",
events=["s3:ObjectCreated:Put", "s3:ObjectRemoved:Delete"],
destination=WebhookDestination(url="http://example.com"),
prefix_filter="logs/",
suffix_filter=".log",
)
result = config.to_dict()
assert result["Id"] == "my-config"
assert result["Events"] == ["s3:ObjectCreated:Put", "s3:ObjectRemoved:Delete"]
assert "Destination" in result
assert result["Filter"]["Key"]["FilterRules"][0]["Value"] == "logs/"
assert result["Filter"]["Key"]["FilterRules"][1]["Value"] == ".log"
def test_from_dict(self):
data = {
"Id": "parsed-config",
"Events": ["s3:ObjectCreated:*"],
"Destination": {"url": "http://hook.example.com"},
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": "data/"},
{"Name": "suffix", "Value": ".csv"},
]
}
},
}
config = NotificationConfiguration.from_dict(data)
assert config.id == "parsed-config"
assert config.events == ["s3:ObjectCreated:*"]
assert config.destination.url == "http://hook.example.com"
assert config.prefix_filter == "data/"
assert config.suffix_filter == ".csv"
@pytest.fixture
def notification_service(tmp_path: Path):
service = NotificationService(tmp_path, worker_count=1)
yield service
service.shutdown()
class TestNotificationService:
def test_get_bucket_notifications_empty(self, notification_service):
result = notification_service.get_bucket_notifications("nonexistent-bucket")
assert result == []
def test_set_and_get_bucket_notifications(self, notification_service):
configs = [
NotificationConfiguration(
id="config1",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com/webhook1"),
),
NotificationConfiguration(
id="config2",
events=["s3:ObjectRemoved:*"],
destination=WebhookDestination(url="http://example.com/webhook2"),
),
]
notification_service.set_bucket_notifications("my-bucket", configs)
retrieved = notification_service.get_bucket_notifications("my-bucket")
assert len(retrieved) == 2
assert retrieved[0].id == "config1"
assert retrieved[1].id == "config2"
def test_delete_bucket_notifications(self, notification_service):
configs = [
NotificationConfiguration(
id="to-delete",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
),
]
notification_service.set_bucket_notifications("delete-bucket", configs)
assert len(notification_service.get_bucket_notifications("delete-bucket")) == 1
notification_service.delete_bucket_notifications("delete-bucket")
notification_service._configs.clear()
assert len(notification_service.get_bucket_notifications("delete-bucket")) == 0
def test_emit_event_no_config(self, notification_service):
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="no-config-bucket",
object_key="test.txt",
)
notification_service.emit_event(event)
assert notification_service._stats["events_queued"] == 0
def test_emit_event_matching_config(self, notification_service):
configs = [
NotificationConfiguration(
id="match-config",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com/webhook"),
),
]
notification_service.set_bucket_notifications("event-bucket", configs)
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="event-bucket",
object_key="test.txt",
)
notification_service.emit_event(event)
assert notification_service._stats["events_queued"] == 1
def test_emit_event_non_matching_config(self, notification_service):
configs = [
NotificationConfiguration(
id="delete-only",
events=["s3:ObjectRemoved:*"],
destination=WebhookDestination(url="http://example.com/webhook"),
),
]
notification_service.set_bucket_notifications("delete-bucket", configs)
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="delete-bucket",
object_key="test.txt",
)
notification_service.emit_event(event)
assert notification_service._stats["events_queued"] == 0
def test_emit_object_created(self, notification_service):
configs = [
NotificationConfiguration(
id="create-config",
events=["s3:ObjectCreated:Put"],
destination=WebhookDestination(url="http://example.com/webhook"),
),
]
notification_service.set_bucket_notifications("create-bucket", configs)
notification_service.emit_object_created(
"create-bucket",
"new-file.txt",
size=1024,
etag="abc123",
operation="Put",
)
assert notification_service._stats["events_queued"] == 1
def test_emit_object_removed(self, notification_service):
configs = [
NotificationConfiguration(
id="remove-config",
events=["s3:ObjectRemoved:Delete"],
destination=WebhookDestination(url="http://example.com/webhook"),
),
]
notification_service.set_bucket_notifications("remove-bucket", configs)
notification_service.emit_object_removed(
"remove-bucket",
"deleted-file.txt",
operation="Delete",
)
assert notification_service._stats["events_queued"] == 1
def test_get_stats(self, notification_service):
stats = notification_service.get_stats()
assert "events_queued" in stats
assert "events_sent" in stats
assert "events_failed" in stats
@patch("app.notifications.requests.post")
def test_send_notification_success(self, mock_post, notification_service):
mock_response = MagicMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="test-bucket",
object_key="test.txt",
)
destination = WebhookDestination(url="http://example.com/webhook")
notification_service._send_notification(event, destination)
mock_post.assert_called_once()
@patch("app.notifications.requests.post")
def test_send_notification_retry_on_failure(self, mock_post, notification_service):
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_post.return_value = mock_response
event = NotificationEvent(
event_name="s3:ObjectCreated:Put",
bucket_name="test-bucket",
object_key="test.txt",
)
destination = WebhookDestination(
url="http://example.com/webhook",
retry_count=2,
retry_delay_seconds=0,
)
with pytest.raises(RuntimeError) as exc_info:
notification_service._send_notification(event, destination)
assert "Failed after 2 attempts" in str(exc_info.value)
assert mock_post.call_count == 2
def test_notification_caching(self, notification_service):
configs = [
NotificationConfiguration(
id="cached-config",
events=["s3:ObjectCreated:*"],
destination=WebhookDestination(url="http://example.com"),
),
]
notification_service.set_bucket_notifications("cached-bucket", configs)
notification_service.get_bucket_notifications("cached-bucket")
assert "cached-bucket" in notification_service._configs