3 Commits

24 changed files with 3615 additions and 119 deletions

View File

@@ -1,4 +1,3 @@
"""Application factory for the mini S3-compatible object store."""
from __future__ import annotations from __future__ import annotations
import logging import logging
@@ -16,6 +15,8 @@ from flask_cors import CORS
from flask_wtf.csrf import CSRFError from flask_wtf.csrf import CSRFError
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from .access_logging import AccessLoggingService
from .acl import AclService
from .bucket_policies import BucketPolicyStore from .bucket_policies import BucketPolicyStore
from .config import AppConfig from .config import AppConfig
from .connections import ConnectionStore from .connections import ConnectionStore
@@ -23,6 +24,9 @@ from .encryption import EncryptionManager
from .extensions import limiter, csrf from .extensions import limiter, csrf
from .iam import IamService from .iam import IamService
from .kms import KMSManager from .kms import KMSManager
from .lifecycle import LifecycleManager
from .notifications import NotificationService
from .object_lock import ObjectLockService
from .replication import ReplicationManager from .replication import ReplicationManager
from .secret_store import EphemeralSecretStore from .secret_store import EphemeralSecretStore
from .storage import ObjectStorage from .storage import ObjectStorage
@@ -140,6 +144,21 @@ def create_app(
from .encrypted_storage import EncryptedObjectStorage from .encrypted_storage import EncryptedObjectStorage
storage = EncryptedObjectStorage(storage, encryption_manager) storage = EncryptedObjectStorage(storage, encryption_manager)
acl_service = AclService(storage_root)
object_lock_service = ObjectLockService(storage_root)
notification_service = NotificationService(storage_root)
access_logging_service = AccessLoggingService(storage_root)
access_logging_service.set_storage(storage)
lifecycle_manager = None
if app.config.get("LIFECYCLE_ENABLED", False):
base_storage = storage.storage if hasattr(storage, 'storage') else storage
lifecycle_manager = LifecycleManager(
base_storage,
interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600),
)
lifecycle_manager.start()
app.extensions["object_storage"] = storage app.extensions["object_storage"] = storage
app.extensions["iam"] = iam app.extensions["iam"] = iam
app.extensions["bucket_policies"] = bucket_policies app.extensions["bucket_policies"] = bucket_policies
@@ -149,6 +168,11 @@ def create_app(
app.extensions["replication"] = replication app.extensions["replication"] = replication
app.extensions["encryption"] = encryption_manager app.extensions["encryption"] = encryption_manager
app.extensions["kms"] = kms_manager app.extensions["kms"] = kms_manager
app.extensions["acl"] = acl_service
app.extensions["lifecycle"] = lifecycle_manager
app.extensions["object_lock"] = object_lock_service
app.extensions["notifications"] = notification_service
app.extensions["access_logging"] = access_logging_service
@app.errorhandler(500) @app.errorhandler(500)
def internal_error(error): def internal_error(error):

262
app/access_logging.py Normal file
View File

@@ -0,0 +1,262 @@
from __future__ import annotations
import io
import json
import logging
import queue
import threading
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class AccessLogEntry:
bucket_owner: str = "-"
bucket: str = "-"
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
remote_ip: str = "-"
requester: str = "-"
request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16].upper())
operation: str = "-"
key: str = "-"
request_uri: str = "-"
http_status: int = 200
error_code: str = "-"
bytes_sent: int = 0
object_size: int = 0
total_time_ms: int = 0
turn_around_time_ms: int = 0
referrer: str = "-"
user_agent: str = "-"
version_id: str = "-"
host_id: str = "-"
signature_version: str = "SigV4"
cipher_suite: str = "-"
authentication_type: str = "AuthHeader"
host_header: str = "-"
tls_version: str = "-"
def to_log_line(self) -> str:
time_str = self.timestamp.strftime("[%d/%b/%Y:%H:%M:%S %z]")
return (
f'{self.bucket_owner} {self.bucket} {time_str} {self.remote_ip} '
f'{self.requester} {self.request_id} {self.operation} {self.key} '
f'"{self.request_uri}" {self.http_status} {self.error_code or "-"} '
f'{self.bytes_sent or "-"} {self.object_size or "-"} {self.total_time_ms or "-"} '
f'{self.turn_around_time_ms or "-"} "{self.referrer}" "{self.user_agent}" {self.version_id}'
)
def to_dict(self) -> Dict[str, Any]:
return {
"bucket_owner": self.bucket_owner,
"bucket": self.bucket,
"timestamp": self.timestamp.isoformat(),
"remote_ip": self.remote_ip,
"requester": self.requester,
"request_id": self.request_id,
"operation": self.operation,
"key": self.key,
"request_uri": self.request_uri,
"http_status": self.http_status,
"error_code": self.error_code,
"bytes_sent": self.bytes_sent,
"object_size": self.object_size,
"total_time_ms": self.total_time_ms,
"referrer": self.referrer,
"user_agent": self.user_agent,
"version_id": self.version_id,
}
@dataclass
class LoggingConfiguration:
target_bucket: str
target_prefix: str = ""
enabled: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"LoggingEnabled": {
"TargetBucket": self.target_bucket,
"TargetPrefix": self.target_prefix,
}
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Optional["LoggingConfiguration"]:
logging_enabled = data.get("LoggingEnabled")
if not logging_enabled:
return None
return cls(
target_bucket=logging_enabled.get("TargetBucket", ""),
target_prefix=logging_enabled.get("TargetPrefix", ""),
enabled=True,
)
class AccessLoggingService:
def __init__(self, storage_root: Path, flush_interval: int = 60, max_buffer_size: int = 1000):
self.storage_root = storage_root
self.flush_interval = flush_interval
self.max_buffer_size = max_buffer_size
self._configs: Dict[str, LoggingConfiguration] = {}
self._buffer: Dict[str, List[AccessLogEntry]] = {}
self._buffer_lock = threading.Lock()
self._shutdown = threading.Event()
self._storage = None
self._flush_thread = threading.Thread(target=self._flush_loop, name="access-log-flush", daemon=True)
self._flush_thread.start()
def set_storage(self, storage: Any) -> None:
self._storage = storage
def _config_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "logging.json"
def get_bucket_logging(self, bucket_name: str) -> Optional[LoggingConfiguration]:
if bucket_name in self._configs:
return self._configs[bucket_name]
config_path = self._config_path(bucket_name)
if not config_path.exists():
return None
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
config = LoggingConfiguration.from_dict(data)
if config:
self._configs[bucket_name] = config
return config
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Failed to load logging config for {bucket_name}: {e}")
return None
def set_bucket_logging(self, bucket_name: str, config: LoggingConfiguration) -> None:
config_path = self._config_path(bucket_name)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps(config.to_dict(), indent=2), encoding="utf-8")
self._configs[bucket_name] = config
def delete_bucket_logging(self, bucket_name: str) -> None:
config_path = self._config_path(bucket_name)
try:
if config_path.exists():
config_path.unlink()
except OSError:
pass
self._configs.pop(bucket_name, None)
def log_request(
self,
bucket_name: str,
*,
operation: str,
key: str = "-",
remote_ip: str = "-",
requester: str = "-",
request_uri: str = "-",
http_status: int = 200,
error_code: str = "",
bytes_sent: int = 0,
object_size: int = 0,
total_time_ms: int = 0,
referrer: str = "-",
user_agent: str = "-",
version_id: str = "-",
request_id: str = "",
) -> None:
config = self.get_bucket_logging(bucket_name)
if not config or not config.enabled:
return
entry = AccessLogEntry(
bucket_owner="local-owner",
bucket=bucket_name,
remote_ip=remote_ip,
requester=requester,
request_id=request_id or uuid.uuid4().hex[:16].upper(),
operation=operation,
key=key,
request_uri=request_uri,
http_status=http_status,
error_code=error_code,
bytes_sent=bytes_sent,
object_size=object_size,
total_time_ms=total_time_ms,
referrer=referrer,
user_agent=user_agent,
version_id=version_id,
)
target_key = f"{config.target_bucket}:{config.target_prefix}"
with self._buffer_lock:
if target_key not in self._buffer:
self._buffer[target_key] = []
self._buffer[target_key].append(entry)
if len(self._buffer[target_key]) >= self.max_buffer_size:
self._flush_buffer(target_key)
def _flush_loop(self) -> None:
while not self._shutdown.is_set():
time.sleep(self.flush_interval)
self._flush_all()
def _flush_all(self) -> None:
with self._buffer_lock:
targets = list(self._buffer.keys())
for target_key in targets:
self._flush_buffer(target_key)
def _flush_buffer(self, target_key: str) -> None:
with self._buffer_lock:
entries = self._buffer.pop(target_key, [])
if not entries or not self._storage:
return
try:
bucket_name, prefix = target_key.split(":", 1)
except ValueError:
logger.error(f"Invalid target key: {target_key}")
return
now = datetime.now(timezone.utc)
log_key = f"{prefix}{now.strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4().hex[:8]}"
log_content = "\n".join(entry.to_log_line() for entry in entries) + "\n"
try:
stream = io.BytesIO(log_content.encode("utf-8"))
self._storage.put_object(bucket_name, log_key, stream, enforce_quota=False)
logger.info(f"Flushed {len(entries)} access log entries to {bucket_name}/{log_key}")
except Exception as e:
logger.error(f"Failed to write access log to {bucket_name}/{log_key}: {e}")
with self._buffer_lock:
if target_key not in self._buffer:
self._buffer[target_key] = []
self._buffer[target_key] = entries + self._buffer[target_key]
def flush(self) -> None:
self._flush_all()
def shutdown(self) -> None:
self._shutdown.set()
self._flush_all()
self._flush_thread.join(timeout=5.0)
def get_stats(self) -> Dict[str, Any]:
with self._buffer_lock:
buffered = sum(len(entries) for entries in self._buffer.values())
return {
"buffered_entries": buffered,
"target_buckets": len(self._buffer),
}

204
app/acl.py Normal file
View File

