diff --git a/app/__init__.py b/app/__init__.py index ac059c7..ad555e1 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -16,6 +16,7 @@ from flask_wtf.csrf import CSRFError from werkzeug.middleware.proxy_fix import ProxyFix from .access_logging import AccessLoggingService +from .operation_metrics import OperationMetricsCollector, classify_endpoint from .compression import GzipMiddleware from .acl import AclService from .bucket_policies import BucketPolicyStore @@ -187,6 +188,15 @@ def create_app( app.extensions["notifications"] = notification_service app.extensions["access_logging"] = access_logging_service + operation_metrics_collector = None + if app.config.get("OPERATION_METRICS_ENABLED", False): + operation_metrics_collector = OperationMetricsCollector( + storage_root, + interval_minutes=app.config.get("OPERATION_METRICS_INTERVAL_MINUTES", 5), + retention_hours=app.config.get("OPERATION_METRICS_RETENTION_HOURS", 24), + ) + app.extensions["operation_metrics"] = operation_metrics_collector + @app.errorhandler(500) def internal_error(error): return render_template('500.html'), 500 @@ -356,6 +366,7 @@ def _configure_logging(app: Flask) -> None: def _log_request_start() -> None: g.request_id = uuid.uuid4().hex g.request_started_at = time.perf_counter() + g.request_bytes_in = request.content_length or 0 app.logger.info( "Request started", extra={"path": request.path, "method": request.method, "remote_addr": request.remote_addr}, @@ -377,4 +388,21 @@ def _configure_logging(app: Flask) -> None: }, ) response.headers["X-Request-Duration-ms"] = f"{duration_ms:.2f}" + + operation_metrics = app.extensions.get("operation_metrics") + if operation_metrics: + bytes_in = getattr(g, "request_bytes_in", 0) + bytes_out = response.content_length or 0 + error_code = getattr(g, "s3_error_code", None) + endpoint_type = classify_endpoint(request.path) + operation_metrics.record_request( + method=request.method, + endpoint_type=endpoint_type, + status_code=response.status_code, + latency_ms=duration_ms, + bytes_in=bytes_in, + bytes_out=bytes_out, + error_code=error_code, + ) + return response diff --git a/app/config.py b/app/config.py index 80420d9..2778963 100644 --- a/app/config.py +++ b/app/config.py @@ -87,6 +87,9 @@ class AppConfig: metrics_history_enabled: bool metrics_history_retention_hours: int metrics_history_interval_minutes: int + operation_metrics_enabled: bool + operation_metrics_interval_minutes: int + operation_metrics_retention_hours: int @classmethod def from_env(cls, overrides: Optional[Dict[str, Any]] = None) -> "AppConfig": @@ -186,6 +189,9 @@ class AppConfig: metrics_history_enabled = str(_get("METRICS_HISTORY_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} metrics_history_retention_hours = int(_get("METRICS_HISTORY_RETENTION_HOURS", 24)) metrics_history_interval_minutes = int(_get("METRICS_HISTORY_INTERVAL_MINUTES", 5)) + operation_metrics_enabled = str(_get("OPERATION_METRICS_ENABLED", "0")).lower() in {"1", "true", "yes", "on"} + operation_metrics_interval_minutes = int(_get("OPERATION_METRICS_INTERVAL_MINUTES", 5)) + operation_metrics_retention_hours = int(_get("OPERATION_METRICS_RETENTION_HOURS", 24)) return cls(storage_root=storage_root, max_upload_size=max_upload_size, @@ -227,7 +233,10 @@ class AppConfig: lifecycle_interval_seconds=lifecycle_interval_seconds, metrics_history_enabled=metrics_history_enabled, metrics_history_retention_hours=metrics_history_retention_hours, - metrics_history_interval_minutes=metrics_history_interval_minutes) + metrics_history_interval_minutes=metrics_history_interval_minutes, + operation_metrics_enabled=operation_metrics_enabled, + operation_metrics_interval_minutes=operation_metrics_interval_minutes, + operation_metrics_retention_hours=operation_metrics_retention_hours) def validate_and_report(self) -> list[str]: """Validate configuration and return a list of warnings/issues. @@ -359,4 +368,7 @@ class AppConfig: "METRICS_HISTORY_ENABLED": self.metrics_history_enabled, "METRICS_HISTORY_RETENTION_HOURS": self.metrics_history_retention_hours, "METRICS_HISTORY_INTERVAL_MINUTES": self.metrics_history_interval_minutes, + "OPERATION_METRICS_ENABLED": self.operation_metrics_enabled, + "OPERATION_METRICS_INTERVAL_MINUTES": self.operation_metrics_interval_minutes, + "OPERATION_METRICS_RETENTION_HOURS": self.operation_metrics_retention_hours, } diff --git a/app/operation_metrics.py b/app/operation_metrics.py new file mode 100644 index 0000000..67a63c2 --- /dev/null +++ b/app/operation_metrics.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import json +import logging +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class OperationStats: + count: int = 0 + success_count: int = 0 + error_count: int = 0 + latency_sum_ms: float = 0.0 + latency_min_ms: float = float("inf") + latency_max_ms: float = 0.0 + bytes_in: int = 0 + bytes_out: int = 0 + + def record(self, latency_ms: float, success: bool, bytes_in: int = 0, bytes_out: int = 0) -> None: + self.count += 1 + if success: + self.success_count += 1 + else: + self.error_count += 1 + self.latency_sum_ms += latency_ms + if latency_ms < self.latency_min_ms: + self.latency_min_ms = latency_ms + if latency_ms > self.latency_max_ms: + self.latency_max_ms = latency_ms + self.bytes_in += bytes_in + self.bytes_out += bytes_out + + def to_dict(self) -> Dict[str, Any]: + avg_latency = self.latency_sum_ms / self.count if self.count > 0 else 0.0 + min_latency = self.latency_min_ms if self.latency_min_ms != float("inf") else 0.0 + return { + "count": self.count, + "success_count": self.success_count, + "error_count": self.error_count, + "latency_avg_ms": round(avg_latency, 2), + "latency_min_ms": round(min_latency, 2), + "latency_max_ms": round(self.latency_max_ms, 2), + "bytes_in": self.bytes_in, + "bytes_out": self.bytes_out, + } + + def merge(self, other: "OperationStats") -> None: + self.count += other.count + self.success_count += other.success_count + self.error_count += other.error_count + self.latency_sum_ms += other.latency_sum_ms + if other.latency_min_ms < self.latency_min_ms: + self.latency_min_ms = other.latency_min_ms + if other.latency_max_ms > self.latency_max_ms: + self.latency_max_ms = other.latency_max_ms + self.bytes_in += other.bytes_in + self.bytes_out += other.bytes_out + + +@dataclass +class MetricsSnapshot: + timestamp: datetime + window_seconds: int + by_method: Dict[str, Dict[str, Any]] + by_endpoint: Dict[str, Dict[str, Any]] + by_status_class: Dict[str, int] + error_codes: Dict[str, int] + totals: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return { + "timestamp": self.timestamp.isoformat(), + "window_seconds": self.window_seconds, + "by_method": self.by_method, + "by_endpoint": self.by_endpoint, + "by_status_class": self.by_status_class, + "error_codes": self.error_codes, + "totals": self.totals, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "MetricsSnapshot": + return cls( + timestamp=datetime.fromisoformat(data["timestamp"]), + window_seconds=data.get("window_seconds", 300), + by_method=data.get("by_method", {}), + by_endpoint=data.get("by_endpoint", {}), + by_status_class=data.get("by_status_class", {}), + error_codes=data.get("error_codes", {}), + totals=data.get("totals", {}), + ) + + +class OperationMetricsCollector: + def __init__( + self, + storage_root: Path, + interval_minutes: int = 5, + retention_hours: int = 24, + ): + self.storage_root = storage_root + self.interval_seconds = interval_minutes * 60 + self.retention_hours = retention_hours + self._lock = threading.Lock() + self._by_method: Dict[str, OperationStats] = {} + self._by_endpoint: Dict[str, OperationStats] = {} + self._by_status_class: Dict[str, int] = {} + self._error_codes: Dict[str, int] = {} + self._totals = OperationStats() + self._window_start = time.time() + self._shutdown = threading.Event() + self._snapshots: List[MetricsSnapshot] = [] + + self._load_history() + + self._snapshot_thread = threading.Thread( + target=self._snapshot_loop, name="operation-metrics-snapshot", daemon=True + ) + self._snapshot_thread.start() + + def _config_path(self) -> Path: + return self.storage_root / ".myfsio.sys" / "config" / "operation_metrics.json" + + def _load_history(self) -> None: + config_path = self._config_path() + if not config_path.exists(): + return + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + snapshots_data = data.get("snapshots", []) + self._snapshots = [MetricsSnapshot.from_dict(s) for s in snapshots_data] + self._prune_old_snapshots() + except (json.JSONDecodeError, OSError, KeyError) as e: + logger.warning(f"Failed to load operation metrics history: {e}") + + def _save_history(self) -> None: + config_path = self._config_path() + config_path.parent.mkdir(parents=True, exist_ok=True) + try: + data = {"snapshots": [s.to_dict() for s in self._snapshots]} + config_path.write_text(json.dumps(data, indent=2), encoding="utf-8") + except OSError as e: + logger.warning(f"Failed to save operation metrics history: {e}") + + def _prune_old_snapshots(self) -> None: + if not self._snapshots: + return + cutoff = datetime.now(timezone.utc).timestamp() - (self.retention_hours * 3600) + self._snapshots = [ + s for s in self._snapshots if s.timestamp.timestamp() > cutoff + ] + + def _snapshot_loop(self) -> None: + while not self._shutdown.is_set(): + self._shutdown.wait(timeout=self.interval_seconds) + if not self._shutdown.is_set(): + self._take_snapshot() + + def _take_snapshot(self) -> None: + with self._lock: + now = datetime.now(timezone.utc) + window_seconds = int(time.time() - self._window_start) + + snapshot = MetricsSnapshot( + timestamp=now, + window_seconds=window_seconds, + by_method={k: v.to_dict() for k, v in self._by_method.items()}, + by_endpoint={k: v.to_dict() for k, v in self._by_endpoint.items()}, + by_status_class=dict(self._by_status_class), + error_codes=dict(self._error_codes), + totals=self._totals.to_dict(), + ) + + self._snapshots.append(snapshot) + self._prune_old_snapshots() + self._save_history() + + self._by_method.clear() + self._by_endpoint.clear() + self._by_status_class.clear() + self._error_codes.clear() + self._totals = OperationStats() + self._window_start = time.time() + + def record_request( + self, + method: str, + endpoint_type: str, + status_code: int, + latency_ms: float, + bytes_in: int = 0, + bytes_out: int = 0, + error_code: Optional[str] = None, + ) -> None: + success = 200 <= status_code < 400 + status_class = f"{status_code // 100}xx" + + with self._lock: + if method not in self._by_method: + self._by_method[method] = OperationStats() + self._by_method[method].record(latency_ms, success, bytes_in, bytes_out) + + if endpoint_type not in self._by_endpoint: + self._by_endpoint[endpoint_type] = OperationStats() + self._by_endpoint[endpoint_type].record(latency_ms, success, bytes_in, bytes_out) + + self._by_status_class[status_class] = self._by_status_class.get(status_class, 0) + 1 + + if error_code: + self._error_codes[error_code] = self._error_codes.get(error_code, 0) + 1 + + self._totals.record(latency_ms, success, bytes_in, bytes_out) + + def get_current_stats(self) -> Dict[str, Any]: + with self._lock: + window_seconds = int(time.time() - self._window_start) + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "window_seconds": window_seconds, + "by_method": {k: v.to_dict() for k, v in self._by_method.items()}, + "by_endpoint": {k: v.to_dict() for k, v in self._by_endpoint.items()}, + "by_status_class": dict(self._by_status_class), + "error_codes": dict(self._error_codes), + "totals": self._totals.to_dict(), + } + + def get_history(self, hours: Optional[int] = None) -> List[Dict[str, Any]]: + with self._lock: + snapshots = list(self._snapshots) + + if hours: + cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600) + snapshots = [s for s in snapshots if s.timestamp.timestamp() > cutoff] + + return [s.to_dict() for s in snapshots] + + def shutdown(self) -> None: + self._shutdown.set() + self._take_snapshot() + self._snapshot_thread.join(timeout=5.0) + + +def classify_endpoint(path: str) -> str: + if not path or path == "/": + return "service" + + path = path.rstrip("/") + + if path.startswith("/ui"): + return "ui" + + if path.startswith("/kms"): + return "kms" + + if path.startswith("/myfsio"): + return "service" + + parts = path.lstrip("/").split("/") + if len(parts) == 0: + return "service" + elif len(parts) == 1: + return "bucket" + else: + return "object" diff --git a/app/s3_api.py b/app/s3_api.py index 77fa81e..f576c32 100644 --- a/app/s3_api.py +++ b/app/s3_api.py @@ -88,6 +88,7 @@ def _xml_response(element: Element, status: int = 200) -> Response: def _error_response(code: str, message: str, status: int) -> Response: + g.s3_error_code = code error = Element("Error") SubElement(error, "Code").text = code SubElement(error, "Message").text = message diff --git a/app/ui.py b/app/ui.py index 7e86c61..3808912 100644 --- a/app/ui.py +++ b/app/ui.py @@ -141,6 +141,10 @@ def _acl() -> AclService: return current_app.extensions["acl"] +def _operation_metrics(): + return current_app.extensions.get("operation_metrics") + + def _format_bytes(num: int) -> str: step = 1024 units = ["B", "KB", "MB", "GB", "TB", "PB"] @@ -2196,6 +2200,7 @@ def metrics_dashboard(): "uptime_days": uptime_days, }, metrics_history_enabled=current_app.config.get("METRICS_HISTORY_ENABLED", False), + operation_metrics_enabled=current_app.config.get("OPERATION_METRICS_ENABLED", False), ) @@ -2329,6 +2334,52 @@ def metrics_settings(): }) +@ui_bp.get("/metrics/operations") +def metrics_operations(): + principal = _current_principal() + + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + collector = _operation_metrics() + if not collector: + return jsonify({ + "enabled": False, + "stats": None, + }) + + return jsonify({ + "enabled": True, + "stats": collector.get_current_stats(), + }) + + +@ui_bp.get("/metrics/operations/history") +def metrics_operations_history(): + principal = _current_principal() + + try: + _iam().authorize(principal, None, "iam:list_users") + except IamError: + return jsonify({"error": "Access denied"}), 403 + + collector = _operation_metrics() + if not collector: + return jsonify({ + "enabled": False, + "history": [], + }) + + hours = request.args.get("hours", type=int) + return jsonify({ + "enabled": True, + "history": collector.get_history(hours), + "interval_minutes": current_app.config.get("OPERATION_METRICS_INTERVAL_MINUTES", 5), + }) + + @ui_bp.route("/buckets//lifecycle", methods=["GET", "POST", "DELETE"]) def bucket_lifecycle(bucket_name: str): principal = _current_principal() diff --git a/templates/metrics.html b/templates/metrics.html index 55a3a28..f108213 100644 --- a/templates/metrics.html +++ b/templates/metrics.html @@ -268,6 +268,121 @@ +{% if operation_metrics_enabled %} +
+
+
+
+
API Operations
+
+ Loading... + +
+
+
+
+
+
+

0

+ Requests +
+
+
+
+

0%

+ Success +
+
+
+
+

0

+ Errors +
+
+
+
+

0ms

+ Latency +
+
+
+
+

0 B

+ Bytes In +
+
+
+
+

0 B

+ Bytes Out +
+
+
+
+
+
+
Requests by Method
+
+ +
+
+
+
+
+
Requests by Status
+
+ +
+
+
+
+
+
+
+
Requests by Endpoint
+
+ +
+
+
+
+
+
+
S3 Error Codes
+ API Only +
+
+
+
Code
+
Count
+
Distribution
+
+
+
+
+ + + + +
No S3 API errors
+
+
+
+
+
+
+
+
+
+
+
+{% endif %} + {% if metrics_history_enabled %}
@@ -307,7 +422,7 @@ {% endblock %} {% block extra_scripts %} -{% if metrics_history_enabled %} +{% if metrics_history_enabled or operation_metrics_enabled %} {% endif %}