Files
GuardDen/src/guardden/services/verification.py
latte 831eed8dbc
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 6m9s
CI/CD Pipeline / Security Scanning (push) Successful in 26s
CI/CD Pipeline / Tests (3.11) (push) Failing after 5m24s
CI/CD Pipeline / Tests (3.12) (push) Failing after 5m23s
CI/CD Pipeline / Build Docker Image (push) Has been skipped
CI/CD Pipeline / Deploy to Staging (push) Has been skipped
CI/CD Pipeline / Deploy to Production (push) Has been skipped
CI/CD Pipeline / Notification (push) Successful in 1s
quick commit
2026-01-17 20:24:43 +01:00

322 lines
10 KiB
Python

"""Verification service for new member challenges."""
import asyncio
import logging
import random
import string
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class ChallengeType(str, Enum):
"""Types of verification challenges."""
BUTTON = "button" # Simple button click
CAPTCHA = "captcha" # Text-based captcha
MATH = "math" # Simple math problem
EMOJI = "emoji" # Select correct emoji
QUESTIONS = "questions" # Custom questions
@dataclass
class Challenge:
"""Represents a verification challenge."""
challenge_type: ChallengeType
question: str
answer: str
options: list[str] = field(default_factory=list) # For multiple choice
expires_at: datetime = field(
default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=10)
)
attempts: int = 0
max_attempts: int = 3
@property
def is_expired(self) -> bool:
return datetime.now(timezone.utc) > self.expires_at
def check_answer(self, response: str) -> bool:
"""Check if the response is correct."""
self.attempts += 1
return response.strip().lower() == self.answer.lower()
@dataclass
class PendingVerification:
"""Tracks a pending verification for a user."""
user_id: int
guild_id: int
challenge: Challenge
message_id: int | None = None
channel_id: int | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class ChallengeGenerator(ABC):
"""Abstract base class for challenge generators."""
@abstractmethod
def generate(self) -> Challenge:
"""Generate a new challenge."""
pass
class ButtonChallengeGenerator(ChallengeGenerator):
"""Generates simple button click challenges."""
def generate(self) -> Challenge:
return Challenge(
challenge_type=ChallengeType.BUTTON,
question="Click the button below to verify you're human.",
answer="verified",
)
class CaptchaChallengeGenerator(ChallengeGenerator):
"""Generates text-based captcha challenges."""
def __init__(self, length: int = 6) -> None:
self.length = length
def generate(self) -> Challenge:
# Generate random alphanumeric code (avoiding confusing chars)
chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
code = "".join(random.choices(chars, k=self.length))
# Create visual representation with some obfuscation
visual = self._create_visual(code)
return Challenge(
challenge_type=ChallengeType.CAPTCHA,
question=f"Enter the code shown below:\n```\n{visual}\n```",
answer=code,
)
def _create_visual(self, code: str) -> str:
"""Create a simple text-based visual captcha."""
lines = []
# Add some noise characters
noise_chars = ".-*~^"
for _ in range(2):
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
# Add the code with spacing
spaced = " ".join(code)
lines.append(spaced)
for _ in range(2):
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
return "\n".join(lines)
class MathChallengeGenerator(ChallengeGenerator):
"""Generates simple math problem challenges."""
def generate(self) -> Challenge:
# Generate simple addition/subtraction/multiplication
operation = random.choice(["+", "-", "*"])
if operation == "*":
a = random.randint(2, 10)
b = random.randint(2, 10)
else:
a = random.randint(10, 50)
b = random.randint(1, 20)
if operation == "+":
answer = a + b
elif operation == "-":
# Ensure positive result
if b > a:
a, b = b, a
answer = a - b
else:
answer = a * b
return Challenge(
challenge_type=ChallengeType.MATH,
question=f"Solve this math problem: **{a} {operation} {b} = ?**",
answer=str(answer),
)
class EmojiChallengeGenerator(ChallengeGenerator):
"""Generates emoji selection challenges."""
EMOJI_SETS = [
("animals", ["🐶", "🐱", "🐭", "🐹", "🐰", "🦊", "🐻", "🐼"]),
("fruits", ["🍎", "🍐", "🍊", "🍋", "🍌", "🍉", "🍇", "🍓"]),
("weather", ["☀️", "🌙", "", "🌧️", "❄️", "🌈", "", "🌪️"]),
("sports", ["", "🏀", "🏈", "", "🎾", "🏐", "🏉", "🎱"]),
]
def generate(self) -> Challenge:
category, emojis = random.choice(self.EMOJI_SETS)
target = random.choice(emojis)
# Create options with the target and some others
options = [target]
other_emojis = [e for e in emojis if e != target]
options.extend(random.sample(other_emojis, min(3, len(other_emojis))))
random.shuffle(options)
return Challenge(
challenge_type=ChallengeType.EMOJI,
question=f"Select the {self._emoji_name(target)} emoji:",
answer=target,
options=options,
)
def _emoji_name(self, emoji: str) -> str:
"""Get a description of the emoji."""
names = {
"🐶": "dog",
"🐱": "cat",
"🐭": "mouse",
"🐹": "hamster",
"🐰": "rabbit",
"🦊": "fox",
"🐻": "bear",
"🐼": "panda",
"🍎": "apple",
"🍐": "pear",
"🍊": "orange",
"🍋": "lemon",
"🍌": "banana",
"🍉": "watermelon",
"🍇": "grapes",
"🍓": "strawberry",
"☀️": "sun",
"🌙": "moon",
"": "star",
"🌧️": "rain",
"❄️": "snowflake",
"🌈": "rainbow",
"": "lightning",
"🌪️": "tornado",
"": "soccer ball",
"🏀": "basketball",
"🏈": "football",
"": "baseball",
"🎾": "tennis",
"🏐": "volleyball",
"🏉": "rugby",
"🎱": "pool ball",
}
return names.get(emoji, "correct")
class QuestionsChallengeGenerator(ChallengeGenerator):
"""Generates custom question challenges."""
DEFAULT_QUESTIONS = [
("What color is the sky on a clear day?", "blue"),
("Type the word 'verified' to continue.", "verified"),
("What is 2 + 2?", "4"),
("What planet do we live on?", "earth"),
]
def __init__(self, questions: list[tuple[str, str]] | None = None) -> None:
self.questions = questions or self.DEFAULT_QUESTIONS
def generate(self) -> Challenge:
question, answer = random.choice(self.questions)
return Challenge(
challenge_type=ChallengeType.QUESTIONS,
question=question,
answer=answer,
)
class VerificationService:
"""Service for managing member verification."""
def __init__(self) -> None:
# Pending verifications: {(guild_id, user_id): PendingVerification}
self._pending: dict[tuple[int, int], PendingVerification] = {}
# Challenge generators
self._generators: dict[ChallengeType, ChallengeGenerator] = {
ChallengeType.BUTTON: ButtonChallengeGenerator(),
ChallengeType.CAPTCHA: CaptchaChallengeGenerator(),
ChallengeType.MATH: MathChallengeGenerator(),
ChallengeType.EMOJI: EmojiChallengeGenerator(),
ChallengeType.QUESTIONS: QuestionsChallengeGenerator(),
}
def create_challenge(
self,
user_id: int,
guild_id: int,
challenge_type: ChallengeType = ChallengeType.BUTTON,
) -> PendingVerification:
"""Create a new verification challenge for a user."""
generator = self._generators.get(challenge_type)
if not generator:
generator = self._generators[ChallengeType.BUTTON]
challenge = generator.generate()
pending = PendingVerification(
user_id=user_id,
guild_id=guild_id,
challenge=challenge,
)
self._pending[(guild_id, user_id)] = pending
return pending
def get_pending(self, guild_id: int, user_id: int) -> PendingVerification | None:
"""Get a pending verification for a user."""
return self._pending.get((guild_id, user_id))
def verify(self, guild_id: int, user_id: int, response: str) -> tuple[bool, str]:
"""
Attempt to verify a user's response.
Returns:
Tuple of (success, message)
"""
pending = self._pending.get((guild_id, user_id))
if not pending:
return False, "No pending verification found."
if pending.challenge.is_expired:
self._pending.pop((guild_id, user_id), None)
return False, "Verification expired. Please request a new one."
if pending.challenge.attempts >= pending.challenge.max_attempts:
self._pending.pop((guild_id, user_id), None)
return False, "Too many failed attempts. Please request a new verification."
if pending.challenge.check_answer(response):
self._pending.pop((guild_id, user_id), None)
return True, "Verification successful!"
remaining = pending.challenge.max_attempts - pending.challenge.attempts
return False, f"Incorrect. {remaining} attempt(s) remaining."
def cancel(self, guild_id: int, user_id: int) -> bool:
"""Cancel a pending verification."""
return self._pending.pop((guild_id, user_id), None) is not None
def cleanup_expired(self) -> int:
"""Remove expired verifications. Returns count of removed."""
expired = [key for key, pending in self._pending.items() if pending.challenge.is_expired]
for key in expired:
self._pending.pop(key, None)
return len(expired)
def get_pending_count(self, guild_id: int) -> int:
"""Get count of pending verifications for a guild."""
return sum(1 for (gid, _) in self._pending if gid == guild_id)