@@ -0,0 +1,204 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
ACL_PERMISSION_FULL_CONTROL = "FULL_CONTROL"
ACL_PERMISSION_WRITE = "WRITE"
ACL_PERMISSION_WRITE_ACP = "WRITE_ACP"
ACL_PERMISSION_READ = "READ"
ACL_PERMISSION_READ_ACP = "READ_ACP"
ALL_PERMISSIONS = {
ACL_PERMISSION_FULL_CONTROL,
ACL_PERMISSION_WRITE,
ACL_PERMISSION_WRITE_ACP,
ACL_PERMISSION_READ,
ACL_PERMISSION_READ_ACP,
}
PERMISSION_TO_ACTIONS = {
ACL_PERMISSION_FULL_CONTROL: {"read", "write", "delete", "list", "share"},
ACL_PERMISSION_WRITE: {"write", "delete"},
ACL_PERMISSION_WRITE_ACP: {"share"},
ACL_PERMISSION_READ: {"read", "list"},
ACL_PERMISSION_READ_ACP: {"share"},
}
GRANTEE_ALL_USERS = "*"
GRANTEE_AUTHENTICATED_USERS = "authenticated"
@dataclass
class AclGrant:
grantee: str
permission: str
def to_dict(self) -> Dict[str, str]:
return {"grantee": self.grantee, "permission": self.permission}
@classmethod
def from_dict(cls, data: Dict[str, str]) -> "AclGrant":
return cls(grantee=data["grantee"], permission=data["permission"])
@dataclass
class Acl:
owner: str
grants: List[AclGrant] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"owner": self.owner,
"grants": [g.to_dict() for g in self.grants],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Acl":
return cls(
owner=data.get("owner", ""),
grants=[AclGrant.from_dict(g) for g in data.get("grants", [])],
)
def get_allowed_actions(self, principal_id: Optional[str], is_authenticated: bool = True) -> Set[str]:
actions: Set[str] = set()
if principal_id and principal_id == self.owner:
actions.update(PERMISSION_TO_ACTIONS[ACL_PERMISSION_FULL_CONTROL])
for grant in self.grants:
if grant.grantee == GRANTEE_ALL_USERS:
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
elif grant.grantee == GRANTEE_AUTHENTICATED_USERS and is_authenticated:
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
elif principal_id and grant.grantee == principal_id:
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
return actions
CANNED_ACLS = {
"private": lambda owner: Acl(
owner=owner,
grants=[AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL)],
),
"public-read": lambda owner: Acl(
owner=owner,
grants=[
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_READ),
],
),
"public-read-write": lambda owner: Acl(
owner=owner,
grants=[
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_READ),
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_WRITE),
],
),
"authenticated-read": lambda owner: Acl(
owner=owner,
grants=[
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
AclGrant(grantee=GRANTEE_AUTHENTICATED_USERS, permission=ACL_PERMISSION_READ),
],
),
"bucket-owner-read": lambda owner: Acl(
owner=owner,
grants=[
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
],
),
"bucket-owner-full-control": lambda owner: Acl(
owner=owner,
grants=[
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
],
),
}
def create_canned_acl(canned_acl: str, owner: str) -> Acl:
factory = CANNED_ACLS.get(canned_acl)
if not factory:
return CANNED_ACLS["private"](owner)
return factory(owner)
class AclService:
def __init__(self, storage_root: Path):
self.storage_root = storage_root
self._bucket_acl_cache: Dict[str, Acl] = {}
def _bucket_acl_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".acl.json"
def get_bucket_acl(self, bucket_name: str) -> Optional[Acl]:
if bucket_name in self._bucket_acl_cache:
return self._bucket_acl_cache[bucket_name]
acl_path = self._bucket_acl_path(bucket_name)
if not acl_path.exists():
return None
try:
data = json.loads(acl_path.read_text(encoding="utf-8"))
acl = Acl.from_dict(data)
self._bucket_acl_cache[bucket_name] = acl
return acl
except (OSError, json.JSONDecodeError):
return None
def set_bucket_acl(self, bucket_name: str, acl: Acl) -> None:
acl_path = self._bucket_acl_path(bucket_name)
acl_path.parent.mkdir(parents=True, exist_ok=True)
acl_path.write_text(json.dumps(acl.to_dict(), indent=2), encoding="utf-8")
self._bucket_acl_cache[bucket_name] = acl
def set_bucket_canned_acl(self, bucket_name: str, canned_acl: str, owner: str) -> Acl:
acl = create_canned_acl(canned_acl, owner)
self.set_bucket_acl(bucket_name, acl)
return acl
def delete_bucket_acl(self, bucket_name: str) -> None:
acl_path = self._bucket_acl_path(bucket_name)
if acl_path.exists():
acl_path.unlink()
self._bucket_acl_cache.pop(bucket_name, None)
def evaluate_bucket_acl(
self,
bucket_name: str,
principal_id: Optional[str],
action: str,
is_authenticated: bool = True,
) -> bool:
acl = self.get_bucket_acl(bucket_name)
if not acl:
return False
allowed_actions = acl.get_allowed_actions(principal_id, is_authenticated)
return action in allowed_actions
def get_object_acl(self, bucket_name: str, object_key: str, object_metadata: Dict[str, Any]) -> Optional[Acl]:
acl_data = object_metadata.get("__acl__")
if not acl_data:
return None
try:
return Acl.from_dict(acl_data)
except (TypeError, KeyError):
return None
def create_object_acl_metadata(self, acl: Acl) -> Dict[str, Any]:
return {"__acl__": acl.to_dict()}
def evaluate_object_acl(
self,
object_metadata: Dict[str, Any],
principal_id: Optional[str],
action: str,
is_authenticated: bool = True,
) -> bool:
acl = self.get_object_acl("", "", object_metadata)
if not acl:
return False
allowed_actions = acl.get_allowed_actions(principal_id, is_authenticated)
return action in allowed_actions

View File

@@ -1,4 +1,3 @@
"""Bucket policy loader/enforcer with a subset of AWS semantics."""
from __future__ import annotations from __future__ import annotations
import json import json

View File

@@ -1,4 +1,3 @@
"""Configuration helpers for the S3 clone application."""
from __future__ import annotations from __future__ import annotations
import os import os
@@ -74,6 +73,8 @@ class AppConfig:
kms_keys_path: Path kms_keys_path: Path
default_encryption_algorithm: str default_encryption_algorithm: str
display_timezone: str display_timezone: str
lifecycle_enabled: bool
lifecycle_interval_seconds: int
@classmethod @classmethod
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
@@ -91,6 +92,8 @@ class AppConfig:
secret_ttl_seconds = int(_get("SECRET_TTL_SECONDS", 300)) secret_ttl_seconds = int(_get("SECRET_TTL_SECONDS", 300))
stream_chunk_size = int(_get("STREAM_CHUNK_SIZE", 64 * 1024)) stream_chunk_size = int(_get("STREAM_CHUNK_SIZE", 64 * 1024))
multipart_min_part_size = int(_get("MULTIPART_MIN_PART_SIZE", 5 * 1024 * 1024)) multipart_min_part_size = int(_get("MULTIPART_MIN_PART_SIZE", 5 * 1024 * 1024))
lifecycle_enabled = _get("LIFECYCLE_ENABLED", "false").lower() in ("true", "1", "yes")
lifecycle_interval_seconds = int(_get("LIFECYCLE_INTERVAL_SECONDS", 3600))
default_secret = "dev-secret-key" default_secret = "dev-secret-key"
secret_key = str(_get("SECRET_KEY", default_secret)) secret_key = str(_get("SECRET_KEY", default_secret))
@@ -198,7 +201,9 @@ class AppConfig:
kms_enabled=kms_enabled, kms_enabled=kms_enabled,
kms_keys_path=kms_keys_path, kms_keys_path=kms_keys_path,
default_encryption_algorithm=default_encryption_algorithm, default_encryption_algorithm=default_encryption_algorithm,
display_timezone=display_timezone) display_timezone=display_timezone,
lifecycle_enabled=lifecycle_enabled,
lifecycle_interval_seconds=lifecycle_interval_seconds)
def validate_and_report(self) -> list[str]: def validate_and_report(self) -> list[str]:
"""Validate configuration and return a list of warnings/issues. """Validate configuration and return a list of warnings/issues.

View File

@@ -1,4 +1,3 @@
"""Manage remote S3 connections."""
from __future__ import annotations from __future__ import annotations
import json import json

View File

@@ -1,4 +1,3 @@
"""Encrypted storage layer that wraps ObjectStorage with encryption support."""
from __future__ import annotations from __future__ import annotations
import io import io

View File

