Files
MyFSIO/app/system_metrics.py

216 lines
7.5 KiB
Python

from __future__ import annotations
import json
import logging
import threading
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, TYPE_CHECKING
import psutil
if TYPE_CHECKING:
from .storage import ObjectStorage
logger = logging.getLogger(__name__)
@dataclass
class SystemMetricsSnapshot:
timestamp: datetime
cpu_percent: float
memory_percent: float
disk_percent: float
storage_bytes: int
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"),
"cpu_percent": round(self.cpu_percent, 2),
"memory_percent": round(self.memory_percent, 2),
"disk_percent": round(self.disk_percent, 2),
"storage_bytes": self.storage_bytes,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SystemMetricsSnapshot":
timestamp_str = data["timestamp"]
if timestamp_str.endswith("Z"):
timestamp_str = timestamp_str[:-1] + "+00:00"
return cls(
timestamp=datetime.fromisoformat(timestamp_str),
cpu_percent=data.get("cpu_percent", 0.0),
memory_percent=data.get("memory_percent", 0.0),
disk_percent=data.get("disk_percent", 0.0),
storage_bytes=data.get("storage_bytes", 0),
)
class SystemMetricsCollector:
def __init__(
self,
storage_root: Path,
interval_minutes: int = 5,
retention_hours: int = 24,
):
self.storage_root = storage_root
self.interval_seconds = interval_minutes * 60
self.retention_hours = retention_hours
self._lock = threading.Lock()
self._shutdown = threading.Event()
self._snapshots: List[SystemMetricsSnapshot] = []
self._storage_ref: Optional["ObjectStorage"] = None
self._load_history()
self._snapshot_thread = threading.Thread(
target=self._snapshot_loop,
name="system-metrics-snapshot",
daemon=True,
)
self._snapshot_thread.start()
def set_storage(self, storage: "ObjectStorage") -> None:
with self._lock:
self._storage_ref = storage
def _config_path(self) -> Path:
return self.storage_root / ".myfsio.sys" / "config" / "metrics_history.json"
def _load_history(self) -> None:
config_path = self._config_path()
if not config_path.exists():
return
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
history_data = data.get("history", [])
self._snapshots = [SystemMetricsSnapshot.from_dict(s) for s in history_data]
self._prune_old_snapshots()
except (json.JSONDecodeError, OSError, KeyError) as e:
logger.warning(f"Failed to load system metrics history: {e}")
def _save_history(self) -> None:
config_path = self._config_path()
config_path.parent.mkdir(parents=True, exist_ok=True)
try:
data = {"history": [s.to_dict() for s in self._snapshots]}
config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
except OSError as e:
logger.warning(f"Failed to save system metrics history: {e}")
def _prune_old_snapshots(self) -> None:
if not self._snapshots:
return
cutoff = datetime.now(timezone.utc).timestamp() - (self.retention_hours * 3600)
self._snapshots = [
s for s in self._snapshots if s.timestamp.timestamp() > cutoff
]
def _snapshot_loop(self) -> None:
while not self._shutdown.is_set():
self._shutdown.wait(timeout=self.interval_seconds)
if not self._shutdown.is_set():
self._take_snapshot()
def _take_snapshot(self) -> None:
try:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage(str(self.storage_root))
storage_bytes = 0
with self._lock:
storage = self._storage_ref
if storage:
try:
buckets = storage.list_buckets()
for bucket in buckets:
stats = storage.bucket_stats(bucket.name, cache_ttl=60)
storage_bytes += stats.get("total_bytes", stats.get("bytes", 0))
except Exception as e:
logger.warning(f"Failed to collect bucket stats: {e}")
snapshot = SystemMetricsSnapshot(
timestamp=datetime.now(timezone.utc),
cpu_percent=cpu_percent,
memory_percent=memory.percent,
disk_percent=disk.percent,
storage_bytes=storage_bytes,
)
with self._lock:
self._snapshots.append(snapshot)
self._prune_old_snapshots()
self._save_history()
logger.debug(f"System metrics snapshot taken: CPU={cpu_percent:.1f}%, Memory={memory.percent:.1f}%")
except Exception as e:
logger.warning(f"Failed to take system metrics snapshot: {e}")
def get_current(self) -> Dict[str, Any]:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage(str(self.storage_root))
boot_time = psutil.boot_time()
uptime_seconds = time.time() - boot_time
uptime_days = int(uptime_seconds / 86400)
total_buckets = 0
total_objects = 0
total_bytes_used = 0
total_versions = 0
with self._lock:
storage = self._storage_ref
if storage:
try:
buckets = storage.list_buckets()
total_buckets = len(buckets)
for bucket in buckets:
stats = storage.bucket_stats(bucket.name, cache_ttl=60)
total_objects += stats.get("total_objects", stats.get("objects", 0))
total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0))
total_versions += stats.get("version_count", 0)
except Exception as e:
logger.warning(f"Failed to collect current bucket stats: {e}")
return {
"cpu_percent": round(cpu_percent, 2),
"memory": {
"total": memory.total,
"available": memory.available,
"used": memory.used,
"percent": round(memory.percent, 2),
},
"disk": {
"total": disk.total,
"free": disk.free,
"used": disk.used,
"percent": round(disk.percent, 2),
},
"app": {
"buckets": total_buckets,
"objects": total_objects,
"versions": total_versions,
"storage_bytes": total_bytes_used,
"uptime_days": uptime_days,
},
}
def get_history(self, hours: Optional[int] = None) -> List[Dict[str, Any]]:
with self._lock:
snapshots = list(self._snapshots)
if hours:
cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
snapshots = [s for s in snapshots if s.timestamp.timestamp() > cutoff]
return [s.to_dict() for s in snapshots]
def shutdown(self) -> None:
self._shutdown.set()
self._take_snapshot()
self._snapshot_thread.join(timeout=5.0)