Add monitoring, metrics tracking, and enhanced logging

- Add BotMonitor class for tracking requests, errors, and health status
- Track request success/failure, response times, error rates
- Add !status command (admin) for detailed metrics with embed
- Add !health command (anyone) for quick health check
- Add rotating file logging support (LOG_FILE config)
- Auto-detect unhealthy status when errors exceed threshold
- Record errors with timestamps and context for debugging
This commit is contained in:
2026-01-11 21:04:49 +01:00
parent 7e26adc7a7
commit 853e7c9fcd
7 changed files with 487 additions and 3 deletions

View File

@@ -62,7 +62,19 @@ SEARXNG_ENABLED=true
SEARXNG_MAX_RESULTS=5
# ===========================================
# Logging
# Logging & Monitoring
# ===========================================
# Log level: DEBUG, INFO, WARNING, ERROR
LOG_LEVEL=INFO
# Log file path (optional - logs to console only if not set)
# LOG_FILE=logs/bot.log
# Custom log format (optional)
# LOG_FORMAT=%(asctime)s - %(name)s - %(levelname)s - %(message)s
# ===========================================
# Monitoring Commands
# ===========================================
# Use !status (admin only) for detailed metrics
# Use !health (anyone) for quick health check

View File

@@ -14,6 +14,7 @@ from daemon_boyfriend.services import (
Message,
SearXNGService,
)
from daemon_boyfriend.utils import get_monitor
logger = logging.getLogger(__name__)
@@ -101,6 +102,9 @@ class AIChatCog(commands.Cog):
return
# Show typing indicator while generating response
monitor = get_monitor()
start_time = monitor.record_request_start()
async with message.channel.typing():
try:
response_text = await self._generate_response(message, content)
@@ -132,7 +136,12 @@ class AIChatCog(commands.Cog):
for img_url in remaining_images:
await message.channel.send(embed=self._create_image_embed(img_url))
# Record successful request
monitor.record_request_success(start_time)
except Exception as e:
# Record failed request
monitor.record_request_failure(start_time, e, context="on_message")
logger.error(f"Mention response error: {e}", exc_info=True)
error_message = self._get_error_message(e)
await message.reply(error_message)

View File