@@ -353,6 +353,106 @@ class EncryptionManager:
return encryptor.decrypt_stream(stream, metadata) return encryptor.decrypt_stream(stream, metadata)
class SSECEncryption(EncryptionProvider):
"""SSE-C: Server-Side Encryption with Customer-Provided Keys.
The client provides the encryption key with each request.
Server encrypts/decrypts but never stores the key.
Required headers for PUT:
- x-amz-server-side-encryption-customer-algorithm: AES256
- x-amz-server-side-encryption-customer-key: Base64-encoded 256-bit key
- x-amz-server-side-encryption-customer-key-MD5: Base64-encoded MD5 of key
"""
KEY_ID = "customer-provided"
def __init__(self, customer_key: bytes):
if len(customer_key) != 32:
raise EncryptionError("Customer key must be exactly 256 bits (32 bytes)")
self.customer_key = customer_key
@classmethod
def from_headers(cls, headers: Dict[str, str]) -> "SSECEncryption":
algorithm = headers.get("x-amz-server-side-encryption-customer-algorithm", "")
if algorithm.upper() != "AES256":
raise EncryptionError(f"Unsupported SSE-C algorithm: {algorithm}. Only AES256 is supported.")
key_b64 = headers.get("x-amz-server-side-encryption-customer-key", "")
if not key_b64:
raise EncryptionError("Missing x-amz-server-side-encryption-customer-key header")
key_md5_b64 = headers.get("x-amz-server-side-encryption-customer-key-md5", "")
try:
customer_key = base64.b64decode(key_b64)
except Exception as e:
raise EncryptionError(f"Invalid base64 in customer key: {e}") from e
if len(customer_key) != 32:
raise EncryptionError(f"Customer key must be 256 bits, got {len(customer_key) * 8} bits")
if key_md5_b64:
import hashlib
expected_md5 = base64.b64encode(hashlib.md5(customer_key).digest()).decode()
if key_md5_b64 != expected_md5:
raise EncryptionError("Customer key MD5 mismatch")
return cls(customer_key)
def encrypt(self, plaintext: bytes, context: Dict[str, str] | None = None) -> EncryptionResult:
aesgcm = AESGCM(self.customer_key)
nonce = secrets.token_bytes(12)
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
return EncryptionResult(
ciphertext=ciphertext,
nonce=nonce,
key_id=self.KEY_ID,
encrypted_data_key=b"",
)
def decrypt(self, ciphertext: bytes, nonce: bytes, encrypted_data_key: bytes,
key_id: str, context: Dict[str, str] | None = None) -> bytes:
aesgcm = AESGCM(self.customer_key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception as exc:
raise EncryptionError(f"SSE-C decryption failed: {exc}") from exc
def generate_data_key(self) -> tuple[bytes, bytes]:
return self.customer_key, b""
@dataclass
class SSECMetadata:
algorithm: str = "AES256"
nonce: bytes = b""
key_md5: str = ""
def to_dict(self) -> Dict[str, str]:
return {
"x-amz-server-side-encryption-customer-algorithm": self.algorithm,
"x-amz-encryption-nonce": base64.b64encode(self.nonce).decode(),
"x-amz-server-side-encryption-customer-key-MD5": self.key_md5,
}
@classmethod
def from_dict(cls, data: Dict[str, str]) -> Optional["SSECMetadata"]:
algorithm = data.get("x-amz-server-side-encryption-customer-algorithm")
if not algorithm:
return None
try:
nonce = base64.b64decode(data.get("x-amz-encryption-nonce", ""))
return cls(
algorithm=algorithm,
nonce=nonce,
key_md5=data.get("x-amz-server-side-encryption-customer-key-MD5", ""),
)
except Exception:
return None
class ClientEncryptionHelper: class ClientEncryptionHelper:
"""Helpers for client-side encryption. """Helpers for client-side encryption.

View File

@@ -1,4 +1,3 @@
"""Standardized error handling for API and UI responses."""
from __future__ import annotations from __future__ import annotations
import logging import logging

View File

@@ -1,4 +1,3 @@
"""Application-wide extension instances."""
from flask import g from flask import g
from flask_limiter import Limiter from flask_limiter import Limiter
from flask_limiter.util import get_remote_address from flask_limiter.util import get_remote_address

View File

@@ -1,4 +1,3 @@
"""Lightweight IAM-style user and policy management."""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -121,6 +120,7 @@ class IamService:
self._cache_ttl = 60.0 # Cache credentials for 60 seconds self._cache_ttl = 60.0 # Cache credentials for 60 seconds
self._last_stat_check = 0.0 self._last_stat_check = 0.0
self._stat_check_interval = 1.0 # Only stat() file every 1 second self._stat_check_interval = 1.0 # Only stat() file every 1 second
self._sessions: Dict[str, Dict[str, Any]] = {}
self._load() self._load()
def _maybe_reload(self) -> None: def _maybe_reload(self) -> None:
@@ -192,6 +192,40 @@ class IamService:
elapsed = (datetime.now(timezone.utc) - oldest).total_seconds() elapsed = (datetime.now(timezone.utc) - oldest).total_seconds()
return int(max(0, self.auth_lockout_window.total_seconds() - elapsed)) return int(max(0, self.auth_lockout_window.total_seconds() - elapsed))
def create_session_token(self, access_key: str, duration_seconds: int = 3600) -> str:
"""Create a temporary session token for an access key."""
self._maybe_reload()
record = self._users.get(access_key)
if not record:
raise IamError("Unknown access key")
self._cleanup_expired_sessions()
token = secrets.token_urlsafe(32)
expires_at = time.time() + duration_seconds
self._sessions[token] = {
"access_key": access_key,
"expires_at": expires_at,
}
return token
def validate_session_token(self, access_key: str, session_token: str) -> bool:
"""Validate a session token for an access key."""
session = self._sessions.get(session_token)
if not session:
return False
if session["access_key"] != access_key:
return False
if time.time() > session["expires_at"]:
del self._sessions[session_token]
return False
return True
def _cleanup_expired_sessions(self) -> None:
"""Remove expired session tokens."""
now = time.time()
expired = [token for token, data in self._sessions.items() if now > data["expires_at"]]
for token in expired:
del self._sessions[token]
def principal_for_key(self, access_key: str) -> Principal: def principal_for_key(self, access_key: str) -> Principal:
# Performance: Check cache first # Performance: Check cache first
now = time.time() now = time.time()

View File

@@ -1,4 +1,3 @@
"""Key Management Service (KMS) for encryption key management."""
from __future__ import annotations from __future__ import annotations
import base64 import base64
@@ -212,6 +211,26 @@ class KMSManager:
self._load_keys() self._load_keys()
return list(self._keys.values()) return list(self._keys.values())
def get_default_key_id(self) -> str:
"""Get the default KMS key ID, creating one if none exist."""
self._load_keys()
for key in self._keys.values():
if key.enabled:
return key.key_id
default_key = self.create_key(description="Default KMS Key")
return default_key.key_id
def get_provider(self, key_id: str | None = None) -> "KMSEncryptionProvider":
"""Get a KMS encryption provider for the specified key."""
if key_id is None:
key_id = self.get_default_key_id()
key = self.get_key(key_id)
if not key:
raise EncryptionError(f"Key not found: {key_id}")
if not key.enabled:
raise EncryptionError(f"Key is disabled: {key_id}")
return KMSEncryptionProvider(self, key_id)
def enable_key(self, key_id: str) -> None: def enable_key(self, key_id: str) -> None:
"""Enable a key.""" """Enable a key."""
self._load_keys() self._load_keys()

View File

@@ -1,4 +1,3 @@
"""KMS and encryption API endpoints."""
from __future__ import annotations from __future__ import annotations
import base64 import base64

235
app/lifecycle.py Normal file
View File

@@ -0,0 +1,235 @@
from __future__ import annotations
import logging
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from .storage import ObjectStorage, StorageError
logger = logging.getLogger(__name__)
@dataclass
class LifecycleResult:
bucket_name: str
objects_deleted: int = 0
versions_deleted: int = 0
uploads_aborted: int = 0
errors: List[str] = field(default_factory=list)
execution_time_seconds: float = 0.0
class LifecycleManager:
def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600):
self.storage = storage
self.interval_seconds = interval_seconds
self._timer: Optional[threading.Timer] = None
self._shutdown = False
self._lock = threading.Lock()
def start(self) -> None:
if self._timer is not None:
return
self._shutdown = False
self._schedule_next()
logger.info(f"Lifecycle manager started with interval {self.interval_seconds}s")
def stop(self) -> None:
self._shutdown = True
if self._timer:
self._timer.cancel()
self._timer = None
logger.info("Lifecycle manager stopped")
def _schedule_next(self) -> None:
if self._shutdown:
return
self._timer = threading.Timer(self.interval_seconds, self._run_enforcement)
self._timer.daemon = True
self._timer.start()
def _run_enforcement(self) -> None:
if self._shutdown:
return
try:
self.enforce_all_buckets()
except Exception as e:
logger.error(f"Lifecycle enforcement failed: {e}")
finally:
self._schedule_next()
def enforce_all_buckets(self) -> Dict[str, LifecycleResult]:
results = {}
try:
buckets = self.storage.list_buckets()
for bucket in buckets:
result = self.enforce_rules(bucket.name)
if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0:
results[bucket.name] = result
except StorageError as e:
logger.error(f"Failed to list buckets for lifecycle: {e}")
return results
def enforce_rules(self, bucket_name: str) -> LifecycleResult:
start_time = time.time()
result = LifecycleResult(bucket_name=bucket_name)
try:
lifecycle = self.storage.get_bucket_lifecycle(bucket_name)
if not lifecycle:
return result
for rule in lifecycle:
if rule.get("Status") != "Enabled":
continue
rule_id = rule.get("ID", "unknown")
prefix = rule.get("Prefix", rule.get("Filter", {}).get("Prefix", ""))
self._enforce_expiration(bucket_name, rule, prefix, result)
self._enforce_noncurrent_expiration(bucket_name, rule, prefix, result)
self._enforce_abort_multipart(bucket_name, rule, result)
except StorageError as e:
result.errors.append(str(e))
logger.error(f"Lifecycle enforcement error for {bucket_name}: {e}")
result.execution_time_seconds = time.time() - start_time
if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0:
logger.info(
f"Lifecycle enforcement for {bucket_name}: "
f"deleted={result.objects_deleted}, versions={result.versions_deleted}, "
f"aborted={result.uploads_aborted}, time={result.execution_time_seconds:.2f}s"
)
return result
def _enforce_expiration(
self, bucket_name: str, rule: Dict[str, Any], prefix: str, result: LifecycleResult
) -> None:
expiration = rule.get("Expiration", {})
if not expiration:
return
days = expiration.get("Days")
date_str = expiration.get("Date")
if days:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
elif date_str:
try:
cutoff = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
return
else:
return
try:
objects = self.storage.list_objects_all(bucket_name)
for obj in objects:
if prefix and not obj.key.startswith(prefix):
continue
if obj.last_modified < cutoff:
try:
self.storage.delete_object(bucket_name, obj.key)
result.objects_deleted += 1
except StorageError as e:
result.errors.append(f"Failed to delete {obj.key}: {e}")
except StorageError as e:
result.errors.append(f"Failed to list objects: {e}")
def _enforce_noncurrent_expiration(
self, bucket_name: str, rule: Dict[str, Any], prefix: str, result: LifecycleResult
) -> None:
noncurrent = rule.get("NoncurrentVersionExpiration", {})
noncurrent_days = noncurrent.get("NoncurrentDays")
if not noncurrent_days:
return
cutoff = datetime.now(timezone.utc) - timedelta(days=noncurrent_days)
try:
objects = self.storage.list_objects_all(bucket_name)
for obj in objects:
if prefix and not obj.key.startswith(prefix):
continue
try:
versions = self.storage.list_object_versions(bucket_name, obj.key)
for version in versions:
archived_at_str = version.get("archived_at", "")
if not archived_at_str:
continue
try:
archived_at = datetime.fromisoformat(archived_at_str.replace("Z", "+00:00"))
if archived_at < cutoff:
version_id = version.get("version_id")
if version_id:
self.storage.delete_object_version(bucket_name, obj.key, version_id)
result.versions_deleted += 1
except (ValueError, StorageError) as e:
result.errors.append(f"Failed to process version: {e}")
except StorageError:
pass
except StorageError as e:
result.errors.append(f"Failed to list objects: {e}")
try:
orphaned = self.storage.list_orphaned_objects(bucket_name)
for item in orphaned:
obj_key = item.get("key", "")
if prefix and not obj_key.startswith(prefix):
continue
try:
versions = self.storage.list_object_versions(bucket_name, obj_key)
for version in versions:
archived_at_str = version.get("archived_at", "")
if not archived_at_str:
continue
try:
archived_at = datetime.fromisoformat(archived_at_str.replace("Z", "+00:00"))
if archived_at < cutoff:
version_id = version.get("version_id")
if version_id:
self.storage.delete_object_version(bucket_name, obj_key, version_id)
result.versions_deleted += 1
except (ValueError, StorageError) as e:
result.errors.append(f"Failed to process orphaned version: {e}")
except StorageError:
pass
except StorageError as e:
result.errors.append(f"Failed to list orphaned objects: {e}")
def _enforce_abort_multipart(
self, bucket_name: str, rule: Dict[str, Any], result: LifecycleResult
) -> None:
abort_config = rule.get("AbortIncompleteMultipartUpload", {})
days_after = abort_config.get("DaysAfterInitiation")
if not days_after:
return
cutoff = datetime.now(timezone.utc) - timedelta(days=days_after)
try:
uploads = self.storage.list_multipart_uploads(bucket_name)
for upload in uploads:
created_at_str = upload.get("created_at", "")
if not created_at_str:
continue
try:
created_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
if created_at < cutoff:
upload_id = upload.get("upload_id")
if upload_id:
self.storage.abort_multipart_upload(bucket_name, upload_id)
result.uploads_aborted += 1
except (ValueError, StorageError) as e:
result.errors.append(f"Failed to abort upload: {e}")
except StorageError as e:
result.errors.append(f"Failed to list multipart uploads: {e}")
def run_now(self, bucket_name: Optional[str] = None) -> Dict[str, LifecycleResult]:
if bucket_name:
return {bucket_name: self.enforce_rules(bucket_name)}
return self.enforce_all_buckets()

334
app/notifications.py Normal file
View File

