first commit

This commit is contained in:
2026-01-10 21:46:27 +01:00
parent d00593415d
commit 561f1a8fb1
30 changed files with 1932 additions and 1 deletions

View File

@@ -0,0 +1,16 @@
"""Services for external integrations."""
from .ai_service import AIService
from .conversation import ConversationManager
from .providers import AIResponse, Message
from .searxng import SearchResponse, SearchResult, SearXNGService
__all__ = [
"AIService",
"AIResponse",
"Message",
"ConversationManager",
"SearXNGService",
"SearchResponse",
"SearchResult",
]

View File

@@ -0,0 +1,101 @@
"""AI Service - Factory and facade for AI providers."""
import logging
from typing import Literal
from daemon_boyfriend.config import Settings, settings
from .providers import (
AIProvider,
AIResponse,
AnthropicProvider,
Message,
OpenAIProvider,
OpenRouterProvider,
)
logger = logging.getLogger(__name__)
ProviderType = Literal["openai", "openrouter", "anthropic"]
class AIService:
"""Factory and facade for AI providers.
This class manages the creation and switching of AI providers,
and provides a unified interface for generating responses.
"""
def __init__(self, config: Settings | None = None) -> None:
self._config = config or settings
self._provider: AIProvider | None = None
self._init_provider()
def _init_provider(self) -> None:
"""Initialize the AI provider based on configuration."""
self._provider = self._create_provider(
self._config.ai_provider,
self._config.get_api_key(),
self._config.ai_model,
)
def _create_provider(self, provider_type: ProviderType, api_key: str, model: str) -> AIProvider:
"""Create a provider instance."""
providers: dict[ProviderType, type[AIProvider]] = {
"openai": OpenAIProvider,
"openrouter": OpenRouterProvider,
"anthropic": AnthropicProvider,
}
provider_class = providers.get(provider_type)
if not provider_class:
raise ValueError(f"Unknown provider: {provider_type}")
logger.info(f"Initializing {provider_type} provider with model {model}")
return provider_class(api_key=api_key, model=model)
@property
def provider(self) -> AIProvider:
"""Get the current provider."""
if self._provider is None:
raise RuntimeError("AI provider not initialized")
return self._provider
@property
def provider_name(self) -> str:
"""Get the name of the current provider."""
return self.provider.provider_name
@property
def model(self) -> str:
"""Get the current model name."""
return self._config.ai_model
async def chat(
self,
messages: list[Message],
system_prompt: str | None = None,
) -> AIResponse:
"""Generate a chat response.
Args:
messages: List of conversation messages
system_prompt: Optional system prompt
Returns:
AIResponse with the generated content
"""
return await self.provider.generate(
messages=messages,
system_prompt=system_prompt,
max_tokens=self._config.ai_max_tokens,
temperature=self._config.ai_temperature,
)
def get_system_prompt(self) -> str:
"""Get the default system prompt for the bot."""
return (
f"You are {self._config.bot_name}, a {self._config.bot_personality} "
f"Discord bot for the MSC group. Keep your responses concise and engaging. "
f"You can use Discord markdown formatting in your responses."
)

View File

@@ -0,0 +1,81 @@
"""Conversation history management."""
import logging
from collections import defaultdict
from daemon_boyfriend.config import settings
from .providers import Message
logger = logging.getLogger(__name__)
class ConversationManager:
"""Manages conversation history per user.
Stores conversation messages in memory with a configurable
maximum history length per user.
"""
def __init__(self, max_history: int | None = None) -> None:
self.max_history = max_history or settings.max_conversation_history
self._conversations: dict[int, list[Message]] = defaultdict(list)
def get_history(self, user_id: int) -> list[Message]:
"""Get the conversation history for a user.
Args:
user_id: Discord user ID
Returns:
List of messages in the conversation
"""
return list(self._conversations[user_id])
def add_message(self, user_id: int, message: Message) -> None:
"""Add a message to a user's conversation history.
Args:
user_id: Discord user ID
message: The message to add
"""
history = self._conversations[user_id]
history.append(message)
# Trim history if it exceeds max length
if len(history) > self.max_history:
# Keep only the most recent messages
self._conversations[user_id] = history[-self.max_history :]
logger.debug(f"Trimmed conversation history for user {user_id}")
def add_exchange(self, user_id: int, user_message: str, assistant_message: str) -> None:
"""Add a user/assistant exchange to the conversation.
Args:
user_id: Discord user ID
user_message: The user's message
assistant_message: The assistant's response
"""
self.add_message(user_id, Message(role="user", content=user_message))
self.add_message(user_id, Message(role="assistant", content=assistant_message))
def clear_history(self, user_id: int) -> None:
"""Clear the conversation history for a user.
Args:
user_id: Discord user ID
"""
if user_id in self._conversations:
del self._conversations[user_id]
logger.debug(f"Cleared conversation history for user {user_id}")
def get_history_length(self, user_id: int) -> int:
"""Get the number of messages in a user's history.
Args:
user_id: Discord user ID
Returns:
Number of messages in history
"""
return len(self._conversations[user_id])

