"""Automod service for spam detection - Minimal Version.""" import logging import time from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING if TYPE_CHECKING: import discord else: try: import discord # type: ignore except ModuleNotFoundError: # pragma: no cover class _DiscordStub: class Message: pass discord = _DiscordStub() # type: ignore logger = logging.getLogger(__name__) @dataclass class SpamConfig: """Spam detection configuration.""" message_rate_limit: int = 5 message_rate_window: int = 5 duplicate_threshold: int = 3 mention_limit: int = 5 mention_rate_limit: int = 10 mention_rate_window: int = 60 @dataclass class AutomodResult: """Result of an automod check.""" matched_filter: str reason: str should_delete: bool = True should_warn: bool = False should_strike: bool = False should_timeout: bool = False timeout_duration: int | None = None class SpamTracker: """Track user spam behavior.""" def __init__(self): # guild_id -> user_id -> deque of message timestamps self.message_times: dict[int, dict[int, list[float]]] = defaultdict(lambda: defaultdict(list)) # guild_id -> user_id -> deque of message contents for duplicate detection self.message_contents: dict[int, dict[int, list[str]]] = defaultdict(lambda: defaultdict(list)) # guild_id -> user_id -> deque of mention timestamps self.mention_times: dict[int, dict[int, list[float]]] = defaultdict(lambda: defaultdict(list)) # Last cleanup time self.last_cleanup = time.time() def cleanup_old_entries(self): """Periodically cleanup old entries to prevent memory leaks.""" now = time.time() if now - self.last_cleanup < 300: # Cleanup every 5 minutes return cutoff = now - 3600 # Keep last hour of data for guild_data in [self.message_times, self.mention_times]: for guild_id in list(guild_data.keys()): for user_id in list(guild_data[guild_id].keys()): # Remove old timestamps guild_data[guild_id][user_id] = [ ts for ts in guild_data[guild_id][user_id] if ts > cutoff ] # Remove empty users if not guild_data[guild_id][user_id]: del guild_data[guild_id][user_id] # Remove empty guilds if not guild_data[guild_id]: del guild_data[guild_id] # Cleanup message contents for guild_id in list(self.message_contents.keys()): for user_id in list(self.message_contents[guild_id].keys()): # Keep only last 10 messages per user self.message_contents[guild_id][user_id] = self.message_contents[guild_id][user_id][-10:] if not self.message_contents[guild_id][user_id]: del self.message_contents[guild_id][user_id] if not self.message_contents[guild_id]: del self.message_contents[guild_id] self.last_cleanup = now class AutomodService: """Service for spam detection - no banned words, no scam links, no invites.""" def __init__(self): self.spam_tracker = SpamTracker() self.default_spam_config = SpamConfig() def check_spam( self, message: "discord.Message", anti_spam_enabled: bool = True, spam_config: SpamConfig | None = None, ) -> AutomodResult | None: """Check message for spam patterns. Args: message: Discord message to check anti_spam_enabled: Whether spam detection is enabled spam_config: Spam configuration settings Returns: AutomodResult if spam detected, None otherwise """ if not anti_spam_enabled: return None config = spam_config or self.default_spam_config guild_id = message.guild.id user_id = message.author.id now = time.time() # Periodic cleanup self.spam_tracker.cleanup_old_entries() # Check 1: Message rate limiting message_times = self.spam_tracker.message_times[guild_id][user_id] cutoff_time = now - config.message_rate_window # Remove old timestamps message_times = [ts for ts in message_times if ts > cutoff_time] self.spam_tracker.message_times[guild_id][user_id] = message_times # Add current message message_times.append(now) if len(message_times) > config.message_rate_limit: return AutomodResult( matched_filter="spam_rate_limit", reason=f"Exceeded message rate limit ({len(message_times)} messages in {config.message_rate_window}s)", should_delete=True, ) # Check 2: Duplicate messages message_contents = self.spam_tracker.message_contents[guild_id][user_id] message_contents.append(message.content) self.spam_tracker.message_contents[guild_id][user_id] = message_contents[-10:] # Keep last 10 # Count duplicates in recent messages duplicate_count = message_contents.count(message.content) if duplicate_count >= config.duplicate_threshold: return AutomodResult( matched_filter="spam_duplicate", reason=f"Duplicate message posted {duplicate_count} times", should_delete=True, ) # Check 3: Mass mentions in single message mention_count = len(message.mentions) if mention_count > config.mention_limit: return AutomodResult( matched_filter="spam_mass_mentions", reason=f"Too many mentions in single message ({mention_count})", should_delete=True, ) # Check 4: Mention rate limiting if mention_count > 0: mention_times = self.spam_tracker.mention_times[guild_id][user_id] mention_cutoff = now - config.mention_rate_window # Remove old timestamps mention_times = [ts for ts in mention_times if ts > mention_cutoff] # Add current mentions mention_times.extend([now] * mention_count) self.spam_tracker.mention_times[guild_id][user_id] = mention_times if len(mention_times) > config.mention_rate_limit: return AutomodResult( matched_filter="spam_mention_rate", reason=f"Exceeded mention rate limit ({len(mention_times)} mentions in {config.mention_rate_window}s)", should_delete=True, ) return None