272 lines
9.4 KiB
Python
272 lines
9.4 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OperationStats:
|
|
count: int = 0
|
|
success_count: int = 0
|
|
error_count: int = 0
|
|
latency_sum_ms: float = 0.0
|
|
latency_min_ms: float = float("inf")
|
|
latency_max_ms: float = 0.0
|
|
bytes_in: int = 0
|
|
bytes_out: int = 0
|
|
|
|
def record(self, latency_ms: float, success: bool, bytes_in: int = 0, bytes_out: int = 0) -> None:
|
|
self.count += 1
|
|
if success:
|
|
self.success_count += 1
|
|
else:
|
|
self.error_count += 1
|
|
self.latency_sum_ms += latency_ms
|
|
if latency_ms < self.latency_min_ms:
|
|
self.latency_min_ms = latency_ms
|
|
if latency_ms > self.latency_max_ms:
|
|
self.latency_max_ms = latency_ms
|
|
self.bytes_in += bytes_in
|
|
self.bytes_out += bytes_out
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
avg_latency = self.latency_sum_ms / self.count if self.count > 0 else 0.0
|
|
min_latency = self.latency_min_ms if self.latency_min_ms != float("inf") else 0.0
|
|
return {
|
|
"count": self.count,
|
|
"success_count": self.success_count,
|
|
"error_count": self.error_count,
|
|
"latency_avg_ms": round(avg_latency, 2),
|
|
"latency_min_ms": round(min_latency, 2),
|
|
"latency_max_ms": round(self.latency_max_ms, 2),
|
|
"bytes_in": self.bytes_in,
|
|
"bytes_out": self.bytes_out,
|
|
}
|
|
|
|
def merge(self, other: "OperationStats") -> None:
|
|
self.count += other.count
|
|
self.success_count += other.success_count
|
|
self.error_count += other.error_count
|
|
self.latency_sum_ms += other.latency_sum_ms
|
|
if other.latency_min_ms < self.latency_min_ms:
|
|
self.latency_min_ms = other.latency_min_ms
|
|
if other.latency_max_ms > self.latency_max_ms:
|
|
self.latency_max_ms = other.latency_max_ms
|
|
self.bytes_in += other.bytes_in
|
|
self.bytes_out += other.bytes_out
|
|
|
|
|
|
@dataclass
|
|
class MetricsSnapshot:
|
|
timestamp: datetime
|
|
window_seconds: int
|
|
by_method: Dict[str, Dict[str, Any]]
|
|
by_endpoint: Dict[str, Dict[str, Any]]
|
|
by_status_class: Dict[str, int]
|
|
error_codes: Dict[str, int]
|
|
totals: Dict[str, Any]
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"window_seconds": self.window_seconds,
|
|
"by_method": self.by_method,
|
|
"by_endpoint": self.by_endpoint,
|
|
"by_status_class": self.by_status_class,
|
|
"error_codes": self.error_codes,
|
|
"totals": self.totals,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "MetricsSnapshot":
|
|
return cls(
|
|
timestamp=datetime.fromisoformat(data["timestamp"]),
|
|
window_seconds=data.get("window_seconds", 300),
|
|
by_method=data.get("by_method", {}),
|
|
by_endpoint=data.get("by_endpoint", {}),
|
|
by_status_class=data.get("by_status_class", {}),
|
|
error_codes=data.get("error_codes", {}),
|
|
totals=data.get("totals", {}),
|
|
)
|
|
|
|
|
|
class OperationMetricsCollector:
|
|
def __init__(
|
|
self,
|
|
storage_root: Path,
|
|
interval_minutes: int = 5,
|
|
retention_hours: int = 24,
|
|
):
|
|
self.storage_root = storage_root
|
|
self.interval_seconds = interval_minutes * 60
|
|
self.retention_hours = retention_hours
|
|
self._lock = threading.Lock()
|
|
self._by_method: Dict[str, OperationStats] = {}
|
|
self._by_endpoint: Dict[str, OperationStats] = {}
|
|
self._by_status_class: Dict[str, int] = {}
|
|
self._error_codes: Dict[str, int] = {}
|
|
self._totals = OperationStats()
|
|
self._window_start = time.time()
|
|
self._shutdown = threading.Event()
|
|
self._snapshots: List[MetricsSnapshot] = []
|
|
|
|
self._load_history()
|
|
|
|
self._snapshot_thread = threading.Thread(
|
|
target=self._snapshot_loop, name="operation-metrics-snapshot", daemon=True
|
|
)
|
|
self._snapshot_thread.start()
|
|
|
|
def _config_path(self) -> Path:
|
|
return self.storage_root / ".myfsio.sys" / "config" / "operation_metrics.json"
|
|
|
|
def _load_history(self) -> None:
|
|
config_path = self._config_path()
|
|
if not config_path.exists():
|
|
return
|
|
try:
|
|
data = json.loads(config_path.read_text(encoding="utf-8"))
|
|
snapshots_data = data.get("snapshots", [])
|
|
self._snapshots = [MetricsSnapshot.from_dict(s) for s in snapshots_data]
|
|
self._prune_old_snapshots()
|
|
except (json.JSONDecodeError, OSError, KeyError) as e:
|
|
logger.warning(f"Failed to load operation metrics history: {e}")
|
|
|
|
def _save_history(self) -> None:
|
|
config_path = self._config_path()
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
data = {"snapshots": [s.to_dict() for s in self._snapshots]}
|
|
config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
|
except OSError as e:
|
|
logger.warning(f"Failed to save operation metrics history: {e}")
|
|
|
|
def _prune_old_snapshots(self) -> None:
|
|
if not self._snapshots:
|
|
return
|
|
cutoff = datetime.now(timezone.utc).timestamp() - (self.retention_hours * 3600)
|
|
self._snapshots = [
|
|
s for s in self._snapshots if s.timestamp.timestamp() > cutoff
|
|
]
|
|
|
|
def _snapshot_loop(self) -> None:
|
|
while not self._shutdown.is_set():
|
|
self._shutdown.wait(timeout=self.interval_seconds)
|
|
if not self._shutdown.is_set():
|
|
self._take_snapshot()
|
|
|
|
def _take_snapshot(self) -> None:
|
|
with self._lock:
|
|
now = datetime.now(timezone.utc)
|
|
window_seconds = int(time.time() - self._window_start)
|
|
|
|
snapshot = MetricsSnapshot(
|
|
timestamp=now,
|
|
window_seconds=window_seconds,
|
|
by_method={k: v.to_dict() for k, v in self._by_method.items()},
|
|
by_endpoint={k: v.to_dict() for k, v in self._by_endpoint.items()},
|
|
by_status_class=dict(self._by_status_class),
|
|
error_codes=dict(self._error_codes),
|
|
totals=self._totals.to_dict(),
|
|
)
|
|
|
|
self._snapshots.append(snapshot)
|
|
self._prune_old_snapshots()
|
|
self._save_history()
|
|
|
|
self._by_method.clear()
|
|
self._by_endpoint.clear()
|
|
self._by_status_class.clear()
|
|
self._error_codes.clear()
|
|
self._totals = OperationStats()
|
|
self._window_start = time.time()
|
|
|
|
def record_request(
|
|
self,
|
|
method: str,
|
|
endpoint_type: str,
|
|
status_code: int,
|
|
latency_ms: float,
|
|
bytes_in: int = 0,
|
|
bytes_out: int = 0,
|
|
error_code: Optional[str] = None,
|
|
) -> None:
|
|
success = 200 <= status_code < 400
|
|
status_class = f"{status_code // 100}xx"
|
|
|
|
with self._lock:
|
|
if method not in self._by_method:
|
|
self._by_method[method] = OperationStats()
|
|
self._by_method[method].record(latency_ms, success, bytes_in, bytes_out)
|
|
|
|
if endpoint_type not in self._by_endpoint:
|
|
self._by_endpoint[endpoint_type] = OperationStats()
|
|
self._by_endpoint[endpoint_type].record(latency_ms, success, bytes_in, bytes_out)
|
|
|
|
self._by_status_class[status_class] = self._by_status_class.get(status_class, 0) + 1
|
|
|
|
if error_code:
|
|
self._error_codes[error_code] = self._error_codes.get(error_code, 0) + 1
|
|
|
|
self._totals.record(latency_ms, success, bytes_in, bytes_out)
|
|
|
|
def get_current_stats(self) -> Dict[str, Any]:
|
|
with self._lock:
|
|
window_seconds = int(time.time() - self._window_start)
|
|
return {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"window_seconds": window_seconds,
|
|
"by_method": {k: v.to_dict() for k, v in self._by_method.items()},
|
|
"by_endpoint": {k: v.to_dict() for k, v in self._by_endpoint.items()},
|
|
"by_status_class": dict(self._by_status_class),
|
|
"error_codes": dict(self._error_codes),
|
|
"totals": self._totals.to_dict(),
|
|
}
|
|
|
|
def get_history(self, hours: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
snapshots = list(self._snapshots)
|
|
|
|
if hours:
|
|
cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
|
|
snapshots = [s for s in snapshots if s.timestamp.timestamp() > cutoff]
|
|
|
|
return [s.to_dict() for s in snapshots]
|
|
|
|
def shutdown(self) -> None:
|
|
self._shutdown.set()
|
|
self._take_snapshot()
|
|
self._snapshot_thread.join(timeout=5.0)
|
|
|
|
|
|
def classify_endpoint(path: str) -> str:
|
|
if not path or path == "/":
|
|
return "service"
|
|
|
|
path = path.rstrip("/")
|
|
|
|
if path.startswith("/ui"):
|
|
return "ui"
|
|
|
|
if path.startswith("/kms"):
|
|
return "kms"
|
|
|
|
if path.startswith("/myfsio"):
|
|
return "service"
|
|
|
|
parts = path.lstrip("/").split("/")
|
|
if len(parts) == 0:
|
|
return "service"
|
|
elif len(parts) == 1:
|
|
return "bucket"
|
|
else:
|
|
return "object"
|