Files
MyFSIO/app/__init__.py

764 lines
31 KiB
Python

from __future__ import annotations
import html as html_module
import itertools
import logging
import mimetypes
import os
import shutil
import sys
import time
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import timedelta
from typing import Any, Dict, List, Optional
from flask import Flask, Response, g, has_request_context, redirect, render_template, request, url_for
from flask_cors import CORS
from flask_wtf.csrf import CSRFError
from werkzeug.middleware.proxy_fix import ProxyFix
import io
from .access_logging import AccessLoggingService
from .operation_metrics import OperationMetricsCollector, classify_endpoint
from .compression import GzipMiddleware
from .acl import AclService
from .bucket_policies import BucketPolicyStore
from .config import AppConfig
from .connections import ConnectionStore
from .encryption import EncryptionManager
from .extensions import limiter, csrf
from .iam import IamService
from .kms import KMSManager
from .gc import GarbageCollector
from .integrity import IntegrityChecker
from .lifecycle import LifecycleManager
from .notifications import NotificationService
from .object_lock import ObjectLockService
from .replication import ReplicationManager
from .secret_store import EphemeralSecretStore
from .site_registry import SiteRegistry, SiteInfo
from .storage import ObjectStorage, StorageError
from .version import get_version
from .website_domains import WebsiteDomainStore
_request_counter = itertools.count(1)
class _ChunkedTransferMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
if environ.get("REQUEST_METHOD") not in ("PUT", "POST"):
return self.app(environ, start_response)
transfer_encoding = environ.get("HTTP_TRANSFER_ENCODING", "")
content_length = environ.get("CONTENT_LENGTH")
if "chunked" in transfer_encoding.lower():
if content_length:
del environ["HTTP_TRANSFER_ENCODING"]
else:
raw = environ.get("wsgi.input")
if raw:
try:
if hasattr(raw, "seek"):
raw.seek(0)
body = raw.read()
except Exception:
body = b""
if body:
environ["wsgi.input"] = io.BytesIO(body)
environ["CONTENT_LENGTH"] = str(len(body))
del environ["HTTP_TRANSFER_ENCODING"]
content_length = environ.get("CONTENT_LENGTH")
if not content_length or content_length == "0":
sha256 = environ.get("HTTP_X_AMZ_CONTENT_SHA256", "")
decoded_len = environ.get("HTTP_X_AMZ_DECODED_CONTENT_LENGTH", "")
content_encoding = environ.get("HTTP_CONTENT_ENCODING", "")
if ("STREAMING" in sha256.upper() or decoded_len
or "aws-chunked" in content_encoding.lower()):
raw = environ.get("wsgi.input")
if raw:
try:
if hasattr(raw, "seek"):
raw.seek(0)
body = raw.read()
except Exception:
body = b""
if body:
environ["wsgi.input"] = io.BytesIO(body)
environ["CONTENT_LENGTH"] = str(len(body))
raw = environ.get("wsgi.input")
if raw and hasattr(raw, "seek"):
try:
raw.seek(0)
except Exception:
pass
return self.app(environ, start_response)
def _migrate_config_file(active_path: Path, legacy_paths: List[Path]) -> Path:
"""Migrate config file from legacy locations to the active path.
Checks each legacy path in order and moves the first one found to the active path.
This ensures backward compatibility for users upgrading from older versions.
"""
active_path.parent.mkdir(parents=True, exist_ok=True)
if active_path.exists():
return active_path
for legacy_path in legacy_paths:
if legacy_path.exists():
try:
shutil.move(str(legacy_path), str(active_path))
except OSError:
shutil.copy2(legacy_path, active_path)
try:
legacy_path.unlink(missing_ok=True)
except OSError:
pass
break
return active_path
def create_app(
test_config: Optional[Dict[str, Any]] = None,
*,
include_api: bool = True,
include_ui: bool = True,
) -> Flask:
"""Create and configure the Flask application."""
config = AppConfig.from_env(test_config)
if getattr(sys, "frozen", False):
project_root = Path(sys._MEIPASS)
else:
project_root = Path(__file__).resolve().parent.parent
app = Flask(
__name__,
static_folder=str(project_root / "static"),
template_folder=str(project_root / "templates"),
)
app.config.update(config.to_flask_config())
if test_config:
app.config.update(test_config)
app.config.setdefault("APP_VERSION", get_version())
app.permanent_session_lifetime = timedelta(days=int(app.config.get("SESSION_LIFETIME_DAYS", 30)))
if app.config.get("TESTING"):
app.config.setdefault("WTF_CSRF_ENABLED", False)
# Trust X-Forwarded-* headers from proxies
num_proxies = app.config.get("NUM_TRUSTED_PROXIES", 1)
if num_proxies:
if "NUM_TRUSTED_PROXIES" not in os.environ:
logging.getLogger(__name__).warning(
"NUM_TRUSTED_PROXIES not set, defaulting to 1. "
"Set NUM_TRUSTED_PROXIES=0 if not behind a reverse proxy."
)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=num_proxies, x_proto=num_proxies, x_host=num_proxies, x_prefix=num_proxies)
if app.config.get("ENABLE_GZIP", True):
app.wsgi_app = GzipMiddleware(app.wsgi_app, compression_level=6)
app.wsgi_app = _ChunkedTransferMiddleware(app.wsgi_app)
_configure_cors(app)
_configure_logging(app)
limiter.init_app(app)
csrf.init_app(app)
storage = ObjectStorage(
Path(app.config["STORAGE_ROOT"]),
cache_ttl=app.config.get("OBJECT_CACHE_TTL", 60),
object_cache_max_size=app.config.get("OBJECT_CACHE_MAX_SIZE", 100),
bucket_config_cache_ttl=app.config.get("BUCKET_CONFIG_CACHE_TTL_SECONDS", 30.0),
object_key_max_length_bytes=app.config.get("OBJECT_KEY_MAX_LENGTH_BYTES", 1024),
meta_read_cache_max=app.config.get("META_READ_CACHE_MAX", 2048),
)
if app.config.get("WARM_CACHE_ON_STARTUP", True) and not app.config.get("TESTING"):
storage.warm_cache_async()
iam = IamService(
Path(app.config["IAM_CONFIG"]),
auth_max_attempts=app.config.get("AUTH_MAX_ATTEMPTS", 5),
auth_lockout_minutes=app.config.get("AUTH_LOCKOUT_MINUTES", 15),
encryption_key=app.config.get("SECRET_KEY"),
)
bucket_policies = BucketPolicyStore(Path(app.config["BUCKET_POLICY_PATH"]))
secret_store = EphemeralSecretStore(default_ttl=app.config.get("SECRET_TTL_SECONDS", 300))
storage_root = Path(app.config["STORAGE_ROOT"])
config_dir = storage_root / ".myfsio.sys" / "config"
config_dir.mkdir(parents=True, exist_ok=True)
connections_path = _migrate_config_file(
active_path=config_dir / "connections.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "connections.json",
storage_root / ".connections.json",
],
)
replication_rules_path = _migrate_config_file(
active_path=config_dir / "replication_rules.json",
legacy_paths=[
storage_root / ".myfsio.sys" / "replication_rules.json",
storage_root / ".replication_rules.json",
],
)
connections = ConnectionStore(connections_path)
replication = ReplicationManager(
storage,
connections,
replication_rules_path,
storage_root,
connect_timeout=app.config.get("REPLICATION_CONNECT_TIMEOUT_SECONDS", 5),
read_timeout=app.config.get("REPLICATION_READ_TIMEOUT_SECONDS", 30),
max_retries=app.config.get("REPLICATION_MAX_RETRIES", 2),
streaming_threshold_bytes=app.config.get("REPLICATION_STREAMING_THRESHOLD_BYTES", 10 * 1024 * 1024),
max_failures_per_bucket=app.config.get("REPLICATION_MAX_FAILURES_PER_BUCKET", 50),
)
site_registry_path = config_dir / "site_registry.json"
site_registry = SiteRegistry(site_registry_path)
if app.config.get("SITE_ID") and not site_registry.get_local_site():
site_registry.set_local_site(SiteInfo(
site_id=app.config["SITE_ID"],
endpoint=app.config.get("SITE_ENDPOINT") or "",
region=app.config.get("SITE_REGION", "us-east-1"),
priority=app.config.get("SITE_PRIORITY", 100),
))
encryption_config = {
"encryption_enabled": app.config.get("ENCRYPTION_ENABLED", False),
"encryption_master_key_path": app.config.get("ENCRYPTION_MASTER_KEY_PATH"),
"default_encryption_algorithm": app.config.get("DEFAULT_ENCRYPTION_ALGORITHM", "AES256"),
"encryption_chunk_size_bytes": app.config.get("ENCRYPTION_CHUNK_SIZE_BYTES", 64 * 1024),
}
encryption_manager = EncryptionManager(encryption_config)
kms_manager = None
if app.config.get("KMS_ENABLED", False):
kms_keys_path = Path(app.config.get("KMS_KEYS_PATH", ""))
kms_master_key_path = Path(app.config.get("ENCRYPTION_MASTER_KEY_PATH", ""))
kms_manager = KMSManager(
kms_keys_path,
kms_master_key_path,
generate_data_key_min_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MIN_BYTES", 1),
generate_data_key_max_bytes=app.config.get("KMS_GENERATE_DATA_KEY_MAX_BYTES", 1024),
)
encryption_manager.set_kms_provider(kms_manager)
if app.config.get("ENCRYPTION_ENABLED", False):
from .encrypted_storage import EncryptedObjectStorage
storage = EncryptedObjectStorage(storage, encryption_manager)
acl_service = AclService(storage_root)
object_lock_service = ObjectLockService(storage_root)
notification_service = NotificationService(
storage_root,
allow_internal_endpoints=app.config.get("ALLOW_INTERNAL_ENDPOINTS", False),
)
access_logging_service = AccessLoggingService(storage_root)
access_logging_service.set_storage(storage)
lifecycle_manager = None
if app.config.get("LIFECYCLE_ENABLED", False):
base_storage = storage.storage if hasattr(storage, 'storage') else storage
lifecycle_manager = LifecycleManager(
base_storage,
interval_seconds=app.config.get("LIFECYCLE_INTERVAL_SECONDS", 3600),
storage_root=storage_root,
max_history_per_bucket=app.config.get("LIFECYCLE_MAX_HISTORY_PER_BUCKET", 50),
)
lifecycle_manager.start()
gc_collector = None
if app.config.get("GC_ENABLED", False):
gc_collector = GarbageCollector(
storage_root=storage_root,
interval_hours=app.config.get("GC_INTERVAL_HOURS", 6.0),
temp_file_max_age_hours=app.config.get("GC_TEMP_FILE_MAX_AGE_HOURS", 24.0),
multipart_max_age_days=app.config.get("GC_MULTIPART_MAX_AGE_DAYS", 7),
lock_file_max_age_hours=app.config.get("GC_LOCK_FILE_MAX_AGE_HOURS", 1.0),
dry_run=app.config.get("GC_DRY_RUN", False),
io_throttle_ms=app.config.get("GC_IO_THROTTLE_MS", 10),
)
gc_collector.start()
integrity_checker = None
if app.config.get("INTEGRITY_ENABLED", False):
integrity_checker = IntegrityChecker(
storage_root=storage_root,
interval_hours=app.config.get("INTEGRITY_INTERVAL_HOURS", 24.0),
batch_size=app.config.get("INTEGRITY_BATCH_SIZE", 1000),
auto_heal=app.config.get("INTEGRITY_AUTO_HEAL", False),
dry_run=app.config.get("INTEGRITY_DRY_RUN", False),
io_throttle_ms=app.config.get("INTEGRITY_IO_THROTTLE_MS", 10),
)
integrity_checker.start()
app.extensions["object_storage"] = storage
app.extensions["iam"] = iam
app.extensions["bucket_policies"] = bucket_policies
app.extensions["secret_store"] = secret_store
app.extensions["limiter"] = limiter
app.extensions["connections"] = connections
app.extensions["replication"] = replication
app.extensions["encryption"] = encryption_manager
app.extensions["kms"] = kms_manager
app.extensions["acl"] = acl_service
app.extensions["lifecycle"] = lifecycle_manager
app.extensions["gc"] = gc_collector
app.extensions["integrity"] = integrity_checker
app.extensions["object_lock"] = object_lock_service
app.extensions["notifications"] = notification_service
app.extensions["access_logging"] = access_logging_service
app.extensions["site_registry"] = site_registry
website_domains_store = None
if app.config.get("WEBSITE_HOSTING_ENABLED", False):
website_domains_path = config_dir / "website_domains.json"
website_domains_store = WebsiteDomainStore(website_domains_path)
app.extensions["website_domains"] = website_domains_store
from .s3_client import S3ProxyClient
api_base = app.config.get("API_BASE_URL") or "http://127.0.0.1:5000"
app.extensions["s3_proxy"] = S3ProxyClient(
api_base_url=api_base,
region=app.config.get("AWS_REGION", "us-east-1"),
)
operation_metrics_collector = None
if app.config.get("OPERATION_METRICS_ENABLED", False):
operation_metrics_collector = OperationMetricsCollector(
storage_root,
interval_minutes=app.config.get("OPERATION_METRICS_INTERVAL_MINUTES", 5),
retention_hours=app.config.get("OPERATION_METRICS_RETENTION_HOURS", 24),
)
app.extensions["operation_metrics"] = operation_metrics_collector
system_metrics_collector = None
if app.config.get("METRICS_HISTORY_ENABLED", False):
from .system_metrics import SystemMetricsCollector
system_metrics_collector = SystemMetricsCollector(
storage_root,
interval_minutes=app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5),
retention_hours=app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24),
)
system_metrics_collector.set_storage(storage)
app.extensions["system_metrics"] = system_metrics_collector
site_sync_worker = None
if app.config.get("SITE_SYNC_ENABLED", False):
from .site_sync import SiteSyncWorker
site_sync_worker = SiteSyncWorker(
storage=storage,
connections=connections,
replication_manager=replication,
storage_root=storage_root,
interval_seconds=app.config.get("SITE_SYNC_INTERVAL_SECONDS", 60),
batch_size=app.config.get("SITE_SYNC_BATCH_SIZE", 100),
connect_timeout=app.config.get("SITE_SYNC_CONNECT_TIMEOUT_SECONDS", 10),
read_timeout=app.config.get("SITE_SYNC_READ_TIMEOUT_SECONDS", 120),
max_retries=app.config.get("SITE_SYNC_MAX_RETRIES", 2),
clock_skew_tolerance_seconds=app.config.get("SITE_SYNC_CLOCK_SKEW_TOLERANCE_SECONDS", 1.0),
)
site_sync_worker.start()
app.extensions["site_sync"] = site_sync_worker
@app.errorhandler(500)
def internal_error(error):
wants_html = request.accept_mimetypes.accept_html
path = request.path or ""
if include_ui and wants_html and (path.startswith("/ui") or path == "/"):
return render_template('500.html'), 500
error_xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error>'
'<Code>InternalError</Code>'
'<Message>An internal server error occurred</Message>'
f'<Resource>{path}</Resource>'
f'<RequestId>{getattr(g, "request_id", "-")}</RequestId>'
'</Error>'
)
return error_xml, 500, {'Content-Type': 'application/xml'}
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
wants_html = request.accept_mimetypes.accept_html
path = request.path or ""
if include_ui and wants_html and (path.startswith("/ui") or path == "/"):
return render_template('csrf_error.html', reason=e.description), 400
error_xml = (
'<?xml version="1.0" encoding="UTF-8"?>'
'<Error>'
'<Code>CSRFError</Code>'
f'<Message>{e.description}</Message>'
f'<Resource>{path}</Resource>'
f'<RequestId>{getattr(g, "request_id", "-")}</RequestId>'
'</Error>'
)
return error_xml, 400, {'Content-Type': 'application/xml'}
@app.template_filter("filesizeformat")
def filesizeformat(value: int) -> str:
"""Format bytes as human-readable file size."""
for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
if abs(value) < 1024.0 or unit == "PB":
if unit == "B":
return f"{int(value)} {unit}"
return f"{value:.1f} {unit}"
value /= 1024.0
return f"{value:.1f} PB"
@app.template_filter("timestamp_to_datetime")
def timestamp_to_datetime(value: float) -> str:
"""Format Unix timestamp as human-readable datetime in configured timezone."""
from datetime import datetime, timezone as dt_timezone
from zoneinfo import ZoneInfo
if not value:
return "Never"
try:
dt_utc = datetime.fromtimestamp(value, dt_timezone.utc)
display_tz = app.config.get("DISPLAY_TIMEZONE", "UTC")
if display_tz and display_tz != "UTC":
try:
tz = ZoneInfo(display_tz)
dt_local = dt_utc.astimezone(tz)
return dt_local.strftime("%Y-%m-%d %H:%M:%S")
except (KeyError, ValueError):
pass
return dt_utc.strftime("%Y-%m-%d %H:%M:%S UTC")
except (ValueError, OSError):
return "Unknown"
@app.template_filter("format_datetime")
def format_datetime_filter(dt, include_tz: bool = True) -> str:
"""Format datetime object as human-readable string in configured timezone."""
from datetime import datetime, timezone as dt_timezone
from zoneinfo import ZoneInfo
if not dt:
return ""
try:
display_tz = app.config.get("DISPLAY_TIMEZONE", "UTC")
if display_tz and display_tz != "UTC":
try:
tz = ZoneInfo(display_tz)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=dt_timezone.utc)
dt = dt.astimezone(tz)
except (KeyError, ValueError):
pass
tz_abbr = dt.strftime("%Z") or "UTC"
if include_tz:
return f"{dt.strftime('%b %d, %Y %H:%M')} ({tz_abbr})"
return dt.strftime("%b %d, %Y %H:%M")
except (ValueError, AttributeError):
return str(dt)
if include_api:
from .s3_api import s3_api_bp
from .kms_api import kms_api_bp
from .admin_api import admin_api_bp
app.register_blueprint(s3_api_bp)
app.register_blueprint(kms_api_bp)
app.register_blueprint(admin_api_bp)
csrf.exempt(s3_api_bp)
csrf.exempt(kms_api_bp)
csrf.exempt(admin_api_bp)
if include_ui:
from .ui import ui_bp
app.register_blueprint(ui_bp)
if not include_api:
@app.get("/")
def ui_root_redirect():
return redirect(url_for("ui.buckets_overview"))
@app.errorhandler(404)
def handle_not_found(error):
wants_html = request.accept_mimetypes.accept_html
path = request.path or ""
if include_ui and wants_html:
if not include_api or path.startswith("/ui") or path == "/":
return render_template("404.html"), 404
return error
@app.get("/myfsio/health")
def healthcheck() -> Dict[str, str]:
return {"status": "ok"}
return app
def create_api_app(test_config: Optional[Dict[str, Any]] = None) -> Flask:
return create_app(test_config, include_api=True, include_ui=False)
def create_ui_app(test_config: Optional[Dict[str, Any]] = None) -> Flask:
return create_app(test_config, include_api=False, include_ui=True)
def _configure_cors(app: Flask) -> None:
origins = app.config.get("CORS_ORIGINS", ["*"])
methods = app.config.get("CORS_METHODS", ["GET", "PUT", "POST", "DELETE", "OPTIONS", "HEAD"])
allow_headers = app.config.get("CORS_ALLOW_HEADERS", ["*"])
expose_headers = app.config.get("CORS_EXPOSE_HEADERS", ["*"])
CORS(
app,
resources={r"/*": {"origins": origins, "methods": methods, "allow_headers": allow_headers, "expose_headers": expose_headers}},
supports_credentials=True,
)
class _RequestContextFilter(logging.Filter):
"""Inject request-specific attributes into log records."""
def filter(self, record: logging.LogRecord) -> bool:
if has_request_context():
record.request_id = getattr(g, "request_id", "-")
record.path = request.path
record.method = request.method
record.remote_addr = request.remote_addr or "-"
else:
record.request_id = getattr(record, "request_id", "-")
record.path = getattr(record, "path", "-")
record.method = getattr(record, "method", "-")
record.remote_addr = getattr(record, "remote_addr", "-")
return True
def _configure_logging(app: Flask) -> None:
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(request_id)s | %(method)s %(path)s | %(message)s"
)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
stream_handler.addFilter(_RequestContextFilter())
logger = app.logger
for handler in logger.handlers[:]:
handler.close()
logger.handlers.clear()
logger.addHandler(stream_handler)
if app.config.get("LOG_TO_FILE"):
log_file = Path(app.config["LOG_FILE"])
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = RotatingFileHandler(
log_file,
maxBytes=int(app.config.get("LOG_MAX_BYTES", 5 * 1024 * 1024)),
backupCount=int(app.config.get("LOG_BACKUP_COUNT", 3)),
encoding="utf-8",
)
file_handler.setFormatter(formatter)
file_handler.addFilter(_RequestContextFilter())
logger.addHandler(file_handler)
logger.setLevel(getattr(logging, app.config.get("LOG_LEVEL", "INFO"), logging.INFO))
@app.before_request
def _log_request_start() -> None:
g.request_id = f"{os.getpid():x}{next(_request_counter):012x}"
g.request_started_at = time.perf_counter()
g.request_bytes_in = request.content_length or 0
@app.before_request
def _maybe_serve_website():
if not app.config.get("WEBSITE_HOSTING_ENABLED"):
return None
if request.method not in {"GET", "HEAD"}:
return None
host = request.host
if ":" in host:
host = host.rsplit(":", 1)[0]
host = host.lower()
store = app.extensions.get("website_domains")
if not store:
return None
bucket = store.get_bucket(host)
if not bucket:
return None
storage = app.extensions["object_storage"]
if not storage.bucket_exists(bucket):
return _website_error_response(404, "Not Found")
website_config = storage.get_bucket_website(bucket)
if not website_config:
return _website_error_response(404, "Not Found")
index_doc = website_config.get("index_document", "index.html")
error_doc = website_config.get("error_document")
req_path = request.path.lstrip("/")
if not req_path or req_path.endswith("/"):
object_key = req_path + index_doc
else:
object_key = req_path
try:
obj_path = storage.get_object_path(bucket, object_key)
except (StorageError, OSError):
if object_key == req_path:
try:
obj_path = storage.get_object_path(bucket, req_path + "/" + index_doc)
object_key = req_path + "/" + index_doc
except (StorageError, OSError):
return _serve_website_error(storage, bucket, error_doc, 404)
else:
return _serve_website_error(storage, bucket, error_doc, 404)
content_type = mimetypes.guess_type(object_key)[0] or "application/octet-stream"
is_encrypted = False
try:
metadata = storage.get_object_metadata(bucket, object_key)
is_encrypted = "x-amz-server-side-encryption" in metadata
except (StorageError, OSError):
pass
if is_encrypted and hasattr(storage, "get_object_data"):
try:
data, _ = storage.get_object_data(bucket, object_key)
file_size = len(data)
except (StorageError, OSError):
return _website_error_response(500, "Internal Server Error")
else:
data = None
try:
stat = obj_path.stat()
file_size = stat.st_size
except OSError:
return _website_error_response(500, "Internal Server Error")
if request.method == "HEAD":
response = Response(status=200)
response.headers["Content-Length"] = file_size
response.headers["Content-Type"] = content_type
response.headers["Accept-Ranges"] = "bytes"
return response
from .s3_api import _parse_range_header
range_header = request.headers.get("Range")
if range_header:
ranges = _parse_range_header(range_header, file_size)
if ranges is None:
return Response(status=416, headers={"Content-Range": f"bytes */{file_size}"})
start, end = ranges[0]
length = end - start + 1
if data is not None:
partial_data = data[start:end + 1]
response = Response(partial_data, status=206, mimetype=content_type)
else:
def _stream_range(file_path, start_pos, length_to_read):
with file_path.open("rb") as f:
f.seek(start_pos)
remaining = length_to_read
while remaining > 0:
chunk = f.read(min(262144, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
response = Response(_stream_range(obj_path, start, length), status=206, mimetype=content_type, direct_passthrough=True)
response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
response.headers["Content-Length"] = length
response.headers["Accept-Ranges"] = "bytes"
return response
if data is not None:
response = Response(data, mimetype=content_type)
response.headers["Content-Length"] = file_size
response.headers["Accept-Ranges"] = "bytes"
return response
def _stream(file_path):
with file_path.open("rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
yield chunk
response = Response(_stream(obj_path), mimetype=content_type, direct_passthrough=True)
response.headers["Content-Length"] = file_size
response.headers["Accept-Ranges"] = "bytes"
return response
def _serve_website_error(storage, bucket, error_doc_key, status_code):
if not error_doc_key:
return _website_error_response(status_code, "Not Found" if status_code == 404 else "Error")
try:
obj_path = storage.get_object_path(bucket, error_doc_key)
except (StorageError, OSError):
return _website_error_response(status_code, "Not Found")
content_type = mimetypes.guess_type(error_doc_key)[0] or "text/html"
is_encrypted = False
try:
metadata = storage.get_object_metadata(bucket, error_doc_key)
is_encrypted = "x-amz-server-side-encryption" in metadata
except (StorageError, OSError):
pass
if is_encrypted and hasattr(storage, "get_object_data"):
try:
data, _ = storage.get_object_data(bucket, error_doc_key)
response = Response(data, status=status_code, mimetype=content_type)
response.headers["Content-Length"] = len(data)
return response
except (StorageError, OSError):
return _website_error_response(status_code, "Not Found")
try:
data = obj_path.read_bytes()
response = Response(data, status=status_code, mimetype=content_type)
response.headers["Content-Length"] = len(data)
return response
except OSError:
return _website_error_response(status_code, "Not Found")
def _website_error_response(status_code, message):
safe_msg = html_module.escape(str(message))
safe_code = html_module.escape(str(status_code))
body = f"<html><head><title>{safe_code} {safe_msg}</title></head><body><h1>{safe_code} {safe_msg}</h1></body></html>"
return Response(body, status=status_code, mimetype="text/html")
@app.after_request
def _log_request_end(response):
duration_ms = 0.0
if hasattr(g, "request_started_at"):
duration_ms = (time.perf_counter() - g.request_started_at) * 1000
request_id = getattr(g, "request_id", f"{os.getpid():x}{next(_request_counter):012x}")
response.headers.setdefault("X-Request-ID", request_id)
if app.logger.isEnabledFor(logging.INFO):
app.logger.info(
"Request completed",
extra={
"path": request.path,
"method": request.method,
"remote_addr": request.remote_addr,
},
)
response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}"
response.headers["Server"] = "MyFSIO"
operation_metrics = app.extensions.get("operation_metrics")
if operation_metrics:
bytes_in = getattr(g, "request_bytes_in", 0)
bytes_out = response.content_length or 0
error_code = getattr(g, "s3_error_code", None)
endpoint_type = classify_endpoint(request.path)
operation_metrics.record_request(
method=request.method,
endpoint_type=endpoint_type,
status_code=response.status_code,
latency_ms=duration_ms,
bytes_in=bytes_in,
bytes_out=bytes_out,
error_code=error_code,
)
return response