"""AI Chat cog - handles mention responses.""" import logging import re import discord from discord.ext import commands from daemon_boyfriend.config import settings from daemon_boyfriend.services import ( AIService, ConversationManager, ImageAttachment, Message, SearXNGService, ) logger = logging.getLogger(__name__) # Discord message character limit MAX_MESSAGE_LENGTH = 2000 def split_message(content: str, max_length: int = MAX_MESSAGE_LENGTH) -> list[str]: """Split a long message into chunks that fit Discord's limit. Tries to split on paragraph breaks, then sentence breaks, then word breaks. """ if len(content) <= max_length: return [content] chunks: list[str] = [] remaining = content while remaining: if len(remaining) <= max_length: chunks.append(remaining) break # Find a good split point split_point = max_length # Try to split on paragraph break para_break = remaining.rfind("\n\n", 0, max_length) if para_break > max_length // 2: split_point = para_break + 2 else: # Try to split on line break line_break = remaining.rfind("\n", 0, max_length) if line_break > max_length // 2: split_point = line_break + 1 else: # Try to split on sentence sentence_end = max( remaining.rfind(". ", 0, max_length), remaining.rfind("! ", 0, max_length), remaining.rfind("? ", 0, max_length), ) if sentence_end > max_length // 2: split_point = sentence_end + 2 else: # Fall back to word break word_break = remaining.rfind(" ", 0, max_length) if word_break > 0: split_point = word_break + 1 chunks.append(remaining[:split_point].rstrip()) remaining = remaining[split_point:].lstrip() return chunks class AIChatCog(commands.Cog): """AI conversation via mentions.""" def __init__(self, bot: commands.Bot) -> None: self.bot = bot self.ai_service = AIService() self.conversations = ConversationManager() self.search_service: SearXNGService | None = None if settings.searxng_enabled and settings.searxng_url: self.search_service = SearXNGService(settings.searxng_url) @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: """Respond when the bot is mentioned.""" # Ignore messages from bots if message.author.bot: return # Check if bot is mentioned if self.bot.user is None or self.bot.user not in message.mentions: return # Extract message content without the mention content = self._extract_message_content(message) if not content: # Just a mention with no message - use configured description await message.reply(f"Hey {message.author.display_name}! {settings.bot_description}") return # Show typing indicator while generating response async with message.channel.typing(): try: response_text = await self._generate_response(message, content) # Extract image URLs and clean response text text_content, image_urls = self._extract_image_urls(response_text) # Split and send response chunks = split_message(text_content) if text_content.strip() else [] # Send first chunk as reply (or just images if no text) if chunks: first_embed = self._create_image_embed(image_urls[0]) if image_urls else None await message.reply(chunks[0], embed=first_embed) remaining_images = image_urls[1:] if image_urls else [] elif image_urls: # Only images, no text await message.reply(embed=self._create_image_embed(image_urls[0])) remaining_images = image_urls[1:] else: await message.reply("I don't have a response for that.") return # Send remaining text chunks for chunk in chunks[1:]: await message.channel.send(chunk) # Send remaining images as separate embeds for img_url in remaining_images: await message.channel.send(embed=self._create_image_embed(img_url)) except Exception as e: logger.error(f"Mention response error: {e}", exc_info=True) error_message = self._get_error_message(e) await message.reply(error_message) def _extract_image_urls(self, text: str) -> tuple[str, list[str]]: """Extract image URLs from text and return cleaned text with URLs. Args: text: The response text that may contain image URLs Returns: Tuple of (cleaned text, list of image URLs) """ # Pattern to match image URLs (common formats) image_extensions = r"\.(png|jpg|jpeg|gif|webp|bmp)" url_pattern = rf"(https?://[^\s<>\"\')]+{image_extensions}(?:\?[^\s<>\"\')]*)?)" # Find all image URLs image_urls = re.findall(url_pattern, text, re.IGNORECASE) # The findall returns tuples when there are groups, extract full URLs image_urls = re.findall( rf"https?://[^\s<>\"\')]+{image_extensions}(?:\?[^\s<>\"\')]*)?", text, re.IGNORECASE, ) # Also check for markdown image syntax ![alt](url) markdown_images = re.findall(r"!\[[^\]]*\]\(([^)]+)\)", text) for url in markdown_images: if url not in image_urls: # Check if it looks like an image URL if re.search(image_extensions, url, re.IGNORECASE) or "image" in url.lower(): image_urls.append(url) # Clean the text by removing standalone image URLs (but keep them if part of markdown links) cleaned_text = text for url in image_urls: # Remove standalone URLs (not part of markdown) cleaned_text = re.sub( rf"(? discord.Embed: """Create a Discord embed with an image. Args: image_url: The URL of the image Returns: Discord Embed object with the image """ embed = discord.Embed() embed.set_image(url=image_url) return embed def _get_error_message(self, error: Exception) -> str: """Get a user-friendly error message based on the exception type. Args: error: The exception that occurred Returns: A user-friendly error message """ error_str = str(error).lower() # Check for credit/quota/billing errors credit_keywords = [ "insufficient_quota", "insufficient credits", "quota exceeded", "rate limit", "billing", "payment required", "credit", "exceeded your current quota", "out of credits", "no credits", "balance", "insufficient funds", ] if any(keyword in error_str for keyword in credit_keywords): return "I'm currently out of API credits. Please try again later or contact the bot administrator." # Check for authentication errors auth_keywords = ["invalid api key", "unauthorized", "authentication", "invalid_api_key"] if any(keyword in error_str for keyword in auth_keywords): return ( "There's an issue with my API configuration. Please contact the bot administrator." ) # Check for model errors if "model" in error_str and ("not found" in error_str or "does not exist" in error_str): return "The configured AI model is not available. Please contact the bot administrator." # Check for content policy violations if "content policy" in error_str or "safety" in error_str or "blocked" in error_str: return "I can't respond to that request due to content policy restrictions." # Default error message return "Sorry, I encountered an error. Please try again." def _extract_message_content(self, message: discord.Message) -> str: """Extract the actual message content, removing bot mentions.""" content = message.content # Remove all mentions of the bot if self.bot.user: # Remove <@BOT_ID> and <@!BOT_ID> patterns content = re.sub( rf"<@!?{self.bot.user.id}>", "", content, ) return content.strip() def _extract_image_attachments(self, message: discord.Message) -> list[ImageAttachment]: """Extract image attachments from a Discord message. Args: message: The Discord message Returns: List of ImageAttachment objects """ images = [] # Supported image types image_types = { "image/png": "image/png", "image/jpeg": "image/jpeg", "image/jpg": "image/jpeg", "image/gif": "image/gif", "image/webp": "image/webp", } # Check message attachments for attachment in message.attachments: content_type = attachment.content_type or "" if content_type in image_types: images.append( ImageAttachment( url=attachment.url, media_type=image_types[content_type], ) ) # Also check by file extension if content_type not set elif attachment.filename: ext = attachment.filename.lower().split(".")[-1] if ext in ("png", "jpg", "jpeg", "gif", "webp"): media_type = f"image/{ext}" if ext != "jpg" else "image/jpeg" images.append( ImageAttachment( url=attachment.url, media_type=media_type, ) ) # Check embeds for images for embed in message.embeds: if embed.image and embed.image.url: # Guess media type from URL url = embed.image.url.lower() media_type = "image/png" # default if ".jpg" in url or ".jpeg" in url: media_type = "image/jpeg" elif ".gif" in url: media_type = "image/gif" elif ".webp" in url: media_type = "image/webp" images.append(ImageAttachment(url=embed.image.url, media_type=media_type)) logger.debug(f"Extracted {len(images)} images from message") return images def _get_mentioned_users_context(self, message: discord.Message) -> str | None: """Get context about mentioned users (excluding the bot). Args: message: The Discord message Returns: Formatted string with user info, or None if no other users mentioned """ # Filter out the bot from mentions other_mentions = [ m for m in message.mentions if self.bot.user is None or m.id != self.bot.user.id ] if not other_mentions: return None user_info = [] for user in other_mentions: # Get member info if available (for nickname, roles, etc.) member = message.guild.get_member(user.id) if message.guild else None if member: info = f"- {member.display_name} (username: {member.name})" if member.nick and member.nick != member.name: info += f" [nickname: {member.nick}]" # Add top role if not @everyone if len(member.roles) > 1: top_role = member.roles[-1] # Highest role if top_role.name != "@everyone": info += f" [role: {top_role.name}]" else: info = f"- {user.display_name} (username: {user.name})" user_info.append(info) return "Mentioned users:\n" + "\n".join(user_info) async def _generate_response(self, message: discord.Message, user_message: str) -> str: """Generate an AI response for a user message. Args: message: The Discord message object user_message: The user's message content Returns: The AI's response text """ user_id = message.author.id # Get conversation history history = self.conversations.get_history(user_id) # Extract any image attachments from the message images = self._extract_image_attachments(message) # Add current message to history for the API call (with images if any) current_message = Message(role="user", content=user_message, images=images) messages = history + [current_message] # Check if we should search the web search_context = await self._maybe_search(user_message) # Get context about mentioned users mentioned_users_context = self._get_mentioned_users_context(message) # Build system prompt with additional context system_prompt = self.ai_service.get_system_prompt() # Add info about the user talking to the bot author_info = f"\n\nYou are talking to: {message.author.display_name} (username: {message.author.name})" if isinstance(message.author, discord.Member) and message.author.nick: author_info += f" [nickname: {message.author.nick}]" system_prompt += author_info # Add mentioned users context if mentioned_users_context: system_prompt += f"\n\n--- {mentioned_users_context} ---" # Add search results if available if search_context: system_prompt += ( "\n\n--- Web Search Results ---\n" "Use the following current information from the web to help answer the user's question. " "Cite sources when relevant.\n\n" f"{search_context}" ) # Generate response response = await self.ai_service.chat( messages=messages, system_prompt=system_prompt, ) # Save the exchange to history self.conversations.add_exchange(user_id, user_message, response.content) logger.debug( f"Generated response for user {user_id}: " f"{len(response.content)} chars, {response.usage}" ) return response.content async def _maybe_search(self, query: str) -> str | None: """Determine if a search is needed and perform it. Args: query: The user's message Returns: Formatted search results or None if search not needed/available """ if not self.search_service: return None # Ask the AI if this query needs current information decision_prompt = ( "You are a search decision assistant. Your ONLY job is to decide if the user's " "question requires current/real-time information from the internet.\n\n" "Respond with ONLY 'SEARCH: ' if a web search would help answer the question " "(replace with optimal search terms), or 'NO_SEARCH' if the question can be " "answered with general knowledge.\n\n" "Examples that NEED search:\n" "- Current events, news, recent happenings\n" "- Current weather, stock prices, sports scores\n" "- Latest version of software, current documentation\n" "- Information about specific people, companies, or products that may have changed\n" "- 'What time is it in Tokyo?' or any real-time data\n\n" "Examples that DON'T need search:\n" "- General knowledge, science, math, history\n" "- Coding help, programming concepts\n" "- Personal advice, opinions, creative writing\n" "- Explanations of concepts or 'how does X work'" ) try: decision = await self.ai_service.chat( messages=[Message(role="user", content=query)], system_prompt=decision_prompt, ) response_text = decision.content.strip() if response_text.startswith("SEARCH:"): search_query = response_text[7:].strip() logger.info(f"AI decided to search for: {search_query}") results = await self.search_service.search( query=search_query, max_results=settings.searxng_max_results, ) if results: return self.search_service.format_results_for_context(results) return None except Exception as e: logger.warning(f"Search decision/execution failed: {e}") return None async def setup(bot: commands.Bot) -> None: """Load the AI Chat cog.""" await bot.add_cog(AIChatCog(bot))