From cdb86aeea7faadedfdfedf1cf6e3ebe0f8892319 Mon Sep 17 00:00:00 2001 From: kqjy Date: Wed, 31 Dec 2025 23:40:46 +0800 Subject: [PATCH] Implement Object Lock, Event Notifications, SSE-C, and Access Logging --- app/__init__.py | 11 +- app/access_logging.py | 262 +++++++++++++++++++++++++ app/acl.py | 1 - app/bucket_policies.py | 1 - app/config.py | 1 - app/connections.py | 1 - app/encrypted_storage.py | 1 - app/encryption.py | 104 +++++++++- app/errors.py | 1 - app/extensions.py | 1 - app/iam.py | 1 - app/kms.py | 1 - app/kms_api.py | 1 - app/lifecycle.py | 1 - app/notifications.py | 334 ++++++++++++++++++++++++++++++++ app/object_lock.py | 234 ++++++++++++++++++++++ app/replication.py | 5 - app/s3_api.py | 405 ++++++++++++++++++++++++++++++++++++++- app/secret_store.py | 1 - app/storage.py | 1 - app/ui.py | 1 - app/version.py | 1 - 22 files changed, 1339 insertions(+), 31 deletions(-) create mode 100644 app/access_logging.py create mode 100644 app/notifications.py create mode 100644 app/object_lock.py diff --git a/app/__init__.py b/app/__init__.py index 6fcc8dc..68ff222 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,4 +1,3 @@ -"""Application factory for the mini S3-compatible object store.""" from __future__ import annotations import logging @@ -16,6 +15,7 @@ from flask_cors import CORS from flask_wtf.csrf import CSRFError from werkzeug.middleware.proxy_fix import ProxyFix +from .access_logging import AccessLoggingService from .acl import AclService from .bucket_policies import BucketPolicyStore from .config import AppConfig @@ -25,6 +25,8 @@ from .extensions import limiter, csrf from .iam import IamService from .kms import KMSManager from .lifecycle import LifecycleManager +from .notifications import NotificationService +from .object_lock import ObjectLockService from .replication import ReplicationManager from .secret_store import EphemeralSecretStore from .storage import ObjectStorage @@ -143,6 +145,10 @@ def create_app( storage = EncryptedObjectStorage(storage, encryption_manager) acl_service = AclService(storage_root) + object_lock_service = ObjectLockService(storage_root) + notification_service = NotificationService(storage_root) + access_logging_service = AccessLoggingService(storage_root) + access_logging_service.set_storage(storage) lifecycle_manager = None if app.config.get("LIFECYCLE_ENABLED", False): @@ -164,6 +170,9 @@ def create_app( app.extensions["kms"] = kms_manager app.extensions["acl"] = acl_service app.extensions["lifecycle"] = lifecycle_manager + app.extensions["object_lock"] = object_lock_service + app.extensions["notifications"] = notification_service + app.extensions["access_logging"] = access_logging_service @app.errorhandler(500) def internal_error(error): diff --git a/app/access_logging.py b/app/access_logging.py new file mode 100644 index 0000000..03132a8 --- /dev/null +++ b/app/access_logging.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +import io +import json +import logging +import queue +import threading +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class AccessLogEntry: + bucket_owner: str = "-" + bucket: str = "-" + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + remote_ip: str = "-" + requester: str = "-" + request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16].upper()) + operation: str = "-" + key: str = "-" + request_uri: str = "-" + http_status: int = 200 + error_code: str = "-" + bytes_sent: int = 0 + object_size: int = 0 + total_time_ms: int = 0 + turn_around_time_ms: int = 0 + referrer: str = "-" + user_agent: str = "-" + version_id: str = "-" + host_id: str = "-" + signature_version: str = "SigV4" + cipher_suite: str = "-" + authentication_type: str = "AuthHeader" + host_header: str = "-" + tls_version: str = "-" + + def to_log_line(self) -> str: + time_str = self.timestamp.strftime("[%d/%b/%Y:%H:%M:%S %z]") + return ( + f'{self.bucket_owner} {self.bucket} {time_str} {self.remote_ip} ' + f'{self.requester} {self.request_id} {self.operation} {self.key} ' + f'"{self.request_uri}" {self.http_status} {self.error_code or "-"} ' + f'{self.bytes_sent or "-"} {self.object_size or "-"} {self.total_time_ms or "-"} ' + f'{self.turn_around_time_ms or "-"} "{self.referrer}" "{self.user_agent}" {self.version_id}' + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "bucket_owner": self.bucket_owner, + "bucket": self.bucket, + "timestamp": self.timestamp.isoformat(), + "remote_ip": self.remote_ip, + "requester": self.requester, + "request_id": self.request_id, + "operation": self.operation, + "key": self.key, + "request_uri": self.request_uri, + "http_status": self.http_status, + "error_code": self.error_code, + "bytes_sent": self.bytes_sent, + "object_size": self.object_size, + "total_time_ms": self.total_time_ms, + "referrer": self.referrer, + "user_agent": self.user_agent, + "version_id": self.version_id, + } + + +@dataclass +class LoggingConfiguration: + target_bucket: str + target_prefix: str = "" + enabled: bool = True + + def to_dict(self) -> Dict[str, Any]: + return { + "LoggingEnabled": { + "TargetBucket": self.target_bucket, + "TargetPrefix": self.target_prefix, + } + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Optional["LoggingConfiguration"]: + logging_enabled = data.get("LoggingEnabled") + if not logging_enabled: + return None + return cls( + target_bucket=logging_enabled.get("TargetBucket", ""), + target_prefix=logging_enabled.get("TargetPrefix", ""), + enabled=True, + ) + + +class AccessLoggingService: + def __init__(self, storage_root: Path, flush_interval: int = 60, max_buffer_size: int = 1000): + self.storage_root = storage_root + self.flush_interval = flush_interval + self.max_buffer_size = max_buffer_size + self._configs: Dict[str, LoggingConfiguration] = {} + self._buffer: Dict[str, List[AccessLogEntry]] = {} + self._buffer_lock = threading.Lock() + self._shutdown = threading.Event() + self._storage = None + + self._flush_thread = threading.Thread(target=self._flush_loop, name="access-log-flush", daemon=True) + self._flush_thread.start() + + def set_storage(self, storage: Any) -> None: + self._storage = storage + + def _config_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "logging.json" + + def get_bucket_logging(self, bucket_name: str) -> Optional[LoggingConfiguration]: + if bucket_name in self._configs: + return self._configs[bucket_name] + + config_path = self._config_path(bucket_name) + if not config_path.exists(): + return None + + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + config = LoggingConfiguration.from_dict(data) + if config: + self._configs[bucket_name] = config + return config + except (json.JSONDecodeError, OSError) as e: + logger.warning(f"Failed to load logging config for {bucket_name}: {e}") + return None + + def set_bucket_logging(self, bucket_name: str, config: LoggingConfiguration) -> None: + config_path = self._config_path(bucket_name) + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(json.dumps(config.to_dict(), indent=2), encoding="utf-8") + self._configs[bucket_name] = config + + def delete_bucket_logging(self, bucket_name: str) -> None: + config_path = self._config_path(bucket_name) + try: + if config_path.exists(): + config_path.unlink() + except OSError: + pass + self._configs.pop(bucket_name, None) + + def log_request( + self, + bucket_name: str, + *, + operation: str, + key: str = "-", + remote_ip: str = "-", + requester: str = "-", + request_uri: str = "-", + http_status: int = 200, + error_code: str = "", + bytes_sent: int = 0, + object_size: int = 0, + total_time_ms: int = 0, + referrer: str = "-", + user_agent: str = "-", + version_id: str = "-", + request_id: str = "", + ) -> None: + config = self.get_bucket_logging(bucket_name) + if not config or not config.enabled: + return + + entry = AccessLogEntry( + bucket_owner="local-owner", + bucket=bucket_name, + remote_ip=remote_ip, + requester=requester, + request_id=request_id or uuid.uuid4().hex[:16].upper(), + operation=operation, + key=key, + request_uri=request_uri, + http_status=http_status, + error_code=error_code, + bytes_sent=bytes_sent, + object_size=object_size, + total_time_ms=total_time_ms, + referrer=referrer, + user_agent=user_agent, + version_id=version_id, + ) + + target_key = f"{config.target_bucket}:{config.target_prefix}" + with self._buffer_lock: + if target_key not in self._buffer: + self._buffer[target_key] = [] + self._buffer[target_key].append(entry) + + if len(self._buffer[target_key]) >= self.max_buffer_size: + self._flush_buffer(target_key) + + def _flush_loop(self) -> None: + while not self._shutdown.is_set(): + time.sleep(self.flush_interval) + self._flush_all() + + def _flush_all(self) -> None: + with self._buffer_lock: + targets = list(self._buffer.keys()) + + for target_key in targets: + self._flush_buffer(target_key) + + def _flush_buffer(self, target_key: str) -> None: + with self._buffer_lock: + entries = self._buffer.pop(target_key, []) + + if not entries or not self._storage: + return + + try: + bucket_name, prefix = target_key.split(":", 1) + except ValueError: + logger.error(f"Invalid target key: {target_key}") + return + + now = datetime.now(timezone.utc) + log_key = f"{prefix}{now.strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4().hex[:8]}" + + log_content = "\n".join(entry.to_log_line() for entry in entries) + "\n" + + try: + stream = io.BytesIO(log_content.encode("utf-8")) + self._storage.put_object(bucket_name, log_key, stream, enforce_quota=False) + logger.info(f"Flushed {len(entries)} access log entries to {bucket_name}/{log_key}") + except Exception as e: + logger.error(f"Failed to write access log to {bucket_name}/{log_key}: {e}") + with self._buffer_lock: + if target_key not in self._buffer: + self._buffer[target_key] = [] + self._buffer[target_key] = entries + self._buffer[target_key] + + def flush(self) -> None: + self._flush_all() + + def shutdown(self) -> None: + self._shutdown.set() + self._flush_all() + self._flush_thread.join(timeout=5.0) + + def get_stats(self) -> Dict[str, Any]: + with self._buffer_lock: + buffered = sum(len(entries) for entries in self._buffer.values()) + return { + "buffered_entries": buffered, + "target_buckets": len(self._buffer), + } diff --git a/app/acl.py b/app/acl.py index 7f78a11..6ee2be0 100644 --- a/app/acl.py +++ b/app/acl.py @@ -1,4 +1,3 @@ -"""S3-compatible Access Control List (ACL) management.""" from __future__ import annotations import json diff --git a/app/bucket_policies.py b/app/bucket_policies.py index b5f0b8f..48257f2 100644 --- a/app/bucket_policies.py +++ b/app/bucket_policies.py @@ -1,4 +1,3 @@ -"""Bucket policy loader/enforcer with a subset of AWS semantics.""" from __future__ import annotations import json diff --git a/app/config.py b/app/config.py index e6bd127..241bcdf 100644 --- a/app/config.py +++ b/app/config.py @@ -1,4 +1,3 @@ -"""Configuration helpers for the S3 clone application.""" from __future__ import annotations import os diff --git a/app/connections.py b/app/connections.py index c5a7b33..b694af9 100644 --- a/app/connections.py +++ b/app/connections.py @@ -1,4 +1,3 @@ -"""Manage remote S3 connections.""" from __future__ import annotations import json diff --git a/app/encrypted_storage.py b/app/encrypted_storage.py index fca73b8..6f419c3 100644 --- a/app/encrypted_storage.py +++ b/app/encrypted_storage.py @@ -1,4 +1,3 @@ -"""Encrypted storage layer that wraps ObjectStorage with encryption support.""" from __future__ import annotations import io diff --git a/app/encryption.py b/app/encryption.py index 7123891..4b1d817 100644 --- a/app/encryption.py +++ b/app/encryption.py @@ -353,13 +353,113 @@ class EncryptionManager: return encryptor.decrypt_stream(stream, metadata) +class SSECEncryption(EncryptionProvider): + """SSE-C: Server-Side Encryption with Customer-Provided Keys. + + The client provides the encryption key with each request. + Server encrypts/decrypts but never stores the key. + + Required headers for PUT: + - x-amz-server-side-encryption-customer-algorithm: AES256 + - x-amz-server-side-encryption-customer-key: Base64-encoded 256-bit key + - x-amz-server-side-encryption-customer-key-MD5: Base64-encoded MD5 of key + """ + + KEY_ID = "customer-provided" + + def __init__(self, customer_key: bytes): + if len(customer_key) != 32: + raise EncryptionError("Customer key must be exactly 256 bits (32 bytes)") + self.customer_key = customer_key + + @classmethod + def from_headers(cls, headers: Dict[str, str]) -> "SSECEncryption": + algorithm = headers.get("x-amz-server-side-encryption-customer-algorithm", "") + if algorithm.upper() != "AES256": + raise EncryptionError(f"Unsupported SSE-C algorithm: {algorithm}. Only AES256 is supported.") + + key_b64 = headers.get("x-amz-server-side-encryption-customer-key", "") + if not key_b64: + raise EncryptionError("Missing x-amz-server-side-encryption-customer-key header") + + key_md5_b64 = headers.get("x-amz-server-side-encryption-customer-key-md5", "") + + try: + customer_key = base64.b64decode(key_b64) + except Exception as e: + raise EncryptionError(f"Invalid base64 in customer key: {e}") from e + + if len(customer_key) != 32: + raise EncryptionError(f"Customer key must be 256 bits, got {len(customer_key) * 8} bits") + + if key_md5_b64: + import hashlib + expected_md5 = base64.b64encode(hashlib.md5(customer_key).digest()).decode() + if key_md5_b64 != expected_md5: + raise EncryptionError("Customer key MD5 mismatch") + + return cls(customer_key) + + def encrypt(self, plaintext: bytes, context: Dict[str, str] | None = None) -> EncryptionResult: + aesgcm = AESGCM(self.customer_key) + nonce = secrets.token_bytes(12) + ciphertext = aesgcm.encrypt(nonce, plaintext, None) + + return EncryptionResult( + ciphertext=ciphertext, + nonce=nonce, + key_id=self.KEY_ID, + encrypted_data_key=b"", + ) + + def decrypt(self, ciphertext: bytes, nonce: bytes, encrypted_data_key: bytes, + key_id: str, context: Dict[str, str] | None = None) -> bytes: + aesgcm = AESGCM(self.customer_key) + try: + return aesgcm.decrypt(nonce, ciphertext, None) + except Exception as exc: + raise EncryptionError(f"SSE-C decryption failed: {exc}") from exc + + def generate_data_key(self) -> tuple[bytes, bytes]: + return self.customer_key, b"" + + +@dataclass +class SSECMetadata: + algorithm: str = "AES256" + nonce: bytes = b"" + key_md5: str = "" + + def to_dict(self) -> Dict[str, str]: + return { + "x-amz-server-side-encryption-customer-algorithm": self.algorithm, + "x-amz-encryption-nonce": base64.b64encode(self.nonce).decode(), + "x-amz-server-side-encryption-customer-key-MD5": self.key_md5, + } + + @classmethod + def from_dict(cls, data: Dict[str, str]) -> Optional["SSECMetadata"]: + algorithm = data.get("x-amz-server-side-encryption-customer-algorithm") + if not algorithm: + return None + try: + nonce = base64.b64decode(data.get("x-amz-encryption-nonce", "")) + return cls( + algorithm=algorithm, + nonce=nonce, + key_md5=data.get("x-amz-server-side-encryption-customer-key-MD5", ""), + ) + except Exception: + return None + + class ClientEncryptionHelper: """Helpers for client-side encryption. - + Client-side encryption is performed by the client, but this helper provides key generation and materials for clients that need them. """ - + @staticmethod def generate_client_key() -> Dict[str, str]: """Generate a new client encryption key.""" diff --git a/app/errors.py b/app/errors.py index bdf2004..7e5d711 100644 --- a/app/errors.py +++ b/app/errors.py @@ -1,4 +1,3 @@ -"""Standardized error handling for API and UI responses.""" from __future__ import annotations import logging diff --git a/app/extensions.py b/app/extensions.py index 0fc97a6..7da36ae 100644 --- a/app/extensions.py +++ b/app/extensions.py @@ -1,4 +1,3 @@ -"""Application-wide extension instances.""" from flask import g from flask_limiter import Limiter from flask_limiter.util import get_remote_address diff --git a/app/iam.py b/app/iam.py index 5e5c26a..5ec91d8 100644 --- a/app/iam.py +++ b/app/iam.py @@ -1,4 +1,3 @@ -"""Lightweight IAM-style user and policy management.""" from __future__ import annotations import json diff --git a/app/kms.py b/app/kms.py index 65e98ab..548e7ea 100644 --- a/app/kms.py +++ b/app/kms.py @@ -1,4 +1,3 @@ -"""Key Management Service (KMS) for encryption key management.""" from __future__ import annotations import base64 diff --git a/app/kms_api.py b/app/kms_api.py index 2ff9e35..332c012 100644 --- a/app/kms_api.py +++ b/app/kms_api.py @@ -1,4 +1,3 @@ -"""KMS and encryption API endpoints.""" from __future__ import annotations import base64 diff --git a/app/lifecycle.py b/app/lifecycle.py index bacf32e..8ad0636 100644 --- a/app/lifecycle.py +++ b/app/lifecycle.py @@ -1,4 +1,3 @@ -"""Lifecycle rule enforcement for S3-compatible storage.""" from __future__ import annotations import logging diff --git a/app/notifications.py b/app/notifications.py new file mode 100644 index 0000000..c449088 --- /dev/null +++ b/app/notifications.py @@ -0,0 +1,334 @@ +from __future__ import annotations + +import json +import logging +import queue +import threading +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +import requests + +logger = logging.getLogger(__name__) + + +@dataclass +class NotificationEvent: + event_name: str + bucket_name: str + object_key: str + object_size: int = 0 + etag: str = "" + version_id: Optional[str] = None + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + request_id: str = field(default_factory=lambda: uuid.uuid4().hex) + source_ip: str = "" + user_identity: str = "" + + def to_s3_event(self) -> Dict[str, Any]: + return { + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "myfsio:s3", + "awsRegion": "local", + "eventTime": self.timestamp.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + "eventName": self.event_name, + "userIdentity": { + "principalId": self.user_identity or "ANONYMOUS", + }, + "requestParameters": { + "sourceIPAddress": self.source_ip or "127.0.0.1", + }, + "responseElements": { + "x-amz-request-id": self.request_id, + "x-amz-id-2": self.request_id, + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "notification", + "bucket": { + "name": self.bucket_name, + "ownerIdentity": {"principalId": "local"}, + "arn": f"arn:aws:s3:::{self.bucket_name}", + }, + "object": { + "key": self.object_key, + "size": self.object_size, + "eTag": self.etag, + "versionId": self.version_id or "null", + "sequencer": f"{int(time.time() * 1000):016X}", + }, + }, + } + ] + } + + +@dataclass +class WebhookDestination: + url: str + headers: Dict[str, str] = field(default_factory=dict) + timeout_seconds: int = 30 + retry_count: int = 3 + retry_delay_seconds: int = 1 + + def to_dict(self) -> Dict[str, Any]: + return { + "url": self.url, + "headers": self.headers, + "timeout_seconds": self.timeout_seconds, + "retry_count": self.retry_count, + "retry_delay_seconds": self.retry_delay_seconds, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "WebhookDestination": + return cls( + url=data.get("url", ""), + headers=data.get("headers", {}), + timeout_seconds=data.get("timeout_seconds", 30), + retry_count=data.get("retry_count", 3), + retry_delay_seconds=data.get("retry_delay_seconds", 1), + ) + + +@dataclass +class NotificationConfiguration: + id: str + events: List[str] + destination: WebhookDestination + prefix_filter: str = "" + suffix_filter: str = "" + + def matches_event(self, event_name: str, object_key: str) -> bool: + event_match = False + for pattern in self.events: + if pattern.endswith("*"): + base = pattern[:-1] + if event_name.startswith(base): + event_match = True + break + elif pattern == event_name: + event_match = True + break + + if not event_match: + return False + + if self.prefix_filter and not object_key.startswith(self.prefix_filter): + return False + if self.suffix_filter and not object_key.endswith(self.suffix_filter): + return False + + return True + + def to_dict(self) -> Dict[str, Any]: + return { + "Id": self.id, + "Events": self.events, + "Destination": self.destination.to_dict(), + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": self.prefix_filter}, + {"Name": "suffix", "Value": self.suffix_filter}, + ] + } + }, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "NotificationConfiguration": + prefix = "" + suffix = "" + filter_data = data.get("Filter", {}) + key_filter = filter_data.get("Key", {}) + for rule in key_filter.get("FilterRules", []): + if rule.get("Name") == "prefix": + prefix = rule.get("Value", "") + elif rule.get("Name") == "suffix": + suffix = rule.get("Value", "") + + return cls( + id=data.get("Id", uuid.uuid4().hex), + events=data.get("Events", []), + destination=WebhookDestination.from_dict(data.get("Destination", {})), + prefix_filter=prefix, + suffix_filter=suffix, + ) + + +class NotificationService: + def __init__(self, storage_root: Path, worker_count: int = 2): + self.storage_root = storage_root + self._configs: Dict[str, List[NotificationConfiguration]] = {} + self._queue: queue.Queue[tuple[NotificationEvent, WebhookDestination]] = queue.Queue() + self._workers: List[threading.Thread] = [] + self._shutdown = threading.Event() + self._stats = { + "events_queued": 0, + "events_sent": 0, + "events_failed": 0, + } + + for i in range(worker_count): + worker = threading.Thread(target=self._worker_loop, name=f"notification-worker-{i}", daemon=True) + worker.start() + self._workers.append(worker) + + def _config_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "notifications.json" + + def get_bucket_notifications(self, bucket_name: str) -> List[NotificationConfiguration]: + if bucket_name in self._configs: + return self._configs[bucket_name] + + config_path = self._config_path(bucket_name) + if not config_path.exists(): + return [] + + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + configs = [NotificationConfiguration.from_dict(c) for c in data.get("configurations", [])] + self._configs[bucket_name] = configs + return configs + except (json.JSONDecodeError, OSError) as e: + logger.warning(f"Failed to load notification config for {bucket_name}: {e}") + return [] + + def set_bucket_notifications( + self, bucket_name: str, configurations: List[NotificationConfiguration] + ) -> None: + config_path = self._config_path(bucket_name) + config_path.parent.mkdir(parents=True, exist_ok=True) + + data = {"configurations": [c.to_dict() for c in configurations]} + config_path.write_text(json.dumps(data, indent=2), encoding="utf-8") + self._configs[bucket_name] = configurations + + def delete_bucket_notifications(self, bucket_name: str) -> None: + config_path = self._config_path(bucket_name) + try: + if config_path.exists(): + config_path.unlink() + except OSError: + pass + self._configs.pop(bucket_name, None) + + def emit_event(self, event: NotificationEvent) -> None: + configurations = self.get_bucket_notifications(event.bucket_name) + if not configurations: + return + + for config in configurations: + if config.matches_event(event.event_name, event.object_key): + self._queue.put((event, config.destination)) + self._stats["events_queued"] += 1 + logger.debug( + f"Queued notification for {event.event_name} on {event.bucket_name}/{event.object_key}" + ) + + def emit_object_created( + self, + bucket_name: str, + object_key: str, + *, + size: int = 0, + etag: str = "", + version_id: Optional[str] = None, + request_id: str = "", + source_ip: str = "", + user_identity: str = "", + operation: str = "Put", + ) -> None: + event = NotificationEvent( + event_name=f"s3:ObjectCreated:{operation}", + bucket_name=bucket_name, + object_key=object_key, + object_size=size, + etag=etag, + version_id=version_id, + request_id=request_id or uuid.uuid4().hex, + source_ip=source_ip, + user_identity=user_identity, + ) + self.emit_event(event) + + def emit_object_removed( + self, + bucket_name: str, + object_key: str, + *, + version_id: Optional[str] = None, + request_id: str = "", + source_ip: str = "", + user_identity: str = "", + operation: str = "Delete", + ) -> None: + event = NotificationEvent( + event_name=f"s3:ObjectRemoved:{operation}", + bucket_name=bucket_name, + object_key=object_key, + version_id=version_id, + request_id=request_id or uuid.uuid4().hex, + source_ip=source_ip, + user_identity=user_identity, + ) + self.emit_event(event) + + def _worker_loop(self) -> None: + while not self._shutdown.is_set(): + try: + event, destination = self._queue.get(timeout=1.0) + except queue.Empty: + continue + + try: + self._send_notification(event, destination) + self._stats["events_sent"] += 1 + except Exception as e: + self._stats["events_failed"] += 1 + logger.error(f"Failed to send notification: {e}") + finally: + self._queue.task_done() + + def _send_notification(self, event: NotificationEvent, destination: WebhookDestination) -> None: + payload = event.to_s3_event() + headers = {"Content-Type": "application/json", **destination.headers} + + last_error = None + for attempt in range(destination.retry_count): + try: + response = requests.post( + destination.url, + json=payload, + headers=headers, + timeout=destination.timeout_seconds, + ) + if response.status_code < 400: + logger.info( + f"Notification sent: {event.event_name} -> {destination.url} (status={response.status_code})" + ) + return + last_error = f"HTTP {response.status_code}: {response.text[:200]}" + except requests.RequestException as e: + last_error = str(e) + + if attempt < destination.retry_count - 1: + time.sleep(destination.retry_delay_seconds * (attempt + 1)) + + raise RuntimeError(f"Failed after {destination.retry_count} attempts: {last_error}") + + def get_stats(self) -> Dict[str, int]: + return dict(self._stats) + + def shutdown(self) -> None: + self._shutdown.set() + for worker in self._workers: + worker.join(timeout=5.0) diff --git a/app/object_lock.py b/app/object_lock.py new file mode 100644 index 0000000..a5aab2c --- /dev/null +++ b/app/object_lock.py @@ -0,0 +1,234 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from typing import Any, Dict, Optional + + +class RetentionMode(Enum): + GOVERNANCE = "GOVERNANCE" + COMPLIANCE = "COMPLIANCE" + + +class ObjectLockError(Exception): + pass + + +@dataclass +class ObjectLockRetention: + mode: RetentionMode + retain_until_date: datetime + + def to_dict(self) -> Dict[str, str]: + return { + "Mode": self.mode.value, + "RetainUntilDate": self.retain_until_date.isoformat(), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Optional["ObjectLockRetention"]: + if not data: + return None + mode_str = data.get("Mode") + date_str = data.get("RetainUntilDate") + if not mode_str or not date_str: + return None + try: + mode = RetentionMode(mode_str) + retain_until = datetime.fromisoformat(date_str.replace("Z", "+00:00")) + return cls(mode=mode, retain_until_date=retain_until) + except (ValueError, KeyError): + return None + + def is_expired(self) -> bool: + return datetime.now(timezone.utc) > self.retain_until_date + + +@dataclass +class ObjectLockConfig: + enabled: bool = False + default_retention: Optional[ObjectLockRetention] = None + + def to_dict(self) -> Dict[str, Any]: + result: Dict[str, Any] = {"ObjectLockEnabled": "Enabled" if self.enabled else "Disabled"} + if self.default_retention: + result["Rule"] = { + "DefaultRetention": { + "Mode": self.default_retention.mode.value, + "Days": None, + "Years": None, + } + } + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ObjectLockConfig": + enabled = data.get("ObjectLockEnabled") == "Enabled" + default_retention = None + rule = data.get("Rule") + if rule and "DefaultRetention" in rule: + dr = rule["DefaultRetention"] + mode_str = dr.get("Mode", "GOVERNANCE") + days = dr.get("Days") + years = dr.get("Years") + if days or years: + from datetime import timedelta + now = datetime.now(timezone.utc) + if years: + delta = timedelta(days=int(years) * 365) + else: + delta = timedelta(days=int(days)) + default_retention = ObjectLockRetention( + mode=RetentionMode(mode_str), + retain_until_date=now + delta, + ) + return cls(enabled=enabled, default_retention=default_retention) + + +class ObjectLockService: + def __init__(self, storage_root: Path): + self.storage_root = storage_root + self._config_cache: Dict[str, ObjectLockConfig] = {} + + def _bucket_lock_config_path(self, bucket_name: str) -> Path: + return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "object_lock.json" + + def _object_lock_meta_path(self, bucket_name: str, object_key: str) -> Path: + safe_key = object_key.replace("/", "_").replace("\\", "_") + return ( + self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / + "locks" / f"{safe_key}.lock.json" + ) + + def get_bucket_lock_config(self, bucket_name: str) -> ObjectLockConfig: + if bucket_name in self._config_cache: + return self._config_cache[bucket_name] + + config_path = self._bucket_lock_config_path(bucket_name) + if not config_path.exists(): + return ObjectLockConfig(enabled=False) + + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + config = ObjectLockConfig.from_dict(data) + self._config_cache[bucket_name] = config + return config + except (json.JSONDecodeError, OSError): + return ObjectLockConfig(enabled=False) + + def set_bucket_lock_config(self, bucket_name: str, config: ObjectLockConfig) -> None: + config_path = self._bucket_lock_config_path(bucket_name) + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(json.dumps(config.to_dict()), encoding="utf-8") + self._config_cache[bucket_name] = config + + def enable_bucket_lock(self, bucket_name: str) -> None: + config = self.get_bucket_lock_config(bucket_name) + config.enabled = True + self.set_bucket_lock_config(bucket_name, config) + + def is_bucket_lock_enabled(self, bucket_name: str) -> bool: + return self.get_bucket_lock_config(bucket_name).enabled + + def get_object_retention(self, bucket_name: str, object_key: str) -> Optional[ObjectLockRetention]: + meta_path = self._object_lock_meta_path(bucket_name, object_key) + if not meta_path.exists(): + return None + try: + data = json.loads(meta_path.read_text(encoding="utf-8")) + return ObjectLockRetention.from_dict(data.get("retention", {})) + except (json.JSONDecodeError, OSError): + return None + + def set_object_retention( + self, + bucket_name: str, + object_key: str, + retention: ObjectLockRetention, + bypass_governance: bool = False, + ) -> None: + existing = self.get_object_retention(bucket_name, object_key) + if existing and not existing.is_expired(): + if existing.mode == RetentionMode.COMPLIANCE: + raise ObjectLockError( + "Cannot modify retention on object with COMPLIANCE mode until retention expires" + ) + if existing.mode == RetentionMode.GOVERNANCE and not bypass_governance: + raise ObjectLockError( + "Cannot modify GOVERNANCE retention without bypass-governance permission" + ) + + meta_path = self._object_lock_meta_path(bucket_name, object_key) + meta_path.parent.mkdir(parents=True, exist_ok=True) + + existing_data: Dict[str, Any] = {} + if meta_path.exists(): + try: + existing_data = json.loads(meta_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + pass + + existing_data["retention"] = retention.to_dict() + meta_path.write_text(json.dumps(existing_data), encoding="utf-8") + + def get_legal_hold(self, bucket_name: str, object_key: str) -> bool: + meta_path = self._object_lock_meta_path(bucket_name, object_key) + if not meta_path.exists(): + return False + try: + data = json.loads(meta_path.read_text(encoding="utf-8")) + return data.get("legal_hold", False) + except (json.JSONDecodeError, OSError): + return False + + def set_legal_hold(self, bucket_name: str, object_key: str, enabled: bool) -> None: + meta_path = self._object_lock_meta_path(bucket_name, object_key) + meta_path.parent.mkdir(parents=True, exist_ok=True) + + existing_data: Dict[str, Any] = {} + if meta_path.exists(): + try: + existing_data = json.loads(meta_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + pass + + existing_data["legal_hold"] = enabled + meta_path.write_text(json.dumps(existing_data), encoding="utf-8") + + def can_delete_object( + self, + bucket_name: str, + object_key: str, + bypass_governance: bool = False, + ) -> tuple[bool, str]: + if self.get_legal_hold(bucket_name, object_key): + return False, "Object is under legal hold" + + retention = self.get_object_retention(bucket_name, object_key) + if retention and not retention.is_expired(): + if retention.mode == RetentionMode.COMPLIANCE: + return False, f"Object is locked in COMPLIANCE mode until {retention.retain_until_date.isoformat()}" + if retention.mode == RetentionMode.GOVERNANCE: + if not bypass_governance: + return False, f"Object is locked in GOVERNANCE mode until {retention.retain_until_date.isoformat()}" + + return True, "" + + def can_overwrite_object( + self, + bucket_name: str, + object_key: str, + bypass_governance: bool = False, + ) -> tuple[bool, str]: + return self.can_delete_object(bucket_name, object_key, bypass_governance) + + def delete_object_lock_metadata(self, bucket_name: str, object_key: str) -> None: + meta_path = self._object_lock_meta_path(bucket_name, object_key) + try: + if meta_path.exists(): + meta_path.unlink() + except OSError: + pass diff --git a/app/replication.py b/app/replication.py index 7301dc2..1a91af9 100644 --- a/app/replication.py +++ b/app/replication.py @@ -1,4 +1,3 @@ -"""Background replication worker.""" from __future__ import annotations import json @@ -32,13 +31,9 @@ REPLICATION_MODE_ALL = "all" def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: """Create a boto3 S3 client for the given connection. - Args: connection: Remote S3 connection configuration health_check: If True, use minimal retries for quick health checks - - Returns: - Configured boto3 S3 client """ config = Config( user_agent_extra=REPLICATION_USER_AGENT, diff --git a/app/s3_api.py b/app/s3_api.py index 5e7fb31..25a3dd1 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -16,10 +16,14 @@ from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, Par from flask import Blueprint, Response, current_app, jsonify, request, g from werkzeug.http import http_date +from .access_logging import AccessLoggingService, LoggingConfiguration from .acl import AclService from .bucket_policies import BucketPolicyStore +from .encryption import SSECEncryption, SSECMetadata, EncryptionError from .extensions import limiter from .iam import IamError, Principal +from .notifications import NotificationService, NotificationConfiguration, WebhookDestination +from .object_lock import ObjectLockService, ObjectLockRetention, ObjectLockConfig, ObjectLockError, RetentionMode from .replication import ReplicationManager from .storage import ObjectStorage, StorageError, QuotaExceededError @@ -49,6 +53,18 @@ def _bucket_policies() -> BucketPolicyStore: return store +def _object_lock() -> ObjectLockService: + return current_app.extensions["object_lock"] + + +def _notifications() -> NotificationService: + return current_app.extensions["notifications"] + + +def _access_logging() -> AccessLoggingService: + return current_app.extensions["access_logging"] + + def _xml_response(element: Element, status: int = 200) -> Response: xml_bytes = tostring(element, encoding="utf-8") return Response(xml_bytes, status=status, mimetype="application/xml") @@ -897,6 +913,9 @@ def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None: "versions": _bucket_list_versions_handler, "lifecycle": _bucket_lifecycle_handler, "quota": _bucket_quota_handler, + "object-lock": _bucket_object_lock_handler, + "notification": _bucket_notification_handler, + "logging": _bucket_logging_handler, } requested = [key for key in handlers if key in request.args] if not requested: @@ -1567,6 +1586,336 @@ def _bucket_quota_handler(bucket_name: str) -> Response: return Response(status=204) +def _bucket_object_lock_handler(bucket_name: str) -> Response: + if request.method not in {"GET", "PUT"}: + return _method_not_allowed(["GET", "PUT"]) + + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "policy") + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + storage = _storage() + if not storage.bucket_exists(bucket_name): + return _error_response("NoSuchBucket", "Bucket does not exist", 404) + + lock_service = _object_lock() + + if request.method == "GET": + config = lock_service.get_bucket_lock_config(bucket_name) + root = Element("ObjectLockConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") + SubElement(root, "ObjectLockEnabled").text = "Enabled" if config.enabled else "Disabled" + return _xml_response(root) + + payload = request.get_data(cache=False) or b"" + if not payload.strip(): + return _error_response("MalformedXML", "Request body is required", 400) + + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + enabled_el = root.find("{*}ObjectLockEnabled") or root.find("ObjectLockEnabled") + enabled = (enabled_el.text or "").strip() == "Enabled" if enabled_el is not None else False + + config = ObjectLockConfig(enabled=enabled) + lock_service.set_bucket_lock_config(bucket_name, config) + + current_app.logger.info("Bucket object lock updated", extra={"bucket": bucket_name, "enabled": enabled}) + return Response(status=200) + + +def _bucket_notification_handler(bucket_name: str) -> Response: + if request.method not in {"GET", "PUT", "DELETE"}: + return _method_not_allowed(["GET", "PUT", "DELETE"]) + + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "policy") + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + storage = _storage() + if not storage.bucket_exists(bucket_name): + return _error_response("NoSuchBucket", "Bucket does not exist", 404) + + notification_service = _notifications() + + if request.method == "GET": + configs = notification_service.get_bucket_notifications(bucket_name) + root = Element("NotificationConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") + for config in configs: + webhook_el = SubElement(root, "WebhookConfiguration") + SubElement(webhook_el, "Id").text = config.id + for event in config.events: + SubElement(webhook_el, "Event").text = event + dest_el = SubElement(webhook_el, "Destination") + SubElement(dest_el, "Url").text = config.destination.url + if config.prefix_filter or config.suffix_filter: + filter_el = SubElement(webhook_el, "Filter") + key_el = SubElement(filter_el, "S3Key") + if config.prefix_filter: + rule_el = SubElement(key_el, "FilterRule") + SubElement(rule_el, "Name").text = "prefix" + SubElement(rule_el, "Value").text = config.prefix_filter + if config.suffix_filter: + rule_el = SubElement(key_el, "FilterRule") + SubElement(rule_el, "Name").text = "suffix" + SubElement(rule_el, "Value").text = config.suffix_filter + return _xml_response(root) + + if request.method == "DELETE": + notification_service.delete_bucket_notifications(bucket_name) + current_app.logger.info("Bucket notifications deleted", extra={"bucket": bucket_name}) + return Response(status=204) + + payload = request.get_data(cache=False) or b"" + if not payload.strip(): + notification_service.delete_bucket_notifications(bucket_name) + return Response(status=200) + + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + configs: list[NotificationConfiguration] = [] + for webhook_el in root.findall("{*}WebhookConfiguration") or root.findall("WebhookConfiguration"): + config_id = _find_element_text(webhook_el, "Id") or uuid.uuid4().hex + events = [el.text for el in webhook_el.findall("{*}Event") or webhook_el.findall("Event") if el.text] + + dest_el = _find_element(webhook_el, "Destination") + url = _find_element_text(dest_el, "Url") if dest_el else "" + if not url: + return _error_response("InvalidArgument", "Destination URL is required", 400) + + prefix = "" + suffix = "" + filter_el = _find_element(webhook_el, "Filter") + if filter_el: + key_el = _find_element(filter_el, "S3Key") + if key_el: + for rule_el in key_el.findall("{*}FilterRule") or key_el.findall("FilterRule"): + name = _find_element_text(rule_el, "Name") + value = _find_element_text(rule_el, "Value") + if name == "prefix": + prefix = value + elif name == "suffix": + suffix = value + + configs.append(NotificationConfiguration( + id=config_id, + events=events, + destination=WebhookDestination(url=url), + prefix_filter=prefix, + suffix_filter=suffix, + )) + + notification_service.set_bucket_notifications(bucket_name, configs) + current_app.logger.info("Bucket notifications updated", extra={"bucket": bucket_name, "configs": len(configs)}) + return Response(status=200) + + +def _bucket_logging_handler(bucket_name: str) -> Response: + if request.method not in {"GET", "PUT", "DELETE"}: + return _method_not_allowed(["GET", "PUT", "DELETE"]) + + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "policy") + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + storage = _storage() + if not storage.bucket_exists(bucket_name): + return _error_response("NoSuchBucket", "Bucket does not exist", 404) + + logging_service = _access_logging() + + if request.method == "GET": + config = logging_service.get_bucket_logging(bucket_name) + root = Element("BucketLoggingStatus", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") + if config and config.enabled: + logging_enabled = SubElement(root, "LoggingEnabled") + SubElement(logging_enabled, "TargetBucket").text = config.target_bucket + SubElement(logging_enabled, "TargetPrefix").text = config.target_prefix + return _xml_response(root) + + if request.method == "DELETE": + logging_service.delete_bucket_logging(bucket_name) + current_app.logger.info("Bucket logging deleted", extra={"bucket": bucket_name}) + return Response(status=204) + + payload = request.get_data(cache=False) or b"" + if not payload.strip(): + logging_service.delete_bucket_logging(bucket_name) + return Response(status=200) + + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + logging_enabled = _find_element(root, "LoggingEnabled") + if logging_enabled is None: + logging_service.delete_bucket_logging(bucket_name) + return Response(status=200) + + target_bucket = _find_element_text(logging_enabled, "TargetBucket") + if not target_bucket: + return _error_response("InvalidArgument", "TargetBucket is required", 400) + + if not storage.bucket_exists(target_bucket): + return _error_response("InvalidTargetBucketForLogging", "Target bucket does not exist", 400) + + target_prefix = _find_element_text(logging_enabled, "TargetPrefix") + + config = LoggingConfiguration( + target_bucket=target_bucket, + target_prefix=target_prefix, + enabled=True, + ) + logging_service.set_bucket_logging(bucket_name, config) + + current_app.logger.info( + "Bucket logging updated", + extra={"bucket": bucket_name, "target_bucket": target_bucket, "target_prefix": target_prefix} + ) + return Response(status=200) + + +def _object_retention_handler(bucket_name: str, object_key: str) -> Response: + if request.method not in {"GET", "PUT"}: + return _method_not_allowed(["GET", "PUT"]) + + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + storage = _storage() + if not storage.bucket_exists(bucket_name): + return _error_response("NoSuchBucket", "Bucket does not exist", 404) + + try: + storage.get_object_path(bucket_name, object_key) + except StorageError: + return _error_response("NoSuchKey", "Object does not exist", 404) + + lock_service = _object_lock() + + if request.method == "GET": + retention = lock_service.get_object_retention(bucket_name, object_key) + if not retention: + return _error_response("NoSuchObjectLockConfiguration", "No retention policy", 404) + + root = Element("Retention", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") + SubElement(root, "Mode").text = retention.mode.value + SubElement(root, "RetainUntilDate").text = retention.retain_until_date.strftime("%Y-%m-%dT%H:%M:%S.000Z") + return _xml_response(root) + + payload = request.get_data(cache=False) or b"" + if not payload.strip(): + return _error_response("MalformedXML", "Request body is required", 400) + + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + mode_str = _find_element_text(root, "Mode") + retain_until_str = _find_element_text(root, "RetainUntilDate") + + if not mode_str or not retain_until_str: + return _error_response("InvalidArgument", "Mode and RetainUntilDate are required", 400) + + try: + mode = RetentionMode(mode_str) + except ValueError: + return _error_response("InvalidArgument", f"Invalid retention mode: {mode_str}", 400) + + try: + retain_until = datetime.fromisoformat(retain_until_str.replace("Z", "+00:00")) + except ValueError: + return _error_response("InvalidArgument", f"Invalid date format: {retain_until_str}", 400) + + bypass = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true" + + retention = ObjectLockRetention(mode=mode, retain_until_date=retain_until) + try: + lock_service.set_object_retention(bucket_name, object_key, retention, bypass_governance=bypass) + except ObjectLockError as exc: + return _error_response("AccessDenied", str(exc), 403) + + current_app.logger.info( + "Object retention set", + extra={"bucket": bucket_name, "key": object_key, "mode": mode_str, "until": retain_until_str} + ) + return Response(status=200) + + +def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response: + if request.method not in {"GET", "PUT"}: + return _method_not_allowed(["GET", "PUT"]) + + principal, error = _require_principal() + if error: + return error + try: + _authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key) + except IamError as exc: + return _error_response("AccessDenied", str(exc), 403) + + storage = _storage() + if not storage.bucket_exists(bucket_name): + return _error_response("NoSuchBucket", "Bucket does not exist", 404) + + try: + storage.get_object_path(bucket_name, object_key) + except StorageError: + return _error_response("NoSuchKey", "Object does not exist", 404) + + lock_service = _object_lock() + + if request.method == "GET": + enabled = lock_service.get_legal_hold(bucket_name, object_key) + root = Element("LegalHold", xmlns="http://s3.amazonaws.com/doc/2006-03-01/") + SubElement(root, "Status").text = "ON" if enabled else "OFF" + return _xml_response(root) + + payload = request.get_data(cache=False) or b"" + if not payload.strip(): + return _error_response("MalformedXML", "Request body is required", 400) + + try: + root = fromstring(payload) + except ParseError: + return _error_response("MalformedXML", "Unable to parse XML document", 400) + + status = _find_element_text(root, "Status") + if status not in {"ON", "OFF"}: + return _error_response("InvalidArgument", "Status must be ON or OFF", 400) + + lock_service.set_legal_hold(bucket_name, object_key, status == "ON") + + current_app.logger.info( + "Object legal hold set", + extra={"bucket": bucket_name, "key": object_key, "status": status} + ) + return Response(status=200) + + def _bulk_delete_handler(bucket_name: str) -> Response: principal, error = _require_principal() if error: @@ -1871,6 +2220,12 @@ def object_handler(bucket_name: str, object_key: str): if "tagging" in request.args: return _object_tagging_handler(bucket_name, object_key) + if "retention" in request.args: + return _object_retention_handler(bucket_name, object_key) + + if "legal-hold" in request.args: + return _object_legal_hold_handler(bucket_name, object_key) + if request.method == "POST": if "uploads" in request.args: return _initiate_multipart_upload(bucket_name, object_key) @@ -1886,22 +2241,28 @@ def object_handler(bucket_name: str, object_key: str): if copy_source: return _copy_object(bucket_name, object_key, copy_source) - _, error = _object_principal("write", bucket_name, object_key) + principal, error = _object_principal("write", bucket_name, object_key) if error: return error - + + bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true" + lock_service = _object_lock() + can_overwrite, lock_reason = lock_service.can_overwrite_object(bucket_name, object_key, bypass_governance=bypass_governance) + if not can_overwrite: + return _error_response("AccessDenied", lock_reason, 403) + stream = request.stream content_encoding = request.headers.get("Content-Encoding", "").lower() if "aws-chunked" in content_encoding: stream = AwsChunkedDecoder(stream) metadata = _extract_request_metadata() - + content_type = request.headers.get("Content-Type") validation_error = _validate_content_type(object_key, content_type) if validation_error: return _error_response("InvalidArgument", validation_error, 400) - + try: meta = storage.put_object( bucket_name, @@ -1922,10 +2283,21 @@ def object_handler(bucket_name: str, object_key: str): ) response = Response(status=200) response.headers["ETag"] = f'"{meta.etag}"' - + + _notifications().emit_object_created( + bucket_name, + object_key, + size=meta.size, + etag=meta.etag, + request_id=getattr(g, "request_id", ""), + source_ip=request.remote_addr or "", + user_identity=principal.access_key if principal else "", + operation="Put", + ) + if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): _replication_manager().trigger_replication(bucket_name, object_key, action="write") - + return response if request.method in {"GET", "HEAD"}: @@ -2048,13 +2420,30 @@ def object_handler(bucket_name: str, object_key: str): _, error = _object_principal("delete", bucket_name, object_key) if error: return error + + bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true" + lock_service = _object_lock() + can_delete, lock_reason = lock_service.can_delete_object(bucket_name, object_key, bypass_governance=bypass_governance) + if not can_delete: + return _error_response("AccessDenied", lock_reason, 403) + storage.delete_object(bucket_name, object_key) + lock_service.delete_object_lock_metadata(bucket_name, object_key) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) - + + principal, _ = _require_principal() + _notifications().emit_object_removed( + bucket_name, + object_key, + request_id=getattr(g, "request_id", ""), + source_ip=request.remote_addr or "", + user_identity=principal.access_key if principal else "", + ) + user_agent = request.headers.get("User-Agent", "") if "S3ReplicationAgent" not in user_agent: _replication_manager().trigger_replication(bucket_name, object_key, action="delete") - + return Response(status=204) diff --git a/app/secret_store.py b/app/secret_store.py index a7c3416..5c14f6e 100644 --- a/app/secret_store.py +++ b/app/secret_store.py @@ -1,4 +1,3 @@ -"""Ephemeral store for one-time secrets communicated to the UI.""" from __future__ import annotations import secrets diff --git a/app/storage.py b/app/storage.py index 27a6e75..013d8d6 100644 --- a/app/storage.py +++ b/app/storage.py @@ -1,4 +1,3 @@ -"""Filesystem-backed object storage helpers.""" from __future__ import annotations import hashlib diff --git a/app/ui.py b/app/ui.py index 3cc6c23..091bfcb 100644 --- a/app/ui.py +++ b/app/ui.py @@ -1,4 +1,3 @@ -"""Authenticated HTML UI for browsing buckets and objects.""" from __future__ import annotations import json diff --git a/app/version.py b/app/version.py index 8b40c27..6f91c30 100644 --- a/app/version.py +++ b/app/version.py @@ -1,4 +1,3 @@ -"""Central location for the application version string.""" from __future__ import annotations APP_VERSION = "0.2.0"