View File

@@ -0,0 +1,15 @@
"""AI Provider implementations."""
from .anthropic import AnthropicProvider
from .base import AIProvider, AIResponse, Message
from .openai import OpenAIProvider
from .openrouter import OpenRouterProvider
__all__ = [
"AIProvider",
"AIResponse",
"Message",
"OpenAIProvider",
"OpenRouterProvider",
"AnthropicProvider",
]

View File

@@ -0,0 +1,59 @@
"""Anthropic (Claude) provider implementation."""
import logging
import anthropic
from .base import AIProvider, AIResponse, Message
logger = logging.getLogger(__name__)
class AnthropicProvider(AIProvider):
"""Anthropic Claude API provider."""
def __init__(self, api_key: str, model: str = "claude-sonnet-4-20250514") -> None:
self.client = anthropic.AsyncAnthropic(api_key=api_key)
self.model = model
@property
def provider_name(self) -> str:
return "anthropic"
async def generate(
self,
messages: list[Message],
system_prompt: str | None = None,
max_tokens: int = 1024,
temperature: float = 0.7,
) -> AIResponse:
"""Generate a response using Claude."""
# Build messages list (Anthropic format)
api_messages = [{"role": m.role, "content": m.content} for m in messages]
logger.debug(f"Sending {len(api_messages)} messages to Anthropic")
response = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt or "",
messages=api_messages, # type: ignore[arg-type]
)
# Extract text from response
content = ""
for block in response.content:
if block.type == "text":
content += block.text
usage = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
return AIResponse(
content=content,
model=response.model,
usage=usage,
)

View File

