Compare commits
3 Commits
1df8ff9d25
...
e792b86485
| Author | SHA1 | Date | |
|---|---|---|---|
| e792b86485 | |||
| cdb86aeea7 | |||
| cdbc156b5b |
@@ -1,4 +1,3 @@
|
||||
"""Application factory for the mini S3-compatible object store."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
@@ -16,6 +15,8 @@ from flask_cors import CORS
|
||||
from flask_wtf.csrf import CSRFError
|
||||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||||
|
||||
from .access_logging import AccessLoggingService
|
||||
from .acl import AclService
|
||||
from .bucket_policies import BucketPolicyStore
|
||||
from .config import AppConfig
|
||||
from .connections import ConnectionStore
|
||||
@@ -23,6 +24,9 @@ from .encryption import EncryptionManager
|
||||
from .extensions import limiter, csrf
|
||||
from .iam import IamService
|
||||
from .kms import KMSManager
|
||||
from .lifecycle import LifecycleManager
|
||||
from .notifications import NotificationService
|
||||
from .object_lock import ObjectLockService
|
||||
from .replication import ReplicationManager
|
||||
from .secret_store import EphemeralSecretStore
|
||||
from .storage import ObjectStorage
|
||||
@@ -140,6 +144,21 @@ def create_app(
|
||||
from .encrypted_storage import EncryptedObjectStorage
|
||||
storage = EncryptedObjectStorage(storage, encryption_manager)
|
||||
|
||||
acl_service = AclService(storage_root)
|
||||
object_lock_service = ObjectLockService(storage_root)
|
||||
notification_service = NotificationService(storage_root)
|
||||
access_logging_service = AccessLoggingService(storage_root)
|
||||
access_logging_service.set_storage(storage)
|
||||
|
||||
lifecycle_manager = None
|
||||
if app.config.get("LIFECYCLE_ENABLED", False):
|
||||
base_storage = storage.storage if hasattr(storage, 'storage') else storage
|
||||
lifecycle_manager = LifecycleManager(
|
||||
base_storage,
|
||||
interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600),
|
||||
)
|
||||
lifecycle_manager.start()
|
||||
|
||||
app.extensions["object_storage"] = storage
|
||||
app.extensions["iam"] = iam
|
||||
app.extensions["bucket_policies"] = bucket_policies
|
||||
@@ -149,6 +168,11 @@ def create_app(
|
||||
app.extensions["replication"] = replication
|
||||
app.extensions["encryption"] = encryption_manager
|
||||
app.extensions["kms"] = kms_manager
|
||||
app.extensions["acl"] = acl_service
|
||||
app.extensions["lifecycle"] = lifecycle_manager
|
||||
app.extensions["object_lock"] = object_lock_service
|
||||
app.extensions["notifications"] = notification_service
|
||||
app.extensions["access_logging"] = access_logging_service
|
||||
|
||||
@app.errorhandler(500)
|
||||
def internal_error(error):
|
||||
|
||||
262
app/access_logging.py
Normal file
262
app/access_logging.py
Normal file
@@ -0,0 +1,262 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AccessLogEntry:
|
||||
bucket_owner: str = "-"
|
||||
bucket: str = "-"
|
||||
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
remote_ip: str = "-"
|
||||
requester: str = "-"
|
||||
request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16].upper())
|
||||
operation: str = "-"
|
||||
key: str = "-"
|
||||
request_uri: str = "-"
|
||||
http_status: int = 200
|
||||
error_code: str = "-"
|
||||
bytes_sent: int = 0
|
||||
object_size: int = 0
|
||||
total_time_ms: int = 0
|
||||
turn_around_time_ms: int = 0
|
||||
referrer: str = "-"
|
||||
user_agent: str = "-"
|
||||
version_id: str = "-"
|
||||
host_id: str = "-"
|
||||
signature_version: str = "SigV4"
|
||||
cipher_suite: str = "-"
|
||||
authentication_type: str = "AuthHeader"
|
||||
host_header: str = "-"
|
||||
tls_version: str = "-"
|
||||
|
||||
def to_log_line(self) -> str:
|
||||
time_str = self.timestamp.strftime("[%d/%b/%Y:%H:%M:%S %z]")
|
||||
return (
|
||||
f'{self.bucket_owner} {self.bucket} {time_str} {self.remote_ip} '
|
||||
f'{self.requester} {self.request_id} {self.operation} {self.key} '
|
||||
f'"{self.request_uri}" {self.http_status} {self.error_code or "-"} '
|
||||
f'{self.bytes_sent or "-"} {self.object_size or "-"} {self.total_time_ms or "-"} '
|
||||
f'{self.turn_around_time_ms or "-"} "{self.referrer}" "{self.user_agent}" {self.version_id}'
|
||||
)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"bucket_owner": self.bucket_owner,
|
||||
"bucket": self.bucket,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"remote_ip": self.remote_ip,
|
||||
"requester": self.requester,
|
||||
"request_id": self.request_id,
|
||||
"operation": self.operation,
|
||||
"key": self.key,
|
||||
"request_uri": self.request_uri,
|
||||
"http_status": self.http_status,
|
||||
"error_code": self.error_code,
|
||||
"bytes_sent": self.bytes_sent,
|
||||
"object_size": self.object_size,
|
||||
"total_time_ms": self.total_time_ms,
|
||||
"referrer": self.referrer,
|
||||
"user_agent": self.user_agent,
|
||||
"version_id": self.version_id,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoggingConfiguration:
|
||||
target_bucket: str
|
||||
target_prefix: str = ""
|
||||
enabled: bool = True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"LoggingEnabled": {
|
||||
"TargetBucket": self.target_bucket,
|
||||
"TargetPrefix": self.target_prefix,
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> Optional["LoggingConfiguration"]:
|
||||
logging_enabled = data.get("LoggingEnabled")
|
||||
if not logging_enabled:
|
||||
return None
|
||||
return cls(
|
||||
target_bucket=logging_enabled.get("TargetBucket", ""),
|
||||
target_prefix=logging_enabled.get("TargetPrefix", ""),
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
|
||||
class AccessLoggingService:
|
||||
def __init__(self, storage_root: Path, flush_interval: int = 60, max_buffer_size: int = 1000):
|
||||
self.storage_root = storage_root
|
||||
self.flush_interval = flush_interval
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self._configs: Dict[str, LoggingConfiguration] = {}
|
||||
self._buffer: Dict[str, List[AccessLogEntry]] = {}
|
||||
self._buffer_lock = threading.Lock()
|
||||
self._shutdown = threading.Event()
|
||||
self._storage = None
|
||||
|
||||
self._flush_thread = threading.Thread(target=self._flush_loop, name="access-log-flush", daemon=True)
|
||||
self._flush_thread.start()
|
||||
|
||||
def set_storage(self, storage: Any) -> None:
|
||||
self._storage = storage
|
||||
|
||||
def _config_path(self, bucket_name: str) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "logging.json"
|
||||
|
||||
def get_bucket_logging(self, bucket_name: str) -> Optional[LoggingConfiguration]:
|
||||
if bucket_name in self._configs:
|
||||
return self._configs[bucket_name]
|
||||
|
||||
config_path = self._config_path(bucket_name)
|
||||
if not config_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
config = LoggingConfiguration.from_dict(data)
|
||||
if config:
|
||||
self._configs[bucket_name] = config
|
||||
return config
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
logger.warning(f"Failed to load logging config for {bucket_name}: {e}")
|
||||
return None
|
||||
|
||||
def set_bucket_logging(self, bucket_name: str, config: LoggingConfiguration) -> None:
|
||||
config_path = self._config_path(bucket_name)
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
config_path.write_text(json.dumps(config.to_dict(), indent=2), encoding="utf-8")
|
||||
self._configs[bucket_name] = config
|
||||
|
||||
def delete_bucket_logging(self, bucket_name: str) -> None:
|
||||
config_path = self._config_path(bucket_name)
|
||||
try:
|
||||
if config_path.exists():
|
||||
config_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._configs.pop(bucket_name, None)
|
||||
|
||||
def log_request(
|
||||
self,
|
||||
bucket_name: str,
|
||||
*,
|
||||
operation: str,
|
||||
key: str = "-",
|
||||
remote_ip: str = "-",
|
||||
requester: str = "-",
|
||||
request_uri: str = "-",
|
||||
http_status: int = 200,
|
||||
error_code: str = "",
|
||||
bytes_sent: int = 0,
|
||||
object_size: int = 0,
|
||||
total_time_ms: int = 0,
|
||||
referrer: str = "-",
|
||||
user_agent: str = "-",
|
||||
version_id: str = "-",
|
||||
request_id: str = "",
|
||||
) -> None:
|
||||
config = self.get_bucket_logging(bucket_name)
|
||||
if not config or not config.enabled:
|
||||
return
|
||||
|
||||
entry = AccessLogEntry(
|
||||
bucket_owner="local-owner",
|
||||
bucket=bucket_name,
|
||||
remote_ip=remote_ip,
|
||||
requester=requester,
|
||||
request_id=request_id or uuid.uuid4().hex[:16].upper(),
|
||||
operation=operation,
|
||||
key=key,
|
||||
request_uri=request_uri,
|
||||
http_status=http_status,
|
||||
error_code=error_code,
|
||||
bytes_sent=bytes_sent,
|
||||
object_size=object_size,
|
||||
total_time_ms=total_time_ms,
|
||||
referrer=referrer,
|
||||
user_agent=user_agent,
|
||||
version_id=version_id,
|
||||
)
|
||||
|
||||
target_key = f"{config.target_bucket}:{config.target_prefix}"
|
||||
with self._buffer_lock:
|
||||
if target_key not in self._buffer:
|
||||
self._buffer[target_key] = []
|
||||
self._buffer[target_key].append(entry)
|
||||
|
||||
if len(self._buffer[target_key]) >= self.max_buffer_size:
|
||||
self._flush_buffer(target_key)
|
||||
|
||||
def _flush_loop(self) -> None:
|
||||
while not self._shutdown.is_set():
|
||||
time.sleep(self.flush_interval)
|
||||
self._flush_all()
|
||||
|
||||
def _flush_all(self) -> None:
|
||||
with self._buffer_lock:
|
||||
targets = list(self._buffer.keys())
|
||||
|
||||
for target_key in targets:
|
||||
self._flush_buffer(target_key)
|
||||
|
||||
def _flush_buffer(self, target_key: str) -> None:
|
||||
with self._buffer_lock:
|
||||
entries = self._buffer.pop(target_key, [])
|
||||
|
||||
if not entries or not self._storage:
|
||||
return
|
||||
|
||||
try:
|
||||
bucket_name, prefix = target_key.split(":", 1)
|
||||
except ValueError:
|
||||
logger.error(f"Invalid target key: {target_key}")
|
||||
return
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
log_key = f"{prefix}{now.strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
log_content = "\n".join(entry.to_log_line() for entry in entries) + "\n"
|
||||
|
||||
try:
|
||||
stream = io.BytesIO(log_content.encode("utf-8"))
|
||||
self._storage.put_object(bucket_name, log_key, stream, enforce_quota=False)
|
||||
logger.info(f"Flushed {len(entries)} access log entries to {bucket_name}/{log_key}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write access log to {bucket_name}/{log_key}: {e}")
|
||||
with self._buffer_lock:
|
||||
if target_key not in self._buffer:
|
||||
self._buffer[target_key] = []
|
||||
self._buffer[target_key] = entries + self._buffer[target_key]
|
||||
|
||||
def flush(self) -> None:
|
||||
self._flush_all()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._shutdown.set()
|
||||
self._flush_all()
|
||||
self._flush_thread.join(timeout=5.0)
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
with self._buffer_lock:
|
||||
buffered = sum(len(entries) for entries in self._buffer.values())
|
||||
return {
|
||||
"buffered_entries": buffered,
|
||||
"target_buckets": len(self._buffer),
|
||||
}
|
||||
204
app/acl.py
Normal file
204
app/acl.py
Normal file
@@ -0,0 +1,204 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
|
||||
ACL_PERMISSION_FULL_CONTROL = "FULL_CONTROL"
|
||||
ACL_PERMISSION_WRITE = "WRITE"
|
||||
ACL_PERMISSION_WRITE_ACP = "WRITE_ACP"
|
||||
ACL_PERMISSION_READ = "READ"
|
||||
ACL_PERMISSION_READ_ACP = "READ_ACP"
|
||||
|
||||
ALL_PERMISSIONS = {
|
||||
ACL_PERMISSION_FULL_CONTROL,
|
||||
ACL_PERMISSION_WRITE,
|
||||
ACL_PERMISSION_WRITE_ACP,
|
||||
ACL_PERMISSION_READ,
|
||||
ACL_PERMISSION_READ_ACP,
|
||||
}
|
||||
|
||||
PERMISSION_TO_ACTIONS = {
|
||||
ACL_PERMISSION_FULL_CONTROL: {"read", "write", "delete", "list", "share"},
|
||||
ACL_PERMISSION_WRITE: {"write", "delete"},
|
||||
ACL_PERMISSION_WRITE_ACP: {"share"},
|
||||
ACL_PERMISSION_READ: {"read", "list"},
|
||||
ACL_PERMISSION_READ_ACP: {"share"},
|
||||
}
|
||||
|
||||
GRANTEE_ALL_USERS = "*"
|
||||
GRANTEE_AUTHENTICATED_USERS = "authenticated"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AclGrant:
|
||||
grantee: str
|
||||
permission: str
|
||||
|
||||
def to_dict(self) -> Dict[str, str]:
|
||||
return {"grantee": self.grantee, "permission": self.permission}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, str]) -> "AclGrant":
|
||||
return cls(grantee=data["grantee"], permission=data["permission"])
|
||||
|
||||
|
||||
@dataclass
|
||||
class Acl:
|
||||
owner: str
|
||||
grants: List[AclGrant] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"owner": self.owner,
|
||||
"grants": [g.to_dict() for g in self.grants],
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "Acl":
|
||||
return cls(
|
||||
owner=data.get("owner", ""),
|
||||
grants=[AclGrant.from_dict(g) for g in data.get("grants", [])],
|
||||
)
|
||||
|
||||
def get_allowed_actions(self, principal_id: Optional[str], is_authenticated: bool = True) -> Set[str]:
|
||||
actions: Set[str] = set()
|
||||
if principal_id and principal_id == self.owner:
|
||||
actions.update(PERMISSION_TO_ACTIONS[ACL_PERMISSION_FULL_CONTROL])
|
||||
for grant in self.grants:
|
||||
if grant.grantee == GRANTEE_ALL_USERS:
|
||||
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
|
||||
elif grant.grantee == GRANTEE_AUTHENTICATED_USERS and is_authenticated:
|
||||
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
|
||||
elif principal_id and grant.grantee == principal_id:
|
||||
actions.update(PERMISSION_TO_ACTIONS.get(grant.permission, set()))
|
||||
return actions
|
||||
|
||||
|
||||
CANNED_ACLS = {
|
||||
"private": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL)],
|
||||
),
|
||||
"public-read": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[
|
||||
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
|
||||
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_READ),
|
||||
],
|
||||
),
|
||||
"public-read-write": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[
|
||||
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
|
||||
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_READ),
|
||||
AclGrant(grantee=GRANTEE_ALL_USERS, permission=ACL_PERMISSION_WRITE),
|
||||
],
|
||||
),
|
||||
"authenticated-read": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[
|
||||
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
|
||||
AclGrant(grantee=GRANTEE_AUTHENTICATED_USERS, permission=ACL_PERMISSION_READ),
|
||||
],
|
||||
),
|
||||
"bucket-owner-read": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[
|
||||
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
|
||||
],
|
||||
),
|
||||
"bucket-owner-full-control": lambda owner: Acl(
|
||||
owner=owner,
|
||||
grants=[
|
||||
AclGrant(grantee=owner, permission=ACL_PERMISSION_FULL_CONTROL),
|
||||
],
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def create_canned_acl(canned_acl: str, owner: str) -> Acl:
|
||||
factory = CANNED_ACLS.get(canned_acl)
|
||||
if not factory:
|
||||
return CANNED_ACLS["private"](owner)
|
||||
return factory(owner)
|
||||
|
||||
|
||||
class AclService:
|
||||
def __init__(self, storage_root: Path):
|
||||
self.storage_root = storage_root
|
||||
self._bucket_acl_cache: Dict[str, Acl] = {}
|
||||
|
||||
def _bucket_acl_path(self, bucket_name: str) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / ".acl.json"
|
||||
|
||||
def get_bucket_acl(self, bucket_name: str) -> Optional[Acl]:
|
||||
if bucket_name in self._bucket_acl_cache:
|
||||
return self._bucket_acl_cache[bucket_name]
|
||||
acl_path = self._bucket_acl_path(bucket_name)
|
||||
if not acl_path.exists():
|
||||
return None
|
||||
try:
|
||||
data = json.loads(acl_path.read_text(encoding="utf-8"))
|
||||
acl = Acl.from_dict(data)
|
||||
self._bucket_acl_cache[bucket_name] = acl
|
||||
return acl
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
def set_bucket_acl(self, bucket_name: str, acl: Acl) -> None:
|
||||
acl_path = self._bucket_acl_path(bucket_name)
|
||||
acl_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
acl_path.write_text(json.dumps(acl.to_dict(), indent=2), encoding="utf-8")
|
||||
self._bucket_acl_cache[bucket_name] = acl
|
||||
|
||||
def set_bucket_canned_acl(self, bucket_name: str, canned_acl: str, owner: str) -> Acl:
|
||||
acl = create_canned_acl(canned_acl, owner)
|
||||
self.set_bucket_acl(bucket_name, acl)
|
||||
return acl
|
||||
|
||||
def delete_bucket_acl(self, bucket_name: str) -> None:
|
||||
acl_path = self._bucket_acl_path(bucket_name)
|
||||
if acl_path.exists():
|
||||
acl_path.unlink()
|
||||
self._bucket_acl_cache.pop(bucket_name, None)
|
||||
|
||||
def evaluate_bucket_acl(
|
||||
self,
|
||||
bucket_name: str,
|
||||
principal_id: Optional[str],
|
||||
action: str,
|
||||
is_authenticated: bool = True,
|
||||
) -> bool:
|
||||
acl = self.get_bucket_acl(bucket_name)
|
||||
if not acl:
|
||||
return False
|
||||
allowed_actions = acl.get_allowed_actions(principal_id, is_authenticated)
|
||||
return action in allowed_actions
|
||||
|
||||
def get_object_acl(self, bucket_name: str, object_key: str, object_metadata: Dict[str, Any]) -> Optional[Acl]:
|
||||
acl_data = object_metadata.get("__acl__")
|
||||
if not acl_data:
|
||||
return None
|
||||
try:
|
||||
return Acl.from_dict(acl_data)
|
||||
except (TypeError, KeyError):
|
||||
return None
|
||||
|
||||
def create_object_acl_metadata(self, acl: Acl) -> Dict[str, Any]:
|
||||
return {"__acl__": acl.to_dict()}
|
||||
|
||||
def evaluate_object_acl(
|
||||
self,
|
||||
object_metadata: Dict[str, Any],
|
||||
principal_id: Optional[str],
|
||||
action: str,
|
||||
is_authenticated: bool = True,
|
||||
) -> bool:
|
||||
acl = self.get_object_acl("", "", object_metadata)
|
||||
if not acl:
|
||||
return False
|
||||
allowed_actions = acl.get_allowed_actions(principal_id, is_authenticated)
|
||||
return action in allowed_actions
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Bucket policy loader/enforcer with a subset of AWS semantics."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Configuration helpers for the S3 clone application."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
@@ -74,6 +73,8 @@ class AppConfig:
|
||||
kms_keys_path: Path
|
||||
default_encryption_algorithm: str
|
||||
display_timezone: str
|
||||
lifecycle_enabled: bool
|
||||
lifecycle_interval_seconds: int
|
||||
|
||||
@classmethod
|
||||
def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig":
|
||||
@@ -91,6 +92,8 @@ class AppConfig:
|
||||
secret_ttl_seconds = int(_get("SECRET_TTL_SECONDS", 300))
|
||||
stream_chunk_size = int(_get("STREAM_CHUNK_SIZE", 64 * 1024))
|
||||
multipart_min_part_size = int(_get("MULTIPART_MIN_PART_SIZE", 5 * 1024 * 1024))
|
||||
lifecycle_enabled = _get("LIFECYCLE_ENABLED", "false").lower() in ("true", "1", "yes")
|
||||
lifecycle_interval_seconds = int(_get("LIFECYCLE_INTERVAL_SECONDS", 3600))
|
||||
default_secret = "dev-secret-key"
|
||||
secret_key = str(_get("SECRET_KEY", default_secret))
|
||||
|
||||
@@ -198,7 +201,9 @@ class AppConfig:
|
||||
kms_enabled=kms_enabled,
|
||||
kms_keys_path=kms_keys_path,
|
||||
default_encryption_algorithm=default_encryption_algorithm,
|
||||
display_timezone=display_timezone)
|
||||
display_timezone=display_timezone,
|
||||
lifecycle_enabled=lifecycle_enabled,
|
||||
lifecycle_interval_seconds=lifecycle_interval_seconds)
|
||||
|
||||
def validate_and_report(self) -> list[str]:
|
||||
"""Validate configuration and return a list of warnings/issues.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Manage remote S3 connections."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Encrypted storage layer that wraps ObjectStorage with encryption support."""
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
|
||||
@@ -353,6 +353,106 @@ class EncryptionManager:
|
||||
return encryptor.decrypt_stream(stream, metadata)
|
||||
|
||||
|
||||
class SSECEncryption(EncryptionProvider):
|
||||
"""SSE-C: Server-Side Encryption with Customer-Provided Keys.
|
||||
|
||||
The client provides the encryption key with each request.
|
||||
Server encrypts/decrypts but never stores the key.
|
||||
|
||||
Required headers for PUT:
|
||||
- x-amz-server-side-encryption-customer-algorithm: AES256
|
||||
- x-amz-server-side-encryption-customer-key: Base64-encoded 256-bit key
|
||||
- x-amz-server-side-encryption-customer-key-MD5: Base64-encoded MD5 of key
|
||||
"""
|
||||
|
||||
KEY_ID = "customer-provided"
|
||||
|
||||
def __init__(self, customer_key: bytes):
|
||||
if len(customer_key) != 32:
|
||||
raise EncryptionError("Customer key must be exactly 256 bits (32 bytes)")
|
||||
self.customer_key = customer_key
|
||||
|
||||
@classmethod
|
||||
def from_headers(cls, headers: Dict[str, str]) -> "SSECEncryption":
|
||||
algorithm = headers.get("x-amz-server-side-encryption-customer-algorithm", "")
|
||||
if algorithm.upper() != "AES256":
|
||||
raise EncryptionError(f"Unsupported SSE-C algorithm: {algorithm}. Only AES256 is supported.")
|
||||
|
||||
key_b64 = headers.get("x-amz-server-side-encryption-customer-key", "")
|
||||
if not key_b64:
|
||||
raise EncryptionError("Missing x-amz-server-side-encryption-customer-key header")
|
||||
|
||||
key_md5_b64 = headers.get("x-amz-server-side-encryption-customer-key-md5", "")
|
||||
|
||||
try:
|
||||
customer_key = base64.b64decode(key_b64)
|
||||
except Exception as e:
|
||||
raise EncryptionError(f"Invalid base64 in customer key: {e}") from e
|
||||
|
||||
if len(customer_key) != 32:
|
||||
raise EncryptionError(f"Customer key must be 256 bits, got {len(customer_key) * 8} bits")
|
||||
|
||||
if key_md5_b64:
|
||||
import hashlib
|
||||
expected_md5 = base64.b64encode(hashlib.md5(customer_key).digest()).decode()
|
||||
if key_md5_b64 != expected_md5:
|
||||
raise EncryptionError("Customer key MD5 mismatch")
|
||||
|
||||
return cls(customer_key)
|
||||
|
||||
def encrypt(self, plaintext: bytes, context: Dict[str, str] | None = None) -> EncryptionResult:
|
||||
aesgcm = AESGCM(self.customer_key)
|
||||
nonce = secrets.token_bytes(12)
|
||||
ciphertext = aesgcm.encrypt(nonce, plaintext, None)
|
||||
|
||||
return EncryptionResult(
|
||||
ciphertext=ciphertext,
|
||||
nonce=nonce,
|
||||
key_id=self.KEY_ID,
|
||||
encrypted_data_key=b"",
|
||||
)
|
||||
|
||||
def decrypt(self, ciphertext: bytes, nonce: bytes, encrypted_data_key: bytes,
|
||||
key_id: str, context: Dict[str, str] | None = None) -> bytes:
|
||||
aesgcm = AESGCM(self.customer_key)
|
||||
try:
|
||||
return aesgcm.decrypt(nonce, ciphertext, None)
|
||||
except Exception as exc:
|
||||
raise EncryptionError(f"SSE-C decryption failed: {exc}") from exc
|
||||
|
||||
def generate_data_key(self) -> tuple[bytes, bytes]:
|
||||
return self.customer_key, b""
|
||||
|
||||
|
||||
@dataclass
|
||||
class SSECMetadata:
|
||||
algorithm: str = "AES256"
|
||||
nonce: bytes = b""
|
||||
key_md5: str = ""
|
||||
|
||||
def to_dict(self) -> Dict[str, str]:
|
||||
return {
|
||||
"x-amz-server-side-encryption-customer-algorithm": self.algorithm,
|
||||
"x-amz-encryption-nonce": base64.b64encode(self.nonce).decode(),
|
||||
"x-amz-server-side-encryption-customer-key-MD5": self.key_md5,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, str]) -> Optional["SSECMetadata"]:
|
||||
algorithm = data.get("x-amz-server-side-encryption-customer-algorithm")
|
||||
if not algorithm:
|
||||
return None
|
||||
try:
|
||||
nonce = base64.b64decode(data.get("x-amz-encryption-nonce", ""))
|
||||
return cls(
|
||||
algorithm=algorithm,
|
||||
nonce=nonce,
|
||||
key_md5=data.get("x-amz-server-side-encryption-customer-key-MD5", ""),
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class ClientEncryptionHelper:
|
||||
"""Helpers for client-side encryption.
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Standardized error handling for API and UI responses."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Application-wide extension instances."""
|
||||
from flask import g
|
||||
from flask_limiter import Limiter
|
||||
from flask_limiter.util import get_remote_address
|
||||
|
||||
36
app/iam.py
36
app/iam.py
@@ -1,4 +1,3 @@
|
||||
"""Lightweight IAM-style user and policy management."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
@@ -121,6 +120,7 @@ class IamService:
|
||||
self._cache_ttl = 60.0 # Cache credentials for 60 seconds
|
||||
self._last_stat_check = 0.0
|
||||
self._stat_check_interval = 1.0 # Only stat() file every 1 second
|
||||
self._sessions: Dict[str, Dict[str, Any]] = {}
|
||||
self._load()
|
||||
|
||||
def _maybe_reload(self) -> None:
|
||||
@@ -192,6 +192,40 @@ class IamService:
|
||||
elapsed = (datetime.now(timezone.utc) - oldest).total_seconds()
|
||||
return int(max(0, self.auth_lockout_window.total_seconds() - elapsed))
|
||||
|
||||
def create_session_token(self, access_key: str, duration_seconds: int = 3600) -> str:
|
||||
"""Create a temporary session token for an access key."""
|
||||
self._maybe_reload()
|
||||
record = self._users.get(access_key)
|
||||
if not record:
|
||||
raise IamError("Unknown access key")
|
||||
self._cleanup_expired_sessions()
|
||||
token = secrets.token_urlsafe(32)
|
||||
expires_at = time.time() + duration_seconds
|
||||
self._sessions[token] = {
|
||||
"access_key": access_key,
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
return token
|
||||
|
||||
def validate_session_token(self, access_key: str, session_token: str) -> bool:
|
||||
"""Validate a session token for an access key."""
|
||||
session = self._sessions.get(session_token)
|
||||
if not session:
|
||||
return False
|
||||
if session["access_key"] != access_key:
|
||||
return False
|
||||
if time.time() > session["expires_at"]:
|
||||
del self._sessions[session_token]
|
||||
return False
|
||||
return True
|
||||
|
||||
def _cleanup_expired_sessions(self) -> None:
|
||||
"""Remove expired session tokens."""
|
||||
now = time.time()
|
||||
expired = [token for token, data in self._sessions.items() if now > data["expires_at"]]
|
||||
for token in expired:
|
||||
del self._sessions[token]
|
||||
|
||||
def principal_for_key(self, access_key: str) -> Principal:
|
||||
# Performance: Check cache first
|
||||
now = time.time()
|
||||
|
||||
21
app/kms.py
21
app/kms.py
@@ -1,4 +1,3 @@
|
||||
"""Key Management Service (KMS) for encryption key management."""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
@@ -212,6 +211,26 @@ class KMSManager:
|
||||
self._load_keys()
|
||||
return list(self._keys.values())
|
||||
|
||||
def get_default_key_id(self) -> str:
|
||||
"""Get the default KMS key ID, creating one if none exist."""
|
||||
self._load_keys()
|
||||
for key in self._keys.values():
|
||||
if key.enabled:
|
||||
return key.key_id
|
||||
default_key = self.create_key(description="Default KMS Key")
|
||||
return default_key.key_id
|
||||
|
||||
def get_provider(self, key_id: str | None = None) -> "KMSEncryptionProvider":
|
||||
"""Get a KMS encryption provider for the specified key."""
|
||||
if key_id is None:
|
||||
key_id = self.get_default_key_id()
|
||||
key = self.get_key(key_id)
|
||||
if not key:
|
||||
raise EncryptionError(f"Key not found: {key_id}")
|
||||
if not key.enabled:
|
||||
raise EncryptionError(f"Key is disabled: {key_id}")
|
||||
return KMSEncryptionProvider(self, key_id)
|
||||
|
||||
def enable_key(self, key_id: str) -> None:
|
||||
"""Enable a key."""
|
||||
self._load_keys()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""KMS and encryption API endpoints."""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
|
||||
235
app/lifecycle.py
Normal file
235
app/lifecycle.py
Normal file
@@ -0,0 +1,235 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from .storage import ObjectStorage, StorageError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LifecycleResult:
|
||||
bucket_name: str
|
||||
objects_deleted: int = 0
|
||||
versions_deleted: int = 0
|
||||
uploads_aborted: int = 0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
execution_time_seconds: float = 0.0
|
||||
|
||||
|
||||
class LifecycleManager:
|
||||
def __init__(self, storage: ObjectStorage, interval_seconds: int = 3600):
|
||||
self.storage = storage
|
||||
self.interval_seconds = interval_seconds
|
||||
self._timer: Optional[threading.Timer] = None
|
||||
self._shutdown = False
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def start(self) -> None:
|
||||
if self._timer is not None:
|
||||
return
|
||||
self._shutdown = False
|
||||
self._schedule_next()
|
||||
logger.info(f"Lifecycle manager started with interval {self.interval_seconds}s")
|
||||
|
||||
def stop(self) -> None:
|
||||
self._shutdown = True
|
||||
if self._timer:
|
||||
self._timer.cancel()
|
||||
self._timer = None
|
||||
logger.info("Lifecycle manager stopped")
|
||||
|
||||
def _schedule_next(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
self._timer = threading.Timer(self.interval_seconds, self._run_enforcement)
|
||||
self._timer.daemon = True
|
||||
self._timer.start()
|
||||
|
||||
def _run_enforcement(self) -> None:
|
||||
if self._shutdown:
|
||||
return
|
||||
try:
|
||||
self.enforce_all_buckets()
|
||||
except Exception as e:
|
||||
logger.error(f"Lifecycle enforcement failed: {e}")
|
||||
finally:
|
||||
self._schedule_next()
|
||||
|
||||
def enforce_all_buckets(self) -> Dict[str, LifecycleResult]:
|
||||
results = {}
|
||||
try:
|
||||
buckets = self.storage.list_buckets()
|
||||
for bucket in buckets:
|
||||
result = self.enforce_rules(bucket.name)
|
||||
if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0:
|
||||
results[bucket.name] = result
|
||||
except StorageError as e:
|
||||
logger.error(f"Failed to list buckets for lifecycle: {e}")
|
||||
return results
|
||||
|
||||
def enforce_rules(self, bucket_name: str) -> LifecycleResult:
|
||||
start_time = time.time()
|
||||
result = LifecycleResult(bucket_name=bucket_name)
|
||||
|
||||
try:
|
||||
lifecycle = self.storage.get_bucket_lifecycle(bucket_name)
|
||||
if not lifecycle:
|
||||
return result
|
||||
|
||||
for rule in lifecycle:
|
||||
if rule.get("Status") != "Enabled":
|
||||
continue
|
||||
rule_id = rule.get("ID", "unknown")
|
||||
prefix = rule.get("Prefix", rule.get("Filter", {}).get("Prefix", ""))
|
||||
|
||||
self._enforce_expiration(bucket_name, rule, prefix, result)
|
||||
self._enforce_noncurrent_expiration(bucket_name, rule, prefix, result)
|
||||
self._enforce_abort_multipart(bucket_name, rule, result)
|
||||
|
||||
except StorageError as e:
|
||||
result.errors.append(str(e))
|
||||
logger.error(f"Lifecycle enforcement error for {bucket_name}: {e}")
|
||||
|
||||
result.execution_time_seconds = time.time() - start_time
|
||||
if result.objects_deleted > 0 or result.versions_deleted > 0 or result.uploads_aborted > 0:
|
||||
logger.info(
|
||||
f"Lifecycle enforcement for {bucket_name}: "
|
||||
f"deleted={result.objects_deleted}, versions={result.versions_deleted}, "
|
||||
f"aborted={result.uploads_aborted}, time={result.execution_time_seconds:.2f}s"
|
||||
)
|
||||
return result
|
||||
|
||||
def _enforce_expiration(
|
||||
self, bucket_name: str, rule: Dict[str, Any], prefix: str, result: LifecycleResult
|
||||
) -> None:
|
||||
expiration = rule.get("Expiration", {})
|
||||
if not expiration:
|
||||
return
|
||||
|
||||
days = expiration.get("Days")
|
||||
date_str = expiration.get("Date")
|
||||
|
||||
if days:
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
elif date_str:
|
||||
try:
|
||||
cutoff = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return
|
||||
else:
|
||||
return
|
||||
|
||||
try:
|
||||
objects = self.storage.list_objects_all(bucket_name)
|
||||
for obj in objects:
|
||||
if prefix and not obj.key.startswith(prefix):
|
||||
continue
|
||||
if obj.last_modified < cutoff:
|
||||
try:
|
||||
self.storage.delete_object(bucket_name, obj.key)
|
||||
result.objects_deleted += 1
|
||||
except StorageError as e:
|
||||
result.errors.append(f"Failed to delete {obj.key}: {e}")
|
||||
except StorageError as e:
|
||||
result.errors.append(f"Failed to list objects: {e}")
|
||||
|
||||
def _enforce_noncurrent_expiration(
|
||||
self, bucket_name: str, rule: Dict[str, Any], prefix: str, result: LifecycleResult
|
||||
) -> None:
|
||||
noncurrent = rule.get("NoncurrentVersionExpiration", {})
|
||||
noncurrent_days = noncurrent.get("NoncurrentDays")
|
||||
if not noncurrent_days:
|
||||
return
|
||||
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=noncurrent_days)
|
||||
|
||||
try:
|
||||
objects = self.storage.list_objects_all(bucket_name)
|
||||
for obj in objects:
|
||||
if prefix and not obj.key.startswith(prefix):
|
||||
continue
|
||||
try:
|
||||
versions = self.storage.list_object_versions(bucket_name, obj.key)
|
||||
for version in versions:
|
||||
archived_at_str = version.get("archived_at", "")
|
||||
if not archived_at_str:
|
||||
continue
|
||||
try:
|
||||
archived_at = datetime.fromisoformat(archived_at_str.replace("Z", "+00:00"))
|
||||
if archived_at < cutoff:
|
||||
version_id = version.get("version_id")
|
||||
if version_id:
|
||||
self.storage.delete_object_version(bucket_name, obj.key, version_id)
|
||||
result.versions_deleted += 1
|
||||
except (ValueError, StorageError) as e:
|
||||
result.errors.append(f"Failed to process version: {e}")
|
||||
except StorageError:
|
||||
pass
|
||||
except StorageError as e:
|
||||
result.errors.append(f"Failed to list objects: {e}")
|
||||
|
||||
try:
|
||||
orphaned = self.storage.list_orphaned_objects(bucket_name)
|
||||
for item in orphaned:
|
||||
obj_key = item.get("key", "")
|
||||
if prefix and not obj_key.startswith(prefix):
|
||||
continue
|
||||
try:
|
||||
versions = self.storage.list_object_versions(bucket_name, obj_key)
|
||||
for version in versions:
|
||||
archived_at_str = version.get("archived_at", "")
|
||||
if not archived_at_str:
|
||||
continue
|
||||
try:
|
||||
archived_at = datetime.fromisoformat(archived_at_str.replace("Z", "+00:00"))
|
||||
if archived_at < cutoff:
|
||||
version_id = version.get("version_id")
|
||||
if version_id:
|
||||
self.storage.delete_object_version(bucket_name, obj_key, version_id)
|
||||
result.versions_deleted += 1
|
||||
except (ValueError, StorageError) as e:
|
||||
result.errors.append(f"Failed to process orphaned version: {e}")
|
||||
except StorageError:
|
||||
pass
|
||||
except StorageError as e:
|
||||
result.errors.append(f"Failed to list orphaned objects: {e}")
|
||||
|
||||
def _enforce_abort_multipart(
|
||||
self, bucket_name: str, rule: Dict[str, Any], result: LifecycleResult
|
||||
) -> None:
|
||||
abort_config = rule.get("AbortIncompleteMultipartUpload", {})
|
||||
days_after = abort_config.get("DaysAfterInitiation")
|
||||
if not days_after:
|
||||
return
|
||||
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days_after)
|
||||
|
||||
try:
|
||||
uploads = self.storage.list_multipart_uploads(bucket_name)
|
||||
for upload in uploads:
|
||||
created_at_str = upload.get("created_at", "")
|
||||
if not created_at_str:
|
||||
continue
|
||||
try:
|
||||
created_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
|
||||
if created_at < cutoff:
|
||||
upload_id = upload.get("upload_id")
|
||||
if upload_id:
|
||||
self.storage.abort_multipart_upload(bucket_name, upload_id)
|
||||
result.uploads_aborted += 1
|
||||
except (ValueError, StorageError) as e:
|
||||
result.errors.append(f"Failed to abort upload: {e}")
|
||||
except StorageError as e:
|
||||
result.errors.append(f"Failed to list multipart uploads: {e}")
|
||||
|
||||
def run_now(self, bucket_name: Optional[str] = None) -> Dict[str, LifecycleResult]:
|
||||
if bucket_name:
|
||||
return {bucket_name: self.enforce_rules(bucket_name)}
|
||||
return self.enforce_all_buckets()
|
||||
334
app/notifications.py
Normal file
334
app/notifications.py
Normal file
@@ -0,0 +1,334 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class NotificationEvent:
|
||||
event_name: str
|
||||
bucket_name: str
|
||||
object_key: str
|
||||
object_size: int = 0
|
||||
etag: str = ""
|
||||
version_id: Optional[str] = None
|
||||
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
request_id: str = field(default_factory=lambda: uuid.uuid4().hex)
|
||||
source_ip: str = ""
|
||||
user_identity: str = ""
|
||||
|
||||
def to_s3_event(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"Records": [
|
||||
{
|
||||
"eventVersion": "2.1",
|
||||
"eventSource": "myfsio:s3",
|
||||
"awsRegion": "local",
|
||||
"eventTime": self.timestamp.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
|
||||
"eventName": self.event_name,
|
||||
"userIdentity": {
|
||||
"principalId": self.user_identity or "ANONYMOUS",
|
||||
},
|
||||
"requestParameters": {
|
||||
"sourceIPAddress": self.source_ip or "127.0.0.1",
|
||||
},
|
||||
"responseElements": {
|
||||
"x-amz-request-id": self.request_id,
|
||||
"x-amz-id-2": self.request_id,
|
||||
},
|
||||
"s3": {
|
||||
"s3SchemaVersion": "1.0",
|
||||
"configurationId": "notification",
|
||||
"bucket": {
|
||||
"name": self.bucket_name,
|
||||
"ownerIdentity": {"principalId": "local"},
|
||||
"arn": f"arn:aws:s3:::{self.bucket_name}",
|
||||
},
|
||||
"object": {
|
||||
"key": self.object_key,
|
||||
"size": self.object_size,
|
||||
"eTag": self.etag,
|
||||
"versionId": self.version_id or "null",
|
||||
"sequencer": f"{int(time.time() * 1000):016X}",
|
||||
},
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebhookDestination:
|
||||
url: str
|
||||
headers: Dict[str, str] = field(default_factory=dict)
|
||||
timeout_seconds: int = 30
|
||||
retry_count: int = 3
|
||||
retry_delay_seconds: int = 1
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"url": self.url,
|
||||
"headers": self.headers,
|
||||
"timeout_seconds": self.timeout_seconds,
|
||||
"retry_count": self.retry_count,
|
||||
"retry_delay_seconds": self.retry_delay_seconds,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "WebhookDestination":
|
||||
return cls(
|
||||
url=data.get("url", ""),
|
||||
headers=data.get("headers", {}),
|
||||
timeout_seconds=data.get("timeout_seconds", 30),
|
||||
retry_count=data.get("retry_count", 3),
|
||||
retry_delay_seconds=data.get("retry_delay_seconds", 1),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class NotificationConfiguration:
|
||||
id: str
|
||||
events: List[str]
|
||||
destination: WebhookDestination
|
||||
prefix_filter: str = ""
|
||||
suffix_filter: str = ""
|
||||
|
||||
def matches_event(self, event_name: str, object_key: str) -> bool:
|
||||
event_match = False
|
||||
for pattern in self.events:
|
||||
if pattern.endswith("*"):
|
||||
base = pattern[:-1]
|
||||
if event_name.startswith(base):
|
||||
event_match = True
|
||||
break
|
||||
elif pattern == event_name:
|
||||
event_match = True
|
||||
break
|
||||
|
||||
if not event_match:
|
||||
return False
|
||||
|
||||
if self.prefix_filter and not object_key.startswith(self.prefix_filter):
|
||||
return False
|
||||
if self.suffix_filter and not object_key.endswith(self.suffix_filter):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"Id": self.id,
|
||||
"Events": self.events,
|
||||
"Destination": self.destination.to_dict(),
|
||||
"Filter": {
|
||||
"Key": {
|
||||
"FilterRules": [
|
||||
{"Name": "prefix", "Value": self.prefix_filter},
|
||||
{"Name": "suffix", "Value": self.suffix_filter},
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "NotificationConfiguration":
|
||||
prefix = ""
|
||||
suffix = ""
|
||||
filter_data = data.get("Filter", {})
|
||||
key_filter = filter_data.get("Key", {})
|
||||
for rule in key_filter.get("FilterRules", []):
|
||||
if rule.get("Name") == "prefix":
|
||||
prefix = rule.get("Value", "")
|
||||
elif rule.get("Name") == "suffix":
|
||||
suffix = rule.get("Value", "")
|
||||
|
||||
return cls(
|
||||
id=data.get("Id", uuid.uuid4().hex),
|
||||
events=data.get("Events", []),
|
||||
destination=WebhookDestination.from_dict(data.get("Destination", {})),
|
||||
prefix_filter=prefix,
|
||||
suffix_filter=suffix,
|
||||
)
|
||||
|
||||
|
||||
class NotificationService:
|
||||
def __init__(self, storage_root: Path, worker_count: int = 2):
|
||||
self.storage_root = storage_root
|
||||
self._configs: Dict[str, List[NotificationConfiguration]] = {}
|
||||
self._queue: queue.Queue[tuple[NotificationEvent, WebhookDestination]] = queue.Queue()
|
||||
self._workers: List[threading.Thread] = []
|
||||
self._shutdown = threading.Event()
|
||||
self._stats = {
|
||||
"events_queued": 0,
|
||||
"events_sent": 0,
|
||||
"events_failed": 0,
|
||||
}
|
||||
|
||||
for i in range(worker_count):
|
||||
worker = threading.Thread(target=self._worker_loop, name=f"notification-worker-{i}", daemon=True)
|
||||
worker.start()
|
||||
self._workers.append(worker)
|
||||
|
||||
def _config_path(self, bucket_name: str) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "notifications.json"
|
||||
|
||||
def get_bucket_notifications(self, bucket_name: str) -> List[NotificationConfiguration]:
|
||||
if bucket_name in self._configs:
|
||||
return self._configs[bucket_name]
|
||||
|
||||
config_path = self._config_path(bucket_name)
|
||||
if not config_path.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
configs = [NotificationConfiguration.from_dict(c) for c in data.get("configurations", [])]
|
||||
self._configs[bucket_name] = configs
|
||||
return configs
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
logger.warning(f"Failed to load notification config for {bucket_name}: {e}")
|
||||
return []
|
||||
|
||||
def set_bucket_notifications(
|
||||
self, bucket_name: str, configurations: List[NotificationConfiguration]
|
||||
) -> None:
|
||||
config_path = self._config_path(bucket_name)
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = {"configurations": [c.to_dict() for c in configurations]}
|
||||
config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||||
self._configs[bucket_name] = configurations
|
||||
|
||||
def delete_bucket_notifications(self, bucket_name: str) -> None:
|
||||
config_path = self._config_path(bucket_name)
|
||||
try:
|
||||
if config_path.exists():
|
||||
config_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._configs.pop(bucket_name, None)
|
||||
|
||||
def emit_event(self, event: NotificationEvent) -> None:
|
||||
configurations = self.get_bucket_notifications(event.bucket_name)
|
||||
if not configurations:
|
||||
return
|
||||
|
||||
for config in configurations:
|
||||
if config.matches_event(event.event_name, event.object_key):
|
||||
self._queue.put((event, config.destination))
|
||||
self._stats["events_queued"] += 1
|
||||
logger.debug(
|
||||
f"Queued notification for {event.event_name} on {event.bucket_name}/{event.object_key}"
|
||||
)
|
||||
|
||||
def emit_object_created(
|
||||
self,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
*,
|
||||
size: int = 0,
|
||||
etag: str = "",
|
||||
version_id: Optional[str] = None,
|
||||
request_id: str = "",
|
||||
source_ip: str = "",
|
||||
user_identity: str = "",
|
||||
operation: str = "Put",
|
||||
) -> None:
|
||||
event = NotificationEvent(
|
||||
event_name=f"s3:ObjectCreated:{operation}",
|
||||
bucket_name=bucket_name,
|
||||
object_key=object_key,
|
||||
object_size=size,
|
||||
etag=etag,
|
||||
version_id=version_id,
|
||||
request_id=request_id or uuid.uuid4().hex,
|
||||
source_ip=source_ip,
|
||||
user_identity=user_identity,
|
||||
)
|
||||
self.emit_event(event)
|
||||
|
||||
def emit_object_removed(
|
||||
self,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
*,
|
||||
version_id: Optional[str] = None,
|
||||
request_id: str = "",
|
||||
source_ip: str = "",
|
||||
user_identity: str = "",
|
||||
operation: str = "Delete",
|
||||
) -> None:
|
||||
event = NotificationEvent(
|
||||
event_name=f"s3:ObjectRemoved:{operation}",
|
||||
bucket_name=bucket_name,
|
||||
object_key=object_key,
|
||||
version_id=version_id,
|
||||
request_id=request_id or uuid.uuid4().hex,
|
||||
source_ip=source_ip,
|
||||
user_identity=user_identity,
|
||||
)
|
||||
self.emit_event(event)
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
while not self._shutdown.is_set():
|
||||
try:
|
||||
event, destination = self._queue.get(timeout=1.0)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
try:
|
||||
self._send_notification(event, destination)
|
||||
self._stats["events_sent"] += 1
|
||||
except Exception as e:
|
||||
self._stats["events_failed"] += 1
|
||||
logger.error(f"Failed to send notification: {e}")
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
|
||||
def _send_notification(self, event: NotificationEvent, destination: WebhookDestination) -> None:
|
||||
payload = event.to_s3_event()
|
||||
headers = {"Content-Type": "application/json", **destination.headers}
|
||||
|
||||
last_error = None
|
||||
for attempt in range(destination.retry_count):
|
||||
try:
|
||||
response = requests.post(
|
||||
destination.url,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=destination.timeout_seconds,
|
||||
)
|
||||
if response.status_code < 400:
|
||||
logger.info(
|
||||
f"Notification sent: {event.event_name} -> {destination.url} (status={response.status_code})"
|
||||
)
|
||||
return
|
||||
last_error = f"HTTP {response.status_code}: {response.text[:200]}"
|
||||
except requests.RequestException as e:
|
||||
last_error = str(e)
|
||||
|
||||
if attempt < destination.retry_count - 1:
|
||||
time.sleep(destination.retry_delay_seconds * (attempt + 1))
|
||||
|
||||
raise RuntimeError(f"Failed after {destination.retry_count} attempts: {last_error}")
|
||||
|
||||
def get_stats(self) -> Dict[str, int]:
|
||||
return dict(self._stats)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._shutdown.set()
|
||||
for worker in self._workers:
|
||||
worker.join(timeout=5.0)
|
||||
234
app/object_lock.py
Normal file
234
app/object_lock.py
Normal file
@@ -0,0 +1,234 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
class RetentionMode(Enum):
|
||||
GOVERNANCE = "GOVERNANCE"
|
||||
COMPLIANCE = "COMPLIANCE"
|
||||
|
||||
|
||||
class ObjectLockError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectLockRetention:
|
||||
mode: RetentionMode
|
||||
retain_until_date: datetime
|
||||
|
||||
def to_dict(self) -> Dict[str, str]:
|
||||
return {
|
||||
"Mode": self.mode.value,
|
||||
"RetainUntilDate": self.retain_until_date.isoformat(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> Optional["ObjectLockRetention"]:
|
||||
if not data:
|
||||
return None
|
||||
mode_str = data.get("Mode")
|
||||
date_str = data.get("RetainUntilDate")
|
||||
if not mode_str or not date_str:
|
||||
return None
|
||||
try:
|
||||
mode = RetentionMode(mode_str)
|
||||
retain_until = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
return cls(mode=mode, retain_until_date=retain_until)
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
return datetime.now(timezone.utc) > self.retain_until_date
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObjectLockConfig:
|
||||
enabled: bool = False
|
||||
default_retention: Optional[ObjectLockRetention] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
result: Dict[str, Any] = {"ObjectLockEnabled": "Enabled" if self.enabled else "Disabled"}
|
||||
if self.default_retention:
|
||||
result["Rule"] = {
|
||||
"DefaultRetention": {
|
||||
"Mode": self.default_retention.mode.value,
|
||||
"Days": None,
|
||||
"Years": None,
|
||||
}
|
||||
}
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "ObjectLockConfig":
|
||||
enabled = data.get("ObjectLockEnabled") == "Enabled"
|
||||
default_retention = None
|
||||
rule = data.get("Rule")
|
||||
if rule and "DefaultRetention" in rule:
|
||||
dr = rule["DefaultRetention"]
|
||||
mode_str = dr.get("Mode", "GOVERNANCE")
|
||||
days = dr.get("Days")
|
||||
years = dr.get("Years")
|
||||
if days or years:
|
||||
from datetime import timedelta
|
||||
now = datetime.now(timezone.utc)
|
||||
if years:
|
||||
delta = timedelta(days=int(years) * 365)
|
||||
else:
|
||||
delta = timedelta(days=int(days))
|
||||
default_retention = ObjectLockRetention(
|
||||
mode=RetentionMode(mode_str),
|
||||
retain_until_date=now + delta,
|
||||
)
|
||||
return cls(enabled=enabled, default_retention=default_retention)
|
||||
|
||||
|
||||
class ObjectLockService:
|
||||
def __init__(self, storage_root: Path):
|
||||
self.storage_root = storage_root
|
||||
self._config_cache: Dict[str, ObjectLockConfig] = {}
|
||||
|
||||
def _bucket_lock_config_path(self, bucket_name: str) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "buckets" / bucket_name / "object_lock.json"
|
||||
|
||||
def _object_lock_meta_path(self, bucket_name: str, object_key: str) -> Path:
|
||||
safe_key = object_key.replace("/", "_").replace("\\", "_")
|
||||
return (
|
||||
self.storage_root / ".myfsio.sys" / "buckets" / bucket_name /
|
||||
"locks" / f"{safe_key}.lock.json"
|
||||
)
|
||||
|
||||
def get_bucket_lock_config(self, bucket_name: str) -> ObjectLockConfig:
|
||||
if bucket_name in self._config_cache:
|
||||
return self._config_cache[bucket_name]
|
||||
|
||||
config_path = self._bucket_lock_config_path(bucket_name)
|
||||
if not config_path.exists():
|
||||
return ObjectLockConfig(enabled=False)
|
||||
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
config = ObjectLockConfig.from_dict(data)
|
||||
self._config_cache[bucket_name] = config
|
||||
return config
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return ObjectLockConfig(enabled=False)
|
||||
|
||||
def set_bucket_lock_config(self, bucket_name: str, config: ObjectLockConfig) -> None:
|
||||
config_path = self._bucket_lock_config_path(bucket_name)
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
config_path.write_text(json.dumps(config.to_dict()), encoding="utf-8")
|
||||
self._config_cache[bucket_name] = config
|
||||
|
||||
def enable_bucket_lock(self, bucket_name: str) -> None:
|
||||
config = self.get_bucket_lock_config(bucket_name)
|
||||
config.enabled = True
|
||||
self.set_bucket_lock_config(bucket_name, config)
|
||||
|
||||
def is_bucket_lock_enabled(self, bucket_name: str) -> bool:
|
||||
return self.get_bucket_lock_config(bucket_name).enabled
|
||||
|
||||
def get_object_retention(self, bucket_name: str, object_key: str) -> Optional[ObjectLockRetention]:
|
||||
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
||||
if not meta_path.exists():
|
||||
return None
|
||||
try:
|
||||
data = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
return ObjectLockRetention.from_dict(data.get("retention", {}))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return None
|
||||
|
||||
def set_object_retention(
|
||||
self,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
retention: ObjectLockRetention,
|
||||
bypass_governance: bool = False,
|
||||
) -> None:
|
||||
existing = self.get_object_retention(bucket_name, object_key)
|
||||
if existing and not existing.is_expired():
|
||||
if existing.mode == RetentionMode.COMPLIANCE:
|
||||
raise ObjectLockError(
|
||||
"Cannot modify retention on object with COMPLIANCE mode until retention expires"
|
||||
)
|
||||
if existing.mode == RetentionMode.GOVERNANCE and not bypass_governance:
|
||||
raise ObjectLockError(
|
||||
"Cannot modify GOVERNANCE retention without bypass-governance permission"
|
||||
)
|
||||
|
||||
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
||||
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
existing_data: Dict[str, Any] = {}
|
||||
if meta_path.exists():
|
||||
try:
|
||||
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
existing_data["retention"] = retention.to_dict()
|
||||
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
|
||||
|
||||
def get_legal_hold(self, bucket_name: str, object_key: str) -> bool:
|
||||
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
||||
if not meta_path.exists():
|
||||
return False
|
||||
try:
|
||||
data = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
return data.get("legal_hold", False)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return False
|
||||
|
||||
def set_legal_hold(self, bucket_name: str, object_key: str, enabled: bool) -> None:
|
||||
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
||||
meta_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
existing_data: Dict[str, Any] = {}
|
||||
if meta_path.exists():
|
||||
try:
|
||||
existing_data = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
existing_data["legal_hold"] = enabled
|
||||
meta_path.write_text(json.dumps(existing_data), encoding="utf-8")
|
||||
|
||||
def can_delete_object(
|
||||
self,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
bypass_governance: bool = False,
|
||||
) -> tuple[bool, str]:
|
||||
if self.get_legal_hold(bucket_name, object_key):
|
||||
return False, "Object is under legal hold"
|
||||
|
||||
retention = self.get_object_retention(bucket_name, object_key)
|
||||
if retention and not retention.is_expired():
|
||||
if retention.mode == RetentionMode.COMPLIANCE:
|
||||
return False, f"Object is locked in COMPLIANCE mode until {retention.retain_until_date.isoformat()}"
|
||||
if retention.mode == RetentionMode.GOVERNANCE:
|
||||
if not bypass_governance:
|
||||
return False, f"Object is locked in GOVERNANCE mode until {retention.retain_until_date.isoformat()}"
|
||||
|
||||
return True, ""
|
||||
|
||||
def can_overwrite_object(
|
||||
self,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
bypass_governance: bool = False,
|
||||
) -> tuple[bool, str]:
|
||||
return self.can_delete_object(bucket_name, object_key, bypass_governance)
|
||||
|
||||
def delete_object_lock_metadata(self, bucket_name: str, object_key: str) -> None:
|
||||
meta_path = self._object_lock_meta_path(bucket_name, object_key)
|
||||
try:
|
||||
if meta_path.exists():
|
||||
meta_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Background replication worker."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
@@ -32,13 +31,9 @@ REPLICATION_MODE_ALL = "all"
|
||||
|
||||
def _create_s3_client(connection: RemoteConnection, *, health_check: bool = False) -> Any:
|
||||
"""Create a boto3 S3 client for the given connection.
|
||||
|
||||
Args:
|
||||
connection: Remote S3 connection configuration
|
||||
health_check: If True, use minimal retries for quick health checks
|
||||
|
||||
Returns:
|
||||
Configured boto3 S3 client
|
||||
"""
|
||||
config = Config(
|
||||
user_agent_extra=REPLICATION_USER_AGENT,
|
||||
@@ -182,9 +177,15 @@ class ReplicationManager:
|
||||
return self._rules.get(bucket_name)
|
||||
|
||||
def set_rule(self, rule: ReplicationRule) -> None:
|
||||
old_rule = self._rules.get(rule.bucket_name)
|
||||
was_all_mode = old_rule and old_rule.mode == REPLICATION_MODE_ALL if old_rule else False
|
||||
self._rules[rule.bucket_name] = rule
|
||||
self.save_rules()
|
||||
|
||||
if rule.mode == REPLICATION_MODE_ALL and rule.enabled and not was_all_mode:
|
||||
logger.info(f"Replication mode ALL enabled for {rule.bucket_name}, triggering sync of existing objects")
|
||||
self._executor.submit(self.replicate_existing_objects, rule.bucket_name)
|
||||
|
||||
def delete_rule(self, bucket_name: str) -> None:
|
||||
if bucket_name in self._rules:
|
||||
del self._rules[bucket_name]
|
||||
|
||||
624
app/s3_api.py
624
app/s3_api.py
@@ -16,9 +16,14 @@ from xml.etree.ElementTree import Element, SubElement, tostring, fromstring, Par
|
||||
from flask import Blueprint, Response, current_app, jsonify, request, g
|
||||
from werkzeug.http import http_date
|
||||
|
||||
from .access_logging import AccessLoggingService, LoggingConfiguration
|
||||
from .acl import AclService
|
||||
from .bucket_policies import BucketPolicyStore
|
||||
from .encryption import SSECEncryption, SSECMetadata, EncryptionError
|
||||
from .extensions import limiter
|
||||
from .iam import IamError, Principal
|
||||
from .notifications import NotificationService, NotificationConfiguration, WebhookDestination
|
||||
from .object_lock import ObjectLockService, ObjectLockRetention, ObjectLockConfig, ObjectLockError, RetentionMode
|
||||
from .replication import ReplicationManager
|
||||
from .storage import ObjectStorage, StorageError, QuotaExceededError
|
||||
|
||||
@@ -30,6 +35,10 @@ def _storage() -> ObjectStorage:
|
||||
return current_app.extensions["object_storage"]
|
||||
|
||||
|
||||
def _acl() -> AclService:
|
||||
return current_app.extensions["acl"]
|
||||
|
||||
|
||||
def _iam():
|
||||
return current_app.extensions["iam"]
|
||||
|
||||
@@ -44,6 +53,18 @@ def _bucket_policies() -> BucketPolicyStore:
|
||||
return store
|
||||
|
||||
|
||||
def _object_lock() -> ObjectLockService:
|
||||
return current_app.extensions["object_lock"]
|
||||
|
||||
|
||||
def _notifications() -> NotificationService:
|
||||
return current_app.extensions["notifications"]
|
||||
|
||||
|
||||
def _access_logging() -> AccessLoggingService:
|
||||
return current_app.extensions["access_logging"]
|
||||
|
||||
|
||||
def _xml_response(element: Element, status: int = 200) -> Response:
|
||||
xml_bytes = tostring(element, encoding="utf-8")
|
||||
return Response(xml_bytes, status=status, mimetype="application/xml")
|
||||
@@ -58,6 +79,37 @@ def _error_response(code: str, message: str, status: int) -> Response:
|
||||
return _xml_response(error, status)
|
||||
|
||||
|
||||
def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]] | None:
|
||||
if not range_header.startswith("bytes="):
|
||||
return None
|
||||
ranges = []
|
||||
range_spec = range_header[6:]
|
||||
for part in range_spec.split(","):
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
if part.startswith("-"):
|
||||
suffix_length = int(part[1:])
|
||||
if suffix_length <= 0:
|
||||
return None
|
||||
start = max(0, file_size - suffix_length)
|
||||
end = file_size - 1
|
||||
elif part.endswith("-"):
|
||||
start = int(part[:-1])
|
||||
if start >= file_size:
|
||||
return None
|
||||
end = file_size - 1
|
||||
else:
|
||||
start_str, end_str = part.split("-", 1)
|
||||
start = int(start_str)
|
||||
end = int(end_str)
|
||||
if start > end or start >= file_size:
|
||||
return None
|
||||
end = min(end, file_size - 1)
|
||||
ranges.append((start, end))
|
||||
return ranges if ranges else None
|
||||
|
||||
|
||||
def _sign(key: bytes, msg: str) -> bytes:
|
||||
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
||||
|
||||
@@ -179,6 +231,11 @@ def _verify_sigv4_header(req: Any, auth_header: str) -> Principal | None:
|
||||
)
|
||||
raise IamError("SignatureDoesNotMatch")
|
||||
|
||||
session_token = req.headers.get("X-Amz-Security-Token")
|
||||
if session_token:
|
||||
if not _iam().validate_session_token(access_key, session_token):
|
||||
raise IamError("InvalidToken")
|
||||
|
||||
return _iam().get_principal(access_key)
|
||||
|
||||
|
||||
@@ -261,6 +318,11 @@ def _verify_sigv4_query(req: Any) -> Principal | None:
|
||||
if not hmac.compare_digest(calculated_signature, signature):
|
||||
raise IamError("SignatureDoesNotMatch")
|
||||
|
||||
session_token = req.args.get("X-Amz-Security-Token")
|
||||
if session_token:
|
||||
if not _iam().validate_session_token(access_key, session_token):
|
||||
raise IamError("InvalidToken")
|
||||
|
||||
return _iam().get_principal(access_key)
|
||||
|
||||
|
||||
@@ -321,6 +383,19 @@ def _authorize_action(principal: Principal | None, bucket_name: str | None, acti
|
||||
return
|
||||
if policy_decision == "allow":
|
||||
return
|
||||
|
||||
acl_allowed = False
|
||||
if bucket_name:
|
||||
acl_service = _acl()
|
||||
acl_allowed = acl_service.evaluate_bucket_acl(
|
||||
bucket_name,
|
||||
access_key,
|
||||
action,
|
||||
is_authenticated=principal is not None,
|
||||
)
|
||||
if acl_allowed:
|
||||
return
|
||||
|
||||
raise iam_error or IamError("Access denied")
|
||||
|
||||
|
||||
@@ -838,6 +913,9 @@ def _maybe_handle_bucket_subresource(bucket_name: str) -> Response | None:
|
||||
"versions": _bucket_list_versions_handler,
|
||||
"lifecycle": _bucket_lifecycle_handler,
|
||||
"quota": _bucket_quota_handler,
|
||||
"object-lock": _bucket_object_lock_handler,
|
||||
"notification": _bucket_notification_handler,
|
||||
"logging": _bucket_logging_handler,
|
||||
}
|
||||
requested = [key for key in handlers if key in request.args]
|
||||
if not requested:
|
||||
@@ -1131,6 +1209,8 @@ def _bucket_location_handler(bucket_name: str) -> Response:
|
||||
|
||||
|
||||
def _bucket_acl_handler(bucket_name: str) -> Response:
|
||||
from .acl import create_canned_acl, Acl, AclGrant, GRANTEE_ALL_USERS, GRANTEE_AUTHENTICATED_USERS
|
||||
|
||||
if request.method not in {"GET", "PUT"}:
|
||||
return _method_not_allowed(["GET", "PUT"])
|
||||
principal, error = _require_principal()
|
||||
@@ -1144,24 +1224,39 @@ def _bucket_acl_handler(bucket_name: str) -> Response:
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
acl_service = _acl()
|
||||
owner_id = principal.access_key if principal else "anonymous"
|
||||
|
||||
if request.method == "PUT":
|
||||
# Accept canned ACL headers for S3 compatibility (not fully implemented)
|
||||
canned_acl = request.headers.get("x-amz-acl", "private")
|
||||
current_app.logger.info("Bucket ACL set (canned)", extra={"bucket": bucket_name, "acl": canned_acl})
|
||||
acl = acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id)
|
||||
current_app.logger.info("Bucket ACL set", extra={"bucket": bucket_name, "acl": canned_acl})
|
||||
return Response(status=200)
|
||||
|
||||
root = Element("AccessControlPolicy")
|
||||
owner = SubElement(root, "Owner")
|
||||
SubElement(owner, "ID").text = principal.access_key if principal else "anonymous"
|
||||
SubElement(owner, "DisplayName").text = principal.display_name if principal else "Anonymous"
|
||||
acl = acl_service.get_bucket_acl(bucket_name)
|
||||
if not acl:
|
||||
acl = create_canned_acl("private", owner_id)
|
||||
|
||||
acl = SubElement(root, "AccessControlList")
|
||||
grant = SubElement(acl, "Grant")
|
||||
grantee = SubElement(grant, "Grantee")
|
||||
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "CanonicalUser")
|
||||
SubElement(grantee, "ID").text = principal.access_key if principal else "anonymous"
|
||||
SubElement(grantee, "DisplayName").text = principal.display_name if principal else "Anonymous"
|
||||
SubElement(grant, "Permission").text = "FULL_CONTROL"
|
||||
root = Element("AccessControlPolicy")
|
||||
owner_el = SubElement(root, "Owner")
|
||||
SubElement(owner_el, "ID").text = acl.owner
|
||||
SubElement(owner_el, "DisplayName").text = acl.owner
|
||||
|
||||
acl_el = SubElement(root, "AccessControlList")
|
||||
for grant in acl.grants:
|
||||
grant_el = SubElement(acl_el, "Grant")
|
||||
grantee = SubElement(grant_el, "Grantee")
|
||||
if grant.grantee == GRANTEE_ALL_USERS:
|
||||
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "Group")
|
||||
SubElement(grantee, "URI").text = "http://acs.amazonaws.com/groups/global/AllUsers"
|
||||
elif grant.grantee == GRANTEE_AUTHENTICATED_USERS:
|
||||
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "Group")
|
||||
SubElement(grantee, "URI").text = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
|
||||
else:
|
||||
grantee.set("{http://www.w3.org/2001/XMLSchema-instance}type", "CanonicalUser")
|
||||
SubElement(grantee, "ID").text = grant.grantee
|
||||
SubElement(grantee, "DisplayName").text = grant.grantee
|
||||
SubElement(grant_el, "Permission").text = grant.permission
|
||||
|
||||
return _xml_response(root)
|
||||
|
||||
@@ -1491,6 +1586,336 @@ def _bucket_quota_handler(bucket_name: str) -> Response:
|
||||
return Response(status=204)
|
||||
|
||||
|
||||
def _bucket_object_lock_handler(bucket_name: str) -> Response:
|
||||
if request.method not in {"GET", "PUT"}:
|
||||
return _method_not_allowed(["GET", "PUT"])
|
||||
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
return error
|
||||
try:
|
||||
_authorize_action(principal, bucket_name, "policy")
|
||||
except IamError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
lock_service = _object_lock()
|
||||
|
||||
if request.method == "GET":
|
||||
config = lock_service.get_bucket_lock_config(bucket_name)
|
||||
root = Element("ObjectLockConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
|
||||
SubElement(root, "ObjectLockEnabled").text = "Enabled" if config.enabled else "Disabled"
|
||||
return _xml_response(root)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
if not payload.strip():
|
||||
return _error_response("MalformedXML", "Request body is required", 400)
|
||||
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
enabled_el = root.find("{*}ObjectLockEnabled") or root.find("ObjectLockEnabled")
|
||||
enabled = (enabled_el.text or "").strip() == "Enabled" if enabled_el is not None else False
|
||||
|
||||
config = ObjectLockConfig(enabled=enabled)
|
||||
lock_service.set_bucket_lock_config(bucket_name, config)
|
||||
|
||||
current_app.logger.info("Bucket object lock updated", extra={"bucket": bucket_name, "enabled": enabled})
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
def _bucket_notification_handler(bucket_name: str) -> Response:
|
||||
if request.method not in {"GET", "PUT", "DELETE"}:
|
||||
return _method_not_allowed(["GET", "PUT", "DELETE"])
|
||||
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
return error
|
||||
try:
|
||||
_authorize_action(principal, bucket_name, "policy")
|
||||
except IamError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
notification_service = _notifications()
|
||||
|
||||
if request.method == "GET":
|
||||
configs = notification_service.get_bucket_notifications(bucket_name)
|
||||
root = Element("NotificationConfiguration", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
|
||||
for config in configs:
|
||||
webhook_el = SubElement(root, "WebhookConfiguration")
|
||||
SubElement(webhook_el, "Id").text = config.id
|
||||
for event in config.events:
|
||||
SubElement(webhook_el, "Event").text = event
|
||||
dest_el = SubElement(webhook_el, "Destination")
|
||||
SubElement(dest_el, "Url").text = config.destination.url
|
||||
if config.prefix_filter or config.suffix_filter:
|
||||
filter_el = SubElement(webhook_el, "Filter")
|
||||
key_el = SubElement(filter_el, "S3Key")
|
||||
if config.prefix_filter:
|
||||
rule_el = SubElement(key_el, "FilterRule")
|
||||
SubElement(rule_el, "Name").text = "prefix"
|
||||
SubElement(rule_el, "Value").text = config.prefix_filter
|
||||
if config.suffix_filter:
|
||||
rule_el = SubElement(key_el, "FilterRule")
|
||||
SubElement(rule_el, "Name").text = "suffix"
|
||||
SubElement(rule_el, "Value").text = config.suffix_filter
|
||||
return _xml_response(root)
|
||||
|
||||
if request.method == "DELETE":
|
||||
notification_service.delete_bucket_notifications(bucket_name)
|
||||
current_app.logger.info("Bucket notifications deleted", extra={"bucket": bucket_name})
|
||||
return Response(status=204)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
if not payload.strip():
|
||||
notification_service.delete_bucket_notifications(bucket_name)
|
||||
return Response(status=200)
|
||||
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
configs: list[NotificationConfiguration] = []
|
||||
for webhook_el in root.findall("{*}WebhookConfiguration") or root.findall("WebhookConfiguration"):
|
||||
config_id = _find_element_text(webhook_el, "Id") or uuid.uuid4().hex
|
||||
events = [el.text for el in webhook_el.findall("{*}Event") or webhook_el.findall("Event") if el.text]
|
||||
|
||||
dest_el = _find_element(webhook_el, "Destination")
|
||||
url = _find_element_text(dest_el, "Url") if dest_el else ""
|
||||
if not url:
|
||||
return _error_response("InvalidArgument", "Destination URL is required", 400)
|
||||
|
||||
prefix = ""
|
||||
suffix = ""
|
||||
filter_el = _find_element(webhook_el, "Filter")
|
||||
if filter_el:
|
||||
key_el = _find_element(filter_el, "S3Key")
|
||||
if key_el:
|
||||
for rule_el in key_el.findall("{*}FilterRule") or key_el.findall("FilterRule"):
|
||||
name = _find_element_text(rule_el, "Name")
|
||||
value = _find_element_text(rule_el, "Value")
|
||||
if name == "prefix":
|
||||
prefix = value
|
||||
elif name == "suffix":
|
||||
suffix = value
|
||||
|
||||
configs.append(NotificationConfiguration(
|
||||
id=config_id,
|
||||
events=events,
|
||||
destination=WebhookDestination(url=url),
|
||||
prefix_filter=prefix,
|
||||
suffix_filter=suffix,
|
||||
))
|
||||
|
||||
notification_service.set_bucket_notifications(bucket_name, configs)
|
||||
current_app.logger.info("Bucket notifications updated", extra={"bucket": bucket_name, "configs": len(configs)})
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
def _bucket_logging_handler(bucket_name: str) -> Response:
|
||||
if request.method not in {"GET", "PUT", "DELETE"}:
|
||||
return _method_not_allowed(["GET", "PUT", "DELETE"])
|
||||
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
return error
|
||||
try:
|
||||
_authorize_action(principal, bucket_name, "policy")
|
||||
except IamError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
logging_service = _access_logging()
|
||||
|
||||
if request.method == "GET":
|
||||
config = logging_service.get_bucket_logging(bucket_name)
|
||||
root = Element("BucketLoggingStatus", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
|
||||
if config and config.enabled:
|
||||
logging_enabled = SubElement(root, "LoggingEnabled")
|
||||
SubElement(logging_enabled, "TargetBucket").text = config.target_bucket
|
||||
SubElement(logging_enabled, "TargetPrefix").text = config.target_prefix
|
||||
return _xml_response(root)
|
||||
|
||||
if request.method == "DELETE":
|
||||
logging_service.delete_bucket_logging(bucket_name)
|
||||
current_app.logger.info("Bucket logging deleted", extra={"bucket": bucket_name})
|
||||
return Response(status=204)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
if not payload.strip():
|
||||
logging_service.delete_bucket_logging(bucket_name)
|
||||
return Response(status=200)
|
||||
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
logging_enabled = _find_element(root, "LoggingEnabled")
|
||||
if logging_enabled is None:
|
||||
logging_service.delete_bucket_logging(bucket_name)
|
||||
return Response(status=200)
|
||||
|
||||
target_bucket = _find_element_text(logging_enabled, "TargetBucket")
|
||||
if not target_bucket:
|
||||
return _error_response("InvalidArgument", "TargetBucket is required", 400)
|
||||
|
||||
if not storage.bucket_exists(target_bucket):
|
||||
return _error_response("InvalidTargetBucketForLogging", "Target bucket does not exist", 400)
|
||||
|
||||
target_prefix = _find_element_text(logging_enabled, "TargetPrefix")
|
||||
|
||||
config = LoggingConfiguration(
|
||||
target_bucket=target_bucket,
|
||||
target_prefix=target_prefix,
|
||||
enabled=True,
|
||||
)
|
||||
logging_service.set_bucket_logging(bucket_name, config)
|
||||
|
||||
current_app.logger.info(
|
||||
"Bucket logging updated",
|
||||
extra={"bucket": bucket_name, "target_bucket": target_bucket, "target_prefix": target_prefix}
|
||||
)
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
def _object_retention_handler(bucket_name: str, object_key: str) -> Response:
|
||||
if request.method not in {"GET", "PUT"}:
|
||||
return _method_not_allowed(["GET", "PUT"])
|
||||
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
return error
|
||||
try:
|
||||
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
try:
|
||||
storage.get_object_path(bucket_name, object_key)
|
||||
except StorageError:
|
||||
return _error_response("NoSuchKey", "Object does not exist", 404)
|
||||
|
||||
lock_service = _object_lock()
|
||||
|
||||
if request.method == "GET":
|
||||
retention = lock_service.get_object_retention(bucket_name, object_key)
|
||||
if not retention:
|
||||
return _error_response("NoSuchObjectLockConfiguration", "No retention policy", 404)
|
||||
|
||||
root = Element("Retention", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
|
||||
SubElement(root, "Mode").text = retention.mode.value
|
||||
SubElement(root, "RetainUntilDate").text = retention.retain_until_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||||
return _xml_response(root)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
if not payload.strip():
|
||||
return _error_response("MalformedXML", "Request body is required", 400)
|
||||
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
mode_str = _find_element_text(root, "Mode")
|
||||
retain_until_str = _find_element_text(root, "RetainUntilDate")
|
||||
|
||||
if not mode_str or not retain_until_str:
|
||||
return _error_response("InvalidArgument", "Mode and RetainUntilDate are required", 400)
|
||||
|
||||
try:
|
||||
mode = RetentionMode(mode_str)
|
||||
except ValueError:
|
||||
return _error_response("InvalidArgument", f"Invalid retention mode: {mode_str}", 400)
|
||||
|
||||
try:
|
||||
retain_until = datetime.fromisoformat(retain_until_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return _error_response("InvalidArgument", f"Invalid date format: {retain_until_str}", 400)
|
||||
|
||||
bypass = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
|
||||
|
||||
retention = ObjectLockRetention(mode=mode, retain_until_date=retain_until)
|
||||
try:
|
||||
lock_service.set_object_retention(bucket_name, object_key, retention, bypass_governance=bypass)
|
||||
except ObjectLockError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
current_app.logger.info(
|
||||
"Object retention set",
|
||||
extra={"bucket": bucket_name, "key": object_key, "mode": mode_str, "until": retain_until_str}
|
||||
)
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
def _object_legal_hold_handler(bucket_name: str, object_key: str) -> Response:
|
||||
if request.method not in {"GET", "PUT"}:
|
||||
return _method_not_allowed(["GET", "PUT"])
|
||||
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
return error
|
||||
try:
|
||||
_authorize_action(principal, bucket_name, "write" if request.method == "PUT" else "read", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return _error_response("AccessDenied", str(exc), 403)
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return _error_response("NoSuchBucket", "Bucket does not exist", 404)
|
||||
|
||||
try:
|
||||
storage.get_object_path(bucket_name, object_key)
|
||||
except StorageError:
|
||||
return _error_response("NoSuchKey", "Object does not exist", 404)
|
||||
|
||||
lock_service = _object_lock()
|
||||
|
||||
if request.method == "GET":
|
||||
enabled = lock_service.get_legal_hold(bucket_name, object_key)
|
||||
root = Element("LegalHold", xmlns="http://s3.amazonaws.com/doc/2006-03-01/")
|
||||
SubElement(root, "Status").text = "ON" if enabled else "OFF"
|
||||
return _xml_response(root)
|
||||
|
||||
payload = request.get_data(cache=False) or b""
|
||||
if not payload.strip():
|
||||
return _error_response("MalformedXML", "Request body is required", 400)
|
||||
|
||||
try:
|
||||
root = fromstring(payload)
|
||||
except ParseError:
|
||||
return _error_response("MalformedXML", "Unable to parse XML document", 400)
|
||||
|
||||
status = _find_element_text(root, "Status")
|
||||
if status not in {"ON", "OFF"}:
|
||||
return _error_response("InvalidArgument", "Status must be ON or OFF", 400)
|
||||
|
||||
lock_service.set_legal_hold(bucket_name, object_key, status == "ON")
|
||||
|
||||
current_app.logger.info(
|
||||
"Object legal hold set",
|
||||
extra={"bucket": bucket_name, "key": object_key, "status": status}
|
||||
)
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
def _bulk_delete_handler(bucket_name: str) -> Response:
|
||||
principal, error = _require_principal()
|
||||
if error:
|
||||
@@ -1537,29 +1962,28 @@ def _bulk_delete_handler(bucket_name: str) -> Response:
|
||||
return _error_response("MalformedXML", "A maximum of 1000 objects can be deleted per request", 400)
|
||||
|
||||
storage = _storage()
|
||||
deleted: list[str] = []
|
||||
deleted: list[dict[str, str | None]] = []
|
||||
errors: list[dict[str, str]] = []
|
||||
for entry in objects:
|
||||
key = entry["Key"] or ""
|
||||
version_id = entry.get("VersionId")
|
||||
if version_id:
|
||||
errors.append({
|
||||
"Key": key,
|
||||
"Code": "InvalidRequest",
|
||||
"Message": "VersionId is not supported for bulk deletes",
|
||||
})
|
||||
continue
|
||||
try:
|
||||
storage.delete_object(bucket_name, key)
|
||||
deleted.append(key)
|
||||
if version_id:
|
||||
storage.delete_object_version(bucket_name, key, version_id)
|
||||
deleted.append({"Key": key, "VersionId": version_id})
|
||||
else:
|
||||
storage.delete_object(bucket_name, key)
|
||||
deleted.append({"Key": key, "VersionId": None})
|
||||
except StorageError as exc:
|
||||
errors.append({"Key": key, "Code": "InvalidRequest", "Message": str(exc)})
|
||||
|
||||
result = Element("DeleteResult")
|
||||
if not quiet:
|
||||
for key in deleted:
|
||||
for item in deleted:
|
||||
deleted_el = SubElement(result, "Deleted")
|
||||
SubElement(deleted_el, "Key").text = key
|
||||
SubElement(deleted_el, "Key").text = item["Key"]
|
||||
if item.get("VersionId"):
|
||||
SubElement(deleted_el, "VersionId").text = item["VersionId"]
|
||||
for err in errors:
|
||||
error_el = SubElement(result, "Error")
|
||||
SubElement(error_el, "Key").text = err.get("Key", "")
|
||||
@@ -1796,6 +2220,12 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if "tagging" in request.args:
|
||||
return _object_tagging_handler(bucket_name, object_key)
|
||||
|
||||
if "retention" in request.args:
|
||||
return _object_retention_handler(bucket_name, object_key)
|
||||
|
||||
if "legal-hold" in request.args:
|
||||
return _object_legal_hold_handler(bucket_name, object_key)
|
||||
|
||||
if request.method == "POST":
|
||||
if "uploads" in request.args:
|
||||
return _initiate_multipart_upload(bucket_name, object_key)
|
||||
@@ -1811,10 +2241,16 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
if copy_source:
|
||||
return _copy_object(bucket_name, object_key, copy_source)
|
||||
|
||||
_, error = _object_principal("write", bucket_name, object_key)
|
||||
principal, error = _object_principal("write", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
|
||||
lock_service = _object_lock()
|
||||
can_overwrite, lock_reason = lock_service.can_overwrite_object(bucket_name, object_key, bypass_governance=bypass_governance)
|
||||
if not can_overwrite:
|
||||
return _error_response("AccessDenied", lock_reason, 403)
|
||||
|
||||
stream = request.stream
|
||||
content_encoding = request.headers.get("Content-Encoding", "").lower()
|
||||
if "aws-chunked" in content_encoding:
|
||||
@@ -1848,6 +2284,17 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
response = Response(status=200)
|
||||
response.headers["ETag"] = f'"{meta.etag}"'
|
||||
|
||||
_notifications().emit_object_created(
|
||||
bucket_name,
|
||||
object_key,
|
||||
size=meta.size,
|
||||
etag=meta.etag,
|
||||
request_id=getattr(g, "request_id", ""),
|
||||
source_ip=request.remote_addr or "",
|
||||
user_identity=principal.access_key if principal else "",
|
||||
operation="Put",
|
||||
)
|
||||
|
||||
if "S3ReplicationAgent" not in request.headers.get("User-Agent", ""):
|
||||
_replication_manager().trigger_replication(bucket_name, object_key, action="write")
|
||||
|
||||
@@ -1870,20 +2317,67 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
is_encrypted = "x-amz-server-side-encryption" in metadata
|
||||
|
||||
if request.method == "GET":
|
||||
range_header = request.headers.get("Range")
|
||||
|
||||
if is_encrypted and hasattr(storage, 'get_object_data'):
|
||||
try:
|
||||
data, clean_metadata = storage.get_object_data(bucket_name, object_key)
|
||||
response = Response(data, mimetype=mimetype)
|
||||
logged_bytes = len(data)
|
||||
response.headers["Content-Length"] = len(data)
|
||||
file_size = len(data)
|
||||
etag = hashlib.md5(data).hexdigest()
|
||||
|
||||
if range_header:
|
||||
try:
|
||||
ranges = _parse_range_header(range_header, file_size)
|
||||
except (ValueError, TypeError):
|
||||
ranges = None
|
||||
if ranges is None:
|
||||
return _error_response("InvalidRange", "Range Not Satisfiable", 416)
|
||||
start, end = ranges[0]
|
||||
partial_data = data[start:end + 1]
|
||||
response = Response(partial_data, status=206, mimetype=mimetype)
|
||||
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
||||
response.headers["Content-Length"] = len(partial_data)
|
||||
logged_bytes = len(partial_data)
|
||||
else:
|
||||
response = Response(data, mimetype=mimetype)
|
||||
response.headers["Content-Length"] = file_size
|
||||
logged_bytes = file_size
|
||||
except StorageError as exc:
|
||||
return _error_response("InternalError", str(exc), 500)
|
||||
else:
|
||||
stat = path.stat()
|
||||
response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True)
|
||||
logged_bytes = stat.st_size
|
||||
file_size = stat.st_size
|
||||
etag = storage._compute_etag(path)
|
||||
|
||||
if range_header:
|
||||
try:
|
||||
ranges = _parse_range_header(range_header, file_size)
|
||||
except (ValueError, TypeError):
|
||||
ranges = None
|
||||
if ranges is None:
|
||||
return _error_response("InvalidRange", "Range Not Satisfiable", 416)
|
||||
start, end = ranges[0]
|
||||
length = end - start + 1
|
||||
|
||||
def stream_range(file_path, start_pos, length_to_read):
|
||||
with open(file_path, "rb") as f:
|
||||
f.seek(start_pos)
|
||||
remaining = length_to_read
|
||||
while remaining > 0:
|
||||
chunk_size = min(65536, remaining)
|
||||
chunk = f.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
remaining -= len(chunk)
|
||||
yield chunk
|
||||
|
||||
response = Response(stream_range(path, start, length), status=206, mimetype=mimetype, direct_passthrough=True)
|
||||
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
||||
response.headers["Content-Length"] = length
|
||||
logged_bytes = length
|
||||
else:
|
||||
response = Response(_stream_file(path), mimetype=mimetype, direct_passthrough=True)
|
||||
logged_bytes = file_size
|
||||
else:
|
||||
if is_encrypted and hasattr(storage, 'get_object_data'):
|
||||
try:
|
||||
@@ -1901,6 +2395,21 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
logged_bytes = 0
|
||||
|
||||
_apply_object_headers(response, file_stat=path.stat() if not is_encrypted else None, metadata=metadata, etag=etag)
|
||||
|
||||
if request.method == "GET":
|
||||
response_overrides = {
|
||||
"response-content-type": "Content-Type",
|
||||
"response-content-language": "Content-Language",
|
||||
"response-expires": "Expires",
|
||||
"response-cache-control": "Cache-Control",
|
||||
"response-content-disposition": "Content-Disposition",
|
||||
"response-content-encoding": "Content-Encoding",
|
||||
}
|
||||
for param, header in response_overrides.items():
|
||||
value = request.args.get(param)
|
||||
if value:
|
||||
response.headers[header] = value
|
||||
|
||||
action = "Object read" if request.method == "GET" else "Object head"
|
||||
current_app.logger.info(action, extra={"bucket": bucket_name, "key": object_key, "bytes": logged_bytes})
|
||||
return response
|
||||
@@ -1911,9 +2420,26 @@ def object_handler(bucket_name: str, object_key: str):
|
||||
_, error = _object_principal("delete", bucket_name, object_key)
|
||||
if error:
|
||||
return error
|
||||
|
||||
bypass_governance = request.headers.get("x-amz-bypass-governance-retention", "").lower() == "true"
|
||||
lock_service = _object_lock()
|
||||
can_delete, lock_reason = lock_service.can_delete_object(bucket_name, object_key, bypass_governance=bypass_governance)
|
||||
if not can_delete:
|
||||
return _error_response("AccessDenied", lock_reason, 403)
|
||||
|
||||
storage.delete_object(bucket_name, object_key)
|
||||
lock_service.delete_object_lock_metadata(bucket_name, object_key)
|
||||
current_app.logger.info("Object deleted", extra={"bucket": bucket_name, "key": object_key})
|
||||
|
||||
principal, _ = _require_principal()
|
||||
_notifications().emit_object_removed(
|
||||
bucket_name,
|
||||
object_key,
|
||||
request_id=getattr(g, "request_id", ""),
|
||||
source_ip=request.remote_addr or "",
|
||||
user_identity=principal.access_key if principal else "",
|
||||
)
|
||||
|
||||
user_agent = request.headers.get("User-Agent", "")
|
||||
if "S3ReplicationAgent" not in user_agent:
|
||||
_replication_manager().trigger_replication(bucket_name, object_key, action="delete")
|
||||
@@ -2120,6 +2646,42 @@ def _copy_object(dest_bucket: str, dest_key: str, copy_source: str) -> Response:
|
||||
except StorageError:
|
||||
return _error_response("NoSuchKey", "Source object not found", 404)
|
||||
|
||||
source_stat = source_path.stat()
|
||||
source_etag = storage._compute_etag(source_path)
|
||||
source_mtime = datetime.fromtimestamp(source_stat.st_mtime, timezone.utc)
|
||||
|
||||
copy_source_if_match = request.headers.get("x-amz-copy-source-if-match")
|
||||
if copy_source_if_match:
|
||||
expected_etag = copy_source_if_match.strip('"')
|
||||
if source_etag != expected_etag:
|
||||
return _error_response("PreconditionFailed", "Source ETag does not match", 412)
|
||||
|
||||
copy_source_if_none_match = request.headers.get("x-amz-copy-source-if-none-match")
|
||||
if copy_source_if_none_match:
|
||||
not_expected_etag = copy_source_if_none_match.strip('"')
|
||||
if source_etag == not_expected_etag:
|
||||
return _error_response("PreconditionFailed", "Source ETag matches", 412)
|
||||
|
||||
copy_source_if_modified_since = request.headers.get("x-amz-copy-source-if-modified-since")
|
||||
if copy_source_if_modified_since:
|
||||
from email.utils import parsedate_to_datetime
|
||||
try:
|
||||
if_modified = parsedate_to_datetime(copy_source_if_modified_since)
|
||||
if source_mtime <= if_modified:
|
||||
return _error_response("PreconditionFailed", "Source not modified since specified date", 412)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
copy_source_if_unmodified_since = request.headers.get("x-amz-copy-source-if-unmodified-since")
|
||||
if copy_source_if_unmodified_since:
|
||||
from email.utils import parsedate_to_datetime
|
||||
try:
|
||||
if_unmodified = parsedate_to_datetime(copy_source_if_unmodified_since)
|
||||
if source_mtime > if_unmodified:
|
||||
return _error_response("PreconditionFailed", "Source modified since specified date", 412)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
source_metadata = storage.get_object_metadata(source_bucket, source_key)
|
||||
|
||||
metadata_directive = request.headers.get("x-amz-metadata-directive", "COPY").upper()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Ephemeral store for one-time secrets communicated to the UI."""
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Filesystem-backed object storage helpers."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
@@ -809,6 +808,29 @@ class ObjectStorage:
|
||||
metadata=metadata or None,
|
||||
)
|
||||
|
||||
def delete_object_version(self, bucket_name: str, object_key: str, version_id: str) -> None:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
raise StorageError("Bucket does not exist")
|
||||
bucket_id = bucket_path.name
|
||||
safe_key = self._sanitize_object_key(object_key)
|
||||
version_dir = self._version_dir(bucket_id, safe_key)
|
||||
data_path = version_dir / f"{version_id}.bin"
|
||||
meta_path = version_dir / f"{version_id}.json"
|
||||
if not data_path.exists() and not meta_path.exists():
|
||||
legacy_version_dir = self._legacy_version_dir(bucket_id, safe_key)
|
||||
data_path = legacy_version_dir / f"{version_id}.bin"
|
||||
meta_path = legacy_version_dir / f"{version_id}.json"
|
||||
if not data_path.exists() and not meta_path.exists():
|
||||
raise StorageError(f"Version {version_id} not found")
|
||||
if data_path.exists():
|
||||
data_path.unlink()
|
||||
if meta_path.exists():
|
||||
meta_path.unlink()
|
||||
parent = data_path.parent
|
||||
if parent.exists() and not any(parent.iterdir()):
|
||||
parent.rmdir()
|
||||
|
||||
def list_orphaned_objects(self, bucket_name: str) -> List[Dict[str, Any]]:
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
@@ -1124,6 +1146,49 @@ class ObjectStorage:
|
||||
parts.sort(key=lambda x: x["PartNumber"])
|
||||
return parts
|
||||
|
||||
def list_multipart_uploads(self, bucket_name: str) -> List[Dict[str, Any]]:
|
||||
"""List all active multipart uploads for a bucket."""
|
||||
bucket_path = self._bucket_path(bucket_name)
|
||||
if not bucket_path.exists():
|
||||
raise StorageError("Bucket does not exist")
|
||||
bucket_id = bucket_path.name
|
||||
uploads = []
|
||||
multipart_root = self._bucket_multipart_root(bucket_id)
|
||||
if multipart_root.exists():
|
||||
for upload_dir in multipart_root.iterdir():
|
||||
if not upload_dir.is_dir():
|
||||
continue
|
||||
manifest_path = upload_dir / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
continue
|
||||
try:
|
||||
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||
uploads.append({
|
||||
"upload_id": manifest.get("upload_id", upload_dir.name),
|
||||
"object_key": manifest.get("object_key", ""),
|
||||
"created_at": manifest.get("created_at", ""),
|
||||
})
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
legacy_root = self._legacy_multipart_root(bucket_id)
|
||||
if legacy_root.exists():
|
||||
for upload_dir in legacy_root.iterdir():
|
||||
if not upload_dir.is_dir():
|
||||
continue
|
||||
manifest_path = upload_dir / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
continue
|
||||
try:
|
||||
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||
uploads.append({
|
||||
"upload_id": manifest.get("upload_id", upload_dir.name),
|
||||
"object_key": manifest.get("object_key", ""),
|
||||
"created_at": manifest.get("created_at", ""),
|
||||
})
|
||||
except (OSError, json.JSONDecodeError):
|
||||
continue
|
||||
return uploads
|
||||
|
||||
def _bucket_path(self, bucket_name: str) -> Path:
|
||||
safe_name = self._sanitize_bucket_name(bucket_name)
|
||||
return self.root / safe_name
|
||||
|
||||
344
app/ui.py
344
app/ui.py
@@ -1,4 +1,3 @@
|
||||
"""Authenticated HTML UI for browsing buckets and objects."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
@@ -26,6 +25,7 @@ from flask import (
|
||||
)
|
||||
from flask_wtf.csrf import generate_csrf
|
||||
|
||||
from .acl import AclService, create_canned_acl, CANNED_ACLS
|
||||
from .bucket_policies import BucketPolicyStore
|
||||
from .connections import ConnectionStore, RemoteConnection
|
||||
from .extensions import limiter
|
||||
@@ -75,6 +75,10 @@ def _secret_store() -> EphemeralSecretStore:
|
||||
return store
|
||||
|
||||
|
||||
def _acl() -> AclService:
|
||||
return current_app.extensions["acl"]
|
||||
|
||||
|
||||
def _format_bytes(num: int) -> str:
|
||||
step = 1024
|
||||
units = ["B", "KB", "MB", "GB", "TB", "PB"]
|
||||
@@ -379,10 +383,21 @@ def bucket_detail(bucket_name: str):
|
||||
|
||||
objects_api_url = url_for("ui.list_bucket_objects", bucket_name=bucket_name)
|
||||
|
||||
lifecycle_url = url_for("ui.bucket_lifecycle", bucket_name=bucket_name)
|
||||
cors_url = url_for("ui.bucket_cors", bucket_name=bucket_name)
|
||||
acl_url = url_for("ui.bucket_acl", bucket_name=bucket_name)
|
||||
folders_url = url_for("ui.create_folder", bucket_name=bucket_name)
|
||||
buckets_for_copy_url = url_for("ui.list_buckets_for_copy", bucket_name=bucket_name)
|
||||
|
||||
return render_template(
|
||||
"bucket_detail.html",
|
||||
bucket_name=bucket_name,
|
||||
objects_api_url=objects_api_url,
|
||||
lifecycle_url=lifecycle_url,
|
||||
cors_url=cors_url,
|
||||
acl_url=acl_url,
|
||||
folders_url=folders_url,
|
||||
buckets_for_copy_url=buckets_for_copy_url,
|
||||
principal=principal,
|
||||
bucket_policy_text=policy_text,
|
||||
bucket_policy=bucket_policy,
|
||||
@@ -441,6 +456,9 @@ def list_bucket_objects(bucket_name: str):
|
||||
presign_template = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||
versions_template = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||
restore_template = url_for("ui.restore_object_version", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER", version_id="VERSION_ID_PLACEHOLDER")
|
||||
tags_template = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||
copy_template = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||
move_template = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
||||
|
||||
objects_data = []
|
||||
for obj in result.objects:
|
||||
@@ -465,6 +483,9 @@ def list_bucket_objects(bucket_name: str):
|
||||
"delete": delete_template,
|
||||
"versions": versions_template,
|
||||
"restore": restore_template,
|
||||
"tags": tags_template,
|
||||
"copy": copy_template,
|
||||
"move": move_template,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1666,6 +1687,327 @@ def metrics_dashboard():
|
||||
)
|
||||
|
||||
|
||||
@ui_bp.route("/buckets/<bucket_name>/lifecycle", methods=["GET", "POST", "DELETE"])
|
||||
def bucket_lifecycle(bucket_name: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "policy")
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return jsonify({"error": "Bucket does not exist"}), 404
|
||||
|
||||
if request.method == "GET":
|
||||
rules = storage.get_bucket_lifecycle(bucket_name) or []
|
||||
return jsonify({"rules": rules})
|
||||
|
||||
if request.method == "DELETE":
|
||||
storage.set_bucket_lifecycle(bucket_name, None)
|
||||
return jsonify({"status": "ok", "message": "Lifecycle configuration deleted"})
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
rules = payload.get("rules", [])
|
||||
if not isinstance(rules, list):
|
||||
return jsonify({"error": "rules must be a list"}), 400
|
||||
|
||||
validated_rules = []
|
||||
for i, rule in enumerate(rules):
|
||||
if not isinstance(rule, dict):
|
||||
return jsonify({"error": f"Rule {i} must be an object"}), 400
|
||||
validated = {
|
||||
"ID": str(rule.get("ID", f"rule-{i+1}")),
|
||||
"Status": "Enabled" if rule.get("Status", "Enabled") == "Enabled" else "Disabled",
|
||||
}
|
||||
if rule.get("Prefix"):
|
||||
validated["Prefix"] = str(rule["Prefix"])
|
||||
if rule.get("Expiration"):
|
||||
exp = rule["Expiration"]
|
||||
if isinstance(exp, dict) and exp.get("Days"):
|
||||
validated["Expiration"] = {"Days": int(exp["Days"])}
|
||||
if rule.get("NoncurrentVersionExpiration"):
|
||||
nve = rule["NoncurrentVersionExpiration"]
|
||||
if isinstance(nve, dict) and nve.get("NoncurrentDays"):
|
||||
validated["NoncurrentVersionExpiration"] = {"NoncurrentDays": int(nve["NoncurrentDays"])}
|
||||
if rule.get("AbortIncompleteMultipartUpload"):
|
||||
aimu = rule["AbortIncompleteMultipartUpload"]
|
||||
if isinstance(aimu, dict) and aimu.get("DaysAfterInitiation"):
|
||||
validated["AbortIncompleteMultipartUpload"] = {"DaysAfterInitiation": int(aimu["DaysAfterInitiation"])}
|
||||
validated_rules.append(validated)
|
||||
|
||||
storage.set_bucket_lifecycle(bucket_name, validated_rules if validated_rules else None)
|
||||
return jsonify({"status": "ok", "message": "Lifecycle configuration saved", "rules": validated_rules})
|
||||
|
||||
|
||||
@ui_bp.route("/buckets/<bucket_name>/cors", methods=["GET", "POST", "DELETE"])
|
||||
def bucket_cors(bucket_name: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "policy")
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return jsonify({"error": "Bucket does not exist"}), 404
|
||||
|
||||
if request.method == "GET":
|
||||
rules = storage.get_bucket_cors(bucket_name) or []
|
||||
return jsonify({"rules": rules})
|
||||
|
||||
if request.method == "DELETE":
|
||||
storage.set_bucket_cors(bucket_name, None)
|
||||
return jsonify({"status": "ok", "message": "CORS configuration deleted"})
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
rules = payload.get("rules", [])
|
||||
if not isinstance(rules, list):
|
||||
return jsonify({"error": "rules must be a list"}), 400
|
||||
|
||||
validated_rules = []
|
||||
for i, rule in enumerate(rules):
|
||||
if not isinstance(rule, dict):
|
||||
return jsonify({"error": f"Rule {i} must be an object"}), 400
|
||||
origins = rule.get("AllowedOrigins", [])
|
||||
methods = rule.get("AllowedMethods", [])
|
||||
if not origins or not methods:
|
||||
return jsonify({"error": f"Rule {i} must have AllowedOrigins and AllowedMethods"}), 400
|
||||
validated = {
|
||||
"AllowedOrigins": [str(o) for o in origins if o],
|
||||
"AllowedMethods": [str(m).upper() for m in methods if m],
|
||||
}
|
||||
if rule.get("AllowedHeaders"):
|
||||
validated["AllowedHeaders"] = [str(h) for h in rule["AllowedHeaders"] if h]
|
||||
if rule.get("ExposeHeaders"):
|
||||
validated["ExposeHeaders"] = [str(h) for h in rule["ExposeHeaders"] if h]
|
||||
if rule.get("MaxAgeSeconds") is not None:
|
||||
try:
|
||||
validated["MaxAgeSeconds"] = int(rule["MaxAgeSeconds"])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
validated_rules.append(validated)
|
||||
|
||||
storage.set_bucket_cors(bucket_name, validated_rules if validated_rules else None)
|
||||
return jsonify({"status": "ok", "message": "CORS configuration saved", "rules": validated_rules})
|
||||
|
||||
|
||||
@ui_bp.route("/buckets/<bucket_name>/acl", methods=["GET", "POST"])
|
||||
def bucket_acl(bucket_name: str):
|
||||
principal = _current_principal()
|
||||
action = "read" if request.method == "GET" else "write"
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, action)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
if not storage.bucket_exists(bucket_name):
|
||||
return jsonify({"error": "Bucket does not exist"}), 404
|
||||
|
||||
acl_service = _acl()
|
||||
owner_id = principal.access_key if principal else "anonymous"
|
||||
|
||||
if request.method == "GET":
|
||||
try:
|
||||
acl = acl_service.get_bucket_acl(bucket_name)
|
||||
if not acl:
|
||||
acl = create_canned_acl("private", owner_id)
|
||||
return jsonify({
|
||||
"owner": acl.owner,
|
||||
"grants": [g.to_dict() for g in acl.grants],
|
||||
"canned_acls": list(CANNED_ACLS.keys()),
|
||||
})
|
||||
except Exception as exc:
|
||||
return jsonify({"error": str(exc)}), 500
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
canned_acl = payload.get("canned_acl")
|
||||
if canned_acl:
|
||||
if canned_acl not in CANNED_ACLS:
|
||||
return jsonify({"error": f"Invalid canned ACL: {canned_acl}"}), 400
|
||||
acl_service.set_bucket_canned_acl(bucket_name, canned_acl, owner_id)
|
||||
return jsonify({"status": "ok", "message": f"ACL set to {canned_acl}"})
|
||||
|
||||
return jsonify({"error": "canned_acl is required"}), 400
|
||||
|
||||
|
||||
@ui_bp.route("/buckets/<bucket_name>/objects/<path:object_key>/tags", methods=["GET", "POST"])
|
||||
def object_tags(bucket_name: str, object_key: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
|
||||
if request.method == "GET":
|
||||
try:
|
||||
tags = storage.get_object_tags(bucket_name, object_key)
|
||||
return jsonify({"tags": tags})
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 404
|
||||
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "write", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
tags = payload.get("tags", [])
|
||||
if not isinstance(tags, list):
|
||||
return jsonify({"error": "tags must be a list"}), 400
|
||||
if len(tags) > 10:
|
||||
return jsonify({"error": "Maximum 10 tags allowed"}), 400
|
||||
|
||||
validated_tags = []
|
||||
for tag in tags:
|
||||
if isinstance(tag, dict) and tag.get("Key"):
|
||||
validated_tags.append({
|
||||
"Key": str(tag["Key"]),
|
||||
"Value": str(tag.get("Value", ""))
|
||||
})
|
||||
|
||||
try:
|
||||
storage.set_object_tags(bucket_name, object_key, validated_tags if validated_tags else None)
|
||||
return jsonify({"status": "ok", "message": "Tags saved", "tags": validated_tags})
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 400
|
||||
|
||||
|
||||
@ui_bp.post("/buckets/<bucket_name>/folders")
|
||||
def create_folder(bucket_name: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "write")
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
folder_name = str(payload.get("folder_name", "")).strip()
|
||||
prefix = str(payload.get("prefix", "")).strip()
|
||||
|
||||
if not folder_name:
|
||||
return jsonify({"error": "folder_name is required"}), 400
|
||||
|
||||
folder_name = folder_name.rstrip("/")
|
||||
if "/" in folder_name:
|
||||
return jsonify({"error": "Folder name cannot contain /"}), 400
|
||||
|
||||
folder_key = f"{prefix}{folder_name}/" if prefix else f"{folder_name}/"
|
||||
|
||||
import io
|
||||
try:
|
||||
_storage().put_object(bucket_name, folder_key, io.BytesIO(b""))
|
||||
return jsonify({"status": "ok", "message": f"Folder '{folder_name}' created", "key": folder_key})
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 400
|
||||
|
||||
|
||||
@ui_bp.post("/buckets/<bucket_name>/objects/<path:object_key>/copy")
|
||||
def copy_object(bucket_name: str, object_key: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip()
|
||||
dest_key = str(payload.get("dest_key", "")).strip()
|
||||
|
||||
if not dest_key:
|
||||
return jsonify({"error": "dest_key is required"}), 400
|
||||
|
||||
try:
|
||||
_authorize_ui(principal, dest_bucket, "write", object_key=dest_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
|
||||
try:
|
||||
source_path = storage.get_object_path(bucket_name, object_key)
|
||||
source_metadata = storage.get_object_metadata(bucket_name, object_key)
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 404
|
||||
|
||||
try:
|
||||
with source_path.open("rb") as stream:
|
||||
storage.put_object(dest_bucket, dest_key, stream, metadata=source_metadata or None)
|
||||
return jsonify({
|
||||
"status": "ok",
|
||||
"message": f"Copied to {dest_bucket}/{dest_key}",
|
||||
"dest_bucket": dest_bucket,
|
||||
"dest_key": dest_key,
|
||||
})
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 400
|
||||
|
||||
|
||||
@ui_bp.post("/buckets/<bucket_name>/objects/<path:object_key>/move")
|
||||
def move_object(bucket_name: str, object_key: str):
|
||||
principal = _current_principal()
|
||||
try:
|
||||
_authorize_ui(principal, bucket_name, "read", object_key=object_key)
|
||||
_authorize_ui(principal, bucket_name, "delete", object_key=object_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
payload = request.get_json(silent=True) or {}
|
||||
dest_bucket = str(payload.get("dest_bucket", bucket_name)).strip()
|
||||
dest_key = str(payload.get("dest_key", "")).strip()
|
||||
|
||||
if not dest_key:
|
||||
return jsonify({"error": "dest_key is required"}), 400
|
||||
|
||||
if dest_bucket == bucket_name and dest_key == object_key:
|
||||
return jsonify({"error": "Cannot move object to the same location"}), 400
|
||||
|
||||
try:
|
||||
_authorize_ui(principal, dest_bucket, "write", object_key=dest_key)
|
||||
except IamError as exc:
|
||||
return jsonify({"error": str(exc)}), 403
|
||||
|
||||
storage = _storage()
|
||||
|
||||
try:
|
||||
source_path = storage.get_object_path(bucket_name, object_key)
|
||||
source_metadata = storage.get_object_metadata(bucket_name, object_key)
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 404
|
||||
|
||||
try:
|
||||
import io
|
||||
with source_path.open("rb") as f:
|
||||
data = f.read()
|
||||
storage.put_object(dest_bucket, dest_key, io.BytesIO(data), metadata=source_metadata or None)
|
||||
storage.delete_object(bucket_name, object_key)
|
||||
return jsonify({
|
||||
"status": "ok",
|
||||
"message": f"Moved to {dest_bucket}/{dest_key}",
|
||||
"dest_bucket": dest_bucket,
|
||||
"dest_key": dest_key,
|
||||
})
|
||||
except StorageError as exc:
|
||||
return jsonify({"error": str(exc)}), 400
|
||||
|
||||
|
||||
@ui_bp.get("/buckets/<bucket_name>/list-for-copy")
|
||||
def list_buckets_for_copy(bucket_name: str):
|
||||
principal = _current_principal()
|
||||
buckets = _storage().list_buckets()
|
||||
allowed = []
|
||||
for bucket in buckets:
|
||||
try:
|
||||
_authorize_ui(principal, bucket.name, "write")
|
||||
allowed.append(bucket.name)
|
||||
except IamError:
|
||||
pass
|
||||
return jsonify({"buckets": allowed})
|
||||
|
||||
|
||||
@ui_bp.app_errorhandler(404)
|
||||
def ui_not_found(error): # type: ignore[override]
|
||||
prefix = ui_bp.url_prefix or ""
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
"""Central location for the application version string."""
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.2.0"
|
||||
|
||||
@@ -1097,6 +1097,9 @@ pre code {
|
||||
|
||||
.modal-body {
|
||||
padding: 1.5rem;
|
||||
overflow-wrap: break-word;
|
||||
word-wrap: break-word;
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
.modal-footer {
|
||||
@@ -1750,3 +1753,67 @@ body.theme-transitioning * {
|
||||
border: 2px solid transparent;
|
||||
background: linear-gradient(var(--myfsio-card-bg), var(--myfsio-card-bg)) padding-box, linear-gradient(135deg, #3b82f6, #8b5cf6) border-box;
|
||||
}
|
||||
|
||||
#objects-table .dropdown-menu {
|
||||
position: fixed !important;
|
||||
z-index: 1050;
|
||||
}
|
||||
|
||||
.objects-header-responsive {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 0.5rem;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-title {
|
||||
flex: 0 0 auto;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 0.5rem;
|
||||
align-items: center;
|
||||
flex: 1;
|
||||
}
|
||||
|
||||
@media (max-width: 640px) {
|
||||
.objects-header-responsive {
|
||||
flex-direction: column;
|
||||
align-items: stretch;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-title {
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions {
|
||||
display: grid;
|
||||
grid-template-columns: 1fr 1fr;
|
||||
gap: 0.5rem;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions .btn {
|
||||
justify-content: center;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions .search-wrapper {
|
||||
grid-column: span 2;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions .search-wrapper input {
|
||||
max-width: 100% !important;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions .bulk-actions {
|
||||
grid-column: span 2;
|
||||
display: flex;
|
||||
gap: 0.5rem;
|
||||
}
|
||||
|
||||
.objects-header-responsive > .header-actions .bulk-actions .btn {
|
||||
flex: 1;
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user