@@ -0,0 +1,334 @@
from __future__ import annotations
import json
import logging
import queue
import threading
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
@dataclass
class NotificationEvent:
event_name: str
bucket_name: str
object_key: str
object_size: int = 0
etag: str = ""
version_id: Optional[str] = None
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
request_id: str = field(default_factory=lambda: uuid.uuid4().hex)
source_ip: str = ""
user_identity: str = ""
def to_s3_event(self) -> Dict[str, Any]:
return {
"Records": [
{
"eventVersion": "2.1",
"eventSource": "myfsio:s3",
"awsRegion": "local",
"eventTime": self.timestamp.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"eventName": self.event_name,
"userIdentity": {
"principalId": self.user_identity or "ANONYMOUS",
},
"requestParameters": {
"sourceIPAddress": self.source_ip or "127.0.0.1",
},
"responseElements": {
"x-amz-request-id": self.request_id,
"x-amz-id-2": self.request_id,
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "notification",
"bucket": {
"name": self.bucket_name,
"ownerIdentity": {"principalId": "local"},
"arn": f"arn:aws:s3:::{self.bucket_name}",
},
"object": {
"key": self.object_key,
"size": self.object_size,
"eTag": self.etag,
"versionId": self.version_id or "null",
"sequencer": f"{int(time.time() * 1000):016X}",
},
},
}
]
}
@dataclass
class WebhookDestination:
url: str
headers: Dict[str, str] = field(default_factory=dict)
timeout_seconds: int = 30
retry_count: int = 3
retry_delay_seconds: int = 1
def to_dict(self) -> Dict[str, Any]:
return {
"url": self.url,
"headers": self.headers,
"timeout_seconds": self.timeout_seconds,
"retry_count": self.retry_count,
"retry_delay_seconds": self.retry_delay_seconds,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WebhookDestination":
return cls(
url=data.get("url", ""),
headers=data.get("headers", {}),
timeout_seconds=data.get("timeout_seconds", 30),
retry_count=data.get("retry_count", 3),
retry_delay_seconds=data.get("retry_delay_seconds", 1),
)
@dataclass
class NotificationConfiguration:
id: str
events: List[str]
destination: WebhookDestination
prefix_filter: str = ""
suffix_filter: str = ""
def matches_event(self, event_name: str, object_key: str) -> bool:
event_match = False
for pattern in self.events:
if pattern.endswith("*"):
base = pattern[:-1]
if event_name.startswith(base):
event_match = True
break
elif pattern == event_name:
event_match = True
break
if not event_match:
return False
if self.prefix_filter and not object_key.startswith(self.prefix_filter):
return False
if self.suffix_filter and not object_key.endswith(self.suffix_filter):
return False
return True
def to_dict(self) -> Dict[str, Any]:
return {
"Id": self.id,
"Events": self.events,
"Destination": self.destination.to_dict(),
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": self.prefix_filter},
{"Name": "suffix", "Value": self.suffix_filter},
]
}
},
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "NotificationConfiguration":
prefix = ""
suffix = ""
filter_data = data.get("Filter", {})
key_filter = filter_data.get("Key", {})
for rule in key_filter.get("FilterRules", []):
if rule.get("Name") == "prefix":
prefix = rule.get("Value", "")
elif rule.get("Name") == "suffix":
suffix = rule.get("Value", "")
return cls(
id=data.get("Id", uuid.uuid4().hex),
events=data.get("Events", []),
destination=WebhookDestination.from_dict(data.get("Destination", {})),
prefix_filter=prefix,
suffix_filter=suffix,
)
class NotificationService:
def __init__(self, storage_root: Path, worker_count: int = 2):
self.storage_root = storage_root
self._configs: Dict[str, List[NotificationConfiguration]] = {}
self._queue: queue.Queue[tuple[NotificationEvent, WebhookDestination]] = queue.Queue()
self._workers: List[threading.Thread] = []
self._shutdown = threading.Event()
self._stats = {
"events_queued": 0,
"events_sent": 0,
"events_failed": 0,
}
for i in range(worker_count):
worker = threading.Thread(target=self._worker_loop, name=f"notification-worker-{i}", daemon=True)
worker.start()
self._workers.append(worker)
def _config_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "notifications.json"
def get_bucket_notifications(self, bucket_name: str) -> List[NotificationConfiguration]:
if bucket_name in self._configs:
return self._configs[bucket_name]
config_path = self._config_path(bucket_name)
if not config_path.exists():
return []
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
configs = [NotificationConfiguration.from_dict(c) for c in data.get("configurations", [])]
self._configs[bucket_name] = configs
return configs
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Failed to load notification config for {bucket_name}: {e}")
return []
def set_bucket_notifications(
self, bucket_name: str, configurations: List[NotificationConfiguration]
) -> None:
config_path = self._config_path(bucket_name)
config_path.parent.mkdir(parents=True, exist_ok=True)
data = {"configurations": [c.to_dict() for c in configurations]}
config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
self._configs[bucket_name] = configurations
def delete_bucket_notifications(self, bucket_name: str) -> None:
config_path = self._config_path(bucket_name)
try:
if config_path.exists():
config_path.unlink()
except OSError:
pass
self._configs.pop(bucket_name, None)
def emit_event(self, event: NotificationEvent) -> None:
configurations = self.get_bucket_notifications(event.bucket_name)
if not configurations:
return
for config in configurations:
if config.matches_event(event.event_name, event.object_key):
self._queue.put((event, config.destination))
self._stats["events_queued"] += 1
logger.debug(
f"Queued notification for {event.event_name} on {event.bucket_name}/{event.object_key}"
)
def emit_object_created(
self,
bucket_name: str,
object_key: str,
*,
size: int = 0,
etag: str = "",
version_id: Optional[str] = None,
request_id: str = "",
source_ip: str = "",
user_identity: str = "",
operation: str = "Put",
) -> None:
event = NotificationEvent(
event_name=f"s3:ObjectCreated:{operation}",
bucket_name=bucket_name,
object_key=object_key,
object_size=size,
etag=etag,
version_id=version_id,
request_id=request_id or uuid.uuid4().hex,
source_ip=source_ip,
user_identity=user_identity,
)
self.emit_event(event)
def emit_object_removed(
self,
bucket_name: str,
object_key: str,
*,
version_id: Optional[str] = None,
request_id: str = "",
source_ip: str = "",
user_identity: str = "",
operation: str = "Delete",
) -> None:
event = NotificationEvent(
event_name=f"s3:ObjectRemoved:{operation}",
bucket_name=bucket_name,
object_key=object_key,
version_id=version_id,
request_id=request_id or uuid.uuid4().hex,
source_ip=source_ip,
user_identity=user_identity,
)
self.emit_event(event)
def _worker_loop(self) -> None:
while not self._shutdown.is_set():
try:
event, destination = self._queue.get(timeout=1.0)
except queue.Empty:
continue
try:
self._send_notification(event, destination)
self._stats["events_sent"] += 1
except Exception as e:
self._stats["events_failed"] += 1
logger.error(f"Failed to send notification: {e}")
finally:
self._queue.task_done()
def _send_notification(self, event: NotificationEvent, destination: WebhookDestination) -> None:
payload = event.to_s3_event()
headers = {"Content-Type": "application/json", **destination.headers}
last_error = None
for attempt in range(destination.retry_count):
try:
response = requests.post(
destination.url,
json=payload,
headers=headers,
timeout=destination.timeout_seconds,
)
if response.status_code < 400:
logger.info(
f"Notification sent: {event.event_name} -> {destination.url} (status={response.status_code})"
)
return
last_error = f"HTTP {response.status_code}: {response.text[:200]}"
except requests.RequestException as e:
last_error = str(e)
if attempt < destination.retry_count - 1:
time.sleep(destination.retry_delay_seconds * (attempt + 1))
raise RuntimeError(f"Failed after {destination.retry_count} attempts: {last_error}")
def get_stats(self) -> Dict[str, int]:
return dict(self._stats)
def shutdown(self) -> None:
self._shutdown.set()
for worker in self._workers:
worker.join(timeout=5.0)

234
app/object_lock.py Normal file
View File

@@ -0,0 +1,234 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional
class RetentionMode(Enum):
GOVERNANCE = "GOVERNANCE"
COMPLIANCE = "COMPLIANCE"
class ObjectLockError(Exception):
pass
@dataclass
class ObjectLockRetention:
mode: RetentionMode
retain_until_date: datetime
def to_dict(self) -> Dict[str, str]:
return {
"Mode": self.mode.value,
"RetainUntilDate": self.retain_until_date.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Optional["ObjectLockRetention"]:
if not data:
return None
mode_str = data.get("Mode")
date_str = data.get("RetainUntilDate")
if not mode_str or not date_str:
return None
try:
mode = RetentionMode(mode_str)
retain_until = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
return cls(mode=mode, retain_until_date=retain_until)
except (ValueError, KeyError):
return None
def is_expired(self) -> bool:
return datetime.now(timezone.utc) > self.retain_until_date
@dataclass
class ObjectLockConfig:
enabled: bool = False
default_retention: Optional[ObjectLockRetention] = None
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {"ObjectLockEnabled": "Enabled" if self.enabled else "Disabled"}
if self.default_retention:
result["Rule"] = {
"DefaultRetention": {
"Mode": self.default_retention.mode.value,
"Days": None,
"Years": None,
}
}
return result
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ObjectLockConfig":
enabled = data.get("ObjectLockEnabled") == "Enabled"
default_retention = None
rule = data.get("Rule")
if rule and "DefaultRetention" in rule:
dr = rule["DefaultRetention"]
mode_str = dr.get("Mode", "GOVERNANCE")
days = dr.get("Days")
years = dr.get("Years")
if days or years:
from datetime import timedelta
now = datetime.now(timezone.utc)
if years:
delta = timedelta(days=int(years) * 365)
else:
delta = timedelta(days=int(days))
default_retention = ObjectLockRetention(
mode=RetentionMode(mode_str),
retain_until_date=now + delta,
)
return cls(enabled=enabled, default_retention=default_retention)
class ObjectLockService:
def __init__(self, storage_root: Path):
self.storage_root = storage_root
self._config_cache: Dict[str, ObjectLockConfig] = {}
def _bucket_lock_config_path(self, bucket_name: str) -> Path:
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "object_lock.json"
def _object_lock_meta_path(self, bucket_name: str, object_key: str) -> Path:
safe_key = object_key.replace("/", "_").replace("\\", "_")
return (
self.storage_root / ".myfsio.sys" / "buckets" / bucket_name /
"locks" / f"{safe_key}.lock.json"
)
def get_bucket_lock_config(self, bucket_name: str) -> ObjectLockConfig:
if bucket_name in self._config_cache:
return self._config_cache[bucket_name]
config_path = self._bucket_lock_config_path(bucket_name)
if not config_path.exists():
return ObjectLockConfig(enabled=False)
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
config = ObjectLockConfig.from_dict(data)
self._config_cache[bucket_name] = config
return config
except (json.JSONDecodeError, OSError):
return ObjectLockConfig(enabled=False)
def set_bucket_lock_config(self, bucket_name: str, config: ObjectLockConfig) -> None:
config_path = self._bucket_lock_config_path(bucket_name)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps(config.to_dict()), encoding="utf-8")
self._config_cache[bucket_name] = config
def enable_bucket_lock(self, bucket_name: str) -> None:
config = self.get_bucket_lock_config(bucket_name)
config.enabled = True
self.set_bucket_lock_config(bucket_name, config)
def is_bucket_lock_enabled(self, bucket_name: str) -> bool:
return self.get_bucket_lock_config(bucket_name).enabled
def get_object_retention(self, bucket_name: str, object_key: str) -> Optional[ObjectLockRetention]:
meta_path = self._object_lock_meta_path(bucket_name, object_key)
if not meta_path.exists():
return None
try:
data = json.loads(meta_path.read_text(encoding="utf-8"))
return ObjectLockRetention.from_dict(data.get("retention", {}))
except (json.JSONDecodeError, OSError):
return None
def set_object_retention(
self,
bucket_name: str,
object_key: str,
retention: ObjectLockRetention,
bypass_governance: bool = False,
) -> None:
existing = self.get_object_retention(bucket_name, object_key)
if existing and not existing.is_expired():
if existing.mode == RetentionMode.COMPLIANCE:
raise ObjectLockError(
"Cannot modify retention on object with COMPLIANCE mode until retention expires"
)
if existing.mode == RetentionMode.GOVERNANCE and not bypass_governance:
raise ObjectLockError(
"Cannot modify GOVERNANCE retention without bypass-governance permission"
)
meta_path = self._object_lock_meta_path(bucket_name, object_key)
meta_path.parent.mkdir(parents=True, exist_ok=True)
existing_data: Dict[str, Any] = {}
if meta_path.exists():
try:
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
existing_data["retention"] = retention.to_dict()
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
def get_legal_hold(self, bucket_name: str, object_key: str) -> bool:
meta_path = self._object_lock_meta_path(bucket_name, object_key)
if not meta_path.exists():
return False
try:
data = json.loads(meta_path.read_text(encoding="utf-8"))
return data.get("legal_hold", False)
except (json.JSONDecodeError, OSError):
return False
def set_legal_hold(self, bucket_name: str, object_key: str, enabled: bool) -> None:
meta_path = self._object_lock_meta_path(bucket_name, object_key)
meta_path.parent.mkdir(parents=True, exist_ok=True)
existing_data: Dict[str, Any] = {}
if meta_path.exists():
try:
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
existing_data["legal_hold"] = enabled
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
def can_delete_object(
self,
bucket_name: str,
object_key: str,
bypass_governance: bool = False,
) -> tuple[bool, str]:
if self.get_legal_hold(bucket_name, object_key):
return False, "Object is under legal hold"
retention = self.get_object_retention(bucket_name, object_key)
if retention and not retention.is_expired():
if retention.mode == RetentionMode.COMPLIANCE:
return False, f"Object is locked in COMPLIANCE mode until {retention.retain_until_date.isoformat()}"
if retention.mode == RetentionMode.GOVERNANCE:
if not bypass_governance:
return False, f"Object is locked in GOVERNANCE mode until {retention.retain_until_date.isoformat()}"
return True, ""
def can_overwrite_object(
self,
bucket_name: str,
object_key: str,
bypass_governance: bool = False,
) -> tuple[bool, str]:
return self.can_delete_object(bucket_name, object_key, bypass_governance)
def delete_object_lock_metadata(self, bucket_name: str, object_key: str) -> None:
meta_path = self._object_lock_meta_path(bucket_name, object_key)
try:
if meta_path.exists():
meta_path.unlink()
except OSError:
pass

View File

