WIP: Minimal bot refactor - new files and core updates

- Created config.yml template with conservative AI limits
- Created owner.py cog (status, reload, ping commands)
- Created config_loader.py service for YAML config
- Created ai_rate_limiter.py for AI cost control
- Updated bot.py to load only 3 cogs (automod, ai_moderation, owner)
- Simplified config.py (removed unused settings)
- Deleted unnecessary cogs, services, models
- Updated models/__init__.py

Next: Update automod and ai_moderation cogs
This commit is contained in:
2026-01-27 18:58:56 +01:00
parent d4cc23f7cf
commit 08815a3dd0
23 changed files with 469 additions and 4638 deletions

61
config.yml Normal file
View File

@@ -0,0 +1,61 @@
# GuardDen Configuration
# Single YAML file for bot configuration
# Bot Settings
bot:
prefix: "!"
owner_ids:
# Add your Discord user ID here
# Example: - 123456789012345678
# Spam Detection (No AI cost)
automod:
enabled: true
anti_spam_enabled: true
message_rate_limit: 5 # Max messages per window
message_rate_window: 5 # Window in seconds
duplicate_threshold: 3 # Duplicate messages trigger
mention_limit: 5 # Max mentions per message
mention_rate_limit: 10 # Max mentions per window
mention_rate_window: 60 # Mention window in seconds
# AI Moderation (Images, GIFs only)
ai_moderation:
enabled: true
sensitivity: 80 # 0-100, higher = stricter
nsfw_only_filtering: true # Only filter sexual/nude content
# Cost Controls (Conservative: ~$25/month for 1-2 guilds)
max_checks_per_hour_per_guild: 25 # Very conservative limit
max_checks_per_user_per_hour: 5 # Prevent user abuse
max_images_per_message: 2 # Check max 2 images per message
max_image_size_mb: 3 # Skip images larger than 3MB
# Feature Toggles
check_embed_images: true # Check GIFs from Discord picker (enabled per user request)
check_video_thumbnails: false # Skip video thumbnails (disabled per user request)
url_image_check_enabled: false # Skip URL image downloads (disabled per user request)
# NSFW Video Domain Blocklist (No AI cost)
# These domains are blocked instantly without AI analysis
nsfw_video_domains:
- pornhub.com
- xvideos.com
- xnxx.com
- redtube.com
- youporn.com
- tube8.com
- spankwire.com
- keezmovies.com
- extremetube.com
- pornerbros.com
- eporner.com
- tnaflix.com
- drtuber.com
- upornia.com
- perfectgirls.net
- xhamster.com
- hqporner.com
- porn.com
- sex.com
- wetpussy.com

View File