@@ -0,0 +1,147 @@
"""Status cog - provides bot health and metrics commands."""
import logging
import discord
from discord.ext import commands
from daemon_boyfriend.utils import HealthStatus, get_monitor
logger = logging.getLogger(__name__)
class StatusCog(commands.Cog):
"""Bot status and monitoring commands."""
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
@commands.command(name="status")
@commands.has_permissions(administrator=True)
async def status_command(self, ctx: commands.Context) -> None:
"""Show bot status and metrics (admin only)."""
monitor = get_monitor()
metrics = monitor.get_metrics()
# Choose embed color based on health status
color_map = {
HealthStatus.HEALTHY: discord.Color.green(),
HealthStatus.DEGRADED: discord.Color.yellow(),
HealthStatus.UNHEALTHY: discord.Color.red(),
}
color = color_map.get(metrics.health_status, discord.Color.grey())
# Format uptime
uptime_hours = metrics.uptime_seconds / 3600
if uptime_hours >= 24:
uptime_str = f"{uptime_hours / 24:.1f} days"
elif uptime_hours >= 1:
uptime_str = f"{uptime_hours:.1f} hours"
else:
uptime_str = f"{metrics.uptime_seconds / 60:.1f} minutes"
# Create embed
embed = discord.Embed(
title="Bot Status Report",
color=color,
)
# Status field with emoji
status_emoji = {
HealthStatus.HEALTHY: "🟢",
HealthStatus.DEGRADED: "🟡",
HealthStatus.UNHEALTHY: "🔴",
}
embed.add_field(
name="Status",
value=f"{status_emoji.get(metrics.health_status, '')} {metrics.health_status.value.upper()}",
inline=True,
)
embed.add_field(name="Uptime", value=uptime_str, inline=True)
embed.add_field(name="Guilds", value=str(len(self.bot.guilds)), inline=True)
# Request metrics
embed.add_field(
name="Requests",
value=f"Total: {metrics.total_requests}\n"
f"Success: {metrics.successful_requests}\n"
f"Failed: {metrics.failed_requests}",
inline=True,
)
embed.add_field(
name="Performance",
value=f"Avg response: {metrics.avg_response_time_ms:.0f}ms\n"
f"Req/min: {metrics.requests_per_minute:.1f}\n"
f"Error rate: {metrics.error_rate_percent:.1f}%",
inline=True,
)
# Last activity
last_success = "Never"
if metrics.last_successful_request:
last_success = metrics.last_successful_request.strftime("%Y-%m-%d %H:%M:%S UTC")
last_error = "None"
if metrics.last_error:
last_error = metrics.last_error.strftime("%Y-%m-%d %H:%M:%S UTC")
embed.add_field(
name="Last Activity",
value=f"Success: {last_success}\nError: {last_error}",
inline=False,
)
# Recent errors (if any)
if metrics.recent_errors:
error_lines = []
for error in metrics.recent_errors[-3:]: # Last 3 errors
time_str = error.timestamp.strftime("%H:%M:%S")
error_lines.append(f"`{time_str}` **{error.error_type}**: {error.message[:50]}...")
embed.add_field(
name="Recent Errors",
value="\n".join(error_lines),
inline=False,
)
await ctx.send(embed=embed)
@commands.command(name="health")
async def health_command(self, ctx: commands.Context) -> None:
"""Quick health check (anyone can use)."""
monitor = get_monitor()
metrics = monitor.get_metrics()
status_emoji = {
HealthStatus.HEALTHY: "🟢",
HealthStatus.DEGRADED: "🟡",
HealthStatus.UNHEALTHY: "🔴",
}
emoji = status_emoji.get(metrics.health_status, "")
status = metrics.health_status.value
# Format uptime
uptime_hours = metrics.uptime_seconds / 3600
if uptime_hours >= 1:
uptime_str = f"{uptime_hours:.1f}h"
else:
uptime_str = f"{metrics.uptime_seconds / 60:.0f}m"
await ctx.send(
f"{emoji} **{status.upper()}** | Uptime: {uptime_str} | Requests: {metrics.total_requests}"
)
@status_command.error
async def status_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
"""Handle status command errors."""
if isinstance(error, commands.MissingPermissions):
await ctx.send("You need administrator permissions to view detailed status.")
else:
logger.error(f"Status command error: {error}")
async def setup(bot: commands.Bot) -> None:
"""Load the Status cog."""
await bot.add_cog(StatusCog(bot))

View File

@@ -34,6 +34,13 @@ class Settings(BaseSettings):
# Logging
log_level: str = Field("INFO", description="Logging level")
log_file: str | None = Field(
None, description="Log file path (optional, logs to console if not set)"
)
log_format: str = Field(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
description="Log message format",
)
# Bot Identity
bot_name: str = Field("AI Bot", description="Bot display name")

View File

@@ -1,8 +1,13 @@
"""Utility modules."""
from .logging import get_logger, setup_logging
from .monitoring import BotMonitor, HealthStatus, MetricsSnapshot, get_monitor
__all__ = [
"setup_logging",
"get_logger",
"BotMonitor",
"HealthStatus",
"MetricsSnapshot",
"get_monitor",
]

View File

