Add background collection for system metrics
This commit is contained in:
@@ -197,6 +197,17 @@ def create_app(
|
||||
)
|
||||
app.extensions["operation_metrics"] = operation_metrics_collector
|
||||
|
||||
system_metrics_collector = None
|
||||
if app.config.get("METRICS_HISTORY_ENABLED", False):
|
||||
from .system_metrics import SystemMetricsCollector
|
||||
system_metrics_collector = SystemMetricsCollector(
|
||||
storage_root,
|
||||
interval_minutes=app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5),
|
||||
retention_hours=app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24),
|
||||
)
|
||||
system_metrics_collector.set_storage(storage)
|
||||
app.extensions["system_metrics"] = system_metrics_collector
|
||||
|
||||
@app.errorhandler(500)
|
||||
def internal_error(error):
|
||||
return render_template('500.html'), 500
|
||||
|
||||
215
app/system_metrics.py
Normal file
215
app/system_metrics.py
Normal file
@@ -0,0 +1,215 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, TYPE_CHECKING
|
||||
|
||||
import psutil
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .storage import ObjectStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemMetricsSnapshot:
|
||||
timestamp: datetime
|
||||
cpu_percent: float
|
||||
memory_percent: float
|
||||
disk_percent: float
|
||||
storage_bytes: int
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"timestamp": self.timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"cpu_percent": round(self.cpu_percent, 2),
|
||||
"memory_percent": round(self.memory_percent, 2),
|
||||
"disk_percent": round(self.disk_percent, 2),
|
||||
"storage_bytes": self.storage_bytes,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "SystemMetricsSnapshot":
|
||||
timestamp_str = data["timestamp"]
|
||||
if timestamp_str.endswith("Z"):
|
||||
timestamp_str = timestamp_str[:-1] + "+00:00"
|
||||
return cls(
|
||||
timestamp=datetime.fromisoformat(timestamp_str),
|
||||
cpu_percent=data.get("cpu_percent", 0.0),
|
||||
memory_percent=data.get("memory_percent", 0.0),
|
||||
disk_percent=data.get("disk_percent", 0.0),
|
||||
storage_bytes=data.get("storage_bytes", 0),
|
||||
)
|
||||
|
||||
|
||||
class SystemMetricsCollector:
|
||||
def __init__(
|
||||
self,
|
||||
storage_root: Path,
|
||||
interval_minutes: int = 5,
|
||||
retention_hours: int = 24,
|
||||
):
|
||||
self.storage_root = storage_root
|
||||
self.interval_seconds = interval_minutes * 60
|
||||
self.retention_hours = retention_hours
|
||||
self._lock = threading.Lock()
|
||||
self._shutdown = threading.Event()
|
||||
self._snapshots: List[SystemMetricsSnapshot] = []
|
||||
self._storage_ref: Optional["ObjectStorage"] = None
|
||||
|
||||
self._load_history()
|
||||
|
||||
self._snapshot_thread = threading.Thread(
|
||||
target=self._snapshot_loop,
|
||||
name="system-metrics-snapshot",
|
||||
daemon=True,
|
||||
)
|
||||
self._snapshot_thread.start()
|
||||
|
||||
def set_storage(self, storage: "ObjectStorage") -> None:
|
||||
with self._lock:
|
||||
self._storage_ref = storage
|
||||
|
||||
def _config_path(self) -> Path:
|
||||
return self.storage_root / ".myfsio.sys" / "config" / "metrics_history.json"
|
||||
|
||||
def _load_history(self) -> None:
|
||||
config_path = self._config_path()
|
||||
if not config_path.exists():
|
||||
return
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
history_data = data.get("history", [])
|
||||
self._snapshots = [SystemMetricsSnapshot.from_dict(s) for s in history_data]
|
||||
self._prune_old_snapshots()
|
||||
except (json.JSONDecodeError, OSError, KeyError) as e:
|
||||
logger.warning(f"Failed to load system metrics history: {e}")
|
||||
|
||||
def _save_history(self) -> None:
|
||||
config_path = self._config_path()
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
data = {"history": [s.to_dict() for s in self._snapshots]}
|
||||
config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||||
except OSError as e:
|
||||
logger.warning(f"Failed to save system metrics history: {e}")
|
||||
|
||||
def _prune_old_snapshots(self) -> None:
|
||||
if not self._snapshots:
|
||||
return
|
||||
cutoff = datetime.now(timezone.utc).timestamp() - (self.retention_hours * 3600)
|
||||
self._snapshots = [
|
||||
s for s in self._snapshots if s.timestamp.timestamp() > cutoff
|
||||
]
|
||||
|
||||
def _snapshot_loop(self) -> None:
|
||||
while not self._shutdown.is_set():
|
||||
self._shutdown.wait(timeout=self.interval_seconds)
|
||||
if not self._shutdown.is_set():
|
||||
self._take_snapshot()
|
||||
|
||||
def _take_snapshot(self) -> None:
|
||||
try:
|
||||
cpu_percent = psutil.cpu_percent(interval=0.1)
|
||||
memory = psutil.virtual_memory()
|
||||
disk = psutil.disk_usage(str(self.storage_root))
|
||||
|
||||
storage_bytes = 0
|
||||
with self._lock:
|
||||
storage = self._storage_ref
|
||||
if storage:
|
||||
try:
|
||||
buckets = storage.list_buckets()
|
||||
for bucket in buckets:
|
||||
stats = storage.bucket_stats(bucket.name, cache_ttl=60)
|
||||
storage_bytes += stats.get("total_bytes", stats.get("bytes", 0))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to collect bucket stats: {e}")
|
||||
|
||||
snapshot = SystemMetricsSnapshot(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
cpu_percent=cpu_percent,
|
||||
memory_percent=memory.percent,
|
||||
disk_percent=disk.percent,
|
||||
storage_bytes=storage_bytes,
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._snapshots.append(snapshot)
|
||||
self._prune_old_snapshots()
|
||||
self._save_history()
|
||||
|
||||
logger.debug(f"System metrics snapshot taken: CPU={cpu_percent:.1f}%, Memory={memory.percent:.1f}%")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to take system metrics snapshot: {e}")
|
||||
|
||||
def get_current(self) -> Dict[str, Any]:
|
||||
cpu_percent = psutil.cpu_percent(interval=0.1)
|
||||
memory = psutil.virtual_memory()
|
||||
disk = psutil.disk_usage(str(self.storage_root))
|
||||
boot_time = psutil.boot_time()
|
||||
uptime_seconds = time.time() - boot_time
|
||||
uptime_days = int(uptime_seconds / 86400)
|
||||
|
||||
total_buckets = 0
|
||||
total_objects = 0
|
||||
total_bytes_used = 0
|
||||
total_versions = 0
|
||||
|
||||
with self._lock:
|
||||
storage = self._storage_ref
|
||||
if storage:
|
||||
try:
|
||||
buckets = storage.list_buckets()
|
||||
total_buckets = len(buckets)
|
||||
for bucket in buckets:
|
||||
stats = storage.bucket_stats(bucket.name, cache_ttl=60)
|
||||
total_objects += stats.get("total_objects", stats.get("objects", 0))
|
||||
total_bytes_used += stats.get("total_bytes", stats.get("bytes", 0))
|
||||
total_versions += stats.get("version_count", 0)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to collect current bucket stats: {e}")
|
||||
|
||||
return {
|
||||
"cpu_percent": round(cpu_percent, 2),
|
||||
"memory": {
|
||||
"total": memory.total,
|
||||
"available": memory.available,
|
||||
"used": memory.used,
|
||||
"percent": round(memory.percent, 2),
|
||||
},
|
||||
"disk": {
|
||||
"total": disk.total,
|
||||
"free": disk.free,
|
||||
"used": disk.used,
|
||||
"percent": round(disk.percent, 2),
|
||||
},
|
||||
"app": {
|
||||
"buckets": total_buckets,
|
||||
"objects": total_objects,
|
||||
"versions": total_versions,
|
||||
"storage_bytes": total_bytes_used,
|
||||
"uptime_days": uptime_days,
|
||||
},
|
||||
}
|
||||
|
||||
def get_history(self, hours: Optional[int] = None) -> List[Dict[str, Any]]:
|
||||
with self._lock:
|
||||
snapshots = list(self._snapshots)
|
||||
|
||||
if hours:
|
||||
cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
|
||||
snapshots = [s for s in snapshots if s.timestamp.timestamp() > cutoff]
|
||||
|
||||
return [s.to_dict() for s in snapshots]
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._shutdown.set()
|
||||
self._take_snapshot()
|
||||
self._snapshot_thread.join(timeout=5.0)
|
||||
79
app/ui.py
79
app/ui.py
@@ -158,69 +158,6 @@ def _format_bytes(num: int) -> str:
|
||||
return f"{value:.1f} PB"
|
||||
|
||||
|
||||
_metrics_last_save_time: float = 0.0
|
||||
|
||||
|
||||
def _get_metrics_history_path() -> Path:
|
||||
storage_root = Path(current_app.config["STORAGE_ROOT"])
|
||||
return storage_root / ".myfsio.sys" / "config" / "metrics_history.json"
|
||||
|
||||
|
||||
def _load_metrics_history() -> dict:
|
||||
path = _get_metrics_history_path()
|
||||
if not path.exists():
|
||||
return {"history": []}
|
||||
try:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return {"history": []}
|
||||
|
||||
|
||||
def _save_metrics_snapshot(cpu_percent: float, memory_percent: float, disk_percent: float, storage_bytes: int) -> None:
|
||||
global _metrics_last_save_time
|
||||
|
||||
if not current_app.config.get("METRICS_HISTORY_ENABLED", False):
|
||||
return
|
||||
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
interval_minutes = current_app.config.get("METRICS_HISTORY_INTERVAL_MINUTES", 5)
|
||||
now_ts = time.time()
|
||||
if now_ts - _metrics_last_save_time < interval_minutes * 60:
|
||||
return
|
||||
|
||||
path = _get_metrics_history_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
data = _load_metrics_history()
|
||||
history = data.get("history", [])
|
||||
retention_hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24)
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
snapshot = {
|
||||
"timestamp": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"cpu_percent": round(cpu_percent, 2),
|
||||
"memory_percent": round(memory_percent, 2),
|
||||
"disk_percent": round(disk_percent, 2),
|
||||
"storage_bytes": storage_bytes,
|
||||
}
|
||||
history.append(snapshot)
|
||||
|
||||
cutoff = now.timestamp() - (retention_hours * 3600)
|
||||
history = [
|
||||
h for h in history
|
||||
if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff
|
||||
]
|
||||
|
||||
data["history"] = history
|
||||
try:
|
||||
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||||
_metrics_last_save_time = now_ts
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _friendly_error_message(exc: Exception) -> str:
|
||||
message = str(exc) or "An unexpected error occurred"
|
||||
if isinstance(exc, IamError):
|
||||
@@ -2240,8 +2177,6 @@ def metrics_api():
|
||||
uptime_seconds = time.time() - boot_time
|
||||
uptime_days = int(uptime_seconds / 86400)
|
||||
|
||||
_save_metrics_snapshot(cpu_percent, memory.percent, disk.percent, total_bytes_used)
|
||||
|
||||
return jsonify({
|
||||
"cpu_percent": round(cpu_percent, 2),
|
||||
"memory": {
|
||||
@@ -2276,23 +2211,15 @@ def metrics_history():
|
||||
except IamError:
|
||||
return jsonify({"error": "Access denied"}), 403
|
||||
|
||||
if not current_app.config.get("METRICS_HISTORY_ENABLED", False):
|
||||
system_metrics = current_app.extensions.get("system_metrics")
|
||||
if not system_metrics:
|
||||
return jsonify({"enabled": False, "history": []})
|
||||
|
||||
hours = request.args.get("hours", type=int)
|
||||
if hours is None:
|
||||
hours = current_app.config.get("METRICS_HISTORY_RETENTION_HOURS", 24)
|
||||
|
||||
data = _load_metrics_history()
|
||||
history = data.get("history", [])
|
||||
|
||||
if hours:
|
||||
from datetime import datetime, timezone
|
||||
cutoff = datetime.now(timezone.utc).timestamp() - (hours * 3600)
|
||||
history = [
|
||||
h for h in history
|
||||
if datetime.fromisoformat(h["timestamp"].replace("Z", "+00:00")).timestamp() > cutoff
|
||||
]
|
||||
history = system_metrics.get_history(hours=hours)
|
||||
|
||||
return jsonify({
|
||||
"enabled": True,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
APP_VERSION = "0.2.2"
|
||||
APP_VERSION = "0.2.3"
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
|
||||
Reference in New Issue
Block a user