@@ -1,28 +1,25 @@
"""Main bot class for GuardDen."""
"""Main bot class for GuardDen - Minimal Version."""
import inspect
import logging
import platform
from typing import TYPE_CHECKING
from pathlib import Path
import discord
from discord.ext import commands
from guardden.config import Settings
from guardden.services.ai import AIProvider, create_ai_provider
from guardden.services.ai_rate_limiter import AIRateLimiter
from guardden.services.config_loader import ConfigLoader
from guardden.services.database import Database
from guardden.services.ratelimit import RateLimiter
from guardden.utils.logging import get_logger, get_logging_middleware, setup_logging
if TYPE_CHECKING:
from guardden.services.guild_config import GuildConfigService
from guardden.utils.logging import get_logger, setup_logging
logger = get_logger(__name__)
logging_middleware = get_logging_middleware()
class GuardDen(commands.Bot):
"""The main GuardDen Discord bot."""
"""The main GuardDen Discord bot - Minimal spam & NSFW detection."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
@@ -30,55 +27,48 @@ class GuardDen(commands.Bot):
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
intents.voice_states = True
# Load config from YAML
config_path = settings.config_file
if not config_path.exists():
raise FileNotFoundError(
f"Config file not found: {config_path}\n"
f"Please create config.yml from the template."
)
super().__init__(
command_prefix=self._get_prefix,
command_prefix=settings.discord_prefix,
intents=intents,
help_command=None, # Set by help cog
help_command=None,
)
# Services
self.database = Database(settings)
self.guild_config: "GuildConfigService | None" = None
self.ai_provider: AIProvider | None = None
self.wordlist_service = None
self.rate_limiter = RateLimiter()
async def _get_prefix(self, bot: "GuardDen", message: discord.Message) -> list[str]:
"""Get the command prefix for a guild."""
if not message.guild:
return [self.settings.discord_prefix]
if self.guild_config:
config = await self.guild_config.get_config(message.guild.id)
if config:
return [config.prefix]
return [self.settings.discord_prefix]
def is_guild_allowed(self, guild_id: int) -> bool:
"""Check if a guild is allowed to run the bot."""
return not self.settings.allowed_guilds or guild_id in self.settings.allowed_guilds
def is_owner_allowed(self, user_id: int) -> bool:
"""Check if a user is allowed elevated access."""
return not self.settings.owner_ids or user_id in self.settings.owner_ids
self.config_loader = ConfigLoader(config_path)
self.ai_rate_limiter = AIRateLimiter()
async def setup_hook(self) -> None:
"""Called when the bot is starting up."""
logger.info("Starting GuardDen setup...")
logger.info("Starting GuardDen Minimal...")
# Load configuration from YAML
try:
await self.config_loader.load()
logger.info(f"Configuration loaded from {self.config_loader.config_path}")
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
raise
self.settings.validate_configuration()
logger.info(
"Configuration loaded: ai_provider=%s, log_level=%s, allowed_guilds=%s, owner_ids=%s",
"Settings: ai_provider=%s, log_level=%s, owner_ids=%s",
self.settings.ai_provider,
self.settings.log_level,
self.settings.allowed_guilds or "all",
self.settings.owner_ids or "admins",
self.settings.owner_ids or "none",
)
logger.info(
"Runtime versions: python=%s, discord.py=%s",
"Runtime: python=%s, discord.py=%s",
platform.python_version(),
discord.__version__,
)
@@ -87,14 +77,6 @@ class GuardDen(commands.Bot):
await self.database.connect()
await self.database.create_tables()
# Initialize services
from guardden.services.guild_config import GuildConfigService
self.guild_config = GuildConfigService(self.database, settings=self.settings)
from guardden.services.wordlist import WordlistService
self.wordlist_service = WordlistService(self.database, self.settings)
# Initialize AI provider
api_key = None
if self.settings.ai_provider == "anthropic" and self.settings.anthropic_api_key:
@@ -104,23 +86,22 @@ class GuardDen(commands.Bot):
self.ai_provider = create_ai_provider(self.settings.ai_provider, api_key)
if self.settings.ai_provider != "none":
logger.info(f"AI provider initialized: {self.settings.ai_provider}")
else:
logger.warning("AI provider is disabled (provider=none)")
# Load cogs
await self._load_cogs()
logger.info("GuardDen setup complete")
async def _load_cogs(self) -> None:
"""Load all cog extensions."""
"""Load minimal cog extensions."""
cogs = [
"guardden.cogs.events",
"guardden.cogs.moderation",
"guardden.cogs.admin",
"guardden.cogs.automod",
"guardden.cogs.ai_moderation",
"guardden.cogs.verification",
"guardden.cogs.health",
"guardden.cogs.wordlist_sync",
"guardden.cogs.help",
"guardden.cogs.automod", # Spam detection only
"guardden.cogs.ai_moderation", # Image detection only
"guardden.cogs.owner", # Owner commands
]
failed_cogs = []
@@ -139,8 +120,8 @@ class GuardDen(commands.Bot):
failed_cogs.append(cog)
if failed_cogs:
logger.warning(f"Failed to load {len(failed_cogs)} cog(s): {', '.join(failed_cogs)}")
# Don't fail startup if some cogs fail to load, but log it prominently
logger.error(f"Failed to load {len(failed_cogs)} cog(s): {', '.join(failed_cogs)}")
raise RuntimeError(f"Critical cogs failed to load: {failed_cogs}")
async def on_ready(self) -> None:
"""Called when the bot is fully connected and ready."""
@@ -148,54 +129,29 @@ class GuardDen(commands.Bot):
logger.info(f"Logged in as {self.user} (ID: {self.user.id})")
logger.info(f"Connected to {len(self.guilds)} guild(s)")
# Ensure all guilds have database entries
if self.guild_config:
initialized = 0
failed_guilds = []
for guild in self.guilds:
try:
if not self.is_guild_allowed(guild.id):
logger.warning(
"Leaving unauthorized guild %s (ID: %s)", guild.name, guild.id
)
try:
await guild.leave()
except discord.HTTPException as e:
logger.error(f"Failed to leave guild {guild.id}: {e}")
continue
await self.guild_config.create_guild(guild)
initialized += 1
except Exception as e:
logger.error(
f"Failed to initialize config for guild {guild.id} ({guild.name}): {e}",
exc_info=True,
)
failed_guilds.append(guild.id)
logger.info("Initialized config for %s guild(s)", initialized)
if failed_guilds:
logger.warning(
f"Failed to initialize {len(failed_guilds)} guild(s): {failed_guilds}"
)
logger.info(f" - {guild.name} (ID: {guild.id}, Members: {guild.member_count})")
# Set presence
activity = discord.Activity(
type=discord.ActivityType.watching,
name="over your community",
name="for NSFW content",
)
await self.change_presence(activity=activity)
logger.info("Bot is ready!")
async def close(self) -> None:
"""Clean up when shutting down."""
logger.info("Shutting down GuardDen...")
await self._shutdown_cogs()
if self.ai_provider:
try:
await self.ai_provider.close()
except Exception as e:
logger.error(f"Error closing AI provider: {e}")
await self.database.disconnect()
await super().close()
@@ -216,14 +172,6 @@ class GuardDen(commands.Bot):
"""Called when the bot joins a new guild."""
logger.info(f"Joined guild: {guild.name} (ID: {guild.id})")
if not self.is_guild_allowed(guild.id):
logger.warning("Guild %s (ID: %s) not in allowlist, leaving.", guild.name, guild.id)
await guild.leave()
return
if self.guild_config:
await self.guild_config.create_guild(guild)
async def on_guild_remove(self, guild: discord.Guild) -> None:
"""Called when the bot is removed from a guild."""
logger.info(f"Removed from guild: {guild.name} (ID: {guild.id})")

View File

@@ -1 +0,0 @@
"""GuardDen CLI tools for configuration management."""

View File

@@ -1,559 +0,0 @@
#!/usr/bin/env python3
"""GuardDen Configuration CLI Tool.
This CLI tool allows you to manage GuardDen bot configurations without
using Discord commands. You can create, edit, validate, and migrate
configurations using this command-line interface.
Usage:
python -m guardden.cli.config --help
python -m guardden.cli.config guild create 123456789 "My Server"
python -m guardden.cli.config guild edit 123456789 ai_moderation.sensitivity 75
python -m guardden.cli.config migrate from-database
python -m guardden.cli.config validate all
"""
import asyncio
import sys
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List
import argparse
import yaml
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from guardden.services.file_config import FileConfigurationManager, ConfigurationError
from guardden.services.config_migration import ConfigurationMigrator
from guardden.services.database import Database
from guardden.services.guild_config import GuildConfigService
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ConfigurationCLI:
"""Command-line interface for GuardDen configuration management."""
def __init__(self, config_dir: str = "config"):
"""Initialize the CLI with configuration directory."""
self.config_dir = Path(config_dir)
self.file_manager: Optional[FileConfigurationManager] = None
self.database: Optional[Database] = None
self.migrator: Optional[ConfigurationMigrator] = None
async def initialize(self):
"""Initialize the configuration system."""
self.file_manager = FileConfigurationManager(str(self.config_dir))
await self.file_manager.initialize()
# Initialize database connection if available
try:
import os
database_url = os.getenv('GUARDDEN_DATABASE_URL', 'postgresql://guardden:guardden@localhost:5432/guardden')
self.database = Database(database_url)
guild_config_service = GuildConfigService(self.database)
self.migrator = ConfigurationMigrator(self.database, guild_config_service, self.file_manager)
logger.info("Database connection established")
except Exception as e:
logger.warning(f"Database not available: {e}")
async def cleanup(self):
"""Clean up resources."""
if self.file_manager:
await self.file_manager.shutdown()
if self.database:
await self.database.close()
# Guild management commands
async def guild_create(self, guild_id: int, name: str, owner_id: Optional[int] = None):
"""Create a new guild configuration."""
try:
file_path = await self.file_manager.create_guild_config(guild_id, name, owner_id)
print(f"✅ Created guild configuration: {file_path}")
print(f"📝 Edit the file to customize settings for {name}")
return True
except ConfigurationError as e:
print(f"❌ Failed to create guild configuration: {e.error_message}")
return False
except Exception as e:
print(f"❌ Unexpected error: {str(e)}")
return False
async def guild_list(self):
"""List all configured guilds."""
configs = self.file_manager.get_all_guild_configs()
if not configs:
print("📄 No guild configurations found")
print("💡 Use 'guild create <guild_id> <name>' to create a new configuration")
return
print(f"📋 Found {len(configs)} guild configuration(s):")
print()
for guild_id, config in configs.items():
status_icon = "" if config else ""
premium_icon = "" if config.premium else ""
print(f"{status_icon} {premium_icon} {guild_id}: {config.name}")
print(f" 📁 File: {config.file_path}")
print(f" 🕐 Updated: {config.last_updated.strftime('%Y-%m-%d %H:%M:%S')}")
# Show key settings
settings = config.settings
ai_enabled = settings.get("ai_moderation", {}).get("enabled", False)
nsfw_only = settings.get("ai_moderation", {}).get("nsfw_only_filtering", False)
automod_enabled = settings.get("moderation", {}).get("automod_enabled", False)
print(f" 🤖 AI: {'' if ai_enabled else ''} | "
f"🔞 NSFW-Only: {'' if nsfw_only else ''} | "
f"⚡ AutoMod: {'' if automod_enabled else ''}")
print()
async def guild_edit(self, guild_id: int, setting_path: str, value: Any):
"""Edit a guild configuration setting."""
config = self.file_manager.get_guild_config(guild_id)
if not config:
print(f"❌ Guild {guild_id} configuration not found")
return False
try:
# Load current configuration
with open(config.file_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
# Parse setting path (e.g., "ai_moderation.sensitivity")
path_parts = setting_path.split('.')
current = file_config
# Navigate to the parent of the target setting
for part in path_parts[:-1]:
if part not in current:
print(f"❌ Setting path not found: {setting_path}")
return False
current = current[part]
# Set the value
final_key = path_parts[-1]
old_value = current.get(final_key, "Not set")
# Convert value to appropriate type
if isinstance(old_value, bool):
value = str(value).lower() in ('true', '1', 'yes', 'on')
elif isinstance(old_value, int):
value = int(value)
elif isinstance(old_value, float):
value = float(value)
elif isinstance(old_value, list):
value = value.split(',') if isinstance(value, str) else value
current[final_key] = value
# Write back to file
with open(config.file_path, 'w', encoding='utf-8') as f:
yaml.dump(file_config, f, default_flow_style=False, indent=2)
print(f"✅ Updated {setting_path} for guild {guild_id}")
print(f" 📝 Changed from: {old_value}")
print(f" 📝 Changed to: {value}")
print(f"🔄 Configuration will be hot-reloaded automatically")
return True
except Exception as e:
print(f"❌ Failed to edit configuration: {str(e)}")
return False
async def guild_validate(self, guild_id: Optional[int] = None):
"""Validate guild configuration(s)."""
if guild_id:
configs = {guild_id: self.file_manager.get_guild_config(guild_id)}
if not configs[guild_id]:
print(f"❌ Guild {guild_id} configuration not found")
return False
else:
configs = self.file_manager.get_all_guild_configs()
if not configs:
print("📄 No configurations to validate")
return True
all_valid = True
print(f"🔍 Validating {len(configs)} configuration(s)...")
print()
for guild_id, config in configs.items():
if not config:
continue
try:
# Load and validate configuration
with open(config.file_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
errors = self.file_manager.validate_config(file_config)
if errors:
all_valid = False
print(f"❌ Guild {guild_id} ({config.name}) - INVALID")
for error in errors:
print(f" 🔸 {error}")
else:
print(f"✅ Guild {guild_id} ({config.name}) - VALID")
except Exception as e:
all_valid = False
print(f"❌ Guild {guild_id} - ERROR: {str(e)}")
print()
if all_valid:
print("🎉 All configurations are valid!")
else:
print("⚠️ Some configurations have errors. Please fix them before running the bot.")
return all_valid
async def guild_backup(self, guild_id: int):
"""Create a backup of guild configuration."""
try:
backup_path = await self.file_manager.backup_config(guild_id)
print(f"✅ Created backup: {backup_path}")
return True
except Exception as e:
print(f"❌ Failed to create backup: {str(e)}")
return False
# Migration commands
async def migrate_from_database(self, backup_existing: bool = True):
"""Migrate all configurations from database to files."""
if not self.migrator:
print("❌ Database not available for migration")
return False
print("🔄 Starting migration from database to files...")
print("⚠️ This will convert Discord command configurations to YAML files")
if backup_existing:
print("📦 Existing files will be backed up")
try:
results = await self.migrator.migrate_all_guilds(backup_existing)
print("\n📊 Migration Results:")
print(f" ✅ Migrated: {len(results['migrated_guilds'])} guilds")
print(f" ❌ Failed: {len(results['failed_guilds'])} guilds")
print(f" ⏭️ Skipped: {len(results['skipped_guilds'])} guilds")
print(f" 📝 Banned words migrated: {results['banned_words_migrated']}")
if results['migrated_guilds']:
print("\n✅ Successfully migrated guilds:")
for guild in results['migrated_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} "
f"({guild['banned_words_count']} banned words)")
if results['failed_guilds']:
print("\n❌ Failed migrations:")
for guild in results['failed_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} - {guild['error']}")
if results['skipped_guilds']:
print("\n⏭️ Skipped guilds:")
for guild in results['skipped_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} - {guild['reason']}")
if results['errors']:
print("\n⚠️ Errors encountered:")
for error in results['errors']:
print(f"{error}")
return len(results['failed_guilds']) == 0
except Exception as e:
print(f"❌ Migration failed: {str(e)}")
return False
async def migrate_verify(self, guild_ids: Optional[List[int]] = None):
"""Verify migration by comparing database and file configurations."""
if not self.migrator:
print("❌ Database not available for verification")
return False
print("🔍 Verifying migration results...")
try:
results = await self.migrator.verify_migration(guild_ids)
print("\n📊 Verification Results:")
print(f" ✅ Verified: {len(results['verified_guilds'])} guilds")
print(f" ⚠️ Mismatches: {len(results['mismatches'])} guilds")
print(f" 📄 Missing files: {len(results['missing_files'])} guilds")
if results['verified_guilds']:
print("\n✅ Verified guilds:")
for guild in results['verified_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
if results['mismatches']:
print("\n⚠️ Configuration mismatches:")
for guild in results['mismatches']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
print(f" Mismatched fields: {', '.join(guild['mismatched_fields'])}")
if results['missing_files']:
print("\n📄 Missing configuration files:")
for guild in results['missing_files']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
print(f" Expected: {guild['expected_file']}")
return len(results['mismatches']) == 0 and len(results['missing_files']) == 0
except Exception as e:
print(f"❌ Verification failed: {str(e)}")
return False
# Wordlist management
async def wordlist_info(self):
"""Show information about wordlist configurations."""
banned_words = self.file_manager.get_wordlist_config()
allowlists = self.file_manager.get_allowlist_config()
external_sources = self.file_manager.get_external_sources_config()
print("📝 Wordlist Configuration Status:")
print()
if banned_words:
global_patterns = len(banned_words.get('global_patterns', []))
guild_patterns = sum(
len(patterns) for patterns in banned_words.get('guild_patterns', {}).values()
)
print(f"🚫 Banned Words: {global_patterns} global, {guild_patterns} guild-specific")
else:
print("🚫 Banned Words: Not configured")
if allowlists:
global_allowlist = len(allowlists.get('global_allowlist', []))
guild_allowlists = sum(
len(domains) for domains in allowlists.get('guild_allowlists', {}).values()
)
print(f"✅ Domain Allowlists: {global_allowlist} global, {guild_allowlists} guild-specific")
else:
print("✅ Domain Allowlists: Not configured")
if external_sources:
sources = external_sources.get('sources', [])
enabled_sources = len([s for s in sources if s.get('enabled', False)])
print(f"🌐 External Sources: {len(sources)} total, {enabled_sources} enabled")
else:
print("🌐 External Sources: Not configured")
print()
print("📁 Configuration files:")
print(f"{self.config_dir / 'wordlists' / 'banned-words.yml'}")
print(f"{self.config_dir / 'wordlists' / 'domain-allowlists.yml'}")
print(f"{self.config_dir / 'wordlists' / 'external-sources.yml'}")
# Template management
async def template_create(self, guild_id: int, name: str):
"""Create a new guild configuration from template."""
return await self.guild_create(guild_id, name)
async def template_info(self):
"""Show available configuration templates."""
template_dir = self.config_dir / "templates"
templates = list(template_dir.glob("*.yml"))
if not templates:
print("📄 No configuration templates found")
return
print(f"📋 Available Templates ({len(templates)}):")
print()
for template in templates:
try:
with open(template, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
description = "Default guild configuration template"
if '_description' in content:
description = content['_description']
print(f"📄 {template.name}")
print(f" {description}")
print(f" 📁 {template}")
print()
except Exception as e:
print(f"❌ Error reading template {template.name}: {str(e)}")
async def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="GuardDen Configuration CLI Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Create a new guild configuration
python -m guardden.cli.config guild create 123456789 "My Server"
# List all guild configurations
python -m guardden.cli.config guild list
# Edit a configuration setting
python -m guardden.cli.config guild edit 123456789 ai_moderation.sensitivity 75
python -m guardden.cli.config guild edit 123456789 ai_moderation.nsfw_only_filtering true
# Validate configurations
python -m guardden.cli.config guild validate
python -m guardden.cli.config guild validate 123456789
# Migration from database
python -m guardden.cli.config migrate from-database
python -m guardden.cli.config migrate verify
# Wordlist management
python -m guardden.cli.config wordlist info
# Template management
python -m guardden.cli.config template info
"""
)
parser.add_argument(
'--config-dir', '-c',
default='config',
help='Configuration directory (default: config)'
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Guild management
guild_parser = subparsers.add_parser('guild', help='Guild configuration management')
guild_subparsers = guild_parser.add_subparsers(dest='guild_command')
# Guild create
create_parser = guild_subparsers.add_parser('create', help='Create new guild configuration')
create_parser.add_argument('guild_id', type=int, help='Discord guild ID')
create_parser.add_argument('name', help='Guild name')
create_parser.add_argument('--owner-id', type=int, help='Guild owner Discord user ID')
# Guild list
guild_subparsers.add_parser('list', help='List all guild configurations')
# Guild edit
edit_parser = guild_subparsers.add_parser('edit', help='Edit guild configuration setting')
edit_parser.add_argument('guild_id', type=int, help='Discord guild ID')
edit_parser.add_argument('setting', help='Setting path (e.g., ai_moderation.sensitivity)')
edit_parser.add_argument('value', help='New value')
# Guild validate
validate_parser = guild_subparsers.add_parser('validate', help='Validate guild configurations')
validate_parser.add_argument('guild_id', type=int, nargs='?', help='Specific guild ID (optional)')
# Guild backup
backup_parser = guild_subparsers.add_parser('backup', help='Backup guild configuration')
backup_parser.add_argument('guild_id', type=int, help='Discord guild ID')
# Migration
migrate_parser = subparsers.add_parser('migrate', help='Configuration migration')
migrate_subparsers = migrate_parser.add_subparsers(dest='migrate_command')
# Migrate from database
from_db_parser = migrate_subparsers.add_parser('from-database', help='Migrate from database to files')
from_db_parser.add_argument('--no-backup', action='store_true', help='Skip backing up existing files')
# Migrate verify
verify_parser = migrate_subparsers.add_parser('verify', help='Verify migration results')
verify_parser.add_argument('guild_ids', type=int, nargs='*', help='Specific guild IDs to verify')
# Wordlist management
wordlist_parser = subparsers.add_parser('wordlist', help='Wordlist management')
wordlist_subparsers = wordlist_parser.add_subparsers(dest='wordlist_command')
wordlist_subparsers.add_parser('info', help='Show wordlist information')
# Template management
template_parser = subparsers.add_parser('template', help='Template management')
template_subparsers = template_parser.add_subparsers(dest='template_command')
template_subparsers.add_parser('info', help='Show available templates')
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Initialize CLI
cli = ConfigurationCLI(args.config_dir)
try:
await cli.initialize()
success = True
# Execute command
if args.command == 'guild':
if args.guild_command == 'create':
success = await cli.guild_create(args.guild_id, args.name, args.owner_id)
elif args.guild_command == 'list':
await cli.guild_list()
elif args.guild_command == 'edit':
success = await cli.guild_edit(args.guild_id, args.setting, args.value)
elif args.guild_command == 'validate':
success = await cli.guild_validate(args.guild_id)
elif args.guild_command == 'backup':
success = await cli.guild_backup(args.guild_id)
else:
print("❌ Unknown guild command. Use --help for available commands.")
success = False
elif args.command == 'migrate':
if args.migrate_command == 'from-database':
success = await cli.migrate_from_database(not args.no_backup)
elif args.migrate_command == 'verify':
guild_ids = args.guild_ids if args.guild_ids else None
success = await cli.migrate_verify(guild_ids)
else:
print("❌ Unknown migrate command. Use --help for available commands.")
success = False
elif args.command == 'wordlist':
if args.wordlist_command == 'info':
await cli.wordlist_info()
else:
print("❌ Unknown wordlist command. Use --help for available commands.")
success = False
elif args.command == 'template':
if args.template_command == 'info':
await cli.template_info()
else:
print("❌ Unknown template command. Use --help for available commands.")
success = False
return 0 if success else 1
except KeyboardInterrupt:
print("\n⚠️ Interrupted by user")
return 1
except Exception as e:
print(f"❌ Unexpected error: {str(e)}")
logger.exception("CLI error")
return 1
finally:
await cli.cleanup()
if __name__ == '__main__':
sys.exit(asyncio.run(main()))

View File

@@ -1,444 +0,0 @@
"""Admin commands for bot configuration."""
import logging
from typing import Literal
import discord
from discord.ext import commands
from guardden.bot import GuardDen
from guardden.utils.ratelimit import RateLimitExceeded
logger = logging.getLogger(__name__)
class Admin(commands.Cog):
"""Administrative commands for bot configuration."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
def cog_check(self, ctx: commands.Context) -> bool:
"""Ensure only administrators can use these commands."""
if not ctx.guild:
return False
if not self.bot.is_owner_allowed(ctx.author.id):
return False
return ctx.author.guild_permissions.administrator
async def cog_before_invoke(self, ctx: commands.Context) -> None:
if not ctx.command:
return
result = self.bot.rate_limiter.acquire_command(
ctx.command.qualified_name,
user_id=ctx.author.id,
guild_id=ctx.guild.id if ctx.guild else None,
channel_id=ctx.channel.id,
)
if result.is_limited:
raise RateLimitExceeded(result.reset_after)
async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
if isinstance(error, RateLimitExceeded):
await ctx.send(
f"You're being rate limited. Try again in {error.retry_after:.1f} seconds."
)
@commands.group(name="config", invoke_without_command=True)
@commands.guild_only()
async def config(self, ctx: commands.Context) -> None:
"""View or modify bot configuration."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
if not config:
await ctx.send("No configuration found. Run a config command to initialize.")
return
embed = discord.Embed(
title=f"Configuration for {ctx.guild.name}",
color=discord.Color.blue(),
)
# General settings
embed.add_field(name="Prefix", value=f"`{config.prefix}`", inline=True)
embed.add_field(name="Locale", value=config.locale, inline=True)
embed.add_field(name="\u200b", value="\u200b", inline=True)
# Channels
log_ch = ctx.guild.get_channel(config.log_channel_id) if config.log_channel_id else None
mod_log_ch = (
ctx.guild.get_channel(config.mod_log_channel_id) if config.mod_log_channel_id else None
)
welcome_ch = (
ctx.guild.get_channel(config.welcome_channel_id) if config.welcome_channel_id else None
)
embed.add_field(
name="Log Channel", value=log_ch.mention if log_ch else "Not set", inline=True
)
embed.add_field(
name="Mod Log Channel",
value=mod_log_ch.mention if mod_log_ch else "Not set",
inline=True,
)
embed.add_field(
name="Welcome Channel",
value=welcome_ch.mention if welcome_ch else "Not set",
inline=True,
)
# Features
features = []
if config.automod_enabled:
features.append("AutoMod")
if config.anti_spam_enabled:
features.append("Anti-Spam")
if config.link_filter_enabled:
features.append("Link Filter")
if config.ai_moderation_enabled:
features.append("AI Moderation")
if config.verification_enabled:
features.append("Verification")
embed.add_field(
name="Enabled Features",
value=", ".join(features) if features else "None",
inline=False,
)
# Notification settings
embed.add_field(
name="In-Channel Warnings",
value="✅ Enabled" if config.send_in_channel_warnings else "❌ Disabled",
inline=True,
)
await ctx.send(embed=embed)
@config.command(name="prefix")
@commands.guild_only()
async def config_prefix(self, ctx: commands.Context, prefix: str) -> None:
"""Set the command prefix for this server."""
if not prefix or not prefix.strip():
await ctx.send("Prefix cannot be empty or whitespace only.")
return
if len(prefix) > 10:
await ctx.send("Prefix must be 10 characters or less.")
return
await self.bot.guild_config.update_settings(ctx.guild.id, prefix=prefix)
await ctx.send(f"Command prefix set to `{prefix}`")
@config.command(name="logchannel")
@commands.guild_only()
async def config_log_channel(
self, ctx: commands.Context, channel: discord.TextChannel | None = None
) -> None:
"""Set the channel for general event logs."""
channel_id = channel.id if channel else None
await self.bot.guild_config.update_settings(ctx.guild.id, log_channel_id=channel_id)
if channel:
await ctx.send(f"Log channel set to {channel.mention}")
else:
await ctx.send("Log channel has been disabled.")
@config.command(name="modlogchannel")
@commands.guild_only()
async def config_mod_log_channel(
self, ctx: commands.Context, channel: discord.TextChannel | None = None
) -> None:
"""Set the channel for moderation action logs."""
channel_id = channel.id if channel else None
await self.bot.guild_config.update_settings(ctx.guild.id, mod_log_channel_id=channel_id)
if channel:
await ctx.send(f"Moderation log channel set to {channel.mention}")
else:
await ctx.send("Moderation log channel has been disabled.")
@config.command(name="welcomechannel")
@commands.guild_only()
async def config_welcome_channel(
self, ctx: commands.Context, channel: discord.TextChannel | None = None
) -> None:
"""Set the welcome channel for new members."""
channel_id = channel.id if channel else None
await self.bot.guild_config.update_settings(ctx.guild.id, welcome_channel_id=channel_id)
if channel:
await ctx.send(f"Welcome channel set to {channel.mention}")
else:
await ctx.send("Welcome channel has been disabled.")
@config.command(name="muterole")
@commands.guild_only()
async def config_mute_role(
self, ctx: commands.Context, role: discord.Role | None = None
) -> None:
"""Set the role to assign when muting members."""
role_id = role.id if role else None
await self.bot.guild_config.update_settings(ctx.guild.id, mute_role_id=role_id)
if role:
await ctx.send(f"Mute role set to {role.mention}")
else:
await ctx.send("Mute role has been cleared.")
@config.command(name="automod")
@commands.guild_only()
async def config_automod(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable automod features."""
await self.bot.guild_config.update_settings(ctx.guild.id, automod_enabled=enabled)
status = "enabled" if enabled else "disabled"
await ctx.send(f"AutoMod has been {status}.")
@config.command(name="antispam")
@commands.guild_only()
async def config_antispam(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable anti-spam protection."""
await self.bot.guild_config.update_settings(ctx.guild.id, anti_spam_enabled=enabled)
status = "enabled" if enabled else "disabled"
await ctx.send(f"Anti-spam has been {status}.")
@config.command(name="linkfilter")
@commands.guild_only()
async def config_linkfilter(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable link filtering."""
await self.bot.guild_config.update_settings(ctx.guild.id, link_filter_enabled=enabled)
status = "enabled" if enabled else "disabled"
await ctx.send(f"Link filter has been {status}.")
@commands.group(name="bannedwords", aliases=["bw"], invoke_without_command=True)
@commands.guild_only()
async def banned_words(self, ctx: commands.Context) -> None:
"""Manage banned words list."""
words = await self.bot.guild_config.get_banned_words(ctx.guild.id)
if not words:
await ctx.send("No banned words configured.")
return
embed = discord.Embed(
title="Banned Words",
color=discord.Color.red(),
)
for word in words[:25]: # Discord embed limit
word_type = "Regex" if word.is_regex else "Text"
embed.add_field(
name=f"#{word.id}: {word.pattern[:30]}",
value=f"Type: {word_type} | Action: {word.action}",
inline=True,
)
if len(words) > 25:
embed.set_footer(text=f"Showing 25 of {len(words)} banned words")
await ctx.send(embed=embed)
@banned_words.command(name="add")
@commands.guild_only()
async def banned_words_add(
self,
ctx: commands.Context,
pattern: str,
action: Literal["delete", "warn", "strike"] = "delete",
is_regex: bool = False,
) -> None:
"""Add a banned word or pattern."""
word = await self.bot.guild_config.add_banned_word(
guild_id=ctx.guild.id,
pattern=pattern,
added_by=ctx.author.id,
is_regex=is_regex,
action=action,
)
word_type = "regex pattern" if is_regex else "word"
await ctx.send(f"Added banned {word_type}: `{pattern}` (ID: {word.id}, Action: {action})")
@banned_words.command(name="remove", aliases=["delete"])
@commands.guild_only()
async def banned_words_remove(self, ctx: commands.Context, word_id: int) -> None:
"""Remove a banned word by ID."""
success = await self.bot.guild_config.remove_banned_word(ctx.guild.id, word_id)
if success:
await ctx.send(f"Removed banned word #{word_id}")
else:
await ctx.send(f"Banned word #{word_id} not found.")
@commands.command(name="channelwarnings")
@commands.guild_only()
async def channel_warnings(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable PUBLIC in-channel warnings when DMs fail.
WARNING: In-channel messages are PUBLIC and visible to all users in the channel.
They are NOT private due to Discord API limitations.
When enabled, if a user has DMs disabled, moderation warnings will be sent
as temporary PUBLIC messages in the channel (auto-deleted after 10 seconds).
Args:
enabled: True to enable PUBLIC warnings, False to disable (default: False)
"""
await self.bot.guild_config.update_settings(ctx.guild.id, send_in_channel_warnings=enabled)
status = "enabled" if enabled else "disabled"
embed = discord.Embed(
title="In-Channel Warnings Updated",
description=f"In-channel warnings are now **{status}**.",
color=discord.Color.green() if enabled else discord.Color.orange(),
)
if enabled:
embed.add_field(
name="⚠️ Privacy Warning",
value="**Messages are PUBLIC and visible to ALL users in the channel.**\n"
"When a user has DMs disabled, moderation warnings will be sent "
"as temporary PUBLIC messages in the channel (auto-deleted after 10 seconds).",
inline=False,
)
else:
embed.add_field(
name="✅ Privacy Protected",
value="When users have DMs disabled, they will not receive any notification. "
"This protects user privacy and prevents public embarrassment.",
inline=False,
)
await ctx.send(embed=embed)
@commands.group(name="whitelist", invoke_without_command=True)
@commands.guild_only()
async def whitelist_cmd(self, ctx: commands.Context) -> None:
"""Manage the moderation whitelist."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
whitelisted_ids = config.whitelisted_user_ids if config else []
if not whitelisted_ids:
await ctx.send("No users are whitelisted.")
return
embed = discord.Embed(
title="Whitelisted Users",
description="These users bypass all moderation checks:",
color=discord.Color.blue(),
)
users_text = []
for user_id in whitelisted_ids[:25]: # Limit to 25 to avoid embed limits
user = ctx.guild.get_member(user_id)
if user:
users_text.append(f"{user.mention} (`{user_id}`)")
else:
users_text.append(f"• Unknown User (`{user_id}`)")
embed.add_field(
name=f"Total: {len(whitelisted_ids)} users",
value="\n".join(users_text) if users_text else "None",
inline=False,
)
if len(whitelisted_ids) > 25:
embed.set_footer(text=f"Showing 25 of {len(whitelisted_ids)} users")
await ctx.send(embed=embed)
@whitelist_cmd.command(name="add")
@commands.guild_only()
async def whitelist_add(self, ctx: commands.Context, user: discord.Member) -> None:
"""Add a user to the whitelist.
Whitelisted users bypass ALL moderation checks (automod and AI moderation).
"""
config = await self.bot.guild_config.get_config(ctx.guild.id)
whitelisted_ids = list(config.whitelisted_user_ids) if config else []
if user.id in whitelisted_ids:
await ctx.send(f"{user.mention} is already whitelisted.")
return
whitelisted_ids.append(user.id)
await self.bot.guild_config.update_settings(
ctx.guild.id, whitelisted_user_ids=whitelisted_ids
)
embed = discord.Embed(
title="✅ User Whitelisted",
description=f"{user.mention} has been added to the whitelist.",
color=discord.Color.green(),
)
embed.add_field(
name="What this means",
value="This user will bypass all automod and AI moderation checks.",
inline=False,
)
await ctx.send(embed=embed)
@whitelist_cmd.command(name="remove")
@commands.guild_only()
async def whitelist_remove(self, ctx: commands.Context, user: discord.Member) -> None:
"""Remove a user from the whitelist."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
whitelisted_ids = list(config.whitelisted_user_ids) if config else []
if user.id not in whitelisted_ids:
await ctx.send(f"{user.mention} is not whitelisted.")
return
whitelisted_ids.remove(user.id)
await self.bot.guild_config.update_settings(
ctx.guild.id, whitelisted_user_ids=whitelisted_ids
)
embed = discord.Embed(
title="🚫 User Removed from Whitelist",
description=f"{user.mention} has been removed from the whitelist.",
color=discord.Color.orange(),
)
embed.add_field(
name="What this means",
value="This user will now be subject to normal moderation checks.",
inline=False,
)
await ctx.send(embed=embed)
@whitelist_cmd.command(name="clear")
@commands.guild_only()
async def whitelist_clear(self, ctx: commands.Context) -> None:
"""Clear the entire whitelist."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
count = len(config.whitelisted_user_ids) if config else 0
if count == 0:
await ctx.send("The whitelist is already empty.")
return
await self.bot.guild_config.update_settings(ctx.guild.id, whitelisted_user_ids=[])
embed = discord.Embed(
title="🧹 Whitelist Cleared",
description=f"Removed {count} user(s) from the whitelist.",
color=discord.Color.red(),
)
embed.add_field(
name="What this means",
value="All users will now be subject to normal moderation checks.",
inline=False,
)
await ctx.send(embed=embed)
@commands.command(name="sync")
@commands.is_owner()
async def sync_commands(self, ctx: commands.Context) -> None:
"""Sync slash commands (bot owner only)."""
await self.bot.tree.sync()
await ctx.send("Slash commands synced.")
async def setup(bot: GuardDen) -> None:
"""Load the Admin cog."""
await bot.add_cog(Admin(bot))

View File

@@ -1,237 +0,0 @@
"""Event handlers for logging and monitoring."""
import logging
from datetime import datetime, timezone
import discord
from discord.ext import commands
from guardden.bot import GuardDen
logger = logging.getLogger(__name__)
class Events(commands.Cog):
"""Handles Discord events for logging and monitoring."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member) -> None:
"""Called when a member joins a guild."""
logger.debug(f"Member joined: {member} in {member.guild}")
config = await self.bot.guild_config.get_config(member.guild.id)
if not config or not config.log_channel_id:
return
channel = member.guild.get_channel(config.log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Member Joined",
description=f"{member.mention} ({member})",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.set_thumbnail(url=member.display_avatar.url)
embed.add_field(
name="Account Created", value=discord.utils.format_dt(member.created_at, "R")
)
embed.add_field(name="Member ID", value=str(member.id))
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_remove(self, member: discord.Member) -> None:
"""Called when a member leaves a guild."""
logger.debug(f"Member left: {member} from {member.guild}")
config = await self.bot.guild_config.get_config(member.guild.id)
if not config or not config.log_channel_id:
return
channel = member.guild.get_channel(config.log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Member Left",
description=f"{member} ({member.id})",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.set_thumbnail(url=member.display_avatar.url)
if member.joined_at:
embed.add_field(name="Joined", value=discord.utils.format_dt(member.joined_at, "R"))
roles = [r.mention for r in member.roles if r != member.guild.default_role]
if roles:
embed.add_field(name="Roles", value=", ".join(roles[:10]), inline=False)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_message_delete(self, message: discord.Message) -> None:
"""Called when a message is deleted."""
if message.author.bot or not message.guild:
return
config = await self.bot.guild_config.get_config(message.guild.id)
if not config or not config.log_channel_id:
return
channel = message.guild.get_channel(config.log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Message Deleted",
description=f"In {message.channel.mention}",
color=discord.Color.red(),
timestamp=datetime.now(timezone.utc),
)
embed.set_author(name=str(message.author), icon_url=message.author.display_avatar.url)
if message.content:
content = message.content[:1024] if len(message.content) > 1024 else message.content
embed.add_field(name="Content", value=content, inline=False)
if message.attachments:
attachments = "\n".join(a.filename for a in message.attachments)
embed.add_field(name="Attachments", value=attachments, inline=False)
embed.set_footer(text=f"Author ID: {message.author.id} | Message ID: {message.id}")
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:
"""Called when a message is edited."""
if before.author.bot or not before.guild:
return
if before.content == after.content:
return
config = await self.bot.guild_config.get_config(before.guild.id)
if not config or not config.log_channel_id:
return
channel = before.guild.get_channel(config.log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Message Edited",
description=f"In {before.channel.mention} | [Jump to message]({after.jump_url})",
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
embed.set_author(name=str(before.author), icon_url=before.author.display_avatar.url)
before_content = before.content[:1024] if len(before.content) > 1024 else before.content
after_content = after.content[:1024] if len(after.content) > 1024 else after.content
embed.add_field(name="Before", value=before_content or "*empty*", inline=False)
embed.add_field(name="After", value=after_content or "*empty*", inline=False)
embed.set_footer(text=f"Author ID: {before.author.id}")
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_voice_state_update(
self,
member: discord.Member,
before: discord.VoiceState,
after: discord.VoiceState,
) -> None:
"""Called when a member's voice state changes."""
if member.bot:
return
config = await self.bot.guild_config.get_config(member.guild.id)
if not config or not config.log_channel_id:
return
channel = member.guild.get_channel(config.log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = None
if before.channel is None and after.channel is not None:
embed = discord.Embed(
title="Voice Channel Joined",
description=f"{member.mention} joined {after.channel.mention}",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
elif before.channel is not None and after.channel is None:
embed = discord.Embed(
title="Voice Channel Left",
description=f"{member.mention} left {before.channel.mention}",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
elif before.channel != after.channel and before.channel and after.channel:
embed = discord.Embed(
title="Voice Channel Moved",
description=f"{member.mention} moved from {before.channel.mention} to {after.channel.mention}",
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
if embed:
embed.set_author(name=str(member), icon_url=member.display_avatar.url)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_ban(self, guild: discord.Guild, user: discord.User) -> None:
"""Called when a user is banned."""
config = await self.bot.guild_config.get_config(guild.id)
if not config or not config.mod_log_channel_id:
return
channel = guild.get_channel(config.mod_log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Member Banned",
description=f"{user} ({user.id})",
color=discord.Color.dark_red(),
timestamp=datetime.now(timezone.utc),
)
embed.set_thumbnail(url=user.display_avatar.url)
await channel.send(embed=embed)
@commands.Cog.listener()
async def on_member_unban(self, guild: discord.Guild, user: discord.User) -> None:
"""Called when a user is unbanned."""
config = await self.bot.guild_config.get_config(guild.id)
if not config or not config.mod_log_channel_id:
return
channel = guild.get_channel(config.mod_log_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
embed = discord.Embed(
title="Member Unbanned",
description=f"{user} ({user.id})",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.set_thumbnail(url=user.display_avatar.url)
await channel.send(embed=embed)
async def setup(bot: GuardDen) -> None:
"""Load the Events cog."""
await bot.add_cog(Events(bot))

View File

@@ -1,71 +0,0 @@
"""Health check commands."""
import logging
import discord
from discord.ext import commands
from sqlalchemy import select
from guardden.bot import GuardDen
from guardden.utils.ratelimit import RateLimitExceeded
logger = logging.getLogger(__name__)
class Health(commands.Cog):
"""Health checks for the bot."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
def cog_check(self, ctx: commands.Context) -> bool:
if not ctx.guild:
return False
if not self.bot.is_owner_allowed(ctx.author.id):
return False
return ctx.author.guild_permissions.administrator
async def cog_before_invoke(self, ctx: commands.Context) -> None:
if not ctx.command:
return
result = self.bot.rate_limiter.acquire_command(
ctx.command.qualified_name,
user_id=ctx.author.id,
guild_id=ctx.guild.id if ctx.guild else None,
channel_id=ctx.channel.id,
)
if result.is_limited:
raise RateLimitExceeded(result.reset_after)
async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
if isinstance(error, RateLimitExceeded):
await ctx.send(
f"You're being rate limited. Try again in {error.retry_after:.1f} seconds."
)
@commands.command(name="health")
@commands.guild_only()
async def health(self, ctx: commands.Context) -> None:
"""Check database and AI provider health."""
db_status = "ok"
try:
async with self.bot.database.session() as session:
await session.execute(select(1))
except Exception as exc: # pragma: no cover - external dependency
logger.exception("Health check database failure")
db_status = f"error: {exc}"
ai_status = "disabled"
if self.bot.settings.ai_provider != "none":
ai_status = "ok" if self.bot.ai_provider else "unavailable"
embed = discord.Embed(title="GuardDen Health", color=discord.Color.green())
embed.add_field(name="Database", value=db_status, inline=False)
embed.add_field(name="AI Provider", value=ai_status, inline=False)
await ctx.send(embed=embed)
async def setup(bot: GuardDen) -> None:
"""Load the health cog."""
await bot.add_cog(Health(bot))

View File

@@ -1,381 +0,0 @@
"""Custom help command for GuardDen."""
import logging
import discord
from discord.ext import commands
from guardden.bot import GuardDen
logger = logging.getLogger(__name__)
class GuardDenHelpCommand(commands.HelpCommand):
"""Custom help command with embed formatting and permission filtering."""
# Friendly category names with emojis
CATEGORY_NAMES = {
"Moderation": "🛡️ Moderation",
"Admin": "⚙️ Server Configuration",
"Automod": "🤖 Automatic Moderation",
"AiModeration": "🧠 AI Moderation",
"Verification": "✅ Member Verification",
"Health": "💊 System Health",
"WordlistSync": "📝 Wordlist Sync",
}
# Category descriptions
CATEGORY_DESCRIPTIONS = {
"Moderation": "Server moderation tools",
"Admin": "Bot settings and configuration",
"Automod": "Automatic content filtering rules",
"AiModeration": "AI-powered content moderation",
"Verification": "New member verification system",
"Health": "System diagnostics",
"WordlistSync": "Wordlist synchronization",
}
def get_command_signature(self, command: commands.Command) -> str:
"""Get the command signature showing usage."""
parent = command.full_parent_name
alias = command.name if not parent else f"{parent} {command.name}"
return f"{self.context.clean_prefix}{alias} {command.signature}"
def get_cog_display_name(self, cog_name: str) -> str:
"""Get user-friendly display name for a cog."""
return self.CATEGORY_NAMES.get(cog_name, cog_name)
def get_cog_description(self, cog_name: str) -> str:
"""Get description for a cog."""
return self.CATEGORY_DESCRIPTIONS.get(cog_name, "Commands")
def _get_permission_info(self, command: commands.Command) -> tuple[str, discord.Color]:
"""Get permission requirement text and color for a command."""
# Check cog-level restrictions
if command.cog:
cog_name = command.cog.qualified_name
if cog_name == "Admin":
return "🔒 Admin Only", discord.Color.red()
elif cog_name == "Moderation":
return "🛡️ Moderator/Owner", discord.Color.orange()
elif cog_name == "WordlistSync":
return "🔒 Admin Only", discord.Color.red()
# Check command-level checks
if hasattr(command.callback, "__commands_checks__"):
checks = command.callback.__commands_checks__
for check in checks:
check_name = getattr(check, "__name__", "")
if "is_owner" in check_name:
return "👑 Bot Owner Only", discord.Color.dark_red()
elif "has_permissions" in check_name or "administrator" in check_name:
return "🔒 Admin Only", discord.Color.red()
return "👥 Everyone", discord.Color.green()
async def send_bot_help(self, mapping: dict) -> None:
"""Send the main help menu showing all commands with detailed information."""
embeds = []
prefix = self.context.clean_prefix
# Create overview embed
overview = discord.Embed(
title="📚 GuardDen Help - All Commands",
description=f"A comprehensive Discord moderation bot\n\n"
f"**Legend:**\n"
f"👥 Everyone can use | 🛡️ Moderators/Owners | 🔒 Admins | 👑 Bot Owner",
color=discord.Color.blue(),
)
overview.set_footer(text=f"Prefix: {prefix} (customizable per server)")
embeds.append(overview)
# Collect all commands organized by category
for cog, cog_commands in mapping.items():
if cog is None:
continue
# Get all commands (don't filter by permissions for full overview)
all_commands = sorted(cog_commands, key=lambda c: c.qualified_name)
if not all_commands:
continue
cog_name = cog.qualified_name
display_name = self.get_cog_display_name(cog_name)
# Create embed for this category
embed = discord.Embed(
title=display_name,
description=self.get_cog_description(cog_name),
color=discord.Color.gold() if "Admin" in display_name else discord.Color.blue(),
)
# Add each command with full details
for command in all_commands:
perm_text, _ = self._get_permission_info(command)
# Build command signature with all parameters
signature_parts = [command.name]
if command.signature:
signature_parts.append(command.signature)
full_signature = f"{prefix}{' '.join(signature_parts)}"
# Build description
desc_parts = []
# Add help text
if command.help:
desc_parts.append(command.help.split("\n")[0])
else:
desc_parts.append("No description available")
# Add aliases if present
if command.aliases:
desc_parts.append(f"*Aliases: {', '.join(command.aliases)}*")
# Add permission requirement
desc_parts.append(f"**Permission:** {perm_text}")
# Add parameter details if present
if command.clean_params:
param_details = []
for param_name, param in command.clean_params.items():
if param.default is param.empty:
param_details.append(f"`{param_name}` (required)")
else:
default_val = param.default if param.default is not None else "None"
param_details.append(f"`{param_name}` (default: {default_val})")
if param_details:
desc_parts.append(f"**Options:** {', '.join(param_details)}")
# Handle subcommands for groups
if isinstance(command, commands.Group):
subcommands = list(command.commands)
if subcommands:
subcommand_names = ", ".join(
f"`{cmd.name}`" for cmd in sorted(subcommands, key=lambda c: c.name)
)
desc_parts.append(f"**Subcommands:** {subcommand_names}")
description = "\n".join(desc_parts)
embed.add_field(
name=f"`{full_signature}`",
value=description,
inline=False,
)
embeds.append(embed)
# Send all embeds
channel = self.get_destination()
for embed in embeds:
await channel.send(embed=embed)
async def send_cog_help(self, cog: commands.Cog) -> None:
"""Send help for a specific category/cog."""
# Get all commands (show all, not just what user can run)
all_commands = sorted(cog.get_commands(), key=lambda c: c.qualified_name)
if not all_commands:
await self.get_destination().send(f"No commands available in this category.")
return
cog_name = cog.qualified_name
display_name = self.get_cog_display_name(cog_name)
embed = discord.Embed(
title=f"{display_name} Commands",
description=f"{cog.description or 'Commands in this category'}\n\n"
f"**Legend:** 👥 Everyone | 🛡️ Moderators/Owners | 🔒 Admins | 👑 Bot Owner",
color=discord.Color.gold() if "Admin" in display_name else discord.Color.blue(),
)
# Show each command with full details
for command in all_commands:
# Get permission info
perm_text, _ = self._get_permission_info(command)
# Get command signature
signature = self.get_command_signature(command)
# Build description
desc_parts = []
if command.help:
desc_parts.append(command.help.split("\n")[0]) # First line only
else:
desc_parts.append("No description available")
if command.aliases:
desc_parts.append(f"*Aliases: {', '.join(command.aliases)}*")
# Add permission info
desc_parts.append(f"**Permission:** {perm_text}")
# Add parameter info
if command.clean_params:
param_count = len(command.clean_params)
required_count = sum(
1 for p in command.clean_params.values() if p.default is p.empty
)
desc_parts.append(
f"**Parameters:** {required_count} required, {param_count - required_count} optional"
)
description = "\n".join(desc_parts)
embed.add_field(
name=f"`{signature}`",
value=description,
inline=False,
)
embed.set_footer(text=f"Use {self.context.clean_prefix}help <command> for detailed info")
channel = self.get_destination()
await channel.send(embed=embed)
async def send_group_help(self, group: commands.Group) -> None:
"""Send help for a command group."""
embed = discord.Embed(
title=f"Command Group: {group.qualified_name}",
description=group.help or "No description available",
color=discord.Color.blurple(),
)
# Add usage
signature = self.get_command_signature(group)
embed.add_field(
name="Usage",
value=f"`{signature}`",
inline=False,
)
# List subcommands
filtered = await self.filter_commands(group.commands, sort=True)
if filtered:
subcommands_text = []
for command in filtered:
sig = f"{self.context.clean_prefix}{command.qualified_name} {command.signature}"
desc = command.help.split("\n")[0] if command.help else "No description"
subcommands_text.append(f"`{sig}`\n{desc}")
embed.add_field(
name="Subcommands",
value="\n\n".join(subcommands_text[:10]), # Limit to 10 to avoid embed size limits
inline=False,
)
if len(filtered) > 10:
embed.add_field(
name="More...",
value=f"And {len(filtered) - 10} more subcommands",
inline=False,
)
# Add aliases
if group.aliases:
embed.add_field(
name="Aliases",
value=", ".join(f"`{alias}`" for alias in group.aliases),
inline=False,
)
channel = self.get_destination()
await channel.send(embed=embed)
async def send_command_help(self, command: commands.Command) -> None:
"""Send help for a specific command."""
perm_text, perm_color = self._get_permission_info(command)
embed = discord.Embed(
title=f"Command: {command.qualified_name}",
description=command.help or "No description available",
color=perm_color,
)
# Add usage
signature = self.get_command_signature(command)
embed.add_field(
name="Usage",
value=f"`{signature}`",
inline=False,
)
# Add permission requirement prominently
embed.add_field(
name="Permission Required",
value=perm_text,
inline=False,
)
# Add aliases
if command.aliases:
embed.add_field(
name="Aliases",
value=", ".join(f"`{alias}`" for alias in command.aliases),
inline=False,
)
# Add parameter details if available
if command.clean_params:
params_text = []
for param_name, param in command.clean_params.items():
# Get parameter annotation for type hint
param_type = ""
if param.annotation is not param.empty:
type_name = getattr(param.annotation, "__name__", str(param.annotation))
param_type = f" ({type_name})"
# Determine if required or optional
if param.default is param.empty:
params_text.append(f"`{param_name}`{param_type} - **Required**")
else:
default_val = param.default if param.default is not None else "None"
params_text.append(
f"`{param_name}`{param_type} - Optional (default: `{default_val}`)"
)
if params_text:
embed.add_field(
name="Parameters",
value="\n".join(params_text),
inline=False,
)
# Add category info
if command.cog:
cog_name = command.cog.qualified_name
embed.set_footer(text=f"Category: {self.get_cog_display_name(cog_name)}")
channel = self.get_destination()
await channel.send(embed=embed)
async def send_error_message(self, error: str) -> None:
"""Send an error message."""
embed = discord.Embed(
title="Help Error",
description=error,
color=discord.Color.red(),
)
embed.set_footer(text=f"Use {self.context.clean_prefix}help for available commands")
channel = self.get_destination()
await channel.send(embed=embed)
async def command_not_found(self, string: str) -> str:
"""Handle command not found error."""
return f"No command or category called `{string}` found."
async def subcommand_not_found(self, command: commands.Command, string: str) -> str:
"""Handle subcommand not found error."""
if isinstance(command, commands.Group) and len(command.all_commands) > 0:
return f"Command `{command.qualified_name}` has no subcommand named `{string}`."
return f"Command `{command.qualified_name}` has no subcommands."
async def setup(bot: GuardDen) -> None:
"""Set up the help command."""
bot.help_command = GuardDenHelpCommand()
logger.info("Custom help command loaded")

View File

@@ -1,513 +0,0 @@
"""Moderation commands and automod features."""
import logging
from datetime import datetime, timedelta, timezone
import discord
from discord.ext import commands
from sqlalchemy import func, select
from guardden.bot import GuardDen
from guardden.models import ModerationLog, Strike
from guardden.utils import parse_duration
from guardden.utils.notifications import send_moderation_notification
from guardden.utils.ratelimit import RateLimitExceeded
logger = logging.getLogger(__name__)
class Moderation(commands.Cog):
"""Moderation commands for server management."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
def cog_check(self, ctx: commands.Context) -> bool:
if not ctx.guild:
return False
if not self.bot.is_owner_allowed(ctx.author.id):
return False
return True
async def cog_before_invoke(self, ctx: commands.Context) -> None:
if not ctx.command:
return
result = self.bot.rate_limiter.acquire_command(
ctx.command.qualified_name,
user_id=ctx.author.id,
guild_id=ctx.guild.id if ctx.guild else None,
channel_id=ctx.channel.id,
)
if result.is_limited:
raise RateLimitExceeded(result.reset_after)
async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
if isinstance(error, RateLimitExceeded):
await ctx.send(
f"You're being rate limited. Try again in {error.retry_after:.1f} seconds."
)
async def _log_action(
self,
guild: discord.Guild,
target: discord.Member | discord.User,
moderator: discord.Member | discord.User,
action: str,
reason: str | None = None,
duration: int | None = None,
channel: discord.TextChannel | None = None,
message: discord.Message | None = None,
is_automatic: bool = False,
) -> None:
"""Log a moderation action to the database."""
expires_at = None
if duration:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=duration)
async with self.bot.database.session() as session:
log_entry = ModerationLog(
guild_id=guild.id,
target_id=target.id,
target_name=str(target),
moderator_id=moderator.id,
moderator_name=str(moderator),
action=action,
reason=reason,
duration=duration,
expires_at=expires_at,
channel_id=channel.id if channel else None,
message_id=message.id if message else None,
message_content=message.content if message else None,
is_automatic=is_automatic,
)
session.add(log_entry)
async def _get_strike_count(self, guild_id: int, user_id: int) -> int:
"""Get the total active strike count for a user."""
async with self.bot.database.session() as session:
result = await session.execute(
select(func.sum(Strike.points)).where(
Strike.guild_id == guild_id,
Strike.user_id == user_id,
Strike.is_active == True,
)
)
total = result.scalar()
return total or 0
async def _add_strike(
self,
guild: discord.Guild,
user: discord.Member,
moderator: discord.Member | discord.User,
reason: str,
points: int = 1,
) -> int:
"""Add a strike to a user and return their new total."""
async with self.bot.database.session() as session:
strike = Strike(
guild_id=guild.id,
user_id=user.id,
user_name=str(user),
moderator_id=moderator.id,
reason=reason,
points=points,
)
session.add(strike)
return await self._get_strike_count(guild.id, user.id)
@commands.command(name="warn")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def warn(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Warn a member."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot warn someone with a higher or equal role.")
return
await self._log_action(ctx.guild, member, ctx.author, "warn", reason)
embed = discord.Embed(
title="Warning Issued",
description=f"{member.mention} has been warned.",
color=discord.Color.yellow(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
# Notify the user
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Warning in {ctx.guild.name}",
description=f"You have been warned.",
color=discord.Color.yellow(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
@commands.command(name="strike")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def strike(
self,
ctx: commands.Context,
member: discord.Member,
points: int = 1,
*,
reason: str = "No reason provided",
) -> None:
"""Add a strike to a member."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot strike someone with a higher or equal role.")
return
total_strikes = await self._add_strike(ctx.guild, member, ctx.author, reason, points)
await self._log_action(ctx.guild, member, ctx.author, "strike", reason)
embed = discord.Embed(
title="Strike Added",
description=f"{member.mention} has received {points} strike(s).",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.add_field(name="Total Strikes", value=str(total_strikes))
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
# Check for automatic actions based on strike thresholds
config = await self.bot.guild_config.get_config(ctx.guild.id)
if config and config.strike_actions:
for threshold, action_config in sorted(
config.strike_actions.items(), key=lambda x: int(x[0]), reverse=True
):
if total_strikes >= int(threshold):
action = action_config.get("action")
if action == "ban":
await ctx.invoke(
self.ban, member=member, reason=f"Automatic: {total_strikes} strikes"
)
elif action == "kick":
await ctx.invoke(
self.kick, member=member, reason=f"Automatic: {total_strikes} strikes"
)
elif action == "timeout":
duration = action_config.get("duration", 3600)
await ctx.invoke(
self.timeout,
member=member,
duration=f"{duration}s",
reason=f"Automatic: {total_strikes} strikes",
)
break
@commands.command(name="strikes")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def strikes(self, ctx: commands.Context, member: discord.Member) -> None:
"""View strikes for a member."""
async with self.bot.database.session() as session:
result = await session.execute(
select(Strike)
.where(
Strike.guild_id == ctx.guild.id,
Strike.user_id == member.id,
Strike.is_active == True,
)
.order_by(Strike.created_at.desc())
.limit(10)
)
user_strikes = result.scalars().all()
total = await self._get_strike_count(ctx.guild.id, member.id)
embed = discord.Embed(
title=f"Strikes for {member}",
description=f"Total active strikes: **{total}**",
color=discord.Color.orange(),
)
if user_strikes:
for strike in user_strikes:
embed.add_field(
name=f"Strike #{strike.id} ({strike.points} pts)",
value=f"{strike.reason}\n*{strike.created_at.strftime('%Y-%m-%d')}*",
inline=False,
)
else:
embed.description = f"{member.mention} has no active strikes."
await ctx.send(embed=embed)
@commands.command(name="timeout", aliases=["mute"])
@commands.has_permissions(moderate_members=True)
@commands.guild_only()
async def timeout(
self,
ctx: commands.Context,
member: discord.Member,
duration: str = "1h",
*,
reason: str = "No reason provided",
) -> None:
"""Timeout a member (e.g., !timeout @user 1h Spamming)."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot timeout someone with a higher or equal role.")
return
delta = parse_duration(duration)
if not delta:
await ctx.send("Invalid duration. Use format like: 30m, 1h, 7d")
return
if delta > timedelta(days=28):
await ctx.send("Timeout duration cannot exceed 28 days.")
return
try:
await member.timeout(delta, reason=f"{ctx.author}: {reason}")
except discord.Forbidden:
await ctx.send("I don't have permission to timeout this user.")
return
except discord.HTTPException as e:
await ctx.send(f"Failed to timeout user: {e}")
return
await self._log_action(
ctx.guild, member, ctx.author, "timeout", reason, int(delta.total_seconds())
)
embed = discord.Embed(
title="Member Timed Out",
description=f"{member.mention} has been timed out for {duration}.",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
@commands.command(name="untimeout", aliases=["unmute"])
@commands.has_permissions(moderate_members=True)
@commands.guild_only()
async def untimeout(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Remove timeout from a member."""
await member.timeout(None, reason=f"{ctx.author}: {reason}")
await self._log_action(ctx.guild, member, ctx.author, "unmute", reason)
embed = discord.Embed(
title="Timeout Removed",
description=f"{member.mention}'s timeout has been removed.",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
@commands.command(name="kick")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def kick(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Kick a member from the server."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot kick someone with a higher or equal role.")
return
# Notify the user before kicking
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Kicked from {ctx.guild.name}",
description=f"You have been kicked from the server.",
color=discord.Color.red(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
try:
await member.kick(reason=f"{ctx.author}: {reason}")
except discord.Forbidden:
await ctx.send("❌ I don't have permission to kick this member.")
return
except discord.HTTPException as e:
await ctx.send(f"❌ Failed to kick member: {e}")
return
await self._log_action(ctx.guild, member, ctx.author, "kick", reason)
embed = discord.Embed(
title="Member Kicked",
description=f"{member} has been kicked from the server.",
color=discord.Color.red(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
try:
await ctx.send(embed=embed)
except discord.HTTPException:
await ctx.send(f"{member} has been kicked from the server.")
@commands.command(name="ban")
@commands.has_permissions(ban_members=True)
@commands.guild_only()
async def ban(
self,
ctx: commands.Context,
member: discord.Member | discord.User,
*,
reason: str = "No reason provided",
) -> None:
"""Ban a member from the server."""
if isinstance(member, discord.Member):
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot ban someone with a higher or equal role.")
return
# Notify the user before banning
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Banned from {ctx.guild.name}",
description=f"You have been banned from the server.",
color=discord.Color.dark_red(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
try:
await ctx.guild.ban(member, reason=f"{ctx.author}: {reason}", delete_message_days=0)
except discord.Forbidden:
await ctx.send("❌ I don't have permission to ban this member.")
return
except discord.HTTPException as e:
await ctx.send(f"❌ Failed to ban member: {e}")
return
await self._log_action(ctx.guild, member, ctx.author, "ban", reason)
embed = discord.Embed(
title="Member Banned",
description=f"{member} has been banned from the server.",
color=discord.Color.dark_red(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
try:
await ctx.send(embed=embed)
except discord.HTTPException:
await ctx.send(f"{member} has been banned from the server.")
@commands.command(name="unban")
@commands.has_permissions(ban_members=True)
@commands.guild_only()
async def unban(
self, ctx: commands.Context, user_id: int, *, reason: str = "No reason provided"
) -> None:
"""Unban a user by their ID."""
try:
user = await self.bot.fetch_user(user_id)
await ctx.guild.unban(user, reason=f"{ctx.author}: {reason}")
await self._log_action(ctx.guild, user, ctx.author, "unban", reason)
embed = discord.Embed(
title="User Unbanned",
description=f"{user} has been unbanned.",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
except discord.NotFound:
await ctx.send("User not found or not banned.")
except discord.Forbidden:
await ctx.send("I don't have permission to unban this user.")
@commands.command(name="purge", aliases=["clear"])
@commands.has_permissions(manage_messages=True)
@commands.guild_only()
async def purge(self, ctx: commands.Context, amount: int) -> None:
"""Delete multiple messages at once (max 100)."""
if amount < 1 or amount > 100:
await ctx.send("Please specify a number between 1 and 100.")
return
deleted = await ctx.channel.purge(limit=amount + 1) # +1 to include the command message
msg = await ctx.send(f"Deleted {len(deleted) - 1} message(s).")
await msg.delete(delay=3)
@commands.command(name="modlogs", aliases=["history"])
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def modlogs(self, ctx: commands.Context, member: discord.Member | discord.User) -> None:
"""View moderation history for a user."""
async with self.bot.database.session() as session:
result = await session.execute(
select(ModerationLog)
.where(ModerationLog.guild_id == ctx.guild.id, ModerationLog.target_id == member.id)
.order_by(ModerationLog.created_at.desc())
.limit(10)
)
logs = result.scalars().all()
embed = discord.Embed(
title=f"Moderation History for {member}",
color=discord.Color.blue(),
)
if logs:
for log in logs:
value = f"**Reason:** {log.reason or 'None'}\n**By:** {log.moderator_name}\n*{log.created_at.strftime('%Y-%m-%d %H:%M')}*"
embed.add_field(name=f"{log.action.upper()} (#{log.id})", value=value, inline=False)
else:
embed.description = "No moderation history found."
await ctx.send(embed=embed)
async def setup(bot: GuardDen) -> None:
"""Load the Moderation cog."""
await bot.add_cog(Moderation(bot))

105
src/guardden/cogs/owner.py Normal file
View File

@@ -0,0 +1,105 @@
"""Owner-only commands for bot maintenance."""
import logging
from datetime import datetime, timezone
import discord
from discord.ext import commands
from guardden.bot import GuardDen
logger = logging.getLogger(__name__)
class Owner(commands.Cog):
"""Owner-only commands for debugging and maintenance."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
self.start_time = datetime.now(timezone.utc)
@commands.command(name="status")
@commands.is_owner()
async def status_cmd(self, ctx: commands.Context) -> None:
"""Show bot status and AI usage statistics."""
uptime = datetime.now(timezone.utc) - self.start_time
hours, remainder = divmod(int(uptime.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
embed = discord.Embed(
title="GuardDen Status",
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
# Bot info
embed.add_field(
name="Bot Info",
value=f"**Uptime:** {hours}h {minutes}m {seconds}s\n"
f"**Guilds:** {len(self.bot.guilds)}\n"
f"**Users:** {sum(g.member_count or 0 for g in self.bot.guilds)}",
inline=False,
)
# AI provider info
ai_status = "None (Disabled)" if self.bot.settings.ai_provider == "none" else self.bot.settings.ai_provider.capitalize()
embed.add_field(
name="AI Provider",
value=ai_status,
inline=True,
)
# Config status
config_loaded = "Yes" if hasattr(self.bot, 'config_loader') and self.bot.config_loader.config else "No"
embed.add_field(
name="Config Loaded",
value=config_loaded,
inline=True,
)
# AI usage stats (if available)
if hasattr(self.bot, 'ai_rate_limiter'):
for guild in self.bot.guilds:
stats = self.bot.ai_rate_limiter.get_stats(guild.id)
if stats['guild_checks_this_hour'] > 0:
max_checks = self.bot.config_loader.get_setting('ai_moderation.max_checks_per_hour_per_guild', 25)
usage_pct = (stats['guild_checks_this_hour'] / max_checks) * 100
status_emoji = "🟢" if usage_pct < 50 else "🟡" if usage_pct < 80 else "🔴"
embed.add_field(
name=f"{status_emoji} {guild.name}",
value=f"**AI Checks (1h):** {stats['guild_checks_this_hour']}/{max_checks} ({usage_pct:.0f}%)\n"
f"**Today:** {stats['guild_checks_today']}",
inline=False,
)
await ctx.send(embed=embed)
@commands.command(name="reload")
@commands.is_owner()
async def reload_cmd(self, ctx: commands.Context) -> None:
"""Reload configuration from config.yml."""
if not hasattr(self.bot, 'config_loader'):
await ctx.send("❌ Config loader not initialized.")
return
try:
await self.bot.config_loader.reload()
await ctx.send("✅ Configuration reloaded successfully.")
logger.info("Configuration reloaded by owner command")
except Exception as e:
await ctx.send(f"❌ Failed to reload config: {e}")
logger.error(f"Failed to reload config: {e}", exc_info=True)
@commands.command(name="ping")
@commands.is_owner()
async def ping_cmd(self, ctx: commands.Context) -> None:
"""Check bot latency."""
latency_ms = round(self.bot.latency * 1000, 2)
await ctx.send(f"🏓 Pong! Latency: {latency_ms}ms")
async def setup(bot: GuardDen) -> None:
"""Load the Owner cog."""
await bot.add_cog(Owner(bot))

View File

@@ -1,449 +0,0 @@
"""Verification cog for new member verification."""
import logging
from datetime import datetime, timezone
import discord
from discord import ui
from discord.ext import commands, tasks
from guardden.bot import GuardDen
from guardden.services.verification import (
ChallengeType,
PendingVerification,
VerificationService,
)
from guardden.utils.ratelimit import RateLimitExceeded
logger = logging.getLogger(__name__)
class VerifyButton(ui.Button["VerificationView"]):
"""Button for simple verification."""
def __init__(self) -> None:
super().__init__(
style=discord.ButtonStyle.success,
label="Verify",
custom_id="verify_button",
)
async def callback(self, interaction: discord.Interaction) -> None:
if self.view is None:
return
success, message = await self.view.cog.complete_verification(
interaction.guild.id,
interaction.user.id,
"verified",
)
if success:
await interaction.response.send_message(message, ephemeral=True)
# Disable the button
self.disabled = True
self.label = "Verified"
await interaction.message.edit(view=self.view)
else:
await interaction.response.send_message(message, ephemeral=True)
class EmojiButton(ui.Button["EmojiVerificationView"]):
"""Button for emoji selection verification."""
def __init__(self, emoji: str, row: int = 0) -> None:
super().__init__(
style=discord.ButtonStyle.secondary,
label=emoji,
custom_id=f"emoji_{emoji}",
row=row,
)
self.emoji_value = emoji
async def callback(self, interaction: discord.Interaction) -> None:
if self.view is None:
return
success, message = await self.view.cog.complete_verification(
interaction.guild.id,
interaction.user.id,
self.emoji_value,
)
if success:
await interaction.response.send_message(message, ephemeral=True)
# Disable all buttons
for item in self.view.children:
if isinstance(item, ui.Button):
item.disabled = True
await interaction.message.edit(view=self.view)
else:
await interaction.response.send_message(message, ephemeral=True)
class VerificationView(ui.View):
"""View for button verification."""
def __init__(self, cog: "Verification", timeout: float = 600) -> None:
super().__init__(timeout=timeout)
self.cog = cog
self.add_item(VerifyButton())
class EmojiVerificationView(ui.View):
"""View for emoji selection verification."""
def __init__(self, cog: "Verification", options: list[str], timeout: float = 600) -> None:
super().__init__(timeout=timeout)
self.cog = cog
for i, emoji in enumerate(options):
self.add_item(EmojiButton(emoji, row=i // 4))
class CaptchaModal(ui.Modal):
"""Modal for captcha/math input."""
answer = ui.TextInput(
label="Your Answer",
placeholder="Enter the answer here...",
max_length=50,
)
def __init__(self, cog: "Verification", title: str = "Verification") -> None:
super().__init__(title=title)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
success, message = await self.cog.complete_verification(
interaction.guild.id,
interaction.user.id,
self.answer.value,
)
await interaction.response.send_message(message, ephemeral=True)
class AnswerButton(ui.Button["AnswerView"]):
"""Button to open the answer modal."""
def __init__(self) -> None:
super().__init__(
style=discord.ButtonStyle.primary,
label="Submit Answer",
custom_id="submit_answer",
)
async def callback(self, interaction: discord.Interaction) -> None:
if self.view is None:
return
modal = CaptchaModal(self.view.cog)
await interaction.response.send_modal(modal)
class AnswerView(ui.View):
"""View with button to open answer modal."""
def __init__(self, cog: "Verification", timeout: float = 600) -> None:
super().__init__(timeout=timeout)
self.cog = cog
self.add_item(AnswerButton())
class Verification(commands.Cog):
"""Member verification system."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
self.service = VerificationService()
self.cleanup_task.start()
def cog_check(self, ctx: commands.Context) -> bool:
if not ctx.guild:
return False
if not self.bot.is_owner_allowed(ctx.author.id):
return False
return True
async def cog_before_invoke(self, ctx: commands.Context) -> None:
if not ctx.command:
return
result = self.bot.rate_limiter.acquire_command(
ctx.command.qualified_name,
user_id=ctx.author.id,
guild_id=ctx.guild.id if ctx.guild else None,
channel_id=ctx.channel.id,
)
if result.is_limited:
raise RateLimitExceeded(result.reset_after)
async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
if isinstance(error, RateLimitExceeded):
await ctx.send(
f"You're being rate limited. Try again in {error.retry_after:.1f} seconds."
)
def cog_unload(self) -> None:
self.cleanup_task.cancel()
@tasks.loop(minutes=5)
async def cleanup_task(self) -> None:
"""Periodically clean up expired verifications."""
count = self.service.cleanup_expired()
if count > 0:
logger.debug(f"Cleaned up {count} expired verifications")
@cleanup_task.before_loop
async def before_cleanup(self) -> None:
await self.bot.wait_until_ready()
async def complete_verification(
self, guild_id: int, user_id: int, response: str
) -> tuple[bool, str]:
"""Complete a verification and assign role if successful."""
success, message = self.service.verify(guild_id, user_id, response)
if success:
# Assign verified role
guild = self.bot.get_guild(guild_id)
if guild:
member = guild.get_member(user_id)
config = await self.bot.guild_config.get_config(guild_id)
if member and config and config.verified_role_id:
role = guild.get_role(config.verified_role_id)
if role:
try:
await member.add_roles(role, reason="Verification completed")
logger.info(f"Verified {member} in {guild.name}")
except discord.Forbidden:
logger.warning(f"Cannot assign verified role in {guild.name}")
return success, message
async def send_verification(
self,
member: discord.Member,
channel: discord.TextChannel,
challenge_type: ChallengeType,
) -> None:
"""Send a verification challenge to a member."""
pending = self.service.create_challenge(
user_id=member.id,
guild_id=member.guild.id,
challenge_type=challenge_type,
)
embed = discord.Embed(
title="Verification Required",
description=pending.challenge.question,
color=discord.Color.blue(),
timestamp=datetime.now(timezone.utc),
)
embed.set_footer(
text=f"Expires in 10 minutes • {pending.challenge.max_attempts} attempts allowed"
)
# Create appropriate view based on challenge type
if challenge_type == ChallengeType.BUTTON:
view = VerificationView(self)
elif challenge_type == ChallengeType.EMOJI:
view = EmojiVerificationView(self, pending.challenge.options)
else:
# Captcha or Math - use modal
view = AnswerView(self)
try:
# Try to DM the user first
dm_channel = await member.create_dm()
msg = await dm_channel.send(embed=embed, view=view)
pending.message_id = msg.id
pending.channel_id = dm_channel.id
except discord.Forbidden:
# Fall back to channel mention
msg = await channel.send(
content=member.mention,
embed=embed,
view=view,
)
pending.message_id = msg.id
pending.channel_id = channel.id
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member) -> None:
"""Handle new member joins for verification."""
if member.bot:
return
config = await self.bot.guild_config.get_config(member.guild.id)
if not config or not config.verification_enabled:
return
# Determine verification channel
channel_id = config.welcome_channel_id or config.log_channel_id
if not channel_id:
return
channel = member.guild.get_channel(channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
return
# Get challenge type from config
try:
challenge_type = ChallengeType(config.verification_type)
except ValueError:
challenge_type = ChallengeType.BUTTON
await self.send_verification(member, channel, challenge_type)
@commands.group(name="verify", invoke_without_command=True)
@commands.guild_only()
async def verify_cmd(self, ctx: commands.Context) -> None:
"""Request a verification challenge."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
if not config or not config.verification_enabled:
await ctx.send("Verification is not enabled on this server.")
return
# Check if already verified
if config.verified_role_id:
role = ctx.guild.get_role(config.verified_role_id)
if role and role in ctx.author.roles:
await ctx.send("You are already verified!")
return
# Check for existing pending verification
pending = self.service.get_pending(ctx.guild.id, ctx.author.id)
if pending and not pending.challenge.is_expired:
await ctx.send("You already have a pending verification. Please complete it first.")
return
# Get challenge type
try:
challenge_type = ChallengeType(config.verification_type)
except ValueError:
challenge_type = ChallengeType.BUTTON
await self.send_verification(ctx.author, ctx.channel, challenge_type)
await ctx.message.delete(delay=1)
@verify_cmd.command(name="setup")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_setup(self, ctx: commands.Context) -> None:
"""View verification setup status."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
embed = discord.Embed(
title="Verification Setup",
color=discord.Color.blue(),
)
embed.add_field(
name="Enabled",
value="✅ Yes" if config and config.verification_enabled else "❌ No",
inline=True,
)
embed.add_field(
name="Type",
value=config.verification_type if config else "button",
inline=True,
)
if config and config.verified_role_id:
role = ctx.guild.get_role(config.verified_role_id)
embed.add_field(
name="Verified Role",
value=role.mention if role else "Not found",
inline=True,
)
else:
embed.add_field(name="Verified Role", value="Not set", inline=True)
pending_count = self.service.get_pending_count(ctx.guild.id)
embed.add_field(name="Pending Verifications", value=str(pending_count), inline=True)
await ctx.send(embed=embed)
@verify_cmd.command(name="enable")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_enable(self, ctx: commands.Context) -> None:
"""Enable verification for new members."""
config = await self.bot.guild_config.get_config(ctx.guild.id)
if not config or not config.verified_role_id:
await ctx.send("Please set a verified role first with `!verify role @role`")
return
await self.bot.guild_config.update_settings(ctx.guild.id, verification_enabled=True)
await ctx.send("✅ Verification enabled for new members.")
@verify_cmd.command(name="disable")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_disable(self, ctx: commands.Context) -> None:
"""Disable verification."""
await self.bot.guild_config.update_settings(ctx.guild.id, verification_enabled=False)
await ctx.send("❌ Verification disabled.")
@verify_cmd.command(name="role")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_role(self, ctx: commands.Context, role: discord.Role) -> None:
"""Set the role given upon verification."""
await self.bot.guild_config.update_settings(ctx.guild.id, verified_role_id=role.id)
await ctx.send(f"Verified role set to {role.mention}")
@verify_cmd.command(name="type")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_type(self, ctx: commands.Context, vtype: str) -> None:
"""Set verification type (button, captcha, math, emoji)."""
try:
challenge_type = ChallengeType(vtype.lower())
except ValueError:
valid = ", ".join(t.value for t in ChallengeType if t != ChallengeType.QUESTIONS)
await ctx.send(f"Invalid type. Valid options: {valid}")
return
await self.bot.guild_config.update_settings(
ctx.guild.id, verification_type=challenge_type.value
)
await ctx.send(f"Verification type set to **{challenge_type.value}**")
@verify_cmd.command(name="test")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def verify_test(self, ctx: commands.Context, vtype: str = "button") -> None:
"""Test verification (sends challenge to you)."""
try:
challenge_type = ChallengeType(vtype.lower())
except ValueError:
challenge_type = ChallengeType.BUTTON
await self.send_verification(ctx.author, ctx.channel, challenge_type)
@verify_cmd.command(name="reset")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def verify_reset(self, ctx: commands.Context, member: discord.Member) -> None:
"""Reset verification for a member (remove role and cancel pending)."""
# Cancel any pending verification
self.service.cancel(ctx.guild.id, member.id)
# Remove verified role
config = await self.bot.guild_config.get_config(ctx.guild.id)
if config and config.verified_role_id:
role = ctx.guild.get_role(config.verified_role_id)
if role and role in member.roles:
try:
await member.remove_roles(role, reason=f"Verification reset by {ctx.author}")
except discord.Forbidden:
pass
await ctx.send(f"Reset verification for {member.mention}")
async def setup(bot: GuardDen) -> None:
"""Load the Verification cog."""
await bot.add_cog(Verification(bot))

View File

@@ -1,38 +0,0 @@
"""Background task for managed wordlist syncing."""
import logging
from discord.ext import commands, tasks
from guardden.services.wordlist import WordlistService
logger = logging.getLogger(__name__)
class WordlistSync(commands.Cog):
"""Periodic sync of managed wordlists into guild bans."""
def __init__(self, bot: commands.Bot, service: WordlistService) -> None:
self.bot = bot
self.service = service
self.sync_task.change_interval(hours=service.update_interval.total_seconds() / 3600)
self.sync_task.start()
def cog_unload(self) -> None:
self.sync_task.cancel()
@tasks.loop(hours=1)
async def sync_task(self) -> None:
await self.service.sync_all()
@sync_task.before_loop
async def before_sync_task(self) -> None:
await self.bot.wait_until_ready()
async def setup(bot: commands.Bot) -> None:
service = getattr(bot, "wordlist_service", None)
if not service:
logger.warning("Wordlist service not initialized; skipping sync task")
return
await bot.add_cog(WordlistSync(bot, service))

View File

@@ -1,11 +1,10 @@
"""Configuration management for GuardDen."""
"""Configuration management for GuardDen - Minimal Version."""
import json
import re
from pathlib import Path
from typing import Any, Literal
from pydantic import BaseModel, Field, SecretStr, ValidationError, field_validator
from pydantic import BaseModel, Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic_settings.sources import EnvSettingsSource
@@ -69,62 +68,13 @@ class GuardDenEnvSettingsSource(EnvSettingsSource):
"""Environment settings source with safe list parsing."""
def decode_complex_value(self, field_name: str, field, value: Any):
if field_name in {"allowed_guilds", "owner_ids"} and isinstance(value, str):
if field_name in {"owner_ids"} and isinstance(value, str):
return value
return super().decode_complex_value(field_name, field, value)
class WordlistSourceConfig(BaseModel):
"""Configuration for a managed wordlist source."""
name: str
url: str
category: Literal["hard", "soft", "context"]
action: Literal["delete", "warn", "strike"]
reason: str
is_regex: bool = False
enabled: bool = True
class GuildDefaults(BaseModel):
"""Default values for new guild settings (configurable via env).
These values are used when creating a new guild configuration.
Override via environment variables with GUARDDEN_GUILD_DEFAULT_ prefix.
Example: GUARDDEN_GUILD_DEFAULT_PREFIX=? sets the default prefix to "?"
"""
prefix: str = Field(default="!", min_length=1, max_length=10)
locale: str = Field(default="en", min_length=2, max_length=10)
automod_enabled: bool = True
anti_spam_enabled: bool = True
link_filter_enabled: bool = False
message_rate_limit: int = Field(default=5, ge=1)
message_rate_window: int = Field(default=5, ge=1)
duplicate_threshold: int = Field(default=3, ge=1)
mention_limit: int = Field(default=5, ge=1)
mention_rate_limit: int = Field(default=10, ge=1)
mention_rate_window: int = Field(default=60, ge=1)
ai_moderation_enabled: bool = True
ai_sensitivity: int = Field(default=80, ge=0, le=100)
ai_confidence_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
ai_log_only: bool = False
nsfw_detection_enabled: bool = True
verification_enabled: bool = False
verification_type: Literal["button", "captcha", "math", "emoji"] = "button"
strike_actions: dict = Field(
default_factory=lambda: {
"1": {"action": "warn"},
"3": {"action": "timeout", "duration": 300},
"5": {"action": "kick"},
"7": {"action": "ban"},
}
)
scam_allowlist: list[str] = Field(default_factory=list)
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
"""Application settings loaded from environment variables - Minimal Version."""
model_config = SettingsConfigDict(
env_file=".env",
@@ -177,62 +127,23 @@ class Settings(BaseSettings):
log_json: bool = Field(default=False, description="Use JSON structured logging format")
log_file: str | None = Field(default=None, description="Log file path (optional)")
# Access control
allowed_guilds: list[int] = Field(
default_factory=list,
description="Guild IDs the bot is allowed to join (empty = allow all)",
)
# Access control (owner IDs for debug commands)
owner_ids: list[int] = Field(
default_factory=list,
description="Owner user IDs with elevated access (empty = allow admins)",
description="Owner user IDs for debug commands (empty = all admins)",
)
# Paths
data_dir: Path = Field(default=Path("data"), description="Data directory for persistent files")
# Wordlist sync
wordlist_enabled: bool = Field(
default=True, description="Enable automatic managed wordlist syncing"
)
wordlist_update_hours: int = Field(
default=168, description="Managed wordlist sync interval in hours"
)
wordlist_sources: list[WordlistSourceConfig] = Field(
default_factory=list,
description="Managed wordlist sources (JSON array via env overrides)",
# Config file path
config_file: Path = Field(
default=Path("config.yml"),
description="Path to config.yml file",
)
# Guild defaults (used when creating new guild configurations)
guild_default: GuildDefaults = Field(
default_factory=GuildDefaults,
description="Default values for new guild settings",
)
@field_validator("allowed_guilds", "owner_ids", mode="before")
@field_validator("owner_ids", mode="before")
@classmethod
def _validate_id_list(cls, value: Any) -> list[int]:
return _parse_id_list(value)
@field_validator("wordlist_sources", mode="before")
@classmethod
def _parse_wordlist_sources(cls, value: Any) -> list[WordlistSourceConfig]:
if value is None:
return []
if isinstance(value, list):
return [WordlistSourceConfig.model_validate(item) for item in value]
if isinstance(value, str):
text = value.strip()
if not text:
return []
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise ValueError("Invalid JSON for wordlist_sources") from exc
if not isinstance(data, list):
raise ValueError("wordlist_sources must be a JSON array")
return [WordlistSourceConfig.model_validate(item) for item in data]
return []
@field_validator("discord_token")
@classmethod
def _validate_discord_token(cls, value: SecretStr) -> SecretStr:
@@ -278,14 +189,6 @@ class Settings(BaseSettings):
if self.database_pool_min < 1:
raise ValueError("database_pool_min must be at least 1")
# Data directory validation
if not isinstance(self.data_dir, Path):
raise ValueError("data_dir must be a valid path")
# Wordlist validation
if self.wordlist_update_hours < 1:
raise ValueError("wordlist_update_hours must be at least 1")
def get_settings() -> Settings:
"""Get application settings instance."""

View File

@@ -1,19 +1,10 @@
"""Database models for GuardDen."""
"""Database models for GuardDen - Minimal Version."""
from guardden.models.analytics import AICheck, MessageActivity, UserActivity
from guardden.models.base import Base
from guardden.models.guild import BannedWord, Guild, GuildSettings
from guardden.models.moderation import ModerationLog, Strike, UserNote
from guardden.models.guild import Guild, GuildSettings
__all__ = [
"AICheck",
"Base",
"BannedWord",
"Guild",
"GuildSettings",
"MessageActivity",
"ModerationLog",
"Strike",
"UserActivity",
"UserNote",
]

View File

@@ -1,86 +0,0 @@
"""Analytics models for tracking bot usage and performance."""
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, Float, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from guardden.models.base import Base, SnowflakeID, TimestampMixin
class AICheck(Base, TimestampMixin):
"""Record of AI moderation checks."""
__tablename__ = "ai_checks"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False, index=True)
user_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False, index=True)
channel_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
message_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
# Check result
flagged: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
confidence: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
category: Mapped[str | None] = mapped_column(String(50), nullable=True)
severity: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Performance metrics
response_time_ms: Mapped[float] = mapped_column(Float, nullable=False)
provider: Mapped[str] = mapped_column(String(20), nullable=False)
# False positive tracking (set by moderators)
is_false_positive: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False, index=True
)
reviewed_by: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
reviewed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
class MessageActivity(Base):
"""Daily message activity statistics per guild."""
__tablename__ = "message_activity"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False, index=True)
date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
# Activity counts
total_messages: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
active_users: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
new_joins: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Moderation activity
automod_triggers: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
ai_checks: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
manual_actions: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
class UserActivity(Base, TimestampMixin):
"""Track user activity and first/last seen timestamps."""
__tablename__ = "user_activity"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False, index=True)
user_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False, index=True)
# User information
username: Mapped[str] = mapped_column(String(100), nullable=False)
# Activity timestamps
first_seen: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_seen: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_message: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
# Activity counts
message_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
command_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Moderation stats
strike_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
warning_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
kick_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
ban_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
timeout_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)

View File

@@ -1,101 +0,0 @@
"""Moderation-related database models."""
from datetime import datetime
from enum import Enum
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from guardden.models.base import Base, SnowflakeID, TimestampMixin
from guardden.models.guild import Guild
class ModAction(str, Enum):
"""Types of moderation actions."""
WARN = "warn"
TIMEOUT = "timeout"
KICK = "kick"
BAN = "ban"
UNBAN = "unban"
UNMUTE = "unmute"
NOTE = "note"
STRIKE = "strike"
DELETE = "delete"
class ModerationLog(Base, TimestampMixin):
"""Log of all moderation actions taken."""
__tablename__ = "moderation_logs"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(
SnowflakeID, ForeignKey("guilds.id", ondelete="CASCADE"), nullable=False
)
# Target and moderator
target_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
target_name: Mapped[str] = mapped_column(String(100), nullable=False)
moderator_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
moderator_name: Mapped[str] = mapped_column(String(100), nullable=False)
# Action details
action: Mapped[str] = mapped_column(String(20), nullable=False)
reason: Mapped[str | None] = mapped_column(Text, nullable=True)
duration: Mapped[int | None] = mapped_column(Integer, nullable=True) # Duration in seconds
expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
# Context
channel_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
message_id: Mapped[int | None] = mapped_column(SnowflakeID, nullable=True)
message_content: Mapped[str | None] = mapped_column(Text, nullable=True)
# Was this an automatic action?
is_automatic: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Relationship
guild: Mapped["Guild"] = relationship(back_populates="moderation_logs")
class Strike(Base, TimestampMixin):
"""User strikes/warnings tracking."""
__tablename__ = "strikes"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(
SnowflakeID, ForeignKey("guilds.id", ondelete="CASCADE"), nullable=False
)
user_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
user_name: Mapped[str] = mapped_column(String(100), nullable=False)
moderator_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
reason: Mapped[str] = mapped_column(Text, nullable=False)
points: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
# Strikes can expire
expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Reference to the moderation log entry
mod_log_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("moderation_logs.id", ondelete="SET NULL"), nullable=True
)
# Relationship
guild: Mapped["Guild"] = relationship(back_populates="strikes")
class UserNote(Base, TimestampMixin):
"""Moderator notes on users."""
__tablename__ = "user_notes"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
guild_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
user_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
moderator_id: Mapped[int] = mapped_column(SnowflakeID, nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)

View File

@@ -0,0 +1,159 @@
"""AI usage tracking and rate limiting for cost control."""
import logging
from collections import defaultdict, deque
from datetime import datetime, timedelta, timezone
from typing import TypedDict
logger = logging.getLogger(__name__)
class RateLimitResult(TypedDict):
"""Result of rate limit check."""
is_limited: bool
reason: str
guild_checks_this_hour: int
user_checks_this_hour: int
class UsageStats(TypedDict):
"""AI usage statistics."""
guild_checks_this_hour: int
guild_checks_today: int
user_checks_this_hour: int
class AIRateLimiter:
"""Track AI usage and enforce rate limits to control costs."""
def __init__(self):
"""Initialize rate limiter."""
# guild_id -> deque of timestamps
self._guild_checks: dict[int, deque] = defaultdict(lambda: deque())
# user_id -> deque of timestamps
self._user_checks: dict[int, deque] = defaultdict(lambda: deque())
def _clean_old_entries(
self,
guild_id: int,
user_id: int,
max_guild: int,
max_user: int,
) -> None:
"""Remove timestamps older than 1 hour."""
now = datetime.now(timezone.utc)
hour_ago = now - timedelta(hours=1)
# Clean guild entries
self._guild_checks[guild_id] = deque(
[ts for ts in self._guild_checks[guild_id] if ts > hour_ago],
maxlen=max_guild,
)
# Clean user entries
self._user_checks[user_id] = deque(
[ts for ts in self._user_checks[user_id] if ts > hour_ago],
maxlen=max_user,
)
def is_limited(
self,
guild_id: int,
user_id: int,
max_guild_per_hour: int,
max_user_per_hour: int,
) -> RateLimitResult:
"""Check if rate limited.
Args:
guild_id: Discord guild ID
user_id: Discord user ID
max_guild_per_hour: Maximum AI checks per hour for guild
max_user_per_hour: Maximum AI checks per hour for user
Returns:
RateLimitResult with is_limited and reason
"""
self._clean_old_entries(guild_id, user_id, max_guild_per_hour, max_user_per_hour)
guild_count = len(self._guild_checks[guild_id])
user_count = len(self._user_checks[user_id])
# Check guild limit
if guild_count >= max_guild_per_hour:
logger.warning(
f"Guild {guild_id} hit AI rate limit: {guild_count}/{max_guild_per_hour} checks this hour"
)
return RateLimitResult(
is_limited=True,
reason="guild_hourly_limit",
guild_checks_this_hour=guild_count,
user_checks_this_hour=user_count,
)
# Check user limit
if user_count >= max_user_per_hour:
logger.info(
f"User {user_id} in guild {guild_id} hit AI rate limit: {user_count}/{max_user_per_hour} checks this hour"
)
return RateLimitResult(
is_limited=True,
reason="user_hourly_limit",
guild_checks_this_hour=guild_count,
user_checks_this_hour=user_count,
)
return RateLimitResult(
is_limited=False,
reason="",
guild_checks_this_hour=guild_count,
user_checks_this_hour=user_count,
)
def track_usage(self, guild_id: int, user_id: int) -> None:
"""Track that an AI check was performed.
Args:
guild_id: Discord guild ID
user_id: Discord user ID
"""
now = datetime.now(timezone.utc)
self._guild_checks[guild_id].append(now)
self._user_checks[user_id].append(now)
logger.debug(
f"AI check tracked: guild={guild_id}, user={user_id}, "
f"guild_total_this_hour={len(self._guild_checks[guild_id])}, "
f"user_total_this_hour={len(self._user_checks[user_id])}"
)
def get_stats(self, guild_id: int, user_id: int | None = None) -> UsageStats:
"""Get usage statistics for status command.
Args:
guild_id: Discord guild ID
user_id: Optional Discord user ID for user-specific stats
Returns:
UsageStats dictionary with counts
"""
now = datetime.now(timezone.utc)
hour_ago = now - timedelta(hours=1)
day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Guild stats
guild_checks_this_hour = sum(1 for ts in self._guild_checks[guild_id] if ts > hour_ago)
guild_checks_today = sum(1 for ts in self._guild_checks[guild_id] if ts > day_start)
# User stats
user_checks_this_hour = 0
if user_id:
user_checks_this_hour = sum(1 for ts in self._user_checks[user_id] if ts > hour_ago)
return UsageStats(
guild_checks_this_hour=guild_checks_this_hour,
guild_checks_today=guild_checks_today,
user_checks_this_hour=user_checks_this_hour,
)

View File

@@ -0,0 +1,83 @@
"""Configuration loader from single YAML file."""
import logging
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
class ConfigLoader:
"""Load and manage configuration from single YAML file."""
def __init__(self, config_path: Path):
"""Initialize config loader.
Args:
config_path: Path to config.yml file
"""
self.config_path = config_path
self.config: dict[str, Any] = {}
async def load(self) -> dict[str, Any]:
"""Load configuration from YAML file.
Returns:
Configuration dictionary
Raises:
FileNotFoundError: If config file doesn't exist
yaml.YAMLError: If config file is invalid YAML
"""
if not self.config_path.exists():
raise FileNotFoundError(f"Config file not found: {self.config_path}")
with open(self.config_path) as f:
self.config = yaml.safe_load(f)
logger.info(f"Configuration loaded from {self.config_path}")
return self.config
async def reload(self) -> dict[str, Any]:
"""Reload configuration from YAML file.
Returns:
Updated configuration dictionary
"""
logger.info("Reloading configuration...")
return await self.load()
def get_setting(self, key: str, default: Any = None) -> Any:
"""Get a nested setting using dot notation.
Examples:
get_setting("ai_moderation.sensitivity") -> 80
get_setting("automod.enabled") -> True
Args:
key: Dot-separated path to setting (e.g., "ai_moderation.sensitivity")
default: Default value if setting not found
Returns:
Setting value or default
"""
parts = key.split(".")
value = self.config
for part in parts:
if isinstance(value, dict) and part in value:
value = value[part]
else:
return default
return value
def get_all(self) -> dict[str, Any]:
"""Get entire configuration dictionary.
Returns:
Complete configuration
"""
return self.config

View File

@@ -1,457 +0,0 @@
"""Configuration migration system for GuardDen.
This module handles migration from database-based Discord command configuration
to file-based YAML configuration.
"""
import logging
import asyncio
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import yaml
from guardden.services.database import Database
from guardden.services.guild_config import GuildConfigService
from guardden.services.file_config import FileConfigurationManager
from guardden.models.guild import Guild, GuildSettings, BannedWord
logger = logging.getLogger(__name__)
class ConfigurationMigrator:
"""Handles migration from database to file-based configuration."""
def __init__(
self,
database: Database,
guild_config_service: GuildConfigService,
file_config_manager: FileConfigurationManager
):
"""Initialize the migration system.
Args:
database: Database instance
guild_config_service: Current guild configuration service
file_config_manager: File configuration manager
"""
self.database = database
self.guild_config_service = guild_config_service
self.file_config_manager = file_config_manager
async def migrate_all_guilds(self, backup_existing: bool = True) -> Dict[str, Any]:
"""Migrate all guild configurations from database to files.
Args:
backup_existing: Whether to backup existing configuration files
Returns:
Dictionary with migration results
"""
logger.info("Starting migration of all guild configurations...")
results = {
"migrated_guilds": [],
"failed_guilds": [],
"skipped_guilds": [],
"total_guilds": 0,
"banned_words_migrated": 0,
"errors": []
}
try:
async with self.database.session() as session:
# Get all guilds from database
from sqlalchemy import select
stmt = select(Guild)
result = await session.execute(stmt)
guilds = result.scalars().all()
results["total_guilds"] = len(guilds)
logger.info(f"Found {len(guilds)} guilds to migrate")
for guild in guilds:
try:
await self._migrate_single_guild(guild, backup_existing, results)
except Exception as e:
error_msg = f"Failed to migrate guild {guild.id}: {str(e)}"
logger.error(error_msg)
results["failed_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"error": error_msg
})
results["errors"].append(error_msg)
# Migrate wordlists
await self._migrate_wordlists(results)
logger.info(f"Migration complete. Success: {len(results['migrated_guilds'])}, "
f"Failed: {len(results['failed_guilds'])}, "
f"Skipped: {len(results['skipped_guilds'])}")
except Exception as e:
error_msg = f"Migration failed with error: {str(e)}"
logger.error(error_msg)
results["errors"].append(error_msg)
return results
async def _migrate_single_guild(
self,
guild: Guild,
backup_existing: bool,
results: Dict[str, Any]
) -> None:
"""Migrate a single guild's configuration."""
# Check if file already exists
guild_file = self.file_config_manager.config_dir / "guilds" / f"guild-{guild.id}.yml"
if guild_file.exists():
if backup_existing:
backup_path = await self.file_config_manager.backup_config(guild.id)
logger.info(f"Backed up existing config for guild {guild.id}: {backup_path}")
else:
results["skipped_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"reason": "Configuration file already exists"
})
return
# Get guild settings from database
async with self.database.session() as session:
from sqlalchemy import select
from sqlalchemy.orm import selectinload
stmt = select(Guild).where(Guild.id == guild.id).options(
selectinload(Guild.settings),
selectinload(Guild.banned_words)
)
result = await session.execute(stmt)
guild_with_settings = result.scalar_one_or_none()
if not guild_with_settings:
raise Exception(f"Guild {guild.id} not found in database")
# Convert to file configuration format
file_config = await self._convert_guild_to_file_config(guild_with_settings)
# Write to file
with open(guild_file, 'w', encoding='utf-8') as f:
yaml.dump(file_config, f, default_flow_style=False, indent=2, sort_keys=False)
logger.info(f"Migrated guild {guild.id} ({guild.name}) to {guild_file}")
results["migrated_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"file_path": str(guild_file),
"banned_words_count": len(guild_with_settings.banned_words) if guild_with_settings.banned_words else 0
})
if guild_with_settings.banned_words:
results["banned_words_migrated"] += len(guild_with_settings.banned_words)
async def _convert_guild_to_file_config(self, guild: Guild) -> Dict[str, Any]:
"""Convert database guild model to file configuration format."""
settings = guild.settings if guild.settings else GuildSettings()
# Base guild information
config = {
"guild_id": guild.id,
"name": guild.name,
"owner_id": guild.owner_id,
"premium": guild.premium,
# Add migration metadata
"_migration_info": {
"migrated_at": datetime.now().isoformat(),
"migrated_from": "database",
"original_created_at": guild.created_at.isoformat() if guild.created_at else None,
"original_updated_at": guild.updated_at.isoformat() if guild.updated_at else None
},
"settings": {
"general": {
"prefix": settings.prefix,
"locale": settings.locale
},
"channels": {
"log_channel_id": settings.log_channel_id,
"mod_log_channel_id": settings.mod_log_channel_id,
"welcome_channel_id": settings.welcome_channel_id
},
"roles": {
"mute_role_id": settings.mute_role_id,
"verified_role_id": settings.verified_role_id,
"mod_role_ids": settings.mod_role_ids or []
},
"moderation": {
"automod_enabled": settings.automod_enabled,
"anti_spam_enabled": settings.anti_spam_enabled,
"link_filter_enabled": settings.link_filter_enabled,
"strike_actions": settings.strike_actions or {}
},
"automod": {
"message_rate_limit": settings.message_rate_limit,
"message_rate_window": settings.message_rate_window,
"duplicate_threshold": settings.duplicate_threshold,
"mention_limit": settings.mention_limit,
"mention_rate_limit": settings.mention_rate_limit,
"mention_rate_window": settings.mention_rate_window,
"scam_allowlist": settings.scam_allowlist or []
},
"ai_moderation": {
"enabled": settings.ai_moderation_enabled,
"sensitivity": settings.ai_sensitivity,
"confidence_threshold": settings.ai_confidence_threshold,
"log_only": settings.ai_log_only,
"nsfw_detection_enabled": settings.nsfw_detection_enabled,
"nsfw_only_filtering": getattr(settings, 'nsfw_only_filtering', False)
},
"verification": {
"enabled": settings.verification_enabled,
"type": settings.verification_type
}
}
}
# Add banned words if any exist
if guild.banned_words:
config["banned_words"] = []
for banned_word in guild.banned_words:
config["banned_words"].append({
"pattern": banned_word.pattern,
"action": banned_word.action,
"is_regex": banned_word.is_regex,
"reason": banned_word.reason,
"category": banned_word.category,
"source": banned_word.source,
"managed": banned_word.managed,
"added_by": banned_word.added_by,
"created_at": banned_word.created_at.isoformat() if banned_word.created_at else None
})
return config
async def _migrate_wordlists(self, results: Dict[str, Any]) -> None:
"""Migrate global banned words and allowlists to wordlist files."""
# Get all managed banned words (global wordlists)
async with self.database.session() as session:
from sqlalchemy import select
stmt = select(BannedWord).where(BannedWord.managed == True)
result = await session.execute(stmt)
managed_words = result.scalars().all()
if managed_words:
# Group by source and category
sources = {}
for word in managed_words:
source = word.source or "unknown"
if source not in sources:
sources[source] = []
sources[source].append(word)
# Update external sources configuration
external_config_path = self.file_config_manager.config_dir / "wordlists" / "external-sources.yml"
if external_config_path.exists():
with open(external_config_path, 'r', encoding='utf-8') as f:
external_config = yaml.safe_load(f)
else:
external_config = {"sources": []}
# Add migration info for discovered sources
for source_name, words in sources.items():
existing_source = next(
(s for s in external_config["sources"] if s["name"] == source_name),
None
)
if not existing_source:
# Add new source based on migrated words
category = words[0].category if words[0].category else "profanity"
action = words[0].action if words[0].action else "warn"
external_config["sources"].append({
"name": source_name,
"url": f"# MIGRATED: Originally from {source_name}",
"category": category,
"action": action,
"reason": f"Migrated from database source: {source_name}",
"enabled": False, # Disabled by default, needs manual URL
"update_interval_hours": 168,
"applies_to_guilds": [],
"_migration_info": {
"migrated_at": datetime.now().isoformat(),
"original_word_count": len(words),
"needs_url_configuration": True
}
})
# Write updated external sources
with open(external_config_path, 'w', encoding='utf-8') as f:
yaml.dump(external_config, f, default_flow_style=False, indent=2)
results["external_sources_updated"] = True
results["managed_words_found"] = len(managed_words)
logger.info(f"Updated external sources configuration with {len(sources)} discovered sources")
async def verify_migration(self, guild_ids: Optional[List[int]] = None) -> Dict[str, Any]:
"""Verify that migration was successful by comparing database and file configs.
Args:
guild_ids: Specific guild IDs to verify, or None for all
Returns:
Verification results
"""
logger.info("Verifying migration results...")
verification_results = {
"verified_guilds": [],
"mismatches": [],
"missing_files": [],
"errors": []
}
try:
async with self.database.session() as session:
from sqlalchemy import select
if guild_ids:
stmt = select(Guild).where(Guild.id.in_(guild_ids))
else:
stmt = select(Guild)
result = await session.execute(stmt)
guilds = result.scalars().all()
for guild in guilds:
try:
await self._verify_single_guild(guild, verification_results)
except Exception as e:
error_msg = f"Verification error for guild {guild.id}: {str(e)}"
logger.error(error_msg)
verification_results["errors"].append(error_msg)
logger.info(f"Verification complete. Verified: {len(verification_results['verified_guilds'])}, "
f"Mismatches: {len(verification_results['mismatches'])}, "
f"Missing: {len(verification_results['missing_files'])}")
except Exception as e:
error_msg = f"Verification failed: {str(e)}"
logger.error(error_msg)
verification_results["errors"].append(error_msg)
return verification_results
async def _verify_single_guild(self, guild: Guild, results: Dict[str, Any]) -> None:
"""Verify migration for a single guild."""
guild_file = self.file_config_manager.config_dir / "guilds" / f"guild-{guild.id}.yml"
if not guild_file.exists():
results["missing_files"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"expected_file": str(guild_file)
})
return
# Load file configuration
with open(guild_file, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
# Get database configuration
db_config = await self.guild_config_service.get_config(guild.id)
# Compare key settings
mismatches = []
if file_config.get("guild_id") != guild.id:
mismatches.append("guild_id")
if file_config.get("name") != guild.name:
mismatches.append("name")
if db_config:
file_settings = file_config.get("settings", {})
# Compare AI moderation settings
ai_settings = file_settings.get("ai_moderation", {})
if ai_settings.get("enabled") != db_config.ai_moderation_enabled:
mismatches.append("ai_moderation.enabled")
if ai_settings.get("sensitivity") != db_config.ai_sensitivity:
mismatches.append("ai_moderation.sensitivity")
# Compare automod settings
automod_settings = file_settings.get("automod", {})
if automod_settings.get("message_rate_limit") != db_config.message_rate_limit:
mismatches.append("automod.message_rate_limit")
if mismatches:
results["mismatches"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"mismatched_fields": mismatches
})
else:
results["verified_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name
})
async def cleanup_database_configs(self, confirm: bool = False) -> Dict[str, Any]:
"""Clean up database configurations after successful migration.
WARNING: This will delete all guild settings and banned words from the database.
Only run after verifying migration is successful.
Args:
confirm: Must be True to actually perform cleanup
Returns:
Cleanup results
"""
if not confirm:
raise ValueError("cleanup_database_configs requires confirm=True to prevent accidental data loss")
logger.warning("STARTING DATABASE CLEANUP - This will delete all migrated configuration data!")
cleanup_results = {
"guild_settings_deleted": 0,
"banned_words_deleted": 0,
"errors": []
}
try:
async with self.database.session() as session:
# Delete all guild settings
from sqlalchemy import delete
# Delete banned words first (foreign key constraint)
banned_words_result = await session.execute(delete(BannedWord))
cleanup_results["banned_words_deleted"] = banned_words_result.rowcount
# Delete guild settings
guild_settings_result = await session.execute(delete(GuildSettings))
cleanup_results["guild_settings_deleted"] = guild_settings_result.rowcount
await session.commit()
logger.warning(f"Database cleanup complete. Deleted {cleanup_results['guild_settings_deleted']} "
f"guild settings and {cleanup_results['banned_words_deleted']} banned words.")
except Exception as e:
error_msg = f"Database cleanup failed: {str(e)}"
logger.error(error_msg)
cleanup_results["errors"].append(error_msg)
return cleanup_results

View File

@@ -1,502 +0,0 @@
"""File-based configuration system for GuardDen.
This module provides a complete file-based configuration system that replaces
Discord commands for bot configuration. Features include:
- YAML configuration files with schema validation
- Hot-reloading with file watching
- Migration from database settings
- Comprehensive error handling and rollback
"""
import logging
import asyncio
from pathlib import Path
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime
from dataclasses import dataclass, field
import hashlib
try:
import yaml
import jsonschema
from watchfiles import watch, Change
except ImportError as e:
raise ImportError(f"Required dependencies missing: {e}. Install with 'pip install pyyaml jsonschema watchfiles'")
from guardden.models.guild import GuildSettings
from guardden.services.database import Database
logger = logging.getLogger(__name__)
@dataclass
class ConfigurationError(Exception):
"""Raised when configuration is invalid or cannot be loaded."""
file_path: str
error_message: str
validation_errors: List[str] = field(default_factory=list)
@dataclass
class FileConfig:
"""Represents a loaded configuration file."""
path: Path
content: Dict[str, Any]
last_modified: float
content_hash: str
is_valid: bool = True
validation_errors: List[str] = field(default_factory=list)
@dataclass
class GuildConfig:
"""Processed guild configuration."""
guild_id: int
name: str
owner_id: Optional[int]
premium: bool
settings: Dict[str, Any]
file_path: Path
last_updated: datetime
class FileConfigurationManager:
"""Manages file-based configuration with hot-reloading and validation."""
def __init__(self, config_dir: str = "config", database: Optional[Database] = None):
"""Initialize the configuration manager.
Args:
config_dir: Base directory for configuration files
database: Database instance for migration and fallback
"""
self.config_dir = Path(config_dir)
self.database = database
self.guild_configs: Dict[int, GuildConfig] = {}
self.wordlist_config: Optional[FileConfig] = None
self.allowlist_config: Optional[FileConfig] = None
self.external_sources_config: Optional[FileConfig] = None
# File watching
self._watch_task: Optional[asyncio.Task] = None
self._watch_enabled = True
self._callbacks: List[Callable[[int, GuildConfig], None]] = []
# Validation schemas
self._schemas: Dict[str, Dict[str, Any]] = {}
# Backup configurations (for rollback)
self._backup_configs: Dict[int, GuildConfig] = {}
# Ensure directories exist
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Create configuration directories if they don't exist."""
dirs = [
self.config_dir / "guilds",
self.config_dir / "wordlists",
self.config_dir / "schemas",
self.config_dir / "templates",
self.config_dir / "backups"
]
for dir_path in dirs:
dir_path.mkdir(parents=True, exist_ok=True)
async def initialize(self) -> None:
"""Initialize the configuration system."""
logger.info("Initializing file-based configuration system...")
try:
# Load validation schemas
await self._load_schemas()
# Load all configuration files
await self._load_all_configs()
# Start file watching for hot-reload
if self._watch_enabled:
await self._start_file_watching()
logger.info(f"Configuration system initialized with {len(self.guild_configs)} guild configs")
except Exception as e:
logger.error(f"Failed to initialize configuration system: {e}")
raise
async def shutdown(self) -> None:
"""Shutdown the configuration system."""
logger.info("Shutting down configuration system...")
if self._watch_task and not self._watch_task.done():
self._watch_task.cancel()
try:
await self._watch_task
except asyncio.CancelledError:
pass
logger.info("Configuration system shutdown complete")
async def _load_schemas(self) -> None:
"""Load validation schemas from files."""
schema_dir = self.config_dir / "schemas"
schema_files = {
"guild": schema_dir / "guild-schema.yml",
"wordlists": schema_dir / "wordlists-schema.yml"
}
for schema_name, schema_path in schema_files.items():
if schema_path.exists():
try:
with open(schema_path, 'r', encoding='utf-8') as f:
self._schemas[schema_name] = yaml.safe_load(f)
logger.debug(f"Loaded schema: {schema_name}")
except Exception as e:
logger.error(f"Failed to load schema {schema_name}: {e}")
else:
logger.warning(f"Schema file not found: {schema_path}")
async def _load_all_configs(self) -> None:
"""Load all configuration files."""
# Load guild configurations
guild_dir = self.config_dir / "guilds"
if guild_dir.exists():
for config_file in guild_dir.glob("guild-*.yml"):
try:
await self._load_guild_config(config_file)
except Exception as e:
logger.error(f"Failed to load guild config {config_file}: {e}")
# Load wordlist configurations
await self._load_wordlist_configs()
async def _load_guild_config(self, file_path: Path) -> Optional[GuildConfig]:
"""Load a single guild configuration file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
# Validate against schema
if 'guild' in self._schemas:
try:
jsonschema.validate(content, self._schemas['guild'])
except jsonschema.ValidationError as e:
logger.error(f"Schema validation failed for {file_path}: {e}")
return None
# Extract guild information
guild_id = content.get('guild_id')
if not guild_id:
logger.error(f"Guild config missing guild_id: {file_path}")
return None
guild_config = GuildConfig(
guild_id=guild_id,
name=content.get('name', f"Guild {guild_id}"),
owner_id=content.get('owner_id'),
premium=content.get('premium', False),
settings=content.get('settings', {}),
file_path=file_path,
last_updated=datetime.now()
)
# Backup current config before updating
if guild_id in self.guild_configs:
self._backup_configs[guild_id] = self.guild_configs[guild_id]
self.guild_configs[guild_id] = guild_config
logger.debug(f"Loaded guild config for {guild_id}: {guild_config.name}")
# Notify callbacks of config change
await self._notify_config_change(guild_id, guild_config)
return guild_config
except Exception as e:
logger.error(f"Error loading guild config {file_path}: {e}")
return None
async def _load_wordlist_configs(self) -> None:
"""Load wordlist configuration files."""
wordlist_dir = self.config_dir / "wordlists"
configs = {
"banned-words.yml": "wordlist_config",
"domain-allowlists.yml": "allowlist_config",
"external-sources.yml": "external_sources_config"
}
for filename, attr_name in configs.items():
file_path = wordlist_dir / filename
if file_path.exists():
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
# Calculate content hash
content_hash = hashlib.md5(str(content).encode()).hexdigest()
file_config = FileConfig(
path=file_path,
content=content,
last_modified=file_path.stat().st_mtime,
content_hash=content_hash
)
setattr(self, attr_name, file_config)
logger.debug(f"Loaded {filename}")
except Exception as e:
logger.error(f"Failed to load {filename}: {e}")
async def _start_file_watching(self) -> None:
"""Start watching configuration files for changes."""
if self._watch_task and not self._watch_task.done():
return
self._watch_task = asyncio.create_task(self._file_watcher())
logger.info("Started file watching for configuration hot-reload")
async def _file_watcher(self) -> None:
"""Watch for file changes and reload configurations."""
try:
async for changes in watch(self.config_dir, recursive=True):
for change_type, file_path in changes:
file_path = Path(file_path)
# Only process YAML files
if file_path.suffix != '.yml':
continue
if change_type in (Change.added, Change.modified):
await self._handle_file_change(file_path)
elif change_type == Change.deleted:
await self._handle_file_deletion(file_path)
except asyncio.CancelledError:
logger.debug("File watcher cancelled")
except Exception as e:
logger.error(f"File watcher error: {e}")
async def _handle_file_change(self, file_path: Path) -> None:
"""Handle a file change event."""
try:
# Determine file type and reload appropriately
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
await self._load_guild_config(file_path)
logger.info(f"Reloaded guild config: {file_path}")
elif file_path.parent.name == "wordlists":
await self._load_wordlist_configs()
logger.info(f"Reloaded wordlist config: {file_path}")
except Exception as e:
logger.error(f"Error handling file change {file_path}: {e}")
await self._rollback_config(file_path)
async def _handle_file_deletion(self, file_path: Path) -> None:
"""Handle a file deletion event."""
try:
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
# Extract guild ID from filename
guild_id_str = file_path.stem.replace("guild-", "")
try:
guild_id = int(guild_id_str)
if guild_id in self.guild_configs:
del self.guild_configs[guild_id]
logger.info(f"Removed guild config for deleted file: {file_path}")
except ValueError:
logger.warning(f"Could not parse guild ID from filename: {file_path}")
except Exception as e:
logger.error(f"Error handling file deletion {file_path}: {e}")
async def _rollback_config(self, file_path: Path) -> None:
"""Rollback to previous configuration on error."""
try:
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
guild_id_str = file_path.stem.replace("guild-", "")
guild_id = int(guild_id_str)
if guild_id in self._backup_configs:
self.guild_configs[guild_id] = self._backup_configs[guild_id]
logger.info(f"Rolled back guild config for {guild_id}")
except Exception as e:
logger.error(f"Error during rollback for {file_path}: {e}")
async def _notify_config_change(self, guild_id: int, config: GuildConfig) -> None:
"""Notify registered callbacks of configuration changes."""
for callback in self._callbacks:
try:
callback(guild_id, config)
except Exception as e:
logger.error(f"Error in config change callback: {e}")
def register_change_callback(self, callback: Callable[[int, GuildConfig], None]) -> None:
"""Register a callback for configuration changes."""
self._callbacks.append(callback)
def get_guild_config(self, guild_id: int) -> Optional[GuildConfig]:
"""Get configuration for a specific guild."""
return self.guild_configs.get(guild_id)
def get_all_guild_configs(self) -> Dict[int, GuildConfig]:
"""Get all guild configurations."""
return self.guild_configs.copy()
def get_wordlist_config(self) -> Optional[Dict[str, Any]]:
"""Get wordlist configuration."""
return self.wordlist_config.content if self.wordlist_config else None
def get_allowlist_config(self) -> Optional[Dict[str, Any]]:
"""Get domain allowlist configuration."""
return self.allowlist_config.content if self.allowlist_config else None
def get_external_sources_config(self) -> Optional[Dict[str, Any]]:
"""Get external sources configuration."""
return self.external_sources_config.content if self.external_sources_config else None
async def create_guild_config(self, guild_id: int, name: str, owner_id: Optional[int] = None) -> Path:
"""Create a new guild configuration file from template."""
guild_file = self.config_dir / "guilds" / f"guild-{guild_id}.yml"
template_file = self.config_dir / "templates" / "guild-default.yml"
if guild_file.exists():
raise ConfigurationError(
str(guild_file),
"Guild configuration already exists"
)
# Load template
if template_file.exists():
with open(template_file, 'r', encoding='utf-8') as f:
template_content = yaml.safe_load(f)
else:
# Create basic template if file doesn't exist
template_content = await self._create_basic_template()
# Customize template
template_content['guild_id'] = guild_id
template_content['name'] = name
if owner_id:
template_content['owner_id'] = owner_id
# Write configuration file
with open(guild_file, 'w', encoding='utf-8') as f:
yaml.dump(template_content, f, default_flow_style=False, indent=2)
logger.info(f"Created guild configuration: {guild_file}")
# Load the new configuration
await self._load_guild_config(guild_file)
return guild_file
async def _create_basic_template(self) -> Dict[str, Any]:
"""Create a basic configuration template."""
return {
"guild_id": 0,
"name": "",
"premium": False,
"settings": {
"general": {
"prefix": "!",
"locale": "en"
},
"channels": {
"log_channel_id": None,
"mod_log_channel_id": None,
"welcome_channel_id": None
},
"roles": {
"mute_role_id": None,
"verified_role_id": None,
"mod_role_ids": []
},
"moderation": {
"automod_enabled": True,
"anti_spam_enabled": True,
"link_filter_enabled": False,
"strike_actions": {
"1": {"action": "warn"},
"3": {"action": "timeout", "duration": 300},
"5": {"action": "kick"},
"7": {"action": "ban"}
}
},
"automod": {
"message_rate_limit": 5,
"message_rate_window": 5,
"duplicate_threshold": 3,
"mention_limit": 5,
"mention_rate_limit": 10,
"mention_rate_window": 60,
"scam_allowlist": []
},
"ai_moderation": {
"enabled": True,
"sensitivity": 80,
"confidence_threshold": 0.7,
"log_only": False,
"nsfw_detection_enabled": True,
"nsfw_only_filtering": False
},
"verification": {
"enabled": False,
"type": "button"
}
}
}
async def export_from_database(self, guild_id: int) -> Optional[Path]:
"""Export guild configuration from database to file."""
if not self.database:
raise ConfigurationError("", "Database not available for export")
try:
# Get guild settings from database
async with self.database.session() as session:
# This would need to be implemented based on your database service
# For now, return None to indicate not implemented
pass
logger.info(f"Exported guild {guild_id} configuration to file")
return None
except Exception as e:
logger.error(f"Failed to export guild {guild_id} from database: {e}")
raise ConfigurationError(
f"guild-{guild_id}.yml",
f"Database export failed: {str(e)}"
)
def validate_config(self, config_data: Dict[str, Any], schema_name: str = "guild") -> List[str]:
"""Validate configuration data against schema."""
errors = []
if schema_name in self._schemas:
try:
jsonschema.validate(config_data, self._schemas[schema_name])
except jsonschema.ValidationError as e:
errors.append(str(e))
else:
errors.append(f"Schema '{schema_name}' not found")
return errors
async def backup_config(self, guild_id: int) -> Path:
"""Create a backup of guild configuration."""
config = self.get_guild_config(guild_id)
if not config:
raise ConfigurationError(f"guild-{guild_id}.yml", "Guild configuration not found")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.config_dir / "backups" / f"guild-{guild_id}_{timestamp}.yml"
# Copy current configuration file
import shutil
shutil.copy2(config.file_path, backup_file)
logger.info(f"Created backup: {backup_file}")
return backup_file

View File

@@ -1,321 +0,0 @@
"""Verification service for new member challenges."""
import asyncio
import logging
import random
import string
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class ChallengeType(str, Enum):
"""Types of verification challenges."""
BUTTON = "button" # Simple button click
CAPTCHA = "captcha" # Text-based captcha
MATH = "math" # Simple math problem
EMOJI = "emoji" # Select correct emoji
QUESTIONS = "questions" # Custom questions
@dataclass
class Challenge:
"""Represents a verification challenge."""
challenge_type: ChallengeType
question: str
answer: str
options: list[str] = field(default_factory=list) # For multiple choice
expires_at: datetime = field(
default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=10)
)
attempts: int = 0
max_attempts: int = 3
@property
def is_expired(self) -> bool:
return datetime.now(timezone.utc) > self.expires_at
def check_answer(self, response: str) -> bool:
"""Check if the response is correct."""
self.attempts += 1
return response.strip().lower() == self.answer.lower()
@dataclass
class PendingVerification:
"""Tracks a pending verification for a user."""
user_id: int
guild_id: int
challenge: Challenge
message_id: int | None = None
channel_id: int | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class ChallengeGenerator(ABC):
"""Abstract base class for challenge generators."""
@abstractmethod
def generate(self) -> Challenge:
"""Generate a new challenge."""
pass
class ButtonChallengeGenerator(ChallengeGenerator):
"""Generates simple button click challenges."""
def generate(self) -> Challenge:
return Challenge(
challenge_type=ChallengeType.BUTTON,
question="Click the button below to verify you're human.",
answer="verified",
)
class CaptchaChallengeGenerator(ChallengeGenerator):
"""Generates text-based captcha challenges."""
def __init__(self, length: int = 6) -> None:
self.length = length
def generate(self) -> Challenge:
# Generate random alphanumeric code (avoiding confusing chars)
chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
code = "".join(random.choices(chars, k=self.length))
# Create visual representation with some obfuscation
visual = self._create_visual(code)
return Challenge(
challenge_type=ChallengeType.CAPTCHA,
question=f"Enter the code shown below:\n```\n{visual}\n```",
answer=code,
)
def _create_visual(self, code: str) -> str:
"""Create a simple text-based visual captcha."""
lines = []
# Add some noise characters
noise_chars = ".-*~^"
for _ in range(2):
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
# Add the code with spacing
spaced = " ".join(code)
lines.append(spaced)
for _ in range(2):
lines.append("".join(random.choices(noise_chars, k=len(code) * 2)))
return "\n".join(lines)
class MathChallengeGenerator(ChallengeGenerator):
"""Generates simple math problem challenges."""
def generate(self) -> Challenge:
# Generate simple addition/subtraction/multiplication
operation = random.choice(["+", "-", "*"])
if operation == "*":
a = random.randint(2, 10)
b = random.randint(2, 10)
else:
a = random.randint(10, 50)
b = random.randint(1, 20)
if operation == "+":
answer = a + b
elif operation == "-":
# Ensure positive result
if b > a:
a, b = b, a
answer = a - b
else:
answer = a * b
return Challenge(
challenge_type=ChallengeType.MATH,
question=f"Solve this math problem: **{a} {operation} {b} = ?**",
answer=str(answer),
)
class EmojiChallengeGenerator(ChallengeGenerator):
"""Generates emoji selection challenges."""
EMOJI_SETS = [
("animals", ["🐶", "🐱", "🐭", "🐹", "🐰", "🦊", "🐻", "🐼"]),
("fruits", ["🍎", "🍐", "🍊", "🍋", "🍌", "🍉", "🍇", "🍓"]),
("weather", ["☀️", "🌙", "", "🌧️", "❄️", "🌈", "", "🌪️"]),
("sports", ["", "🏀", "🏈", "", "🎾", "🏐", "🏉", "🎱"]),
]
def generate(self) -> Challenge:
category, emojis = random.choice(self.EMOJI_SETS)
target = random.choice(emojis)
# Create options with the target and some others
options = [target]
other_emojis = [e for e in emojis if e != target]
options.extend(random.sample(other_emojis, min(3, len(other_emojis))))
random.shuffle(options)
return Challenge(
challenge_type=ChallengeType.EMOJI,
question=f"Select the {self._emoji_name(target)} emoji:",
answer=target,
options=options,
)
def _emoji_name(self, emoji: str) -> str:
"""Get a description of the emoji."""
names = {
"🐶": "dog",
"🐱": "cat",
"🐭": "mouse",
"🐹": "hamster",
"🐰": "rabbit",
"🦊": "fox",
"🐻": "bear",
"🐼": "panda",
"🍎": "apple",
"🍐": "pear",
"🍊": "orange",
"🍋": "lemon",
"🍌": "banana",
"🍉": "watermelon",
"🍇": "grapes",
"🍓": "strawberry",
"☀️": "sun",
"🌙": "moon",
"": "star",
"🌧️": "rain",
"❄️": "snowflake",
"🌈": "rainbow",
"": "lightning",
"🌪️": "tornado",
"": "soccer ball",
"🏀": "basketball",
"🏈": "football",
"": "baseball",
"🎾": "tennis",
"🏐": "volleyball",
"🏉": "rugby",
"🎱": "pool ball",
}
return names.get(emoji, "correct")
class QuestionsChallengeGenerator(ChallengeGenerator):
"""Generates custom question challenges."""
DEFAULT_QUESTIONS = [
("What color is the sky on a clear day?", "blue"),
("Type the word 'verified' to continue.", "verified"),
("What is 2 + 2?", "4"),
("What planet do we live on?", "earth"),
]
def __init__(self, questions: list[tuple[str, str]] | None = None) -> None:
self.questions = questions or self.DEFAULT_QUESTIONS
def generate(self) -> Challenge:
question, answer = random.choice(self.questions)
return Challenge(
challenge_type=ChallengeType.QUESTIONS,
question=question,
answer=answer,
)
class VerificationService:
"""Service for managing member verification."""
def __init__(self) -> None:
# Pending verifications: {(guild_id, user_id): PendingVerification}
self._pending: dict[tuple[int, int], PendingVerification] = {}
# Challenge generators
self._generators: dict[ChallengeType, ChallengeGenerator] = {
ChallengeType.BUTTON: ButtonChallengeGenerator(),
ChallengeType.CAPTCHA: CaptchaChallengeGenerator(),
ChallengeType.MATH: MathChallengeGenerator(),
ChallengeType.EMOJI: EmojiChallengeGenerator(),
ChallengeType.QUESTIONS: QuestionsChallengeGenerator(),
}
def create_challenge(
self,
user_id: int,
guild_id: int,
challenge_type: ChallengeType = ChallengeType.BUTTON,
) -> PendingVerification:
"""Create a new verification challenge for a user."""
generator = self._generators.get(challenge_type)
if not generator:
generator = self._generators[ChallengeType.BUTTON]
challenge = generator.generate()
pending = PendingVerification(
user_id=user_id,
guild_id=guild_id,
challenge=challenge,
)
self._pending[(guild_id, user_id)] = pending
return pending
def get_pending(self, guild_id: int, user_id: int) -> PendingVerification | None:
"""Get a pending verification for a user."""
return self._pending.get((guild_id, user_id))
def verify(self, guild_id: int, user_id: int, response: str) -> tuple[bool, str]:
"""
Attempt to verify a user's response.
Returns:
Tuple of (success, message)
"""
pending = self._pending.get((guild_id, user_id))
if not pending:
return False, "No pending verification found."
if pending.challenge.is_expired:
self._pending.pop((guild_id, user_id), None)
return False, "Verification expired. Please request a new one."
if pending.challenge.attempts >= pending.challenge.max_attempts:
self._pending.pop((guild_id, user_id), None)
return False, "Too many failed attempts. Please request a new verification."
if pending.challenge.check_answer(response):
self._pending.pop((guild_id, user_id), None)
return True, "Verification successful!"
remaining = pending.challenge.max_attempts - pending.challenge.attempts
return False, f"Incorrect. {remaining} attempt(s) remaining."
def cancel(self, guild_id: int, user_id: int) -> bool:
"""Cancel a pending verification."""
return self._pending.pop((guild_id, user_id), None) is not None
def cleanup_expired(self) -> int:
"""Remove expired verifications. Returns count of removed."""
expired = [key for key, pending in self._pending.items() if pending.challenge.is_expired]
for key in expired:
self._pending.pop(key, None)
return len(expired)
def get_pending_count(self, guild_id: int) -> int:
"""Get count of pending verifications for a guild."""
return sum(1 for (gid, _) in self._pending if gid == guild_id)

View File

@@ -1,180 +0,0 @@
"""Managed wordlist sync service."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable
import httpx
from sqlalchemy import delete, select
from guardden.config import Settings, WordlistSourceConfig
from guardden.models import BannedWord, Guild
from guardden.services.database import Database
logger = logging.getLogger(__name__)
MAX_WORDLIST_ENTRY_LENGTH = 128
REQUEST_TIMEOUT = 20.0
@dataclass(frozen=True)
class WordlistSource:
name: str
url: str
category: str
action: str
reason: str
is_regex: bool = False
DEFAULT_SOURCES: list[WordlistSource] = [
WordlistSource(
name="ldnoobw_en",
url="https://raw.githubusercontent.com/LDNOOBW/List-of-Dirty-Naughty-Obscene-and-Otherwise-Bad-Words/master/en",
category="soft",
action="warn",
reason="Auto list: profanity",
is_regex=False,
),
]
def _normalize_entry(line: str) -> str:
text = line.strip().lower()
if not text:
return ""
if len(text) > MAX_WORDLIST_ENTRY_LENGTH:
return ""
return text
def _parse_wordlist(text: str) -> list[str]:
entries: list[str] = []
seen: set[str] = set()
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
if line.startswith("#") or line.startswith("//") or line.startswith(";"):
continue
normalized = _normalize_entry(line)
if not normalized or normalized in seen:
continue
entries.append(normalized)
seen.add(normalized)
return entries
class WordlistService:
"""Fetches and syncs managed wordlists into per-guild bans."""
def __init__(self, database: Database, settings: Settings) -> None:
self.database = database
self.settings = settings
self.sources = self._load_sources(settings)
self.update_interval = timedelta(hours=settings.wordlist_update_hours)
self.last_sync: datetime | None = None
@staticmethod
def _load_sources(settings: Settings) -> list[WordlistSource]:
if settings.wordlist_sources:
sources: list[WordlistSource] = []
for src in settings.wordlist_sources:
if not src.enabled:
continue
sources.append(
WordlistSource(
name=src.name,
url=src.url,
category=src.category,
action=src.action,
reason=src.reason,
is_regex=src.is_regex,
)
)
return sources
return list(DEFAULT_SOURCES)
async def _fetch_source(self, source: WordlistSource) -> list[str]:
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client:
response = await client.get(source.url)
response.raise_for_status()
return _parse_wordlist(response.text)
async def sync_all(self) -> None:
if not self.settings.wordlist_enabled:
logger.info("Managed wordlist sync disabled")
return
if not self.sources:
logger.warning("No wordlist sources configured")
return
logger.info("Starting managed wordlist sync (%d sources)", len(self.sources))
async with self.database.session() as session:
guild_ids = list((await session.execute(select(Guild.id))).scalars().all())
for source in self.sources:
try:
entries = await self._fetch_source(source)
except Exception as exc:
logger.error("Failed to fetch wordlist %s: %s", source.name, exc)
continue
if not entries:
logger.warning("Wordlist %s returned no entries", source.name)
continue
await self._sync_source_to_guilds(source, entries, guild_ids)
self.last_sync = datetime.now(timezone.utc)
logger.info("Managed wordlist sync completed")
async def _sync_source_to_guilds(
self, source: WordlistSource, entries: Iterable[str], guild_ids: list[int]
) -> None:
entry_set = set(entries)
async with self.database.session() as session:
for guild_id in guild_ids:
result = await session.execute(
select(BannedWord).where(
BannedWord.guild_id == guild_id,
BannedWord.managed.is_(True),
BannedWord.source == source.name,
)
)
existing = list(result.scalars().all())
existing_set = {word.pattern.lower() for word in existing}
to_add = entry_set - existing_set
to_remove = existing_set - entry_set
if to_remove:
await session.execute(
delete(BannedWord).where(
BannedWord.guild_id == guild_id,
BannedWord.managed.is_(True),
BannedWord.source == source.name,
BannedWord.pattern.in_(to_remove),
)
)
if to_add:
session.add_all(
[
BannedWord(
guild_id=guild_id,
pattern=pattern,
is_regex=source.is_regex,
action=source.action,
reason=source.reason,
source=source.name,
category=source.category,
managed=True,
added_by=0,
)
for pattern in to_add
]
)

View File

@@ -1,79 +0,0 @@
"""Utility functions for sending moderation notifications."""
import logging
import discord
logger = logging.getLogger(__name__)
async def send_moderation_notification(
user: discord.User | discord.Member,
channel: discord.TextChannel,
embed: discord.Embed,
send_in_channel: bool = False,
) -> bool:
"""
Send moderation notification to user.
Attempts to DM the user first. If DM fails and send_in_channel is True,
sends a temporary PUBLIC message in the channel that auto-deletes after 10 seconds.
WARNING: In-channel messages are PUBLIC and visible to all users in the channel.
They are NOT private or ephemeral due to Discord API limitations.
Args:
user: The user to notify
channel: The channel to send fallback message in
embed: The embed to send
send_in_channel: Whether to send PUBLIC in-channel message if DM fails (default: False)
Returns:
True if notification was delivered (via DM or channel), False otherwise
"""
# Try to DM the user first
try:
await user.send(embed=embed)
logger.debug(f"Sent moderation notification DM to {user}")
return True
except discord.Forbidden:
logger.debug(f"User {user} has DMs disabled, attempting in-channel notification")
pass
except discord.HTTPException as e:
logger.warning(f"Failed to DM user {user}: {e}")
pass
# DM failed, try in-channel notification if enabled
if not send_in_channel:
logger.debug(f"In-channel warnings disabled, notification to {user} not sent")
return False
try:
# Create a simplified message for in-channel notification
# Mention the user so they see it, but keep it brief
in_channel_embed = discord.Embed(
title="⚠️ Moderation Notice",
description=f"{user.mention}, your message was flagged by moderation.\n\n"
f"**Reason:** {embed.description or 'Violation detected'}\n\n"
f"_This message will be deleted in 10 seconds._",
color=embed.color or discord.Color.orange(),
)
# Add timeout info if present
for field in embed.fields:
if field.name in ("Timeout", "Action"):
in_channel_embed.add_field(
name=field.name,
value=field.value,
inline=False,
)
await channel.send(embed=in_channel_embed, delete_after=10)
logger.info(f"Sent in-channel moderation notification to {user} in {channel}")
return True
except discord.Forbidden:
logger.warning(f"Cannot send in-channel notification in {channel}: missing permissions")
return False
except discord.HTTPException as e:
logger.warning(f"Failed to send in-channel notification in {channel}: {e}")
return False