297 lines
10 KiB
Python
297 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any, Generator, Optional
|
|
|
|
import boto3
|
|
from botocore.config import Config
|
|
from botocore.exceptions import ClientError, EndpointConnectionError, ConnectionClosedError
|
|
from flask import current_app, session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
UI_PROXY_USER_AGENT = "MyFSIO-UIProxy/1.0"
|
|
|
|
_BOTO_ERROR_MAP = {
|
|
"NoSuchBucket": 404,
|
|
"NoSuchKey": 404,
|
|
"NoSuchUpload": 404,
|
|
"BucketAlreadyExists": 409,
|
|
"BucketAlreadyOwnedByYou": 409,
|
|
"BucketNotEmpty": 409,
|
|
"AccessDenied": 403,
|
|
"InvalidAccessKeyId": 403,
|
|
"SignatureDoesNotMatch": 403,
|
|
"InvalidBucketName": 400,
|
|
"InvalidArgument": 400,
|
|
"MalformedXML": 400,
|
|
"EntityTooLarge": 400,
|
|
"QuotaExceeded": 403,
|
|
}
|
|
|
|
_UPLOAD_REGISTRY_MAX_AGE = 86400
|
|
_UPLOAD_REGISTRY_CLEANUP_INTERVAL = 3600
|
|
|
|
|
|
class UploadRegistry:
|
|
def __init__(self) -> None:
|
|
self._entries: dict[str, tuple[str, str, float]] = {}
|
|
self._lock = threading.Lock()
|
|
self._last_cleanup = time.monotonic()
|
|
|
|
def register(self, upload_id: str, bucket_name: str, object_key: str) -> None:
|
|
with self._lock:
|
|
self._entries[upload_id] = (bucket_name, object_key, time.monotonic())
|
|
self._maybe_cleanup()
|
|
|
|
def get_key(self, upload_id: str, bucket_name: str) -> Optional[str]:
|
|
with self._lock:
|
|
entry = self._entries.get(upload_id)
|
|
if entry is None:
|
|
return None
|
|
stored_bucket, key, created_at = entry
|
|
if stored_bucket != bucket_name:
|
|
return None
|
|
if time.monotonic() - created_at > _UPLOAD_REGISTRY_MAX_AGE:
|
|
del self._entries[upload_id]
|
|
return None
|
|
return key
|
|
|
|
def remove(self, upload_id: str) -> None:
|
|
with self._lock:
|
|
self._entries.pop(upload_id, None)
|
|
|
|
def _maybe_cleanup(self) -> None:
|
|
now = time.monotonic()
|
|
if now - self._last_cleanup < _UPLOAD_REGISTRY_CLEANUP_INTERVAL:
|
|
return
|
|
self._last_cleanup = now
|
|
cutoff = now - _UPLOAD_REGISTRY_MAX_AGE
|
|
stale = [uid for uid, (_, _, ts) in self._entries.items() if ts < cutoff]
|
|
for uid in stale:
|
|
del self._entries[uid]
|
|
|
|
|
|
class S3ProxyClient:
|
|
def __init__(self, api_base_url: str, region: str = "us-east-1") -> None:
|
|
if not api_base_url:
|
|
raise ValueError("api_base_url is required for S3ProxyClient")
|
|
self._api_base_url = api_base_url.rstrip("/")
|
|
self._region = region
|
|
self.upload_registry = UploadRegistry()
|
|
|
|
@property
|
|
def api_base_url(self) -> str:
|
|
return self._api_base_url
|
|
|
|
def get_client(self, access_key: str, secret_key: str) -> Any:
|
|
if not access_key or not secret_key:
|
|
raise ValueError("Both access_key and secret_key are required")
|
|
config = Config(
|
|
user_agent_extra=UI_PROXY_USER_AGENT,
|
|
connect_timeout=5,
|
|
read_timeout=30,
|
|
retries={"max_attempts": 0},
|
|
signature_version="s3v4",
|
|
s3={"addressing_style": "path"},
|
|
request_checksum_calculation="when_required",
|
|
response_checksum_validation="when_required",
|
|
)
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=self._api_base_url,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
region_name=self._region,
|
|
config=config,
|
|
)
|
|
|
|
|
|
def _get_proxy() -> S3ProxyClient:
|
|
proxy = current_app.extensions.get("s3_proxy")
|
|
if proxy is None:
|
|
raise RuntimeError(
|
|
"S3 proxy not configured. Set API_BASE_URL or run both API and UI servers."
|
|
)
|
|
return proxy
|
|
|
|
|
|
def _get_session_creds() -> tuple[str, str]:
|
|
secret_store = current_app.extensions["secret_store"]
|
|
secret_store.purge_expired()
|
|
token = session.get("cred_token")
|
|
if not token:
|
|
raise PermissionError("Not authenticated")
|
|
creds = secret_store.peek(token)
|
|
if not creds:
|
|
raise PermissionError("Session expired")
|
|
access_key = creds.get("access_key", "")
|
|
secret_key = creds.get("secret_key", "")
|
|
if not access_key or not secret_key:
|
|
raise PermissionError("Invalid session credentials")
|
|
return access_key, secret_key
|
|
|
|
|
|
def get_session_s3_client() -> Any:
|
|
proxy = _get_proxy()
|
|
access_key, secret_key = _get_session_creds()
|
|
return proxy.get_client(access_key, secret_key)
|
|
|
|
|
|
def get_upload_registry() -> UploadRegistry:
|
|
return _get_proxy().upload_registry
|
|
|
|
|
|
def handle_client_error(exc: ClientError) -> tuple[dict[str, str], int]:
|
|
error_info = exc.response.get("Error", {})
|
|
code = error_info.get("Code", "InternalError")
|
|
message = error_info.get("Message") or "S3 operation failed"
|
|
http_status = _BOTO_ERROR_MAP.get(code)
|
|
if http_status is None:
|
|
http_status = exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 500)
|
|
return {"error": message}, http_status
|
|
|
|
|
|
def handle_connection_error(exc: Exception) -> tuple[dict[str, str], int]:
|
|
logger.error("S3 API connection failed: %s", exc)
|
|
return {"error": "S3 API server is unreachable. Ensure the API server is running."}, 502
|
|
|
|
|
|
def format_datetime_display(dt: Any, display_tz: str = "UTC") -> str:
|
|
from .ui import _format_datetime_display
|
|
return _format_datetime_display(dt, display_tz)
|
|
|
|
|
|
def format_datetime_iso(dt: Any, display_tz: str = "UTC") -> str:
|
|
from .ui import _format_datetime_iso
|
|
return _format_datetime_iso(dt, display_tz)
|
|
|
|
|
|
def build_url_templates(bucket_name: str) -> dict[str, str]:
|
|
from flask import url_for
|
|
preview_t = url_for("ui.object_preview", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
delete_t = url_for("ui.delete_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
presign_t = url_for("ui.object_presign", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
versions_t = url_for("ui.object_versions", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
restore_t = url_for(
|
|
"ui.restore_object_version",
|
|
bucket_name=bucket_name,
|
|
object_key="KEY_PLACEHOLDER",
|
|
version_id="VERSION_ID_PLACEHOLDER",
|
|
)
|
|
tags_t = url_for("ui.object_tags", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
copy_t = url_for("ui.copy_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
move_t = url_for("ui.move_object", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
metadata_t = url_for("ui.object_metadata", bucket_name=bucket_name, object_key="KEY_PLACEHOLDER")
|
|
return {
|
|
"preview": preview_t,
|
|
"download": preview_t + "?download=1",
|
|
"presign": presign_t,
|
|
"delete": delete_t,
|
|
"versions": versions_t,
|
|
"restore": restore_t,
|
|
"tags": tags_t,
|
|
"copy": copy_t,
|
|
"move": move_t,
|
|
"metadata": metadata_t,
|
|
}
|
|
|
|
|
|
def translate_list_objects(
|
|
boto3_response: dict[str, Any],
|
|
url_templates: dict[str, str],
|
|
display_tz: str = "UTC",
|
|
versioning_enabled: bool = False,
|
|
) -> dict[str, Any]:
|
|
objects_data = []
|
|
for obj in boto3_response.get("Contents", []):
|
|
last_mod = obj["LastModified"]
|
|
objects_data.append({
|
|
"key": obj["Key"],
|
|
"size": obj["Size"],
|
|
"last_modified": last_mod.isoformat(),
|
|
"last_modified_display": format_datetime_display(last_mod, display_tz),
|
|
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
|
|
"etag": obj.get("ETag", "").strip('"'),
|
|
})
|
|
return {
|
|
"objects": objects_data,
|
|
"is_truncated": boto3_response.get("IsTruncated", False),
|
|
"next_continuation_token": boto3_response.get("NextContinuationToken"),
|
|
"total_count": boto3_response.get("KeyCount", len(objects_data)),
|
|
"versioning_enabled": versioning_enabled,
|
|
"url_templates": url_templates,
|
|
}
|
|
|
|
|
|
def get_versioning_via_s3(client: Any, bucket_name: str) -> bool:
|
|
try:
|
|
resp = client.get_bucket_versioning(Bucket=bucket_name)
|
|
return resp.get("Status") == "Enabled"
|
|
except ClientError as exc:
|
|
code = exc.response.get("Error", {}).get("Code", "")
|
|
if code != "NoSuchBucket":
|
|
logger.warning("Failed to check versioning for %s: %s", bucket_name, code)
|
|
return False
|
|
|
|
|
|
def stream_objects_ndjson(
|
|
client: Any,
|
|
bucket_name: str,
|
|
prefix: Optional[str],
|
|
url_templates: dict[str, str],
|
|
display_tz: str = "UTC",
|
|
versioning_enabled: bool = False,
|
|
delimiter: Optional[str] = None,
|
|
) -> Generator[str, None, None]:
|
|
meta_line = json.dumps({
|
|
"type": "meta",
|
|
"versioning_enabled": versioning_enabled,
|
|
"url_templates": url_templates,
|
|
}) + "\n"
|
|
yield meta_line
|
|
|
|
yield json.dumps({"type": "count", "total_count": 0}) + "\n"
|
|
|
|
kwargs: dict[str, Any] = {"Bucket": bucket_name, "MaxKeys": 1000}
|
|
if prefix:
|
|
kwargs["Prefix"] = prefix
|
|
if delimiter:
|
|
kwargs["Delimiter"] = delimiter
|
|
|
|
running_count = 0
|
|
try:
|
|
paginator = client.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(**kwargs):
|
|
for cp in page.get("CommonPrefixes", []):
|
|
yield json.dumps({
|
|
"type": "folder",
|
|
"prefix": cp["Prefix"],
|
|
}) + "\n"
|
|
page_contents = page.get("Contents", [])
|
|
for obj in page_contents:
|
|
last_mod = obj["LastModified"]
|
|
yield json.dumps({
|
|
"type": "object",
|
|
"key": obj["Key"],
|
|
"size": obj["Size"],
|
|
"last_modified": last_mod.isoformat(),
|
|
"last_modified_display": format_datetime_display(last_mod, display_tz),
|
|
"last_modified_iso": format_datetime_iso(last_mod, display_tz),
|
|
"etag": obj.get("ETag", "").strip('"'),
|
|
}) + "\n"
|
|
running_count += len(page_contents)
|
|
yield json.dumps({"type": "count", "total_count": running_count}) + "\n"
|
|
except ClientError as exc:
|
|
error_msg = exc.response.get("Error", {}).get("Message", "S3 operation failed")
|
|
yield json.dumps({"type": "error", "error": error_msg}) + "\n"
|
|
return
|
|
except (EndpointConnectionError, ConnectionClosedError):
|
|
yield json.dumps({"type": "error", "error": "S3 API server is unreachable"}) + "\n"
|
|
return
|
|
|
|
yield json.dumps({"type": "done"}) + "\n"
|