Implement GuardDen Discord moderation bot
Features: - Core moderation: warn, kick, ban, timeout, strike system - Automod: banned words filter, scam detection, anti-spam, link filtering - AI moderation: Claude/OpenAI integration, NSFW detection, phishing analysis - Verification system: button, captcha, math, emoji challenges - Rate limiting system with configurable scopes - Event logging: joins, leaves, message edits/deletes, voice activity - Per-guild configuration with caching - Docker deployment support Bug fixes applied: - Fixed await on session.delete() in guild_config.py - Fixed memory leak in AI moderation message tracking (use deque) - Added error handling to bot shutdown - Added error handling to timeout command - Removed unused Literal import - Added prefix validation - Added image analysis limit (3 per message) - Fixed test mock for SQLAlchemy model
This commit is contained in:
300
src/guardden/services/verification.py
Normal file
300
src/guardden/services/verification.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""Verification service for new member challenges."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import discord
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChallengeType(str, Enum):
|
||||
"""Types of verification challenges."""
|
||||
|
||||
BUTTON = "button" # Simple button click
|
||||
CAPTCHA = "captcha" # Text-based captcha
|
||||
MATH = "math" # Simple math problem
|
||||
EMOJI = "emoji" # Select correct emoji
|
||||
QUESTIONS = "questions" # Custom questions
|
||||
|
||||
|
||||
@dataclass
|
||||
class Challenge:
|
||||
"""Represents a verification challenge."""
|
||||
|
||||
challenge_type: ChallengeType
|
||||
question: str
|
||||
answer: str
|
||||
options: list[str] = field(default_factory=list) # For multiple choice
|
||||
expires_at: datetime = field(
|
||||
default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=10)
|
||||
)
|
||||
attempts: int = 0
|
||||
max_attempts: int = 3
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
return datetime.now(timezone.utc) > self.expires_at
|
||||
|
||||
def check_answer(self, response: str) -> bool:
|
||||
"""Check if the response is correct."""
|
||||
self.attempts += 1
|
||||
return response.strip().lower() == self.answer.lower()
|
||||
|
||||
|
||||
@dataclass
|
||||
class PendingVerification:
|
||||
"""Tracks a pending verification for a user."""
|
||||
|
||||
user_id: int
|
||||
guild_id: int
|
||||
challenge: Challenge
|
||||
message_id: int | None = None
|
||||
channel_id: int | None = None
|
||||
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
class ChallengeGenerator(ABC):
|
||||
"""Abstract base class for challenge generators."""
|
||||
|
||||
@abstractmethod
|
||||
def generate(self) -> Challenge:
|
||||
"""Generate a new challenge."""
|
||||
pass
|
||||
|
||||
|
||||
class ButtonChallengeGenerator(ChallengeGenerator):
|
||||
"""Generates simple button click challenges."""
|
||||
|
||||
def generate(self) -> Challenge:
|
||||
return Challenge(
|
||||
challenge_type=ChallengeType.BUTTON,
|
||||
question="Click the button below to verify you're human.",
|
||||
answer="verified",
|
||||
)
|
||||
|
||||
|
||||
class CaptchaChallengeGenerator(ChallengeGenerator):
|
||||
"""Generates text-based captcha challenges."""
|
||||
|
||||
def __init__(self, length: int = 6) -> None:
|
||||
self.length = length
|
||||
|
||||
def generate(self) -> Challenge:
|
||||
# Generate random alphanumeric code (avoiding confusing chars)
|
||||
chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
|
||||
code = "".join(random.choices(chars, k=self.length))
|
||||
|
||||
# Create visual representation with some obfuscation
|
||||
visual = self._create_visual(code)
|
||||
|
||||
return Challenge(
|
||||
challenge_type=ChallengeType.CAPTCHA,
|
||||
question=f"Enter the code shown below:\n```\n{visual}\n```",
|
||||
answer=code,
|
||||
)
|
||||
|
||||
def _create_visual(self, code: str) -> str:
|
||||
"""Create a simple text-based visual captcha."""
|
||||
lines = []
|
||||
# Add some noise characters
|
||||
noise_chars = ".-*~^"
|
||||
|
||||
for _ in range(2):
|
||||
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
|
||||
|
||||
# Add the code with spacing
|
||||
spaced = " ".join(code)
|
||||
lines.append(spaced)
|
||||
|
||||
for _ in range(2):
|
||||
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class MathChallengeGenerator(ChallengeGenerator):
|
||||
"""Generates simple math problem challenges."""
|
||||
|
||||
def generate(self) -> Challenge:
|
||||
# Generate simple addition/subtraction/multiplication
|
||||
operation = random.choice(["+", "-", "*"])
|
||||
|
||||
if operation == "*":
|
||||
a = random.randint(2, 10)
|
||||
b = random.randint(2, 10)
|
||||
else:
|
||||
a = random.randint(10, 50)
|
||||
b = random.randint(1, 20)
|
||||
|
||||
if operation == "+":
|
||||
answer = a + b
|
||||
elif operation == "-":
|
||||
# Ensure positive result
|
||||
if b > a:
|
||||
a, b = b, a
|
||||
answer = a - b
|
||||
else:
|
||||
answer = a * b
|
||||
|
||||
return Challenge(
|
||||
challenge_type=ChallengeType.MATH,
|
||||
question=f"Solve this math problem: **{a} {operation} {b} = ?**",
|
||||
answer=str(answer),
|
||||
)
|
||||
|
||||
|
||||
class EmojiChallengeGenerator(ChallengeGenerator):
|
||||
"""Generates emoji selection challenges."""
|
||||
|
||||
EMOJI_SETS = [
|
||||
("animals", ["🐶", "🐱", "🐭", "🐹", "🐰", "🦊", "🐻", "🐼"]),
|
||||
("fruits", ["🍎", "🍐", "🍊", "🍋", "🍌", "🍉", "🍇", "🍓"]),
|
||||
("weather", ["☀️", "🌙", "⭐", "🌧️", "❄️", "🌈", "⚡", "🌪️"]),
|
||||
("sports", ["⚽", "🏀", "🏈", "⚾", "🎾", "🏐", "🏉", "🎱"]),
|
||||
]
|
||||
|
||||
def generate(self) -> Challenge:
|
||||
category, emojis = random.choice(self.EMOJI_SETS)
|
||||
target = random.choice(emojis)
|
||||
|
||||
# Create options with the target and some others
|
||||
options = [target]
|
||||
other_emojis = [e for e in emojis if e != target]
|
||||
options.extend(random.sample(other_emojis, min(3, len(other_emojis))))
|
||||
random.shuffle(options)
|
||||
|
||||
return Challenge(
|
||||
challenge_type=ChallengeType.EMOJI,
|
||||
question=f"Select the {self._emoji_name(target)} emoji:",
|
||||
answer=target,
|
||||
options=options,
|
||||
)
|
||||
|
||||
def _emoji_name(self, emoji: str) -> str:
|
||||
"""Get a description of the emoji."""
|
||||
names = {
|
||||
"🐶": "dog",
|
||||
"🐱": "cat",
|
||||
"🐭": "mouse",
|
||||
"🐹": "hamster",
|
||||
"🐰": "rabbit",
|
||||
"🦊": "fox",
|
||||
"🐻": "bear",
|
||||
"🐼": "panda",
|
||||
"🍎": "apple",
|
||||
"🍐": "pear",
|
||||
"🍊": "orange",
|
||||
"🍋": "lemon",
|
||||
"🍌": "banana",
|
||||
"🍉": "watermelon",
|
||||
"🍇": "grapes",
|
||||
"🍓": "strawberry",
|
||||
"☀️": "sun",
|
||||
"🌙": "moon",
|
||||
"⭐": "star",
|
||||
"🌧️": "rain",
|
||||
"❄️": "snowflake",
|
||||
"🌈": "rainbow",
|
||||
"⚡": "lightning",
|
||||
"🌪️": "tornado",
|
||||
"⚽": "soccer ball",
|
||||
"🏀": "basketball",
|
||||
"🏈": "football",
|
||||
"⚾": "baseball",
|
||||
"🎾": "tennis",
|
||||
"🏐": "volleyball",
|
||||
"🏉": "rugby",
|
||||
"🎱": "pool ball",
|
||||
}
|
||||
return names.get(emoji, "correct")
|
||||
|
||||
|
||||
class VerificationService:
|
||||
"""Service for managing member verification."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Pending verifications: {(guild_id, user_id): PendingVerification}
|
||||
self._pending: dict[tuple[int, int], PendingVerification] = {}
|
||||
|
||||
# Challenge generators
|
||||
self._generators: dict[ChallengeType, ChallengeGenerator] = {
|
||||
ChallengeType.BUTTON: ButtonChallengeGenerator(),
|
||||
ChallengeType.CAPTCHA: CaptchaChallengeGenerator(),
|
||||
ChallengeType.MATH: MathChallengeGenerator(),
|
||||
ChallengeType.EMOJI: EmojiChallengeGenerator(),
|
||||
}
|
||||
|
||||
def create_challenge(
|
||||
self,
|
||||
user_id: int,
|
||||
guild_id: int,
|
||||
challenge_type: ChallengeType = ChallengeType.BUTTON,
|
||||
) -> PendingVerification:
|
||||
"""Create a new verification challenge for a user."""
|
||||
generator = self._generators.get(challenge_type)
|
||||
if not generator:
|
||||
generator = self._generators[ChallengeType.BUTTON]
|
||||
|
||||
challenge = generator.generate()
|
||||
pending = PendingVerification(
|
||||
user_id=user_id,
|
||||
guild_id=guild_id,
|
||||
challenge=challenge,
|
||||
)
|
||||
|
||||
self._pending[(guild_id, user_id)] = pending
|
||||
return pending
|
||||
|
||||
def get_pending(self, guild_id: int, user_id: int) -> PendingVerification | None:
|
||||
"""Get a pending verification for a user."""
|
||||
return self._pending.get((guild_id, user_id))
|
||||
|
||||
def verify(self, guild_id: int, user_id: int, response: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Attempt to verify a user's response.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
pending = self._pending.get((guild_id, user_id))
|
||||
|
||||
if not pending:
|
||||
return False, "No pending verification found."
|
||||
|
||||
if pending.challenge.is_expired:
|
||||
self._pending.pop((guild_id, user_id), None)
|
||||
return False, "Verification expired. Please request a new one."
|
||||
|
||||
if pending.challenge.attempts >= pending.challenge.max_attempts:
|
||||
self._pending.pop((guild_id, user_id), None)
|
||||
return False, "Too many failed attempts. Please request a new verification."
|
||||
|
||||
if pending.challenge.check_answer(response):
|
||||
self._pending.pop((guild_id, user_id), None)
|
||||
return True, "Verification successful!"
|
||||
|
||||
remaining = pending.challenge.max_attempts - pending.challenge.attempts
|
||||
return False, f"Incorrect. {remaining} attempt(s) remaining."
|
||||
|
||||
def cancel(self, guild_id: int, user_id: int) -> bool:
|
||||
"""Cancel a pending verification."""
|
||||
return self._pending.pop((guild_id, user_id), None) is not None
|
||||
|
||||
def cleanup_expired(self) -> int:
|
||||
"""Remove expired verifications. Returns count of removed."""
|
||||
expired = [key for key, pending in self._pending.items() if pending.challenge.is_expired]
|
||||
for key in expired:
|
||||
self._pending.pop(key, None)
|
||||
return len(expired)
|
||||
|
||||
def get_pending_count(self, guild_id: int) -> int:
|
||||
"""Get count of pending verifications for a guild."""
|
||||
return sum(1 for (gid, _) in self._pending if gid == guild_id)
|
||||
Reference in New Issue
Block a user