feat: Complete minimal bot refactor - AI providers, models, docs, and migration

Changes:
- Strip AI providers to image-only analysis (remove text/phishing methods)
- Simplify guild models (remove BannedWord, reduce GuildSettings columns)
- Create migration to drop unused tables and columns
- Rewrite README for minimal bot focus
- Update CLAUDE.md architecture documentation

Result: -992 lines, +158 lines (net -834 lines)
Cost-conscious bot ready for deployment.
This commit is contained in:
2026-01-27 19:25:57 +01:00
parent d972f6f51c
commit b4f29a9d5e
7 changed files with 366 additions and 986 deletions

View File

@@ -1,17 +1,10 @@
"""Guild-related database models."""
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import JSON, Boolean, Float, ForeignKey, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import Boolean, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from guardden.models.base import Base, SnowflakeID, TimestampMixin
if TYPE_CHECKING:
from guardden.models.moderation import ModerationLog, Strike
class Guild(Base, TimestampMixin):
"""Represents a Discord guild (server) configuration."""
@@ -27,15 +20,6 @@ class Guild(Base, TimestampMixin):
settings: Mapped["GuildSettings"] = relationship(
back_populates="guild", uselist=False, cascade="all, delete-orphan"
)
banned_words: Mapped[list["BannedWord"]] = relationship(
back_populates="guild", cascade="all, delete-orphan"
)
moderation_logs: Mapped[list["ModerationLog"]] = relationship(
back_populates="guild", cascade="all, delete-orphan"
)
strikes: Mapped[list["Strike"]] = relationship(
back_populates="guild", cascade="all, delete-orphan"
)
class GuildSettings(Base, TimestampMixin):
@@ -51,94 +35,21 @@ class GuildSettings(Base, TimestampMixin):
prefix: Mapped[str] = mapped_column(String(10), default="!", nullable=False)
locale: Mapped[str] = mapped_column(String(10), default="en", nullable=False)
# Channel configuration (stored as snowflake IDs)
log_channel_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
mod_log_channel_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
welcome_channel_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
# Role configuration
mute_role_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
verified_role_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
mod_role_ids: Mapped[dict] = mapped_column(
JSONB().with_variant(JSON(), "sqlite"), default=list, nullable=False
)
# Moderation settings
# Spam detection settings
automod_enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
anti_spam_enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
link_filter_enabled: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Automod thresholds
message_rate_limit: Mapped[int] = mapped_column(Integer, default=5, nullable=False)
message_rate_window: Mapped[int] = mapped_column(Integer, default=5, nullable=False)
duplicate_threshold: Mapped[int] = mapped_column(Integer, default=3, nullable=False)
mention_limit: Mapped[int] = mapped_column(Integer, default=5, nullable=False)
mention_rate_limit: Mapped[int] = mapped_column(Integer, default=10, nullable=False)
mention_rate_window: Mapped[int] = mapped_column(Integer, default=60, nullable=False)
scam_allowlist: Mapped[list[str]] = mapped_column(
JSONB().with_variant(JSON(), "sqlite"), default=list, nullable=False
)
# Strike thresholds (actions at each threshold)
strike_actions: Mapped[dict] = mapped_column(
JSONB().with_variant(JSON(), "sqlite"),
default=lambda: {
"1": {"action": "warn"},
"3": {"action": "timeout", "duration": 300},
"5": {"action": "kick"},
"7": {"action": "ban"},
},
nullable=False,
)
# AI moderation settings
ai_moderation_enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
ai_sensitivity: Mapped[int] = mapped_column(Integer, default=80, nullable=False) # 0-100 scale
ai_confidence_threshold: Mapped[float] = mapped_column(Float, default=0.7, nullable=False)
ai_log_only: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
ai_sensitivity: Mapped[int] = mapped_column(Integer, default=80, nullable=False)
nsfw_detection_enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
nsfw_only_filtering: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Notification settings
send_in_channel_warnings: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Whitelist settings
whitelisted_user_ids: Mapped[list[int]] = mapped_column(
JSONB().with_variant(JSON(), "sqlite"), default=list, nullable=False
)
# Verification settings
verification_enabled: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
verification_type: Mapped[str] = mapped_column(
String(20), default="button", nullable=False
) # button, captcha, questions
# Relationship
guild: Mapped["Guild"] = relationship(back_populates="settings")
class BannedWord(Base, TimestampMixin):
"""Banned words/phrases for a guild with regex support."""
__tablename__ = "banned_words"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(
SnowflakeID, ForeignKey("guilds.id", ondelete="CASCADE"), nullable=False
)
pattern: Mapped[str] = mapped_column(Text, nullable=False)
is_regex: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
action: Mapped[str] = mapped_column(
String(20), default="delete", nullable=False
) # delete, warn, strike
reason: Mapped[str | None] = mapped_column(Text, nullable=True)
source: Mapped[str | None] = mapped_column(String(100), nullable=True)
category: Mapped[str | None] = mapped_column(String(20), nullable=True)
managed: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Who added this and when
added_by: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
# Relationship
guild: Mapped["Guild"] = relationship(back_populates="banned_words")