@@ -0,0 +1,52 @@
"""Abstract base class for AI providers."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class Message:
"""A chat message."""
role: str # "user", "assistant", "system"
content: str
@dataclass
class AIResponse:
"""Response from an AI provider."""
content: str
model: str
usage: dict[str, int] # Token usage info
class AIProvider(ABC):
"""Abstract base class for AI providers."""
@abstractmethod
async def generate(
self,
messages: list[Message],
system_prompt: str | None = None,
max_tokens: int = 1024,
temperature: float = 0.7,
) -> AIResponse:
"""Generate a response from the AI model.
Args:
messages: List of conversation messages
system_prompt: Optional system prompt
max_tokens: Maximum tokens in response
temperature: Sampling temperature
Returns:
AIResponse with the generated content
"""
pass
@property
@abstractmethod
def provider_name(self) -> str:
"""Return the name of this provider."""
pass

View File

@@ -0,0 +1,61 @@
"""OpenAI provider implementation."""
import logging
from openai import AsyncOpenAI
from .base import AIProvider, AIResponse, Message
logger = logging.getLogger(__name__)
class OpenAIProvider(AIProvider):
"""OpenAI API provider."""
def __init__(self, api_key: str, model: str = "gpt-4o") -> None:
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
@property
def provider_name(self) -> str:
return "openai"
async def generate(
self,
messages: list[Message],
system_prompt: str | None = None,
max_tokens: int = 1024,
temperature: float = 0.7,
) -> AIResponse:
"""Generate a response using OpenAI."""
# Build messages list
api_messages: list[dict[str, str]] = []
if system_prompt:
api_messages.append({"role": "system", "content": system_prompt})
api_messages.extend([{"role": m.role, "content": m.content} for m in messages])
logger.debug(f"Sending {len(api_messages)} messages to OpenAI")
response = await self.client.chat.completions.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
messages=api_messages, # type: ignore[arg-type]
)
content = response.choices[0].message.content or ""
usage = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
return AIResponse(
content=content,
model=response.model,
usage=usage,
)

View File

@@ -0,0 +1,77 @@
"""OpenRouter provider implementation.
OpenRouter uses an OpenAI-compatible API, so we extend the OpenAI provider.
"""
import logging
from openai import AsyncOpenAI
from .base import AIProvider, AIResponse, Message
logger = logging.getLogger(__name__)
# OpenRouter API base URL
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
class OpenRouterProvider(AIProvider):
"""OpenRouter API provider.
OpenRouter provides access to 100+ models through an OpenAI-compatible API.
"""
def __init__(self, api_key: str, model: str = "openai/gpt-4o") -> None:
self.client = AsyncOpenAI(
api_key=api_key,
base_url=OPENROUTER_BASE_URL,
)
self.model = model
@property
def provider_name(self) -> str:
return "openrouter"
async def generate(
self,
messages: list[Message],
system_prompt: str | None = None,
max_tokens: int = 1024,
temperature: float = 0.7,
) -> AIResponse:
"""Generate a response using OpenRouter."""
# Build messages list
api_messages: list[dict[str, str]] = []
if system_prompt:
api_messages.append({"role": "system", "content": system_prompt})
api_messages.extend([{"role": m.role, "content": m.content} for m in messages])
logger.debug(f"Sending {len(api_messages)} messages to OpenRouter ({self.model})")
response = await self.client.chat.completions.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
messages=api_messages, # type: ignore[arg-type]
extra_headers={
"HTTP-Referer": "https://github.com/daemon-boyfriend",
"X-Title": "Daemon Boyfriend",
},
)
content = response.choices[0].message.content or ""
usage = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
return AIResponse(
content=content,
model=response.model,
usage=usage,
)

View File

@@ -0,0 +1,119 @@
"""SearXNG search service."""
import logging
from dataclasses import dataclass
from typing import Literal
import aiohttp
from daemon_boyfriend.config import settings
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""A single search result."""
title: str
url: str
content: str # snippet/description
engine: str
@dataclass
class SearchResponse:
"""Response from a SearXNG search."""
query: str
results: list[SearchResult]
suggestions: list[str]
number_of_results: int
class SearXNGService:
"""SearXNG search service client."""
def __init__(
self,
base_url: str | None = None,
timeout: int | None = None,
) -> None:
self.base_url = (base_url or settings.searxng_base_url).rstrip("/")
self.timeout = aiohttp.ClientTimeout(total=timeout or settings.searxng_timeout)
async def search(
self,
query: str,
categories: list[str] | None = None,
language: str = "en",
safesearch: Literal[0, 1, 2] = 1, # 0=off, 1=moderate, 2=strict
num_results: int = 5,
) -> SearchResponse:
"""Search using SearXNG.
Args:
query: Search query
categories: Search categories (general, images, videos, news, it, science)
language: Language code (e.g., "en", "nl")
safesearch: Safe search level (0=off, 1=moderate, 2=strict)
num_results: Maximum number of results to return
Returns:
SearchResponse with results
Raises:
aiohttp.ClientError: On network errors
"""
params: dict[str, str | int] = {
"q": query,
"format": "json",
"language": language,
"safesearch": safesearch,
}
if categories:
params["categories"] = ",".join(categories)
logger.debug(f"Searching SearXNG: {query}")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(f"{self.base_url}/search", params=params) as response:
response.raise_for_status()
data = await response.json()
# Parse results
results = [
SearchResult(
title=r.get("title", "No title"),
url=r.get("url", ""),
content=r.get("content", "No description"),
engine=r.get("engine", "unknown"),
)
for r in data.get("results", [])[:num_results]
]
return SearchResponse(
query=query,
results=results,
suggestions=data.get("suggestions", []),
number_of_results=data.get("number_of_results", len(results)),
)
async def health_check(self) -> bool:
"""Check if the SearXNG instance is reachable.
Returns:
True if healthy, False otherwise
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# Try the search endpoint with a simple query
async with session.get(
f"{self.base_url}/search",
params={"q": "test", "format": "json"},
) as response:
return response.status == 200
except Exception as e:
logger.error(f"SearXNG health check failed: {e}")
return False