@@ -2,6 +2,8 @@
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
from daemon_boyfriend.config import settings
@@ -10,7 +12,7 @@ def setup_logging() -> None:
"""Configure application-wide logging."""
# Create formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
settings.log_format,
datefmt="%Y-%m-%d %H:%M:%S",
)
@@ -21,11 +23,29 @@ def setup_logging() -> None:
# Remove existing handlers
root_logger.handlers.clear()
# Console handler
# Console handler (always enabled)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler (if configured)
if settings.log_file:
log_path = Path(settings.log_file)
# Create directory if it doesn't exist
log_path.parent.mkdir(parents=True, exist_ok=True)
# Use rotating file handler (10MB max, keep 5 backups)
file_handler = RotatingFileHandler(
log_path,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
logging.info(f"Logging to file: {log_path}")
# Reduce noise from discord.py
logging.getLogger("discord").setLevel(logging.WARNING)
logging.getLogger("discord.http").setLevel(logging.WARNING)
@@ -38,6 +58,8 @@ def setup_logging() -> None:
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.info(f"Logging initialized at {settings.log_level} level")
def get_logger(name: str) -> logging.Logger:
"""Get a logger with the given name."""

View File

@@ -0,0 +1,282 @@
"""Monitoring and metrics tracking for the bot."""
import asyncio
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Callable
logger = logging.getLogger(__name__)
class HealthStatus(Enum):
"""Health status levels."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ErrorRecord:
"""Record of an error occurrence."""
timestamp: datetime
error_type: str
message: str
context: str = ""
@dataclass
class MetricsSnapshot:
"""Snapshot of current metrics."""
uptime_seconds: float
total_requests: int
successful_requests: int
failed_requests: int
avg_response_time_ms: float
requests_per_minute: float
error_rate_percent: float
recent_errors: list[ErrorRecord]
health_status: HealthStatus
last_successful_request: datetime | None
last_error: datetime | None
class BotMonitor:
"""Monitors bot health and collects metrics."""
def __init__(
self,
error_threshold: int = 10,
error_window_minutes: int = 5,
max_error_history: int = 50,
) -> None:
"""Initialize the monitor.
Args:
error_threshold: Number of errors in window to trigger unhealthy status
error_window_minutes: Time window for error rate calculation
max_error_history: Maximum number of errors to keep in history
"""
self._start_time = time.time()
self._total_requests = 0
self._successful_requests = 0
self._failed_requests = 0
self._response_times: deque[float] = deque(maxlen=100)
self._request_timestamps: deque[float] = deque(maxlen=1000)
self._errors: deque[ErrorRecord] = deque(maxlen=max_error_history)
self._error_threshold = error_threshold
self._error_window_minutes = error_window_minutes
self._last_successful_request: datetime | None = None
self._last_error: datetime | None = None
self._status_callbacks: list[Callable[[HealthStatus], None]] = []
def record_request_start(self) -> float:
"""Record the start of a request.
Returns:
Start timestamp for use with record_request_end
"""
self._total_requests += 1
start_time = time.time()
self._request_timestamps.append(start_time)
return start_time
def record_request_success(self, start_time: float) -> None:
"""Record a successful request.
Args:
start_time: Timestamp from record_request_start
"""
self._successful_requests += 1
response_time = (time.time() - start_time) * 1000 # Convert to ms
self._response_times.append(response_time)
self._last_successful_request = datetime.now(timezone.utc)
logger.debug(f"Request completed in {response_time:.2f}ms")
def record_request_failure(
self, start_time: float, error: Exception, context: str = ""
) -> None:
"""Record a failed request.
Args:
start_time: Timestamp from record_request_start
error: The exception that occurred
context: Additional context about where the error occurred
"""
self._failed_requests += 1
response_time = (time.time() - start_time) * 1000
self._response_times.append(response_time)
error_record = ErrorRecord(
timestamp=datetime.now(timezone.utc),
error_type=type(error).__name__,
message=str(error)[:500], # Truncate long messages
context=context,
)
self._errors.append(error_record)
self._last_error = error_record.timestamp
logger.warning(
f"Request failed after {response_time:.2f}ms: "
f"[{error_record.error_type}] {error_record.message[:100]}"
)
# Check if we need to alert about health status change
new_status = self._calculate_health_status()
if new_status == HealthStatus.UNHEALTHY:
self._notify_status_change(new_status)
def record_error(self, error: Exception, context: str = "") -> None:
"""Record an error without request timing.
Args:
error: The exception that occurred
context: Additional context about where the error occurred
"""
error_record = ErrorRecord(
timestamp=datetime.now(timezone.utc),
error_type=type(error).__name__,
message=str(error)[:500],
context=context,
)
self._errors.append(error_record)
self._last_error = error_record.timestamp
logger.error(f"Error recorded: [{error_record.error_type}] {error_record.message}")
def on_status_change(self, callback: Callable[[HealthStatus], None]) -> None:
"""Register a callback for health status changes.
Args:
callback: Function to call when status changes to unhealthy
"""
self._status_callbacks.append(callback)
def _notify_status_change(self, status: HealthStatus) -> None:
"""Notify all registered callbacks of a status change."""
for callback in self._status_callbacks:
try:
callback(status)
except Exception as e:
logger.error(f"Status callback failed: {e}")
def _calculate_health_status(self) -> HealthStatus:
"""Calculate the current health status."""
now = time.time()
window_start = now - (self._error_window_minutes * 60)
# Count recent errors
recent_error_count = sum(1 for e in self._errors if e.timestamp.timestamp() > window_start)
if recent_error_count >= self._error_threshold:
return HealthStatus.UNHEALTHY
elif recent_error_count >= self._error_threshold // 2:
return HealthStatus.DEGRADED
else:
return HealthStatus.HEALTHY
def _calculate_requests_per_minute(self) -> float:
"""Calculate the current requests per minute rate."""
now = time.time()
one_minute_ago = now - 60
recent_requests = sum(1 for t in self._request_timestamps if t > one_minute_ago)
return float(recent_requests)
def get_metrics(self) -> MetricsSnapshot:
"""Get a snapshot of current metrics.
Returns:
MetricsSnapshot with all current metrics
"""
uptime = time.time() - self._start_time
avg_response_time = 0.0
if self._response_times:
avg_response_time = sum(self._response_times) / len(self._response_times)
error_rate = 0.0
if self._total_requests > 0:
error_rate = (self._failed_requests / self._total_requests) * 100
return MetricsSnapshot(
uptime_seconds=uptime,
total_requests=self._total_requests,
successful_requests=self._successful_requests,
failed_requests=self._failed_requests,
avg_response_time_ms=avg_response_time,
requests_per_minute=self._calculate_requests_per_minute(),
error_rate_percent=error_rate,
recent_errors=list(self._errors)[-10:], # Last 10 errors
health_status=self._calculate_health_status(),
last_successful_request=self._last_successful_request,
last_error=self._last_error,
)
def get_status_report(self) -> str:
"""Get a formatted status report.
Returns:
Human-readable status report
"""
metrics = self.get_metrics()
# Format uptime
uptime_hours = metrics.uptime_seconds / 3600
uptime_str = (
f"{uptime_hours:.1f} hours"
if uptime_hours >= 1
else f"{metrics.uptime_seconds / 60:.1f} minutes"
)
report = [
"=== Bot Status Report ===",
f"Status: {metrics.health_status.value.upper()}",
f"Uptime: {uptime_str}",
"",
"--- Request Metrics ---",
f"Total requests: {metrics.total_requests}",
f"Successful: {metrics.successful_requests}",
f"Failed: {metrics.failed_requests}",
f"Error rate: {metrics.error_rate_percent:.1f}%",
f"Avg response time: {metrics.avg_response_time_ms:.0f}ms",
f"Requests/min: {metrics.requests_per_minute:.1f}",
]
if metrics.last_successful_request:
report.append(f"Last success: {metrics.last_successful_request.isoformat()}")
if metrics.recent_errors:
report.append("")
report.append("--- Recent Errors ---")
for error in metrics.recent_errors[-5:]:
report.append(
f"[{error.timestamp.strftime('%H:%M:%S')}] {error.error_type}: {error.message[:50]}"
)
return "\n".join(report)
# Global monitor instance
_monitor: BotMonitor | None = None
def get_monitor() -> BotMonitor:
"""Get the global monitor instance."""
global _monitor
if _monitor is None:
_monitor = BotMonitor()
return _monitor
def reset_monitor() -> None:
"""Reset the global monitor (mainly for testing)."""
global _monitor
_monitor = None