View File

@@ -3,41 +3,10 @@
import logging
from typing import Any
from guardden.services.ai.base import (
AIProvider,
ImageAnalysisResult,
ModerationResult,
PhishingAnalysisResult,
parse_categories,
run_with_retries,
)
from guardden.services.ai.base import AIProvider, ImageAnalysisResult, run_with_retries
logger = logging.getLogger(__name__)
# Content moderation system prompt
MODERATION_SYSTEM_PROMPT = """You are a content moderation AI for a Discord server. Analyze the given message and determine if it violates community guidelines.
Categories to check:
- harassment: Personal attacks, bullying, intimidation
- hate_speech: Discrimination, slurs, dehumanization based on identity
- sexual: Explicit sexual content, sexual solicitation
- violence: Threats, graphic violence, encouraging harm
- self_harm: Suicide, self-injury content or encouragement
- spam: Repetitive, promotional, or low-quality content
- scam: Phishing attempts, fraudulent offers, impersonation
- misinformation: Dangerous false information
Respond in this exact JSON format:
{
"is_flagged": true/false,
"confidence": 0.0-1.0,
"categories": ["category1", "category2"],
"explanation": "Brief explanation",
"suggested_action": "none/warn/delete/timeout/ban"
}
Be balanced - flag genuinely problematic content but allow normal conversation, jokes, and mild language. Consider context."""
IMAGE_ANALYSIS_PROMPT = """Analyze this image for content moderation. Classify NSFW content with precision.
NSFW CATEGORIES (in order of severity):
@@ -76,24 +45,6 @@ NSFW SEVERITY GUIDELINES:
- nudity: 60-75
- explicit: 80-100"""
PHISHING_ANALYSIS_PROMPT = """Analyze this URL and message context for phishing or scam indicators.
Check for:
- Domain impersonation (typosquatting, lookalike domains)
- Urgency tactics ("act now", "limited time")
- Requests for credentials or personal info
- Too-good-to-be-true offers
- Suspicious redirects or URL shorteners
- Mismatched or hidden URLs
Respond in this exact JSON format:
{
"is_phishing": true/false,
"confidence": 0.0-1.0,
"risk_factors": ["factor1", "factor2"],
"explanation": "Brief explanation"
}"""
class AnthropicProvider(AIProvider):
"""AI provider using Anthropic's Claude API."""
@@ -150,47 +101,6 @@ class AnthropicProvider(AIProvider):
return json.loads(text)
async def moderate_text(
self,
content: str,
context: str | None = None,
sensitivity: int = 50,
) -> ModerationResult:
"""Analyze text content for policy violations."""
# Adjust prompt based on sensitivity
sensitivity_note = ""
if sensitivity < 30:
sensitivity_note = "\n\nBe lenient - only flag clearly problematic content."
elif sensitivity > 70:
sensitivity_note = "\n\nBe strict - flag anything potentially problematic."
system = MODERATION_SYSTEM_PROMPT + sensitivity_note
user_message = f"Message to analyze:\n{content}"
if context:
user_message = f"Context: {context}\n\n{user_message}"
try:
response = await self._call_api(system, user_message)
data = self._parse_json_response(response)
categories = parse_categories(data.get("categories", []))
return ModerationResult(
is_flagged=data.get("is_flagged", False),
confidence=float(data.get("confidence", 0.0)),
categories=categories,
explanation=data.get("explanation", ""),
suggested_action=data.get("suggested_action", "none"),
)
except Exception as e:
logger.error(f"Error moderating text: {e}")
return ModerationResult(
is_flagged=False,
explanation=f"Error analyzing content: {str(e)}",
)
async def analyze_image(
self,
image_url: str,
@@ -276,31 +186,6 @@ SENSITIVITY: BALANCED
logger.error(f"Error analyzing image: {e}")
return ImageAnalysisResult(description=f"Error analyzing image: {str(e)}")
async def analyze_phishing(
self,
url: str,
message_content: str | None = None,
) -> PhishingAnalysisResult:
"""Analyze a URL for phishing/scam indicators."""
user_message = f"URL to analyze: {url}"
if message_content:
user_message += f"\n\nFull message context:\n{message_content}"
try:
response = await self._call_api(PHISHING_ANALYSIS_PROMPT, user_message)
data = self._parse_json_response(response)
return PhishingAnalysisResult(
is_phishing=data.get("is_phishing", False),
confidence=float(data.get("confidence", 0.0)),
risk_factors=data.get("risk_factors", []),
explanation=data.get("explanation", ""),
)
except Exception as e:
logger.error(f"Error analyzing phishing: {e}")
return PhishingAnalysisResult(explanation=f"Error analyzing URL: {str(e)}")
async def close(self) -> None:
"""Clean up resources."""
await self.client.close()

View File

@@ -91,53 +91,6 @@ async def run_with_retries(
raise RuntimeError("Retry loop exited unexpectedly")
@dataclass
class ModerationResult:
"""Result of AI content moderation."""
is_flagged: bool = False
confidence: float = 0.0 # 0.0 to 1.0
categories: list[ContentCategory] = field(default_factory=list)
explanation: str = ""
suggested_action: Literal["none", "warn", "delete", "timeout", "ban"] = "none"
severity_override: int | None = None # Direct severity for NSFW images
@property
def severity(self) -> int:
"""Get severity score 0-100 based on confidence and categories."""
if not self.is_flagged:
return 0
# Use override if provided (e.g., from NSFW image analysis)
if self.severity_override is not None:
return min(self.severity_override, 100)
# Base severity from confidence
severity = int(self.confidence * 50)
# Add severity based on category
high_severity = {
ContentCategory.HATE_SPEECH,
ContentCategory.SELF_HARM,
ContentCategory.SCAM,
}
medium_severity = {
ContentCategory.HARASSMENT,
ContentCategory.VIOLENCE,
ContentCategory.SEXUAL,
}
for cat in self.categories:
if cat in high_severity:
severity += 30
elif cat in medium_severity:
severity += 20
else:
severity += 10
return min(severity, 100)
@dataclass
class ImageAnalysisResult:
"""Result of AI image analysis."""
@@ -152,38 +105,8 @@ class ImageAnalysisResult:
nsfw_severity: int = 0 # 0-100 specific NSFW severity score
@dataclass
class PhishingAnalysisResult:
"""Result of AI phishing/scam analysis."""
is_phishing: bool = False
confidence: float = 0.0
risk_factors: list[str] = field(default_factory=list)
explanation: str = ""
class AIProvider(ABC):
"""Abstract base class for AI providers."""
@abstractmethod
async def moderate_text(
self,
content: str,
context: str | None = None,
sensitivity: int = 50,
) -> ModerationResult:
"""
Analyze text content for policy violations.
Args:
content: The text to analyze
context: Optional context about the conversation/server
sensitivity: 0-100, higher means more strict
Returns:
ModerationResult with analysis
"""
pass
"""Abstract base class for AI providers - Image analysis only."""
@abstractmethod
async def analyze_image(
@@ -203,24 +126,6 @@ class AIProvider(ABC):
"""
pass
@abstractmethod
async def analyze_phishing(
self,
url: str,
message_content: str | None = None,
) -> PhishingAnalysisResult:
"""
Analyze a URL for phishing/scam indicators.
Args:
url: The URL to analyze
message_content: Optional full message for context
Returns:
PhishingAnalysisResult with analysis
"""
pass
@abstractmethod
async def close(self) -> None:
"""Clean up resources."""

View File

@@ -3,14 +3,7 @@
import logging
from typing import Any
from guardden.services.ai.base import (
AIProvider,
ContentCategory,
ImageAnalysisResult,
ModerationResult,
PhishingAnalysisResult,
run_with_retries,
)
from guardden.services.ai.base import AIProvider, ImageAnalysisResult, run_with_retries
logger = logging.getLogger(__name__)
@@ -35,107 +28,12 @@ class OpenAIProvider(AIProvider):
self.model = model
logger.info(f"Initialized OpenAI provider with model: {model}")
async def _call_api(
self,
system: str,
user_content: Any,
max_tokens: int = 500,
) -> str:
"""Make an API call to OpenAI."""
async def _request() -> str:
response = await self.client.chat.completions.create(
model=self.model,
max_tokens=max_tokens,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_content},
],
response_format={"type": "json_object"},
)
return response.choices[0].message.content or ""
try:
return await run_with_retries(
_request,
logger=logger,
operation_name="OpenAI chat completion",
)
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise
def _parse_json_response(self, response: str) -> dict:
"""Parse JSON from response."""
import json
return json.loads(response)
async def moderate_text(
self,
content: str,
context: str | None = None,
sensitivity: int = 50,
) -> ModerationResult:
"""Analyze text content for policy violations."""
# First, use OpenAI's built-in moderation API for quick check
try:
async def _moderate() -> Any:
return await self.client.moderations.create(input=content)
mod_response = await run_with_retries(
_moderate,
logger=logger,
operation_name="OpenAI moderation",
)
results = mod_response.results[0]
# Map OpenAI categories to our categories
category_mapping = {
"harassment": ContentCategory.HARASSMENT,
"harassment/threatening": ContentCategory.HARASSMENT,
"hate": ContentCategory.HATE_SPEECH,
"hate/threatening": ContentCategory.HATE_SPEECH,
"self-harm": ContentCategory.SELF_HARM,
"self-harm/intent": ContentCategory.SELF_HARM,
"self-harm/instructions": ContentCategory.SELF_HARM,
"sexual": ContentCategory.SEXUAL,
"sexual/minors": ContentCategory.SEXUAL,
"violence": ContentCategory.VIOLENCE,
"violence/graphic": ContentCategory.VIOLENCE,
}
flagged_categories = []
max_score = 0.0
for category, score in results.category_scores.model_dump().items():
if score > 0.5: # Threshold
if category in category_mapping:
flagged_categories.append(category_mapping[category])
max_score = max(max_score, score)
# Adjust threshold based on sensitivity
threshold = 0.3 + (0.4 * (100 - sensitivity) / 100) # 0.3 to 0.7
if results.flagged or max_score > threshold:
return ModerationResult(
is_flagged=True,
confidence=max_score,
categories=list(set(flagged_categories)),
explanation="Content flagged by moderation API",
suggested_action="delete" if max_score > 0.8 else "warn",
)
return ModerationResult(is_flagged=False, confidence=1.0 - max_score)
except Exception as e:
logger.error(f"Error moderating text: {e}")
return ModerationResult(
is_flagged=False,
explanation=f"Error analyzing content: {str(e)}",
)
async def analyze_image(
self,
image_url: str,
@@ -223,41 +121,6 @@ NSFW SEVERITY GUIDELINES: none=0, suggestive=20-35, partial_nudity=40-55, nudity
logger.error(f"Error analyzing image: {e}")
return ImageAnalysisResult(description=f"Error analyzing image: {str(e)}")
async def analyze_phishing(
self,
url: str,
message_content: str | None = None,
) -> PhishingAnalysisResult:
"""Analyze a URL for phishing/scam indicators."""
system = """Analyze the URL for phishing/scam indicators. Respond in JSON:
{
"is_phishing": true/false,
"confidence": 0.0-1.0,
"risk_factors": ["factor1"],
"explanation": "Brief explanation"
}
Check for: domain impersonation, urgency tactics, credential requests, too-good-to-be-true offers."""
user_message = f"URL: {url}"
if message_content:
user_message += f"\n\nMessage context: {message_content}"
try:
response = await self._call_api(system, user_message)
data = self._parse_json_response(response)
return PhishingAnalysisResult(
is_phishing=data.get("is_phishing", False),
confidence=float(data.get("confidence", 0.0)),
risk_factors=data.get("risk_factors", []),
explanation=data.get("explanation", ""),
)
except Exception as e:
logger.error(f"Error analyzing phishing: {e}")
return PhishingAnalysisResult(explanation=f"Error analyzing URL: {str(e)}")
async def close(self) -> None:
"""Clean up resources."""
await self.client.close()