Files
2025-12-21 13:42:30 +01:00

372 lines
10 KiB
Python

"""Metrics Collector
Observability metrics for AI agent performance monitoring.
Tracks request counts, latencies, errors, and LLM usage.
"""
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from threading import Lock
@dataclass
class MetricPoint:
"""A single metric data point."""
timestamp: datetime
value: float
labels: dict = field(default_factory=dict)
class Counter:
"""Thread-safe counter metric."""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._value = 0.0
self._lock = Lock()
def inc(self, value: float = 1.0):
"""Increment the counter."""
with self._lock:
self._value += value
@property
def value(self) -> float:
"""Get current counter value."""
with self._lock:
return self._value
class Gauge:
"""Thread-safe gauge metric."""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._value = 0.0
self._lock = Lock()
def set(self, value: float):
"""Set the gauge value."""
with self._lock:
self._value = value
def inc(self, value: float = 1.0):
"""Increment the gauge."""
with self._lock:
self._value += value
def dec(self, value: float = 1.0):
"""Decrement the gauge."""
with self._lock:
self._value -= value
@property
def value(self) -> float:
"""Get current gauge value."""
with self._lock:
return self._value
class Histogram:
"""Simple histogram for tracking distributions."""
def __init__(
self,
name: str,
description: str = "",
buckets: list[float] | None = None,
):
self.name = name
self.description = description
self.buckets = buckets or [0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
self._values: list[float] = []
self._lock = Lock()
def observe(self, value: float):
"""Record an observation."""
with self._lock:
self._values.append(value)
# Keep only last 1000 observations
if len(self._values) > 1000:
self._values = self._values[-1000:]
def get_percentile(self, percentile: float) -> float:
"""Get a percentile value."""
with self._lock:
if not self._values:
return 0.0
sorted_values = sorted(self._values)
idx = int(len(sorted_values) * percentile / 100)
return sorted_values[min(idx, len(sorted_values) - 1)]
@property
def count(self) -> int:
"""Get observation count."""
with self._lock:
return len(self._values)
@property
def sum(self) -> float:
"""Get sum of observations."""
with self._lock:
return sum(self._values)
class MetricsCollector:
"""Central metrics collector for AI agents."""
def __init__(self, enabled: bool = True):
"""Initialize metrics collector.
Args:
enabled: Whether metrics collection is enabled.
"""
self.enabled = enabled
self._start_time = time.time()
# Counters
self.requests_total = Counter(
"ai_review_requests_total",
"Total number of review requests processed",
)
self.requests_success = Counter(
"ai_review_requests_success",
"Number of successful review requests",
)
self.requests_failed = Counter(
"ai_review_requests_failed",
"Number of failed review requests",
)
self.llm_calls_total = Counter(
"ai_review_llm_calls_total",
"Total number of LLM API calls",
)
self.llm_tokens_total = Counter(
"ai_review_llm_tokens_total",
"Total LLM tokens consumed",
)
self.comments_posted = Counter(
"ai_review_comments_posted_total",
"Total comments posted",
)
self.labels_applied = Counter(
"ai_review_labels_applied_total",
"Total labels applied",
)
self.security_findings = Counter(
"ai_review_security_findings_total",
"Total security findings detected",
)
# Gauges
self.active_requests = Gauge(
"ai_review_active_requests",
"Currently active review requests",
)
# Histograms
self.request_duration = Histogram(
"ai_review_request_duration_seconds",
"Request processing duration",
)
self.llm_duration = Histogram(
"ai_review_llm_duration_seconds",
"LLM API call duration",
)
# Per-agent metrics
self._agent_metrics: dict[str, dict] = {}
def record_request_start(self, agent: str):
"""Record the start of a request.
Args:
agent: Name of the agent handling the request.
"""
if not self.enabled:
return
self.requests_total.inc()
self.active_requests.inc()
if agent not in self._agent_metrics:
self._agent_metrics[agent] = {
"total": 0,
"success": 0,
"failed": 0,
}
self._agent_metrics[agent]["total"] += 1
def record_request_end(
self,
agent: str,
success: bool,
duration_seconds: float,
):
"""Record the end of a request.
Args:
agent: Name of the agent.
success: Whether the request succeeded.
duration_seconds: Request duration.
"""
if not self.enabled:
return
self.active_requests.dec()
self.request_duration.observe(duration_seconds)
if success:
self.requests_success.inc()
if agent in self._agent_metrics:
self._agent_metrics[agent]["success"] += 1
else:
self.requests_failed.inc()
if agent in self._agent_metrics:
self._agent_metrics[agent]["failed"] += 1
def record_llm_call(
self,
provider: str,
model: str,
tokens: int | None,
duration_seconds: float,
):
"""Record an LLM API call.
Args:
provider: LLM provider name.
model: Model used.
tokens: Tokens consumed.
duration_seconds: Call duration.
"""
if not self.enabled:
return
self.llm_calls_total.inc()
self.llm_duration.observe(duration_seconds)
if tokens:
self.llm_tokens_total.inc(tokens)
def record_comment_posted(self):
"""Record a comment being posted."""
if self.enabled:
self.comments_posted.inc()
def record_labels_applied(self, count: int = 1):
"""Record labels being applied."""
if self.enabled:
self.labels_applied.inc(count)
def record_security_finding(self, severity: str):
"""Record a security finding."""
if self.enabled:
self.security_findings.inc()
def get_summary(self) -> dict:
"""Get a summary of all metrics.
Returns:
Dictionary with metric summaries.
"""
uptime = time.time() - self._start_time
return {
"uptime_seconds": uptime,
"requests": {
"total": self.requests_total.value,
"success": self.requests_success.value,
"failed": self.requests_failed.value,
"active": self.active_requests.value,
"success_rate": (
self.requests_success.value / max(self.requests_total.value, 1)
),
},
"llm": {
"calls": self.llm_calls_total.value,
"tokens": self.llm_tokens_total.value,
"avg_duration_ms": (
(self.llm_duration.sum / max(self.llm_duration.count, 1)) * 1000
),
"p50_duration_ms": self.llm_duration.get_percentile(50) * 1000,
"p95_duration_ms": self.llm_duration.get_percentile(95) * 1000,
},
"actions": {
"comments_posted": self.comments_posted.value,
"labels_applied": self.labels_applied.value,
"security_findings": self.security_findings.value,
},
"latency": {
"avg_ms": (
(self.request_duration.sum / max(self.request_duration.count, 1))
* 1000
),
"p50_ms": self.request_duration.get_percentile(50) * 1000,
"p95_ms": self.request_duration.get_percentile(95) * 1000,
"p99_ms": self.request_duration.get_percentile(99) * 1000,
},
"by_agent": self._agent_metrics,
}
def export_prometheus(self) -> str:
"""Export metrics in Prometheus format.
Returns:
Prometheus-formatted metrics string.
"""
lines = []
def add_metric(name: str, value: float, help_text: str = ""):
if help_text:
lines.append(f"# HELP {name} {help_text}")
lines.append(f"{name} {value}")
add_metric(
"ai_review_requests_total",
self.requests_total.value,
"Total review requests",
)
add_metric(
"ai_review_requests_success_total",
self.requests_success.value,
"Successful requests",
)
add_metric(
"ai_review_requests_failed_total",
self.requests_failed.value,
"Failed requests",
)
add_metric(
"ai_review_llm_calls_total",
self.llm_calls_total.value,
"Total LLM calls",
)
add_metric(
"ai_review_llm_tokens_total",
self.llm_tokens_total.value,
"Total LLM tokens",
)
add_metric(
"ai_review_comments_posted_total",
self.comments_posted.value,
"Comments posted",
)
return "\n".join(lines)
# Global instance
_metrics: MetricsCollector | None = None
def get_metrics() -> MetricsCollector:
"""Get the global metrics collector instance."""
global _metrics
if _metrics is None:
_metrics = MetricsCollector()
return _metrics