commit, am too tired to add docs here

This commit is contained in:
2026-01-25 09:09:07 +01:00
parent 32d63284c7
commit 1e3acf05d0
15 changed files with 3336 additions and 61 deletions

View File

@@ -0,0 +1 @@
"""GuardDen CLI tools for configuration management."""

559
src/guardden/cli/config.py Normal file
View File

@@ -0,0 +1,559 @@
#!/usr/bin/env python3
"""GuardDen Configuration CLI Tool.
This CLI tool allows you to manage GuardDen bot configurations without
using Discord commands. You can create, edit, validate, and migrate
configurations using this command-line interface.
Usage:
python -m guardden.cli.config --help
python -m guardden.cli.config guild create 123456789 "My Server"
python -m guardden.cli.config guild edit 123456789 ai_moderation.sensitivity 75
python -m guardden.cli.config migrate from-database
python -m guardden.cli.config validate all
"""
import asyncio
import sys
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List
import argparse
import yaml
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from guardden.services.file_config import FileConfigurationManager, ConfigurationError
from guardden.services.config_migration import ConfigurationMigrator
from guardden.services.database import Database
from guardden.services.guild_config import GuildConfigService
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ConfigurationCLI:
"""Command-line interface for GuardDen configuration management."""
def __init__(self, config_dir: str = "config"):
"""Initialize the CLI with configuration directory."""
self.config_dir = Path(config_dir)
self.file_manager: Optional[FileConfigurationManager] = None
self.database: Optional[Database] = None
self.migrator: Optional[ConfigurationMigrator] = None
async def initialize(self):
"""Initialize the configuration system."""
self.file_manager = FileConfigurationManager(str(self.config_dir))
await self.file_manager.initialize()
# Initialize database connection if available
try:
import os
database_url = os.getenv('GUARDDEN_DATABASE_URL', 'postgresql://guardden:guardden@localhost:5432/guardden')
self.database = Database(database_url)
guild_config_service = GuildConfigService(self.database)
self.migrator = ConfigurationMigrator(self.database, guild_config_service, self.file_manager)
logger.info("Database connection established")
except Exception as e:
logger.warning(f"Database not available: {e}")
async def cleanup(self):
"""Clean up resources."""
if self.file_manager:
await self.file_manager.shutdown()
if self.database:
await self.database.close()
# Guild management commands
async def guild_create(self, guild_id: int, name: str, owner_id: Optional[int] = None):
"""Create a new guild configuration."""
try:
file_path = await self.file_manager.create_guild_config(guild_id, name, owner_id)
print(f"✅ Created guild configuration: {file_path}")
print(f"📝 Edit the file to customize settings for {name}")
return True
except ConfigurationError as e:
print(f"❌ Failed to create guild configuration: {e.error_message}")
return False
except Exception as e:
print(f"❌ Unexpected error: {str(e)}")
return False
async def guild_list(self):
"""List all configured guilds."""
configs = self.file_manager.get_all_guild_configs()
if not configs:
print("📄 No guild configurations found")
print("💡 Use 'guild create <guild_id> <name>' to create a new configuration")
return
print(f"📋 Found {len(configs)} guild configuration(s):")
print()
for guild_id, config in configs.items():
status_icon = "" if config else ""
premium_icon = "" if config.premium else ""
print(f"{status_icon} {premium_icon} {guild_id}: {config.name}")
print(f" 📁 File: {config.file_path}")
print(f" 🕐 Updated: {config.last_updated.strftime('%Y-%m-%d %H:%M:%S')}")
# Show key settings
settings = config.settings
ai_enabled = settings.get("ai_moderation", {}).get("enabled", False)
nsfw_only = settings.get("ai_moderation", {}).get("nsfw_only_filtering", False)
automod_enabled = settings.get("moderation", {}).get("automod_enabled", False)
print(f" 🤖 AI: {'' if ai_enabled else ''} | "
f"🔞 NSFW-Only: {'' if nsfw_only else ''} | "
f"⚡ AutoMod: {'' if automod_enabled else ''}")
print()
async def guild_edit(self, guild_id: int, setting_path: str, value: Any):
"""Edit a guild configuration setting."""
config = self.file_manager.get_guild_config(guild_id)
if not config:
print(f"❌ Guild {guild_id} configuration not found")
return False
try:
# Load current configuration
with open(config.file_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
# Parse setting path (e.g., "ai_moderation.sensitivity")
path_parts = setting_path.split('.')
current = file_config
# Navigate to the parent of the target setting
for part in path_parts[:-1]:
if part not in current:
print(f"❌ Setting path not found: {setting_path}")
return False
current = current[part]
# Set the value
final_key = path_parts[-1]
old_value = current.get(final_key, "Not set")
# Convert value to appropriate type
if isinstance(old_value, bool):
value = str(value).lower() in ('true', '1', 'yes', 'on')
elif isinstance(old_value, int):
value = int(value)
elif isinstance(old_value, float):
value = float(value)
elif isinstance(old_value, list):
value = value.split(',') if isinstance(value, str) else value
current[final_key] = value
# Write back to file
with open(config.file_path, 'w', encoding='utf-8') as f:
yaml.dump(file_config, f, default_flow_style=False, indent=2)
print(f"✅ Updated {setting_path} for guild {guild_id}")
print(f" 📝 Changed from: {old_value}")
print(f" 📝 Changed to: {value}")
print(f"🔄 Configuration will be hot-reloaded automatically")
return True
except Exception as e:
print(f"❌ Failed to edit configuration: {str(e)}")
return False
async def guild_validate(self, guild_id: Optional[int] = None):
"""Validate guild configuration(s)."""
if guild_id:
configs = {guild_id: self.file_manager.get_guild_config(guild_id)}
if not configs[guild_id]:
print(f"❌ Guild {guild_id} configuration not found")
return False
else:
configs = self.file_manager.get_all_guild_configs()
if not configs:
print("📄 No configurations to validate")
return True
all_valid = True
print(f"🔍 Validating {len(configs)} configuration(s)...")
print()
for guild_id, config in configs.items():
if not config:
continue
try:
# Load and validate configuration
with open(config.file_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
errors = self.file_manager.validate_config(file_config)
if errors:
all_valid = False
print(f"❌ Guild {guild_id} ({config.name}) - INVALID")
for error in errors:
print(f" 🔸 {error}")
else:
print(f"✅ Guild {guild_id} ({config.name}) - VALID")
except Exception as e:
all_valid = False
print(f"❌ Guild {guild_id} - ERROR: {str(e)}")
print()
if all_valid:
print("🎉 All configurations are valid!")
else:
print("⚠️ Some configurations have errors. Please fix them before running the bot.")
return all_valid
async def guild_backup(self, guild_id: int):
"""Create a backup of guild configuration."""
try:
backup_path = await self.file_manager.backup_config(guild_id)
print(f"✅ Created backup: {backup_path}")
return True
except Exception as e:
print(f"❌ Failed to create backup: {str(e)}")
return False
# Migration commands
async def migrate_from_database(self, backup_existing: bool = True):
"""Migrate all configurations from database to files."""
if not self.migrator:
print("❌ Database not available for migration")
return False
print("🔄 Starting migration from database to files...")
print("⚠️ This will convert Discord command configurations to YAML files")
if backup_existing:
print("📦 Existing files will be backed up")
try:
results = await self.migrator.migrate_all_guilds(backup_existing)
print("\n📊 Migration Results:")
print(f" ✅ Migrated: {len(results['migrated_guilds'])} guilds")
print(f" ❌ Failed: {len(results['failed_guilds'])} guilds")
print(f" ⏭️ Skipped: {len(results['skipped_guilds'])} guilds")
print(f" 📝 Banned words migrated: {results['banned_words_migrated']}")
if results['migrated_guilds']:
print("\n✅ Successfully migrated guilds:")
for guild in results['migrated_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} "
f"({guild['banned_words_count']} banned words)")
if results['failed_guilds']:
print("\n❌ Failed migrations:")
for guild in results['failed_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} - {guild['error']}")
if results['skipped_guilds']:
print("\n⏭️ Skipped guilds:")
for guild in results['skipped_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']} - {guild['reason']}")
if results['errors']:
print("\n⚠️ Errors encountered:")
for error in results['errors']:
print(f"{error}")
return len(results['failed_guilds']) == 0
except Exception as e:
print(f"❌ Migration failed: {str(e)}")
return False
async def migrate_verify(self, guild_ids: Optional[List[int]] = None):
"""Verify migration by comparing database and file configurations."""
if not self.migrator:
print("❌ Database not available for verification")
return False
print("🔍 Verifying migration results...")
try:
results = await self.migrator.verify_migration(guild_ids)
print("\n📊 Verification Results:")
print(f" ✅ Verified: {len(results['verified_guilds'])} guilds")
print(f" ⚠️ Mismatches: {len(results['mismatches'])} guilds")
print(f" 📄 Missing files: {len(results['missing_files'])} guilds")
if results['verified_guilds']:
print("\n✅ Verified guilds:")
for guild in results['verified_guilds']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
if results['mismatches']:
print("\n⚠️ Configuration mismatches:")
for guild in results['mismatches']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
print(f" Mismatched fields: {', '.join(guild['mismatched_fields'])}")
if results['missing_files']:
print("\n📄 Missing configuration files:")
for guild in results['missing_files']:
print(f"{guild['guild_id']}: {guild['guild_name']}")
print(f" Expected: {guild['expected_file']}")
return len(results['mismatches']) == 0 and len(results['missing_files']) == 0
except Exception as e:
print(f"❌ Verification failed: {str(e)}")
return False
# Wordlist management
async def wordlist_info(self):
"""Show information about wordlist configurations."""
banned_words = self.file_manager.get_wordlist_config()
allowlists = self.file_manager.get_allowlist_config()
external_sources = self.file_manager.get_external_sources_config()
print("📝 Wordlist Configuration Status:")
print()
if banned_words:
global_patterns = len(banned_words.get('global_patterns', []))
guild_patterns = sum(
len(patterns) for patterns in banned_words.get('guild_patterns', {}).values()
)
print(f"🚫 Banned Words: {global_patterns} global, {guild_patterns} guild-specific")
else:
print("🚫 Banned Words: Not configured")
if allowlists:
global_allowlist = len(allowlists.get('global_allowlist', []))
guild_allowlists = sum(
len(domains) for domains in allowlists.get('guild_allowlists', {}).values()
)
print(f"✅ Domain Allowlists: {global_allowlist} global, {guild_allowlists} guild-specific")
else:
print("✅ Domain Allowlists: Not configured")
if external_sources:
sources = external_sources.get('sources', [])
enabled_sources = len([s for s in sources if s.get('enabled', False)])
print(f"🌐 External Sources: {len(sources)} total, {enabled_sources} enabled")
else:
print("🌐 External Sources: Not configured")
print()
print("📁 Configuration files:")
print(f"{self.config_dir / 'wordlists' / 'banned-words.yml'}")
print(f"{self.config_dir / 'wordlists' / 'domain-allowlists.yml'}")
print(f"{self.config_dir / 'wordlists' / 'external-sources.yml'}")
# Template management
async def template_create(self, guild_id: int, name: str):
"""Create a new guild configuration from template."""
return await self.guild_create(guild_id, name)
async def template_info(self):
"""Show available configuration templates."""
template_dir = self.config_dir / "templates"
templates = list(template_dir.glob("*.yml"))
if not templates:
print("📄 No configuration templates found")
return
print(f"📋 Available Templates ({len(templates)}):")
print()
for template in templates:
try:
with open(template, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
description = "Default guild configuration template"
if '_description' in content:
description = content['_description']
print(f"📄 {template.name}")
print(f" {description}")
print(f" 📁 {template}")
print()
except Exception as e:
print(f"❌ Error reading template {template.name}: {str(e)}")
async def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="GuardDen Configuration CLI Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Create a new guild configuration
python -m guardden.cli.config guild create 123456789 "My Server"
# List all guild configurations
python -m guardden.cli.config guild list
# Edit a configuration setting
python -m guardden.cli.config guild edit 123456789 ai_moderation.sensitivity 75
python -m guardden.cli.config guild edit 123456789 ai_moderation.nsfw_only_filtering true
# Validate configurations
python -m guardden.cli.config guild validate
python -m guardden.cli.config guild validate 123456789
# Migration from database
python -m guardden.cli.config migrate from-database
python -m guardden.cli.config migrate verify
# Wordlist management
python -m guardden.cli.config wordlist info
# Template management
python -m guardden.cli.config template info
"""
)
parser.add_argument(
'--config-dir', '-c',
default='config',
help='Configuration directory (default: config)'
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Guild management
guild_parser = subparsers.add_parser('guild', help='Guild configuration management')
guild_subparsers = guild_parser.add_subparsers(dest='guild_command')
# Guild create
create_parser = guild_subparsers.add_parser('create', help='Create new guild configuration')
create_parser.add_argument('guild_id', type=int, help='Discord guild ID')
create_parser.add_argument('name', help='Guild name')
create_parser.add_argument('--owner-id', type=int, help='Guild owner Discord user ID')
# Guild list
guild_subparsers.add_parser('list', help='List all guild configurations')
# Guild edit
edit_parser = guild_subparsers.add_parser('edit', help='Edit guild configuration setting')
edit_parser.add_argument('guild_id', type=int, help='Discord guild ID')
edit_parser.add_argument('setting', help='Setting path (e.g., ai_moderation.sensitivity)')
edit_parser.add_argument('value', help='New value')
# Guild validate
validate_parser = guild_subparsers.add_parser('validate', help='Validate guild configurations')
validate_parser.add_argument('guild_id', type=int, nargs='?', help='Specific guild ID (optional)')
# Guild backup
backup_parser = guild_subparsers.add_parser('backup', help='Backup guild configuration')
backup_parser.add_argument('guild_id', type=int, help='Discord guild ID')
# Migration
migrate_parser = subparsers.add_parser('migrate', help='Configuration migration')
migrate_subparsers = migrate_parser.add_subparsers(dest='migrate_command')
# Migrate from database
from_db_parser = migrate_subparsers.add_parser('from-database', help='Migrate from database to files')
from_db_parser.add_argument('--no-backup', action='store_true', help='Skip backing up existing files')
# Migrate verify
verify_parser = migrate_subparsers.add_parser('verify', help='Verify migration results')
verify_parser.add_argument('guild_ids', type=int, nargs='*', help='Specific guild IDs to verify')
# Wordlist management
wordlist_parser = subparsers.add_parser('wordlist', help='Wordlist management')
wordlist_subparsers = wordlist_parser.add_subparsers(dest='wordlist_command')
wordlist_subparsers.add_parser('info', help='Show wordlist information')
# Template management
template_parser = subparsers.add_parser('template', help='Template management')
template_subparsers = template_parser.add_subparsers(dest='template_command')
template_subparsers.add_parser('info', help='Show available templates')
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Initialize CLI
cli = ConfigurationCLI(args.config_dir)
try:
await cli.initialize()
success = True
# Execute command
if args.command == 'guild':
if args.guild_command == 'create':
success = await cli.guild_create(args.guild_id, args.name, args.owner_id)
elif args.guild_command == 'list':
await cli.guild_list()
elif args.guild_command == 'edit':
success = await cli.guild_edit(args.guild_id, args.setting, args.value)
elif args.guild_command == 'validate':
success = await cli.guild_validate(args.guild_id)
elif args.guild_command == 'backup':
success = await cli.guild_backup(args.guild_id)
else:
print("❌ Unknown guild command. Use --help for available commands.")
success = False
elif args.command == 'migrate':
if args.migrate_command == 'from-database':
success = await cli.migrate_from_database(not args.no_backup)
elif args.migrate_command == 'verify':
guild_ids = args.guild_ids if args.guild_ids else None
success = await cli.migrate_verify(guild_ids)
else:
print("❌ Unknown migrate command. Use --help for available commands.")
success = False
elif args.command == 'wordlist':
if args.wordlist_command == 'info':
await cli.wordlist_info()
else:
print("❌ Unknown wordlist command. Use --help for available commands.")
success = False
elif args.command == 'template':
if args.template_command == 'info':
await cli.template_info()
else:
print("❌ Unknown template command. Use --help for available commands.")
success = False
return 0 if success else 1
except KeyboardInterrupt:
print("\n⚠️ Interrupted by user")
return 1
except Exception as e:
print(f"❌ Unexpected error: {str(e)}")
logger.exception("CLI error")
return 1
finally:
await cli.cleanup()
if __name__ == '__main__':
sys.exit(asyncio.run(main()))

View File

@@ -0,0 +1,457 @@
"""Configuration migration system for GuardDen.
This module handles migration from database-based Discord command configuration
to file-based YAML configuration.
"""
import logging
import asyncio
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import yaml
from guardden.services.database import Database
from guardden.services.guild_config import GuildConfigService
from guardden.services.file_config import FileConfigurationManager
from guardden.models.guild import Guild, GuildSettings, BannedWord
logger = logging.getLogger(__name__)
class ConfigurationMigrator:
"""Handles migration from database to file-based configuration."""
def __init__(
self,
database: Database,
guild_config_service: GuildConfigService,
file_config_manager: FileConfigurationManager
):
"""Initialize the migration system.
Args:
database: Database instance
guild_config_service: Current guild configuration service
file_config_manager: File configuration manager
"""
self.database = database
self.guild_config_service = guild_config_service
self.file_config_manager = file_config_manager
async def migrate_all_guilds(self, backup_existing: bool = True) -> Dict[str, Any]:
"""Migrate all guild configurations from database to files.
Args:
backup_existing: Whether to backup existing configuration files
Returns:
Dictionary with migration results
"""
logger.info("Starting migration of all guild configurations...")
results = {
"migrated_guilds": [],
"failed_guilds": [],
"skipped_guilds": [],
"total_guilds": 0,
"banned_words_migrated": 0,
"errors": []
}
try:
async with self.database.session() as session:
# Get all guilds from database
from sqlalchemy import select
stmt = select(Guild)
result = await session.execute(stmt)
guilds = result.scalars().all()
results["total_guilds"] = len(guilds)
logger.info(f"Found {len(guilds)} guilds to migrate")
for guild in guilds:
try:
await self._migrate_single_guild(guild, backup_existing, results)
except Exception as e:
error_msg = f"Failed to migrate guild {guild.id}: {str(e)}"
logger.error(error_msg)
results["failed_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"error": error_msg
})
results["errors"].append(error_msg)
# Migrate wordlists
await self._migrate_wordlists(results)
logger.info(f"Migration complete. Success: {len(results['migrated_guilds'])}, "
f"Failed: {len(results['failed_guilds'])}, "
f"Skipped: {len(results['skipped_guilds'])}")
except Exception as e:
error_msg = f"Migration failed with error: {str(e)}"
logger.error(error_msg)
results["errors"].append(error_msg)
return results
async def _migrate_single_guild(
self,
guild: Guild,
backup_existing: bool,
results: Dict[str, Any]
) -> None:
"""Migrate a single guild's configuration."""
# Check if file already exists
guild_file = self.file_config_manager.config_dir / "guilds" / f"guild-{guild.id}.yml"
if guild_file.exists():
if backup_existing:
backup_path = await self.file_config_manager.backup_config(guild.id)
logger.info(f"Backed up existing config for guild {guild.id}: {backup_path}")
else:
results["skipped_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"reason": "Configuration file already exists"
})
return
# Get guild settings from database
async with self.database.session() as session:
from sqlalchemy import select
from sqlalchemy.orm import selectinload
stmt = select(Guild).where(Guild.id == guild.id).options(
selectinload(Guild.settings),
selectinload(Guild.banned_words)
)
result = await session.execute(stmt)
guild_with_settings = result.scalar_one_or_none()
if not guild_with_settings:
raise Exception(f"Guild {guild.id} not found in database")
# Convert to file configuration format
file_config = await self._convert_guild_to_file_config(guild_with_settings)
# Write to file
with open(guild_file, 'w', encoding='utf-8') as f:
yaml.dump(file_config, f, default_flow_style=False, indent=2, sort_keys=False)
logger.info(f"Migrated guild {guild.id} ({guild.name}) to {guild_file}")
results["migrated_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"file_path": str(guild_file),
"banned_words_count": len(guild_with_settings.banned_words) if guild_with_settings.banned_words else 0
})
if guild_with_settings.banned_words:
results["banned_words_migrated"] += len(guild_with_settings.banned_words)
async def _convert_guild_to_file_config(self, guild: Guild) -> Dict[str, Any]:
"""Convert database guild model to file configuration format."""
settings = guild.settings if guild.settings else GuildSettings()
# Base guild information
config = {
"guild_id": guild.id,
"name": guild.name,
"owner_id": guild.owner_id,
"premium": guild.premium,
# Add migration metadata
"_migration_info": {
"migrated_at": datetime.now().isoformat(),
"migrated_from": "database",
"original_created_at": guild.created_at.isoformat() if guild.created_at else None,
"original_updated_at": guild.updated_at.isoformat() if guild.updated_at else None
},
"settings": {
"general": {
"prefix": settings.prefix,
"locale": settings.locale
},
"channels": {
"log_channel_id": settings.log_channel_id,
"mod_log_channel_id": settings.mod_log_channel_id,
"welcome_channel_id": settings.welcome_channel_id
},
"roles": {
"mute_role_id": settings.mute_role_id,
"verified_role_id": settings.verified_role_id,
"mod_role_ids": settings.mod_role_ids or []
},
"moderation": {
"automod_enabled": settings.automod_enabled,
"anti_spam_enabled": settings.anti_spam_enabled,
"link_filter_enabled": settings.link_filter_enabled,
"strike_actions": settings.strike_actions or {}
},
"automod": {
"message_rate_limit": settings.message_rate_limit,
"message_rate_window": settings.message_rate_window,
"duplicate_threshold": settings.duplicate_threshold,
"mention_limit": settings.mention_limit,
"mention_rate_limit": settings.mention_rate_limit,
"mention_rate_window": settings.mention_rate_window,
"scam_allowlist": settings.scam_allowlist or []
},
"ai_moderation": {
"enabled": settings.ai_moderation_enabled,
"sensitivity": settings.ai_sensitivity,
"confidence_threshold": settings.ai_confidence_threshold,
"log_only": settings.ai_log_only,
"nsfw_detection_enabled": settings.nsfw_detection_enabled,
"nsfw_only_filtering": getattr(settings, 'nsfw_only_filtering', False)
},
"verification": {
"enabled": settings.verification_enabled,
"type": settings.verification_type
}
}
}
# Add banned words if any exist
if guild.banned_words:
config["banned_words"] = []
for banned_word in guild.banned_words:
config["banned_words"].append({
"pattern": banned_word.pattern,
"action": banned_word.action,
"is_regex": banned_word.is_regex,
"reason": banned_word.reason,
"category": banned_word.category,
"source": banned_word.source,
"managed": banned_word.managed,
"added_by": banned_word.added_by,
"created_at": banned_word.created_at.isoformat() if banned_word.created_at else None
})
return config
async def _migrate_wordlists(self, results: Dict[str, Any]) -> None:
"""Migrate global banned words and allowlists to wordlist files."""
# Get all managed banned words (global wordlists)
async with self.database.session() as session:
from sqlalchemy import select
stmt = select(BannedWord).where(BannedWord.managed == True)
result = await session.execute(stmt)
managed_words = result.scalars().all()
if managed_words:
# Group by source and category
sources = {}
for word in managed_words:
source = word.source or "unknown"
if source not in sources:
sources[source] = []
sources[source].append(word)
# Update external sources configuration
external_config_path = self.file_config_manager.config_dir / "wordlists" / "external-sources.yml"
if external_config_path.exists():
with open(external_config_path, 'r', encoding='utf-8') as f:
external_config = yaml.safe_load(f)
else:
external_config = {"sources": []}
# Add migration info for discovered sources
for source_name, words in sources.items():
existing_source = next(
(s for s in external_config["sources"] if s["name"] == source_name),
None
)
if not existing_source:
# Add new source based on migrated words
category = words[0].category if words[0].category else "profanity"
action = words[0].action if words[0].action else "warn"
external_config["sources"].append({
"name": source_name,
"url": f"# MIGRATED: Originally from {source_name}",
"category": category,
"action": action,
"reason": f"Migrated from database source: {source_name}",
"enabled": False, # Disabled by default, needs manual URL
"update_interval_hours": 168,
"applies_to_guilds": [],
"_migration_info": {
"migrated_at": datetime.now().isoformat(),
"original_word_count": len(words),
"needs_url_configuration": True
}
})
# Write updated external sources
with open(external_config_path, 'w', encoding='utf-8') as f:
yaml.dump(external_config, f, default_flow_style=False, indent=2)
results["external_sources_updated"] = True
results["managed_words_found"] = len(managed_words)
logger.info(f"Updated external sources configuration with {len(sources)} discovered sources")
async def verify_migration(self, guild_ids: Optional[List[int]] = None) -> Dict[str, Any]:
"""Verify that migration was successful by comparing database and file configs.
Args:
guild_ids: Specific guild IDs to verify, or None for all
Returns:
Verification results
"""
logger.info("Verifying migration results...")
verification_results = {
"verified_guilds": [],
"mismatches": [],
"missing_files": [],
"errors": []
}
try:
async with self.database.session() as session:
from sqlalchemy import select
if guild_ids:
stmt = select(Guild).where(Guild.id.in_(guild_ids))
else:
stmt = select(Guild)
result = await session.execute(stmt)
guilds = result.scalars().all()
for guild in guilds:
try:
await self._verify_single_guild(guild, verification_results)
except Exception as e:
error_msg = f"Verification error for guild {guild.id}: {str(e)}"
logger.error(error_msg)
verification_results["errors"].append(error_msg)
logger.info(f"Verification complete. Verified: {len(verification_results['verified_guilds'])}, "
f"Mismatches: {len(verification_results['mismatches'])}, "
f"Missing: {len(verification_results['missing_files'])}")
except Exception as e:
error_msg = f"Verification failed: {str(e)}"
logger.error(error_msg)
verification_results["errors"].append(error_msg)
return verification_results
async def _verify_single_guild(self, guild: Guild, results: Dict[str, Any]) -> None:
"""Verify migration for a single guild."""
guild_file = self.file_config_manager.config_dir / "guilds" / f"guild-{guild.id}.yml"
if not guild_file.exists():
results["missing_files"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"expected_file": str(guild_file)
})
return
# Load file configuration
with open(guild_file, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
# Get database configuration
db_config = await self.guild_config_service.get_config(guild.id)
# Compare key settings
mismatches = []
if file_config.get("guild_id") != guild.id:
mismatches.append("guild_id")
if file_config.get("name") != guild.name:
mismatches.append("name")
if db_config:
file_settings = file_config.get("settings", {})
# Compare AI moderation settings
ai_settings = file_settings.get("ai_moderation", {})
if ai_settings.get("enabled") != db_config.ai_moderation_enabled:
mismatches.append("ai_moderation.enabled")
if ai_settings.get("sensitivity") != db_config.ai_sensitivity:
mismatches.append("ai_moderation.sensitivity")
# Compare automod settings
automod_settings = file_settings.get("automod", {})
if automod_settings.get("message_rate_limit") != db_config.message_rate_limit:
mismatches.append("automod.message_rate_limit")
if mismatches:
results["mismatches"].append({
"guild_id": guild.id,
"guild_name": guild.name,
"mismatched_fields": mismatches
})
else:
results["verified_guilds"].append({
"guild_id": guild.id,
"guild_name": guild.name
})
async def cleanup_database_configs(self, confirm: bool = False) -> Dict[str, Any]:
"""Clean up database configurations after successful migration.
WARNING: This will delete all guild settings and banned words from the database.
Only run after verifying migration is successful.
Args:
confirm: Must be True to actually perform cleanup
Returns:
Cleanup results
"""
if not confirm:
raise ValueError("cleanup_database_configs requires confirm=True to prevent accidental data loss")
logger.warning("STARTING DATABASE CLEANUP - This will delete all migrated configuration data!")
cleanup_results = {
"guild_settings_deleted": 0,
"banned_words_deleted": 0,
"errors": []
}
try:
async with self.database.session() as session:
# Delete all guild settings
from sqlalchemy import delete
# Delete banned words first (foreign key constraint)
banned_words_result = await session.execute(delete(BannedWord))
cleanup_results["banned_words_deleted"] = banned_words_result.rowcount
# Delete guild settings
guild_settings_result = await session.execute(delete(GuildSettings))
cleanup_results["guild_settings_deleted"] = guild_settings_result.rowcount
await session.commit()
logger.warning(f"Database cleanup complete. Deleted {cleanup_results['guild_settings_deleted']} "
f"guild settings and {cleanup_results['banned_words_deleted']} banned words.")
except Exception as e:
error_msg = f"Database cleanup failed: {str(e)}"
logger.error(error_msg)
cleanup_results["errors"].append(error_msg)
return cleanup_results

View File

@@ -0,0 +1,502 @@
"""File-based configuration system for GuardDen.
This module provides a complete file-based configuration system that replaces
Discord commands for bot configuration. Features include:
- YAML configuration files with schema validation
- Hot-reloading with file watching
- Migration from database settings
- Comprehensive error handling and rollback
"""
import logging
import asyncio
from pathlib import Path
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime
from dataclasses import dataclass, field
import hashlib
try:
import yaml
import jsonschema
from watchfiles import watch, Change
except ImportError as e:
raise ImportError(f"Required dependencies missing: {e}. Install with 'pip install pyyaml jsonschema watchfiles'")
from guardden.models.guild import GuildSettings
from guardden.services.database import Database
logger = logging.getLogger(__name__)
@dataclass
class ConfigurationError(Exception):
"""Raised when configuration is invalid or cannot be loaded."""
file_path: str
error_message: str
validation_errors: List[str] = field(default_factory=list)
@dataclass
class FileConfig:
"""Represents a loaded configuration file."""
path: Path
content: Dict[str, Any]
last_modified: float
content_hash: str
is_valid: bool = True
validation_errors: List[str] = field(default_factory=list)
@dataclass
class GuildConfig:
"""Processed guild configuration."""
guild_id: int
name: str
owner_id: Optional[int]
premium: bool
settings: Dict[str, Any]
file_path: Path
last_updated: datetime
class FileConfigurationManager:
"""Manages file-based configuration with hot-reloading and validation."""
def __init__(self, config_dir: str = "config", database: Optional[Database] = None):
"""Initialize the configuration manager.
Args:
config_dir: Base directory for configuration files
database: Database instance for migration and fallback
"""
self.config_dir = Path(config_dir)
self.database = database
self.guild_configs: Dict[int, GuildConfig] = {}
self.wordlist_config: Optional[FileConfig] = None
self.allowlist_config: Optional[FileConfig] = None
self.external_sources_config: Optional[FileConfig] = None
# File watching
self._watch_task: Optional[asyncio.Task] = None
self._watch_enabled = True
self._callbacks: List[Callable[[int, GuildConfig], None]] = []
# Validation schemas
self._schemas: Dict[str, Dict[str, Any]] = {}
# Backup configurations (for rollback)
self._backup_configs: Dict[int, GuildConfig] = {}
# Ensure directories exist
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Create configuration directories if they don't exist."""
dirs = [
self.config_dir / "guilds",
self.config_dir / "wordlists",
self.config_dir / "schemas",
self.config_dir / "templates",
self.config_dir / "backups"
]
for dir_path in dirs:
dir_path.mkdir(parents=True, exist_ok=True)
async def initialize(self) -> None:
"""Initialize the configuration system."""
logger.info("Initializing file-based configuration system...")
try:
# Load validation schemas
await self._load_schemas()
# Load all configuration files
await self._load_all_configs()
# Start file watching for hot-reload
if self._watch_enabled:
await self._start_file_watching()
logger.info(f"Configuration system initialized with {len(self.guild_configs)} guild configs")
except Exception as e:
logger.error(f"Failed to initialize configuration system: {e}")
raise
async def shutdown(self) -> None:
"""Shutdown the configuration system."""
logger.info("Shutting down configuration system...")
if self._watch_task and not self._watch_task.done():
self._watch_task.cancel()
try:
await self._watch_task
except asyncio.CancelledError:
pass
logger.info("Configuration system shutdown complete")
async def _load_schemas(self) -> None:
"""Load validation schemas from files."""
schema_dir = self.config_dir / "schemas"
schema_files = {
"guild": schema_dir / "guild-schema.yml",
"wordlists": schema_dir / "wordlists-schema.yml"
}
for schema_name, schema_path in schema_files.items():
if schema_path.exists():
try:
with open(schema_path, 'r', encoding='utf-8') as f:
self._schemas[schema_name] = yaml.safe_load(f)
logger.debug(f"Loaded schema: {schema_name}")
except Exception as e:
logger.error(f"Failed to load schema {schema_name}: {e}")
else:
logger.warning(f"Schema file not found: {schema_path}")
async def _load_all_configs(self) -> None:
"""Load all configuration files."""
# Load guild configurations
guild_dir = self.config_dir / "guilds"
if guild_dir.exists():
for config_file in guild_dir.glob("guild-*.yml"):
try:
await self._load_guild_config(config_file)
except Exception as e:
logger.error(f"Failed to load guild config {config_file}: {e}")
# Load wordlist configurations
await self._load_wordlist_configs()
async def _load_guild_config(self, file_path: Path) -> Optional[GuildConfig]:
"""Load a single guild configuration file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
# Validate against schema
if 'guild' in self._schemas:
try:
jsonschema.validate(content, self._schemas['guild'])
except jsonschema.ValidationError as e:
logger.error(f"Schema validation failed for {file_path}: {e}")
return None
# Extract guild information
guild_id = content.get('guild_id')
if not guild_id:
logger.error(f"Guild config missing guild_id: {file_path}")
return None
guild_config = GuildConfig(
guild_id=guild_id,
name=content.get('name', f"Guild {guild_id}"),
owner_id=content.get('owner_id'),
premium=content.get('premium', False),
settings=content.get('settings', {}),
file_path=file_path,
last_updated=datetime.now()
)
# Backup current config before updating
if guild_id in self.guild_configs:
self._backup_configs[guild_id] = self.guild_configs[guild_id]
self.guild_configs[guild_id] = guild_config
logger.debug(f"Loaded guild config for {guild_id}: {guild_config.name}")
# Notify callbacks of config change
await self._notify_config_change(guild_id, guild_config)
return guild_config
except Exception as e:
logger.error(f"Error loading guild config {file_path}: {e}")
return None
async def _load_wordlist_configs(self) -> None:
"""Load wordlist configuration files."""
wordlist_dir = self.config_dir / "wordlists"
configs = {
"banned-words.yml": "wordlist_config",
"domain-allowlists.yml": "allowlist_config",
"external-sources.yml": "external_sources_config"
}
for filename, attr_name in configs.items():
file_path = wordlist_dir / filename
if file_path.exists():
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = yaml.safe_load(f)
# Calculate content hash
content_hash = hashlib.md5(str(content).encode()).hexdigest()
file_config = FileConfig(
path=file_path,
content=content,
last_modified=file_path.stat().st_mtime,
content_hash=content_hash
)
setattr(self, attr_name, file_config)
logger.debug(f"Loaded {filename}")
except Exception as e:
logger.error(f"Failed to load {filename}: {e}")
async def _start_file_watching(self) -> None:
"""Start watching configuration files for changes."""
if self._watch_task and not self._watch_task.done():
return
self._watch_task = asyncio.create_task(self._file_watcher())
logger.info("Started file watching for configuration hot-reload")
async def _file_watcher(self) -> None:
"""Watch for file changes and reload configurations."""
try:
async for changes in watch(self.config_dir, recursive=True):
for change_type, file_path in changes:
file_path = Path(file_path)
# Only process YAML files
if file_path.suffix != '.yml':
continue
if change_type in (Change.added, Change.modified):
await self._handle_file_change(file_path)
elif change_type == Change.deleted:
await self._handle_file_deletion(file_path)
except asyncio.CancelledError:
logger.debug("File watcher cancelled")
except Exception as e:
logger.error(f"File watcher error: {e}")
async def _handle_file_change(self, file_path: Path) -> None:
"""Handle a file change event."""
try:
# Determine file type and reload appropriately
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
await self._load_guild_config(file_path)
logger.info(f"Reloaded guild config: {file_path}")
elif file_path.parent.name == "wordlists":
await self._load_wordlist_configs()
logger.info(f"Reloaded wordlist config: {file_path}")
except Exception as e:
logger.error(f"Error handling file change {file_path}: {e}")
await self._rollback_config(file_path)
async def _handle_file_deletion(self, file_path: Path) -> None:
"""Handle a file deletion event."""
try:
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
# Extract guild ID from filename
guild_id_str = file_path.stem.replace("guild-", "")
try:
guild_id = int(guild_id_str)
if guild_id in self.guild_configs:
del self.guild_configs[guild_id]
logger.info(f"Removed guild config for deleted file: {file_path}")
except ValueError:
logger.warning(f"Could not parse guild ID from filename: {file_path}")
except Exception as e:
logger.error(f"Error handling file deletion {file_path}: {e}")
async def _rollback_config(self, file_path: Path) -> None:
"""Rollback to previous configuration on error."""
try:
if file_path.parent.name == "guilds" and file_path.name.startswith("guild-"):
guild_id_str = file_path.stem.replace("guild-", "")
guild_id = int(guild_id_str)
if guild_id in self._backup_configs:
self.guild_configs[guild_id] = self._backup_configs[guild_id]
logger.info(f"Rolled back guild config for {guild_id}")
except Exception as e:
logger.error(f"Error during rollback for {file_path}: {e}")
async def _notify_config_change(self, guild_id: int, config: GuildConfig) -> None:
"""Notify registered callbacks of configuration changes."""
for callback in self._callbacks:
try:
callback(guild_id, config)
except Exception as e:
logger.error(f"Error in config change callback: {e}")
def register_change_callback(self, callback: Callable[[int, GuildConfig], None]) -> None:
"""Register a callback for configuration changes."""
self._callbacks.append(callback)
def get_guild_config(self, guild_id: int) -> Optional[GuildConfig]:
"""Get configuration for a specific guild."""
return self.guild_configs.get(guild_id)
def get_all_guild_configs(self) -> Dict[int, GuildConfig]:
"""Get all guild configurations."""
return self.guild_configs.copy()
def get_wordlist_config(self) -> Optional[Dict[str, Any]]:
"""Get wordlist configuration."""
return self.wordlist_config.content if self.wordlist_config else None
def get_allowlist_config(self) -> Optional[Dict[str, Any]]:
"""Get domain allowlist configuration."""
return self.allowlist_config.content if self.allowlist_config else None
def get_external_sources_config(self) -> Optional[Dict[str, Any]]:
"""Get external sources configuration."""
return self.external_sources_config.content if self.external_sources_config else None
async def create_guild_config(self, guild_id: int, name: str, owner_id: Optional[int] = None) -> Path:
"""Create a new guild configuration file from template."""
guild_file = self.config_dir / "guilds" / f"guild-{guild_id}.yml"
template_file = self.config_dir / "templates" / "guild-default.yml"
if guild_file.exists():
raise ConfigurationError(
str(guild_file),
"Guild configuration already exists"
)
# Load template
if template_file.exists():
with open(template_file, 'r', encoding='utf-8') as f:
template_content = yaml.safe_load(f)
else:
# Create basic template if file doesn't exist
template_content = await self._create_basic_template()
# Customize template
template_content['guild_id'] = guild_id
template_content['name'] = name
if owner_id:
template_content['owner_id'] = owner_id
# Write configuration file
with open(guild_file, 'w', encoding='utf-8') as f:
yaml.dump(template_content, f, default_flow_style=False, indent=2)
logger.info(f"Created guild configuration: {guild_file}")
# Load the new configuration
await self._load_guild_config(guild_file)
return guild_file
async def _create_basic_template(self) -> Dict[str, Any]:
"""Create a basic configuration template."""
return {
"guild_id": 0,
"name": "",
"premium": False,
"settings": {
"general": {
"prefix": "!",
"locale": "en"
},
"channels": {
"log_channel_id": None,
"mod_log_channel_id": None,
"welcome_channel_id": None
},
"roles": {
"mute_role_id": None,
"verified_role_id": None,
"mod_role_ids": []
},
"moderation": {
"automod_enabled": True,
"anti_spam_enabled": True,
"link_filter_enabled": False,
"strike_actions": {
"1": {"action": "warn"},
"3": {"action": "timeout", "duration": 3600},
"5": {"action": "kick"},
"7": {"action": "ban"}
}
},
"automod": {
"message_rate_limit": 5,
"message_rate_window": 5,
"duplicate_threshold": 3,
"mention_limit": 5,
"mention_rate_limit": 10,
"mention_rate_window": 60,
"scam_allowlist": []
},
"ai_moderation": {
"enabled": True,
"sensitivity": 80,
"confidence_threshold": 0.7,
"log_only": False,
"nsfw_detection_enabled": True,
"nsfw_only_filtering": False
},
"verification": {
"enabled": False,
"type": "button"
}
}
}
async def export_from_database(self, guild_id: int) -> Optional[Path]:
"""Export guild configuration from database to file."""
if not self.database:
raise ConfigurationError("", "Database not available for export")
try:
# Get guild settings from database
async with self.database.session() as session:
# This would need to be implemented based on your database service
# For now, return None to indicate not implemented
pass
logger.info(f"Exported guild {guild_id} configuration to file")
return None
except Exception as e:
logger.error(f"Failed to export guild {guild_id} from database: {e}")
raise ConfigurationError(
f"guild-{guild_id}.yml",
f"Database export failed: {str(e)}"
)
def validate_config(self, config_data: Dict[str, Any], schema_name: str = "guild") -> List[str]:
"""Validate configuration data against schema."""
errors = []
if schema_name in self._schemas:
try:
jsonschema.validate(config_data, self._schemas[schema_name])
except jsonschema.ValidationError as e:
errors.append(str(e))
else:
errors.append(f"Schema '{schema_name}' not found")
return errors
async def backup_config(self, guild_id: int) -> Path:
"""Create a backup of guild configuration."""
config = self.get_guild_config(guild_id)
if not config:
raise ConfigurationError(f"guild-{guild_id}.yml", "Guild configuration not found")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = self.config_dir / "backups" / f"guild-{guild_id}_{timestamp}.yml"
# Copy current configuration file
import shutil
shutil.copy2(config.file_path, backup_file)
logger.info(f"Created backup: {backup_file}")
return backup_file