@@ -1,4 +1,3 @@
"""Background replication worker."""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -32,13 +31,9 @@ REPLICATION_MODE_ALL = "all"
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any: def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
"""Create a boto3 S3 client for the given connection. """Create a boto3 S3 client for the given connection.
Args: Args:
connection: Remote S3 connection configuration connection: Remote S3 connection configuration
health_check: If True, use minimal retries for quick health checks health_check: If True, use minimal retries for quick health checks
Returns:
Configured boto3 S3 client
""" """
config = Config( config = Config(
user_agent_extra=REPLICATION_USER_AGENT, user_agent_extra=REPLICATION_USER_AGENT,
@@ -182,9 +177,15 @@ class ReplicationManager:
return self._rules.get(bucket_name) return self._rules.get(bucket_name)
def set_rule(self, rule: ReplicationRule) -> None: def set_rule(self, rule: ReplicationRule) -> None:
old_rule = self._rules.get(rule.bucket_name)
was_all_mode = old_rule and old_rule.mode == REPLICATION_MODE_ALL if old_rule else False
self._rules[rule.bucket_name] = rule self._rules[rule.bucket_name] = rule
self.save_rules() self.save_rules()
if rule.mode == REPLICATION_MODE_ALL and rule.enabled and not was_all_mode:
logger.info(f"Replication mode ALL enabled for {rule.bucket_name}, triggering sync of existing objects")
self._executor.submit(self.replicate_existing_objects, rule.bucket_name)
def delete_rule(self, bucket_name: str) -> None: def delete_rule(self, bucket_name: str) -> None:
if bucket_name in self._rules: if bucket_name in self._rules:
del self._rules[bucket_name] del self._rules[bucket_name]

View File

@@ -16,9 +16,14 @@ from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, Par
from flask import Blueprint, Response, current_app, jsonify, request, g from flask import Blueprint, Response, current_app, jsonify, request, g
from werkzeug.http import http_date from werkzeug.http import http_date
from .access_logging import AccessLoggingService, LoggingConfiguration
from .acl import AclService
from .bucket_policies import BucketPolicyStore from .bucket_policies import BucketPolicyStore
from .encryption import SSECEncryption, SSECMetadata, EncryptionError
from .extensions import limiter from .extensions import limiter
from .iam import IamError, Principal from .iam import IamError, Principal
from .notifications import NotificationService, NotificationConfiguration, WebhookDestination
from .object_lock import ObjectLockService, ObjectLockRetention, ObjectLockConfig, ObjectLockError, RetentionMode
from .replication import ReplicationManager from .replication import ReplicationManager
from .storage import ObjectStorage, StorageError, QuotaExceededError from .storage import ObjectStorage, StorageError, QuotaExceededError
@@ -30,6 +35,10 @@ def _storage() -> ObjectStorage:
return current_app.extensions["object_storage"] return current_app.extensions["object_storage"]
def _acl() -> AclService:
return current_app.extensions["acl"]
def _iam(): def _iam():
return current_app.extensions["iam"] return current_app.extensions["iam"]
@@ -44,6 +53,18 @@ def _bucket_policies() -> BucketPolicyStore:
return store return store
def _object_lock() -> ObjectLockService:
return current_app.extensions["object_lock"]
def _notifications() -> NotificationService:
return current_app.extensions["notifications"]
def _access_logging() -> AccessLoggingService:
return current_app.extensions["access_logging"]
def _xml_response(element: Element, status: int = 200) -> Response: def _xml_response(element: Element, status: int = 200) -> Response:
xml_bytes = tostring(element, encoding="utf-8") xml_bytes = tostring(element, encoding="utf-8")
return Response(xml_bytes, status=status, mimetype="application/xml") return Response(xml_bytes, status=status, mimetype="application/xml")
@@ -58,6 +79,37 @@ def _error_response(code: str, message: str, status: int) -> Response:
return _xml_response(error, status) return _xml_response(error, status)
def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]] | None:
if not range_header.startswith("bytes="):
return None
ranges = []
range_spec = range_header[6:]
for part in range_spec.split(","):
part = part.strip()
if not part:
continue
if part.startswith("-"):
suffix_length = int(part[1:])
if suffix_length <= 0:
return None
start = max(0, file_size - suffix_length)
end = file_size - 1
elif part.endswith("-"):
start = int(part[:-1])
if start >= file_size:
return None
end = file_size - 1
else:
start_str, end_str = part.split("-", 1)
start = int(start_str)
end = int(end_str)
if start > end or start >= file_size:
return None
end = min(end, file_size - 1)
ranges.append((start, end))
return ranges if ranges else None
def _sign(key: bytes, msg: str) -> bytes: def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
@@ -179,6 +231,11 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
) )
raise IamError("SignatureDoesNotMatch") raise IamError("SignatureDoesNotMatch")
session_token = req.headers.get("X-Amz-Security-Token")
if session_token:
if not _iam().validate_session_token(access_key, session_token):
raise IamError("InvalidToken")
return _iam().get_principal(access_key) return _iam().get_principal(access_key)
@@ -261,6 +318,11 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
if not hmac.compare_digest(calculated_signature, signature): if not hmac.compare_digest(calculated_signature, signature):
raise IamError("SignatureDoesNotMatch") raise IamError("SignatureDoesNotMatch")
session_token = req.args.get("X-Amz-Security-Token")
if session_token:
if not _iam().validate_session_token(access_key, session_token):
raise IamError("InvalidToken")
return _iam().get_principal(access_key) return _iam().get_principal(access_key)
@@ -321,6 +383,19 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti
return return
if policy_decision == "allow": if policy_decision == "allow":
return return
acl_allowed = False
if bucket_name:
acl_service = _acl()
acl_allowed = acl_service.evaluate_bucket_acl(
bucket_name,
access_key,
action,
is_authenticated=principal is not None,
)
if acl_allowed:
return
raise iam_error or IamError("Access denied") raise iam_error or IamError("Access denied")
@@ -838,6 +913,9 @@ def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None:
"versions": _bucket_list_versions_handler, "versions": _bucket_list_versions_handler,
"lifecycle": _bucket_lifecycle_handler, "lifecycle": _bucket_lifecycle_handler,
"quota": _bucket_quota_handler, "quota": _bucket_quota_handler,
"object-lock": _bucket_object_lock_handler,
"notification": _bucket_notification_handler,
"logging": _bucket_logging_handler,
} }
requested = [key for key in handlers if key in request.args] requested = [key for key in handlers if key in request.args]
if not requested: if not requested:
@@ -1131,6 +1209,8 @@ def _bucket_location_handler(bucket_name: str) -> Response:
def _bucket_acl_handler(bucket_name: str) -> Response: def _bucket_acl_handler(bucket_name: str) -> Response:
from .acl import create_canned_acl, Acl, AclGrant, GRANTEE_ALL_USERS, GRANTEE_AUTHENTICATED_USERS
if request.method not in {"GET", "PUT"}: if request.method not in {"GET", "PUT"}:
return _method_not_allowed(["GET", "PUT"]) return _method_not_allowed(["GET", "PUT"])
principal, error = _require_principal() principal, error = _require_principal()
@@ -1144,24 +1224,39 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
if not storage.bucket_exists(bucket_name): if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404) return _error_response("NoSuchBucket", "Bucket does not exist", 404)
acl_service = _acl()
owner_id = principal.access_key if principal else "anonymous"
if request.method == "PUT": if request.method == "PUT":
# Accept canned ACL headers for S3 compatibility (not fully implemented)
canned_acl = request.headers.get("x-amz-acl", "private") canned_acl = request.headers.get("x-amz-acl", "private")
current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl}) acl = acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id)
current_app.logger.info("Bucket ACL set", extra={"bucket": bucket_name, "acl": canned_acl})
return Response(status=200) return Response(status=200)
root = Element("AccessControlPolicy") acl = acl_service.get_bucket_acl(bucket_name)
owner = SubElement(root, "Owner") if not acl:
SubElement(owner, "ID").text = principal.access_key if principal else "anonymous" acl = create_canned_acl("private", owner_id)
SubElement(owner, "DisplayName").text = principal.display_name if principal else "Anonymous"
acl = SubElement(root, "AccessControlList") root = Element("AccessControlPolicy")
grant = SubElement(acl, "Grant") owner_el = SubElement(root, "Owner")
grantee = SubElement(grant, "Grantee") SubElement(owner_el, "ID").text = acl.owner
SubElement(owner_el, "DisplayName").text = acl.owner
acl_el = SubElement(root, "AccessControlList")
for grant in acl.grants:
grant_el = SubElement(acl_el, "Grant")
grantee = SubElement(grant_el, "Grantee")
if grant.grantee == GRANTEE_ALL_USERS:
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "Group")
SubElement(grantee, "URI").text = "http://acs.amazonaws.com/groups/global/AllUsers"
elif grant.grantee == GRANTEE_AUTHENTICATED_USERS:
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "Group")
SubElement(grantee, "URI").text = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
else:
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "CanonicalUser") grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "CanonicalUser")
SubElement(grantee, "ID").text = principal.access_key if principal else "anonymous" SubElement(grantee, "ID").text = grant.grantee
SubElement(grantee, "DisplayName").text = principal.display_name if principal else "Anonymous" SubElement(grantee, "DisplayName").text = grant.grantee
SubElement(grant, "Permission").text = "FULL_CONTROL" SubElement(grant_el, "Permission").text = grant.permission
return _xml_response(root) return _xml_response(root)
@@ -1491,6 +1586,336 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
return Response(status=204) return Response(status=204)
def _bucket_object_lock_handler(bucket_name: str) -> Response:
if request.method not in {"GET", "PUT"}:
return _method_not_allowed(["GET", "PUT"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
lock_service = _object_lock()
if request.method == "GET":
config = lock_service.get_bucket_lock_config(bucket_name)
root = Element("ObjectLockConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
SubElement(root, "ObjectLockEnabled").text = "Enabled" if config.enabled else "Disabled"
return _xml_response(root)
payload = request.get_data(cache=False) or b""
if not payload.strip():
return _error_response("MalformedXML", "Request body is required", 400)
try:
root = fromstring(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
enabled_el = root.find("{*}ObjectLockEnabled") or root.find("ObjectLockEnabled")
enabled = (enabled_el.text or "").strip() == "Enabled" if enabled_el is not None else False
config = ObjectLockConfig(enabled=enabled)
lock_service.set_bucket_lock_config(bucket_name, config)
current_app.logger.info("Bucket object lock updated", extra={"bucket": bucket_name, "enabled": enabled})
return Response(status=200)
def _bucket_notification_handler(bucket_name: str) -> Response:
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
notification_service = _notifications()
if request.method == "GET":
configs = notification_service.get_bucket_notifications(bucket_name)
root = Element("NotificationConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
for config in configs:
webhook_el = SubElement(root, "WebhookConfiguration")
SubElement(webhook_el, "Id").text = config.id
for event in config.events:
SubElement(webhook_el, "Event").text = event
dest_el = SubElement(webhook_el, "Destination")
SubElement(dest_el, "Url").text = config.destination.url
if config.prefix_filter or config.suffix_filter:
filter_el = SubElement(webhook_el, "Filter")
key_el = SubElement(filter_el, "S3Key")
if config.prefix_filter:
rule_el = SubElement(key_el, "FilterRule")
SubElement(rule_el, "Name").text = "prefix"
SubElement(rule_el, "Value").text = config.prefix_filter
if config.suffix_filter:
rule_el = SubElement(key_el, "FilterRule")
SubElement(rule_el, "Name").text = "suffix"
SubElement(rule_el, "Value").text = config.suffix_filter
return _xml_response(root)
if request.method == "DELETE":
notification_service.delete_bucket_notifications(bucket_name)
current_app.logger.info("Bucket notifications deleted", extra={"bucket": bucket_name})
return Response(status=204)
payload = request.get_data(cache=False) or b""
if not payload.strip():
notification_service.delete_bucket_notifications(bucket_name)
return Response(status=200)
try:
root = fromstring(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
configs: list[NotificationConfiguration] = []
for webhook_el in root.findall("{*}WebhookConfiguration") or root.findall("WebhookConfiguration"):
config_id = _find_element_text(webhook_el, "Id") or uuid.uuid4().hex
events = [el.text for el in webhook_el.findall("{*}Event") or webhook_el.findall("Event") if el.text]
dest_el = _find_element(webhook_el, "Destination")
url = _find_element_text(dest_el, "Url") if dest_el else ""
if not url:
return _error_response("InvalidArgument", "Destination URL is required", 400)
prefix = ""
suffix = ""
filter_el = _find_element(webhook_el, "Filter")
if filter_el:
key_el = _find_element(filter_el, "S3Key")
if key_el:
for rule_el in key_el.findall("{*}FilterRule") or key_el.findall("FilterRule"):
name = _find_element_text(rule_el, "Name")
value = _find_element_text(rule_el, "Value")
if name == "prefix":
prefix = value
elif name == "suffix":
suffix = value
configs.append(NotificationConfiguration(
id=config_id,
events=events,
destination=WebhookDestination(url=url),
prefix_filter=prefix,
suffix_filter=suffix,
))
notification_service.set_bucket_notifications(bucket_name, configs)
current_app.logger.info("Bucket notifications updated", extra={"bucket": bucket_name, "configs": len(configs)})
return Response(status=200)
def _bucket_logging_handler(bucket_name: str) -> Response:
if request.method not in {"GET", "PUT", "DELETE"}:
return _method_not_allowed(["GET", "PUT", "DELETE"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "policy")
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
logging_service = _access_logging()
if request.method == "GET":
config = logging_service.get_bucket_logging(bucket_name)
root = Element("BucketLoggingStatus", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
if config and config.enabled:
logging_enabled = SubElement(root, "LoggingEnabled")
SubElement(logging_enabled, "TargetBucket").text = config.target_bucket
SubElement(logging_enabled, "TargetPrefix").text = config.target_prefix
return _xml_response(root)
if request.method == "DELETE":
logging_service.delete_bucket_logging(bucket_name)
current_app.logger.info("Bucket logging deleted", extra={"bucket": bucket_name})
return Response(status=204)
payload = request.get_data(cache=False) or b""
if not payload.strip():
logging_service.delete_bucket_logging(bucket_name)
return Response(status=200)
try:
root = fromstring(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
logging_enabled = _find_element(root, "LoggingEnabled")
if logging_enabled is None:
logging_service.delete_bucket_logging(bucket_name)
return Response(status=200)
target_bucket = _find_element_text(logging_enabled, "TargetBucket")
if not target_bucket:
return _error_response("InvalidArgument", "TargetBucket is required", 400)
if not storage.bucket_exists(target_bucket):
return _error_response("InvalidTargetBucketForLogging", "Target bucket does not exist", 400)
target_prefix = _find_element_text(logging_enabled, "TargetPrefix")
config = LoggingConfiguration(
target_bucket=target_bucket,
target_prefix=target_prefix,
enabled=True,
)
logging_service.set_bucket_logging(bucket_name, config)
current_app.logger.info(
"Bucket logging updated",
extra={"bucket": bucket_name, "target_bucket": target_bucket, "target_prefix": target_prefix}
)
return Response(status=200)
def _object_retention_handler(bucket_name: str, object_key: str) -> Response:
if request.method not in {"GET", "PUT"}:
return _method_not_allowed(["GET", "PUT"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
try:
storage.get_object_path(bucket_name, object_key)
except StorageError:
return _error_response("NoSuchKey", "Object does not exist", 404)
lock_service = _object_lock()
if request.method == "GET":
retention = lock_service.get_object_retention(bucket_name, object_key)
if not retention:
return _error_response("NoSuchObjectLockConfiguration", "No retention policy", 404)
root = Element("Retention", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
SubElement(root, "Mode").text = retention.mode.value
SubElement(root, "RetainUntilDate").text = retention.retain_until_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
return _xml_response(root)
payload = request.get_data(cache=False) or b""
if not payload.strip():
return _error_response("MalformedXML", "Request body is required", 400)
try:
root = fromstring(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
mode_str = _find_element_text(root, "Mode")
retain_until_str = _find_element_text(root, "RetainUntilDate")
if not mode_str or not retain_until_str:
return _error_response("InvalidArgument", "Mode and RetainUntilDate are required", 400)
try:
mode = RetentionMode(mode_str)
except ValueError:
return _error_response("InvalidArgument", f"Invalid retention mode: {mode_str}", 400)
try:
retain_until = datetime.fromisoformat(retain_until_str.replace("Z", "+00:00"))
except ValueError:
return _error_response("InvalidArgument", f"Invalid date format: {retain_until_str}", 400)
bypass = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
retention = ObjectLockRetention(mode=mode, retain_until_date=retain_until)
try:
lock_service.set_object_retention(bucket_name, object_key, retention, bypass_governance=bypass)
except ObjectLockError as exc:
return _error_response("AccessDenied", str(exc), 403)
current_app.logger.info(
"Object retention set",
extra={"bucket": bucket_name, "key": object_key, "mode": mode_str, "until": retain_until_str}
)
return Response(status=200)
def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response:
if request.method not in {"GET", "PUT"}:
return _method_not_allowed(["GET", "PUT"])
principal, error = _require_principal()
if error:
return error
try:
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
except IamError as exc:
return _error_response("AccessDenied", str(exc), 403)
storage = _storage()
if not storage.bucket_exists(bucket_name):
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
try:
storage.get_object_path(bucket_name, object_key)
except StorageError:
return _error_response("NoSuchKey", "Object does not exist", 404)
lock_service = _object_lock()
if request.method == "GET":
enabled = lock_service.get_legal_hold(bucket_name, object_key)
root = Element("LegalHold", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
SubElement(root, "Status").text = "ON" if enabled else "OFF"
return _xml_response(root)
payload = request.get_data(cache=False) or b""
if not payload.strip():
return _error_response("MalformedXML", "Request body is required", 400)
try:
root = fromstring(payload)
except ParseError:
return _error_response("MalformedXML", "Unable to parse XML document", 400)
status = _find_element_text(root, "Status")
if status not in {"ON", "OFF"}:
return _error_response("InvalidArgument", "Status must be ON or OFF", 400)
lock_service.set_legal_hold(bucket_name, object_key, status == "ON")
current_app.logger.info(
"Object legal hold set",
extra={"bucket": bucket_name, "key": object_key, "status": status}
)
return Response(status=200)
def _bulk_delete_handler(bucket_name: str) -> Response: def _bulk_delete_handler(bucket_name: str) -> Response:
principal, error = _require_principal() principal, error = _require_principal()
if error: if error:
@@ -1537,29 +1962,28 @@ def _bulk_delete_handler(bucket_name: str) -> Response:
return _error_response("MalformedXML", "A maximum of 1000 objects can be deleted per request", 400) return _error_response("MalformedXML", "A maximum of 1000 objects can be deleted per request", 400)
storage = _storage() storage = _storage()
deleted: list[str] = [] deleted: list[dict[str, str | None]] = []
errors: list[dict[str, str]] = [] errors: list[dict[str, str]] = []
for entry in objects: for entry in objects:
key = entry["Key"] or "" key = entry["Key"] or ""
version_id = entry.get("VersionId") version_id = entry.get("VersionId")
if version_id:
errors.append({
"Key": key,
"Code": "InvalidRequest",
"Message": "VersionId is not supported for bulk deletes",
})
continue
try: try:
if version_id:
storage.delete_object_version(bucket_name, key, version_id)
deleted.append({"Key": key, "VersionId": version_id})
else:
storage.delete_object(bucket_name, key) storage.delete_object(bucket_name, key)
deleted.append(key) deleted.append({"Key": key, "VersionId": None})
except StorageError as exc: except StorageError as exc:
errors.append({"Key": key, "Code": "InvalidRequest", "Message": str(exc)}) errors.append({"Key": key, "Code": "InvalidRequest", "Message": str(exc)})
result = Element("DeleteResult") result = Element("DeleteResult")
if not quiet: if not quiet:
for key in deleted: for item in deleted:
deleted_el = SubElement(result, "Deleted") deleted_el = SubElement(result, "Deleted")
SubElement(deleted_el, "Key").text = key SubElement(deleted_el, "Key").text = item["Key"]
if item.get("VersionId"):
SubElement(deleted_el, "VersionId").text = item["VersionId"]
for err in errors: for err in errors:
error_el = SubElement(result, "Error") error_el = SubElement(result, "Error")
SubElement(error_el, "Key").text = err.get("Key", "") SubElement(error_el, "Key").text = err.get("Key", "")
@@ -1796,6 +2220,12 @@ def object_handler(bucket_name: str, object_key: str):
if "tagging" in request.args: if "tagging" in request.args:
return _object_tagging_handler(bucket_name, object_key) return _object_tagging_handler(bucket_name, object_key)
if "retention" in request.args:
return _object_retention_handler(bucket_name, object_key)
if "legal-hold" in request.args:
return _object_legal_hold_handler(bucket_name, object_key)
if request.method == "POST": if request.method == "POST":
if "uploads" in request.args: if "uploads" in request.args:
return _initiate_multipart_upload(bucket_name, object_key) return _initiate_multipart_upload(bucket_name, object_key)
@@ -1811,10 +2241,16 @@ def object_handler(bucket_name: str, object_key: str):
if copy_source: if copy_source:
return _copy_object(bucket_name, object_key, copy_source) return _copy_object(bucket_name, object_key, copy_source)
_, error = _object_principal("write", bucket_name, object_key) principal, error = _object_principal("write", bucket_name, object_key)
if error: if error:
return error return error
bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
lock_service = _object_lock()
can_overwrite, lock_reason = lock_service.can_overwrite_object(bucket_name, object_key, bypass_governance=bypass_governance)
if not can_overwrite:
return _error_response("AccessDenied", lock_reason, 403)
stream = request.stream stream = request.stream
content_encoding = request.headers.get("Content-Encoding", "").lower() content_encoding = request.headers.get("Content-Encoding", "").lower()
if "aws-chunked" in content_encoding: if "aws-chunked" in content_encoding:
@@ -1848,6 +2284,17 @@ def object_handler(bucket_name: str, object_key: str):
response = Response(status=200) response = Response(status=200)
response.headers["ETag"] = f'"{meta.etag}"' response.headers["ETag"] = f'"{meta.etag}"'
_notifications().emit_object_created(
bucket_name,
object_key,
size=meta.size,
etag=meta.etag,
request_id=getattr(g, "request_id", ""),
source_ip=request.remote_addr or "",
user_identity=principal.access_key if principal else "",
operation="Put",
)
if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""): if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""):
_replication_manager().trigger_replication(bucket_name, object_key, action="write") _replication_manager().trigger_replication(bucket_name, object_key, action="write")
@@ -1870,20 +2317,67 @@ def object_handler(bucket_name: str, object_key: str):
is_encrypted = "x-amz-server-side-encryption" in metadata is_encrypted = "x-amz-server-side-encryption" in metadata
if request.method == "GET": if request.method == "GET":
range_header = request.headers.get("Range")
if is_encrypted and hasattr(storage, 'get_object_data'): if is_encrypted and hasattr(storage, 'get_object_data'):
try: try:
data, clean_metadata = storage.get_object_data(bucket_name, object_key) data, clean_metadata = storage.get_object_data(bucket_name, object_key)
response = Response(data, mimetype=mimetype) file_size = len(data)
logged_bytes = len(data)
response.headers["Content-Length"] = len(data)
etag = hashlib.md5(data).hexdigest() etag = hashlib.md5(data).hexdigest()
if range_header:
try:
ranges = _parse_range_header(range_header, file_size)
except (ValueError, TypeError):
ranges = None
if ranges is None:
return _error_response("InvalidRange", "Range Not Satisfiable", 416)
start, end = ranges[0]
partial_data = data[start:end + 1]
response = Response(partial_data, status=206, mimetype=mimetype)
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
response.headers["Content-Length"] = len(partial_data)
logged_bytes = len(partial_data)
else:
response = Response(data, mimetype=mimetype)
response.headers["Content-Length"] = file_size
logged_bytes = file_size
except StorageError as exc: except StorageError as exc:
return _error_response("InternalError", str(exc), 500) return _error_response("InternalError", str(exc), 500)
else: else:
stat = path.stat() stat = path.stat()
response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True) file_size = stat.st_size
logged_bytes = stat.st_size
etag = storage._compute_etag(path) etag = storage._compute_etag(path)
if range_header:
try:
ranges = _parse_range_header(range_header, file_size)
except (ValueError, TypeError):
ranges = None
if ranges is None:
return _error_response("InvalidRange", "Range Not Satisfiable", 416)
start, end = ranges[0]
length = end - start + 1
def stream_range(file_path, start_pos, length_to_read):
with open(file_path, "rb") as f:
f.seek(start_pos)
remaining = length_to_read
while remaining > 0:
chunk_size = min(65536, remaining)
chunk = f.read(chunk_size)
if not chunk:
break
remaining -= len(chunk)
yield chunk
response = Response(stream_range(path, start, length), status=206, mimetype=mimetype, direct_passthrough=True)
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
response.headers["Content-Length"] = length
logged_bytes = length
else:
response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True)
logged_bytes = file_size
else: else:
if is_encrypted and hasattr(storage, 'get_object_data'): if is_encrypted and hasattr(storage, 'get_object_data'):
try: try:
@@ -1901,6 +2395,21 @@ def object_handler(bucket_name: str, object_key: str):
logged_bytes = 0 logged_bytes = 0
_apply_object_headers(response, file_stat=path.stat() if not is_encrypted else None, metadata=metadata, etag=etag) _apply_object_headers(response, file_stat=path.stat() if not is_encrypted else None, metadata=metadata, etag=etag)
if request.method == "GET":
response_overrides = {
"response-content-type": "Content-Type",
"response-content-language": "Content-Language",
"response-expires": "Expires",
"response-cache-control": "Cache-Control",
"response-content-disposition": "Content-Disposition",
"response-content-encoding": "Content-Encoding",
}
for param, header in response_overrides.items():
value = request.args.get(param)
if value:
response.headers[header] = value
action = "Object read" if request.method == "GET" else "Object head" action = "Object read" if request.method == "GET" else "Object head"
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes}) current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
return response return response
@@ -1911,9 +2420,26 @@ def object_handler(bucket_name: str, object_key: str):
_, error = _object_principal("delete", bucket_name, object_key) _, error = _object_principal("delete", bucket_name, object_key)
if error: if error:
return error return error
bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
lock_service = _object_lock()
can_delete, lock_reason = lock_service.can_delete_object(bucket_name, object_key, bypass_governance=bypass_governance)
if not can_delete:
return _error_response("AccessDenied", lock_reason, 403)
storage.delete_object(bucket_name, object_key) storage.delete_object(bucket_name, object_key)
lock_service.delete_object_lock_metadata(bucket_name, object_key)
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key}) current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
principal, _ = _require_principal()
_notifications().emit_object_removed(
bucket_name,
object_key,
request_id=getattr(g, "request_id", ""),
source_ip=request.remote_addr or "",
user_identity=principal.access_key if principal else "",
)
user_agent = request.headers.get("User-Agent", "") user_agent = request.headers.get("User-Agent", "")
if "S3ReplicationAgent" not in user_agent: if "S3ReplicationAgent" not in user_agent:
_replication_manager().trigger_replication(bucket_name, object_key, action="delete") _replication_manager().trigger_replication(bucket_name, object_key, action="delete")
@@ -2120,6 +2646,42 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response:
except StorageError: except StorageError:
return _error_response("NoSuchKey", "Source object not found", 404) return _error_response("NoSuchKey", "Source object not found", 404)
source_stat = source_path.stat()
source_etag = storage._compute_etag(source_path)
source_mtime = datetime.fromtimestamp(source_stat.st_mtime, timezone.utc)
copy_source_if_match = request.headers.get("x-amz-copy-source-if-match")
if copy_source_if_match:
expected_etag = copy_source_if_match.strip('"')
if source_etag != expected_etag:
return _error_response("PreconditionFailed", "Source ETag does not match", 412)
copy_source_if_none_match = request.headers.get("x-amz-copy-source-if-none-match")
if copy_source_if_none_match:
not_expected_etag = copy_source_if_none_match.strip('"')
if source_etag == not_expected_etag:
return _error_response("PreconditionFailed", "Source ETag matches", 412)
copy_source_if_modified_since = request.headers.get("x-amz-copy-source-if-modified-since")
if copy_source_if_modified_since:
from email.utils import parsedate_to_datetime
try:
if_modified = parsedate_to_datetime(copy_source_if_modified_since)
if source_mtime <= if_modified:
return _error_response("PreconditionFailed", "Source not modified since specified date", 412)
except (TypeError, ValueError):
pass
copy_source_if_unmodified_since = request.headers.get("x-amz-copy-source-if-unmodified-since")
if copy_source_if_unmodified_since:
from email.utils import parsedate_to_datetime
try:
if_unmodified = parsedate_to_datetime(copy_source_if_unmodified_since)
if source_mtime > if_unmodified:
return _error_response("PreconditionFailed", "Source modified since specified date", 412)
except (TypeError, ValueError):
pass
source_metadata = storage.get_object_metadata(source_bucket, source_key) source_metadata = storage.get_object_metadata(source_bucket, source_key)
metadata_directive = request.headers.get("x-amz-metadata-directive", "COPY").upper() metadata_directive = request.headers.get("x-amz-metadata-directive", "COPY").upper()

View File

@@ -1,4 +1,3 @@
"""Ephemeral store for one-time secrets communicated to the UI."""
from __future__ import annotations from __future__ import annotations
import secrets import secrets

View File

@@ -1,4 +1,3 @@
"""Filesystem-backed object storage helpers."""
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
@@ -809,6 +808,29 @@ class ObjectStorage:
metadata=metadata or None, metadata=metadata or None,
) )
def delete_object_version(self, bucket_name: str, object_key: str, version_id: str) -> None:
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
raise StorageError("Bucket does not exist")
bucket_id = bucket_path.name
safe_key = self._sanitize_object_key(object_key)
version_dir = self._version_dir(bucket_id, safe_key)
data_path = version_dir / f"{version_id}.bin"
meta_path = version_dir / f"{version_id}.json"
if not data_path.exists() and not meta_path.exists():
legacy_version_dir = self._legacy_version_dir(bucket_id, safe_key)
data_path = legacy_version_dir / f"{version_id}.bin"
meta_path = legacy_version_dir / f"{version_id}.json"
if not data_path.exists() and not meta_path.exists():
raise StorageError(f"Version {version_id} not found")
if data_path.exists():
data_path.unlink()
if meta_path.exists():
meta_path.unlink()
parent = data_path.parent
if parent.exists() and not any(parent.iterdir()):
parent.rmdir()
def list_orphaned_objects(self, bucket_name: str) -> List[Dict[str, Any]]: def list_orphaned_objects(self, bucket_name: str) -> List[Dict[str, Any]]:
bucket_path = self._bucket_path(bucket_name) bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists(): if not bucket_path.exists():
@@ -1124,6 +1146,49 @@ class ObjectStorage:
parts.sort(key=lambda x: x["PartNumber"]) parts.sort(key=lambda x: x["PartNumber"])
return parts return parts
def list_multipart_uploads(self, bucket_name: str) -> List[Dict[str, Any]]:
"""List all active multipart uploads for a bucket."""
bucket_path = self._bucket_path(bucket_name)
if not bucket_path.exists():
raise StorageError("Bucket does not exist")
bucket_id = bucket_path.name
uploads = []
multipart_root = self._bucket_multipart_root(bucket_id)
if multipart_root.exists():
for upload_dir in multipart_root.iterdir():
if not upload_dir.is_dir():
continue
manifest_path = upload_dir / "manifest.json"
if not manifest_path.exists():
continue
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
uploads.append({
"upload_id": manifest.get("upload_id", upload_dir.name),
"object_key": manifest.get("object_key", ""),
"created_at": manifest.get("created_at", ""),
})
except (OSError, json.JSONDecodeError):
continue
legacy_root = self._legacy_multipart_root(bucket_id)
if legacy_root.exists():
for upload_dir in legacy_root.iterdir():
if not upload_dir.is_dir():
continue
manifest_path = upload_dir / "manifest.json"
if not manifest_path.exists():
continue
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
uploads.append({
"upload_id": manifest.get("upload_id", upload_dir.name),
"object_key": manifest.get("object_key", ""),
"created_at": manifest.get("created_at", ""),
})
except (OSError, json.JSONDecodeError):
continue
return uploads
def _bucket_path(self, bucket_name: str) -> Path: def _bucket_path(self, bucket_name: str) -> Path:
safe_name = self._sanitize_bucket_name(bucket_name) safe_name = self._sanitize_bucket_name(bucket_name)
return self.root / safe_name return self.root / safe_name

344
app/ui.py
View File

@@ -1,4 +1,3 @@
"""Authenticated HTML UI for browsing buckets and objects."""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -26,6 +25,7 @@ from flask import (
) )
from flask_wtf.csrf import generate_csrf from flask_wtf.csrf import generate_csrf
from .acl import AclService, create_canned_acl, CANNED_ACLS
from .bucket_policies import BucketPolicyStore from .bucket_policies import BucketPolicyStore
from .connections import ConnectionStore, RemoteConnection from .connections import ConnectionStore, RemoteConnection
from .extensions import limiter from .extensions import limiter
@@ -75,6 +75,10 @@ def _secret_store() -> EphemeralSecretStore:
return store return store
def _acl() -> AclService:
return current_app.extensions["acl"]
def _format_bytes(num: int) -> str: def _format_bytes(num: int) -> str:
step = 1024 step = 1024
units = ["B", "KB", "MB", "GB", "TB", "PB"] units = ["B", "KB", "MB", "GB", "TB", "PB"]
@@ -379,10 +383,21 @@ def bucket_detail(bucket_name: str):
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name) objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
lifecycle_url = url_for("ui.bucket_lifecycle", bucket_name=bucket_name)
cors_url = url_for("ui.bucket_cors", bucket_name=bucket_name)
acl_url = url_for("ui.bucket_acl", bucket_name=bucket_name)
folders_url = url_for("ui.create_folder", bucket_name=bucket_name)
buckets_for_copy_url = url_for("ui.list_buckets_for_copy", bucket_name=bucket_name)
return render_template( return render_template(
"bucket_detail.html", "bucket_detail.html",
bucket_name=bucket_name, bucket_name=bucket_name,
objects_api_url=objects_api_url, objects_api_url=objects_api_url,
lifecycle_url=lifecycle_url,
cors_url=cors_url,
acl_url=acl_url,
folders_url=folders_url,
buckets_for_copy_url=buckets_for_copy_url,
principal=principal, principal=principal,
bucket_policy_text=policy_text, bucket_policy_text=policy_text,
bucket_policy=bucket_policy, bucket_policy=bucket_policy,
@@ -441,6 +456,9 @@ def list_bucket_objects(bucket_name: str):
presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER") versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER") restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER")
tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
objects_data = [] objects_data = []
for obj in result.objects: for obj in result.objects:
@@ -465,6 +483,9 @@ def list_bucket_objects(bucket_name: str):
"delete": delete_template, "delete": delete_template,
"versions": versions_template, "versions": versions_template,
"restore": restore_template, "restore": restore_template,
"tags": tags_template,
"copy": copy_template,
"move": move_template,
}, },
}) })
@@ -1666,6 +1687,327 @@ def metrics_dashboard():
) )
@ui_bp.route("/buckets/<bucket_name>/lifecycle", methods=["GET", "POST", "DELETE"])
def bucket_lifecycle(bucket_name: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "policy")
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
if not storage.bucket_exists(bucket_name):
return jsonify({"error": "Bucket does not exist"}), 404
if request.method == "GET":
rules = storage.get_bucket_lifecycle(bucket_name) or []
return jsonify({"rules": rules})
if request.method == "DELETE":
storage.set_bucket_lifecycle(bucket_name, None)
return jsonify({"status": "ok", "message": "Lifecycle configuration deleted"})
payload = request.get_json(silent=True) or {}
rules = payload.get("rules", [])
if not isinstance(rules, list):
return jsonify({"error": "rules must be a list"}), 400
validated_rules = []
for i, rule in enumerate(rules):
if not isinstance(rule, dict):
return jsonify({"error": f"Rule {i} must be an object"}), 400
validated = {
"ID": str(rule.get("ID", f"rule-{i+1}")),
"Status": "Enabled" if rule.get("Status", "Enabled") == "Enabled" else "Disabled",
}
if rule.get("Prefix"):
validated["Prefix"] = str(rule["Prefix"])
if rule.get("Expiration"):
exp = rule["Expiration"]
if isinstance(exp, dict) and exp.get("Days"):
validated["Expiration"] = {"Days": int(exp["Days"])}
if rule.get("NoncurrentVersionExpiration"):
nve = rule["NoncurrentVersionExpiration"]
if isinstance(nve, dict) and nve.get("NoncurrentDays"):
validated["NoncurrentVersionExpiration"] = {"NoncurrentDays": int(nve["NoncurrentDays"])}
if rule.get("AbortIncompleteMultipartUpload"):
aimu = rule["AbortIncompleteMultipartUpload"]
if isinstance(aimu, dict) and aimu.get("DaysAfterInitiation"):
validated["AbortIncompleteMultipartUpload"] = {"DaysAfterInitiation": int(aimu["DaysAfterInitiation"])}
validated_rules.append(validated)
storage.set_bucket_lifecycle(bucket_name, validated_rules if validated_rules else None)
return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules})
@ui_bp.route("/buckets/<bucket_name>/cors", methods=["GET", "POST", "DELETE"])
def bucket_cors(bucket_name: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "policy")
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
if not storage.bucket_exists(bucket_name):
return jsonify({"error": "Bucket does not exist"}), 404
if request.method == "GET":
rules = storage.get_bucket_cors(bucket_name) or []
return jsonify({"rules": rules})
if request.method == "DELETE":
storage.set_bucket_cors(bucket_name, None)
return jsonify({"status": "ok", "message": "CORS configuration deleted"})
payload = request.get_json(silent=True) or {}
rules = payload.get("rules", [])
if not isinstance(rules, list):
return jsonify({"error": "rules must be a list"}), 400
validated_rules = []
for i, rule in enumerate(rules):
if not isinstance(rule, dict):
return jsonify({"error": f"Rule {i} must be an object"}), 400
origins = rule.get("AllowedOrigins", [])
methods = rule.get("AllowedMethods", [])
if not origins or not methods:
return jsonify({"error": f"Rule {i} must have AllowedOrigins and AllowedMethods"}), 400
validated = {
"AllowedOrigins": [str(o) for o in origins if o],
"AllowedMethods": [str(m).upper() for m in methods if m],
}
if rule.get("AllowedHeaders"):
validated["AllowedHeaders"] = [str(h) for h in rule["AllowedHeaders"] if h]
if rule.get("ExposeHeaders"):
validated["ExposeHeaders"] = [str(h) for h in rule["ExposeHeaders"] if h]
if rule.get("MaxAgeSeconds") is not None:
try:
validated["MaxAgeSeconds"] = int(rule["MaxAgeSeconds"])
except (ValueError, TypeError):
pass
validated_rules.append(validated)
storage.set_bucket_cors(bucket_name, validated_rules if validated_rules else None)
return jsonify({"status": "ok", "message": "CORS configuration saved", "rules": validated_rules})
@ui_bp.route("/buckets/<bucket_name>/acl", methods=["GET", "POST"])
def bucket_acl(bucket_name: str):
principal = _current_principal()
action = "read" if request.method == "GET" else "write"
try:
_authorize_ui(principal, bucket_name, action)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
if not storage.bucket_exists(bucket_name):
return jsonify({"error": "Bucket does not exist"}), 404
acl_service = _acl()
owner_id = principal.access_key if principal else "anonymous"
if request.method == "GET":
try:
acl = acl_service.get_bucket_acl(bucket_name)
if not acl:
acl = create_canned_acl("private", owner_id)
return jsonify({
"owner": acl.owner,
"grants": [g.to_dict() for g in acl.grants],
"canned_acls": list(CANNED_ACLS.keys()),
})
except Exception as exc:
return jsonify({"error": str(exc)}), 500
payload = request.get_json(silent=True) or {}
canned_acl = payload.get("canned_acl")
if canned_acl:
if canned_acl not in CANNED_ACLS:
return jsonify({"error": f"Invalid canned ACL: {canned_acl}"}), 400
acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id)
return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"})
return jsonify({"error": "canned_acl is required"}), 400
@ui_bp.route("/buckets/<bucket_name>/objects/<path:object_key>/tags", methods=["GET", "POST"])
def object_tags(bucket_name: str, object_key: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
if request.method == "GET":
try:
tags = storage.get_object_tags(bucket_name, object_key)
return jsonify({"tags": tags})
except StorageError as exc:
return jsonify({"error": str(exc)}), 404
try:
_authorize_ui(principal, bucket_name, "write", object_key=object_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
payload = request.get_json(silent=True) or {}
tags = payload.get("tags", [])
if not isinstance(tags, list):
return jsonify({"error": "tags must be a list"}), 400
if len(tags) > 10:
return jsonify({"error": "Maximum 10 tags allowed"}), 400
validated_tags = []
for tag in tags:
if isinstance(tag, dict) and tag.get("Key"):
validated_tags.append({
"Key": str(tag["Key"]),
"Value": str(tag.get("Value", ""))
})
try:
storage.set_object_tags(bucket_name, object_key, validated_tags if validated_tags else None)
return jsonify({"status": "ok", "message": "Tags saved", "tags": validated_tags})
except StorageError as exc:
return jsonify({"error": str(exc)}), 400
@ui_bp.post("/buckets/<bucket_name>/folders")
def create_folder(bucket_name: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "write")
except IamError as exc:
return jsonify({"error": str(exc)}), 403
payload = request.get_json(silent=True) or {}
folder_name = str(payload.get("folder_name", "")).strip()
prefix = str(payload.get("prefix", "")).strip()
if not folder_name:
return jsonify({"error": "folder_name is required"}), 400
folder_name = folder_name.rstrip("/")
if "/" in folder_name:
return jsonify({"error": "Folder name cannot contain /"}), 400
folder_key = f"{prefix}{folder_name}/" if prefix else f"{folder_name}/"
import io
try:
_storage().put_object(bucket_name, folder_key, io.BytesIO(b""))
return jsonify({"status": "ok", "message": f"Folder '{folder_name}' created", "key": folder_key})
except StorageError as exc:
return jsonify({"error": str(exc)}), 400
@ui_bp.post("/buckets/<bucket_name>/objects/<path:object_key>/copy")
def copy_object(bucket_name: str, object_key: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
payload = request.get_json(silent=True) or {}
dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip()
dest_key = str(payload.get("dest_key", "")).strip()
if not dest_key:
return jsonify({"error": "dest_key is required"}), 400
try:
_authorize_ui(principal, dest_bucket, "write", object_key=dest_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
try:
source_path = storage.get_object_path(bucket_name, object_key)
source_metadata = storage.get_object_metadata(bucket_name, object_key)
except StorageError as exc:
return jsonify({"error": str(exc)}), 404
try:
with source_path.open("rb") as stream:
storage.put_object(dest_bucket, dest_key, stream, metadata=source_metadata or None)
return jsonify({
"status": "ok",
"message": f"Copied to {dest_bucket}/{dest_key}",
"dest_bucket": dest_bucket,
"dest_key": dest_key,
})
except StorageError as exc:
return jsonify({"error": str(exc)}), 400
@ui_bp.post("/buckets/<bucket_name>/objects/<path:object_key>/move")
def move_object(bucket_name: str, object_key: str):
principal = _current_principal()
try:
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
_authorize_ui(principal, bucket_name, "delete", object_key=object_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
payload = request.get_json(silent=True) or {}
dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip()
dest_key = str(payload.get("dest_key", "")).strip()
if not dest_key:
return jsonify({"error": "dest_key is required"}), 400
if dest_bucket == bucket_name and dest_key == object_key:
return jsonify({"error": "Cannot move object to the same location"}), 400
try:
_authorize_ui(principal, dest_bucket, "write", object_key=dest_key)
except IamError as exc:
return jsonify({"error": str(exc)}), 403
storage = _storage()
try:
source_path = storage.get_object_path(bucket_name, object_key)
source_metadata = storage.get_object_metadata(bucket_name, object_key)
except StorageError as exc:
return jsonify({"error": str(exc)}), 404
try:
import io
with source_path.open("rb") as f:
data = f.read()
storage.put_object(dest_bucket, dest_key, io.BytesIO(data), metadata=source_metadata or None)
storage.delete_object(bucket_name, object_key)
return jsonify({
"status": "ok",
"message": f"Moved to {dest_bucket}/{dest_key}",
"dest_bucket": dest_bucket,
"dest_key": dest_key,
})
except StorageError as exc:
return jsonify({"error": str(exc)}), 400
@ui_bp.get("/buckets/<bucket_name>/list-for-copy")
def list_buckets_for_copy(bucket_name: str):
principal = _current_principal()
buckets = _storage().list_buckets()
allowed = []
for bucket in buckets:
try:
_authorize_ui(principal, bucket.name, "write")
allowed.append(bucket.name)
except IamError:
pass
return jsonify({"buckets": allowed})
@ui_bp.app_errorhandler(404) @ui_bp.app_errorhandler(404)
def ui_not_found(error): # type: ignore[override] def ui_not_found(error): # type: ignore[override]
prefix = ui_bp.url_prefix or "" prefix = ui_bp.url_prefix or ""

View File

@@ -1,4 +1,3 @@
"""Central location for the application version string."""
from __future__ import annotations from __future__ import annotations
APP_VERSION = "0.2.0" APP_VERSION = "0.2.0"

View File

@@ -1097,6 +1097,9 @@ pre code {
.modal-body { .modal-body {
padding: 1.5rem; padding: 1.5rem;
overflow-wrap: break-word;
word-wrap: break-word;
word-break: break-word;
} }
.modal-footer { .modal-footer {
@@ -1750,3 +1753,67 @@ body.theme-transitioning * {
border: 2px solid transparent; border: 2px solid transparent;
background: linear-gradient(var(--myfsio-card-bg), var(--myfsio-card-bg)) padding-box, linear-gradient(135deg, #3b82f6, #8b5cf6) border-box; background: linear-gradient(var(--myfsio-card-bg), var(--myfsio-card-bg)) padding-box, linear-gradient(135deg, #3b82f6, #8b5cf6) border-box;
} }
#objects-table .dropdown-menu {
position: fixed !important;
z-index: 1050;
}
.objects-header-responsive {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
align-items: center;
}
.objects-header-responsive > .header-title {
flex: 0 0 auto;
}
.objects-header-responsive > .header-actions {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
align-items: center;
flex: 1;
}
@media (max-width: 640px) {
.objects-header-responsive {
flex-direction: column;
align-items: stretch;
}
.objects-header-responsive > .header-title {
margin-bottom: 0.5rem;
}
.objects-header-responsive > .header-actions {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.5rem;
}
.objects-header-responsive > .header-actions .btn {
justify-content: center;
}
.objects-header-responsive > .header-actions .search-wrapper {
grid-column: span 2;
}
.objects-header-responsive > .header-actions .search-wrapper input {
max-width: 100% !important;
width: 100%;
}
.objects-header-responsive > .header-actions .bulk-actions {
grid-column: span 2;
display: flex;
gap: 0.5rem;
}
.objects-header-responsive > .header-actions .bulk-actions .btn {
flex: 1;
}
}

File diff suppressed because it is too large Load Diff