Files
GuardDen/src/guardden/cogs/moderation.py
2026-01-25 16:46:50 +01:00

514 lines
19 KiB
Python

"""Moderation commands and automod features."""
import logging
from datetime import datetime, timedelta, timezone
import discord
from discord.ext import commands
from sqlalchemy import func, select
from guardden.bot import GuardDen
from guardden.models import ModerationLog, Strike
from guardden.utils import parse_duration
from guardden.utils.notifications import send_moderation_notification
from guardden.utils.ratelimit import RateLimitExceeded
logger = logging.getLogger(__name__)
class Moderation(commands.Cog):
"""Moderation commands for server management."""
def __init__(self, bot: GuardDen) -> None:
self.bot = bot
def cog_check(self, ctx: commands.Context) -> bool:
if not ctx.guild:
return False
if not self.bot.is_owner_allowed(ctx.author.id):
return False
return True
async def cog_before_invoke(self, ctx: commands.Context) -> None:
if not ctx.command:
return
result = self.bot.rate_limiter.acquire_command(
ctx.command.qualified_name,
user_id=ctx.author.id,
guild_id=ctx.guild.id if ctx.guild else None,
channel_id=ctx.channel.id,
)
if result.is_limited:
raise RateLimitExceeded(result.reset_after)
async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
if isinstance(error, RateLimitExceeded):
await ctx.send(
f"You're being rate limited. Try again in {error.retry_after:.1f} seconds."
)
async def _log_action(
self,
guild: discord.Guild,
target: discord.Member | discord.User,
moderator: discord.Member | discord.User,
action: str,
reason: str | None = None,
duration: int | None = None,
channel: discord.TextChannel | None = None,
message: discord.Message | None = None,
is_automatic: bool = False,
) -> None:
"""Log a moderation action to the database."""
expires_at = None
if duration:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=duration)
async with self.bot.database.session() as session:
log_entry = ModerationLog(
guild_id=guild.id,
target_id=target.id,
target_name=str(target),
moderator_id=moderator.id,
moderator_name=str(moderator),
action=action,
reason=reason,
duration=duration,
expires_at=expires_at,
channel_id=channel.id if channel else None,
message_id=message.id if message else None,
message_content=message.content if message else None,
is_automatic=is_automatic,
)
session.add(log_entry)
async def _get_strike_count(self, guild_id: int, user_id: int) -> int:
"""Get the total active strike count for a user."""
async with self.bot.database.session() as session:
result = await session.execute(
select(func.sum(Strike.points)).where(
Strike.guild_id == guild_id,
Strike.user_id == user_id,
Strike.is_active == True,
)
)
total = result.scalar()
return total or 0
async def _add_strike(
self,
guild: discord.Guild,
user: discord.Member,
moderator: discord.Member | discord.User,
reason: str,
points: int = 1,
) -> int:
"""Add a strike to a user and return their new total."""
async with self.bot.database.session() as session:
strike = Strike(
guild_id=guild.id,
user_id=user.id,
user_name=str(user),
moderator_id=moderator.id,
reason=reason,
points=points,
)
session.add(strike)
return await self._get_strike_count(guild.id, user.id)
@commands.command(name="warn")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def warn(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Warn a member."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot warn someone with a higher or equal role.")
return
await self._log_action(ctx.guild, member, ctx.author, "warn", reason)
embed = discord.Embed(
title="Warning Issued",
description=f"{member.mention} has been warned.",
color=discord.Color.yellow(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
# Notify the user
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Warning in {ctx.guild.name}",
description=f"You have been warned.",
color=discord.Color.yellow(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
@commands.command(name="strike")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def strike(
self,
ctx: commands.Context,
member: discord.Member,
points: int = 1,
*,
reason: str = "No reason provided",
) -> None:
"""Add a strike to a member."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot strike someone with a higher or equal role.")
return
total_strikes = await self._add_strike(ctx.guild, member, ctx.author, reason, points)
await self._log_action(ctx.guild, member, ctx.author, "strike", reason)
embed = discord.Embed(
title="Strike Added",
description=f"{member.mention} has received {points} strike(s).",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.add_field(name="Total Strikes", value=str(total_strikes))
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
# Check for automatic actions based on strike thresholds
config = await self.bot.guild_config.get_config(ctx.guild.id)
if config and config.strike_actions:
for threshold, action_config in sorted(
config.strike_actions.items(), key=lambda x: int(x[0]), reverse=True
):
if total_strikes >= int(threshold):
action = action_config.get("action")
if action == "ban":
await ctx.invoke(
self.ban, member=member, reason=f"Automatic: {total_strikes} strikes"
)
elif action == "kick":
await ctx.invoke(
self.kick, member=member, reason=f"Automatic: {total_strikes} strikes"
)
elif action == "timeout":
duration = action_config.get("duration", 3600)
await ctx.invoke(
self.timeout,
member=member,
duration=f"{duration}s",
reason=f"Automatic: {total_strikes} strikes",
)
break
@commands.command(name="strikes")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def strikes(self, ctx: commands.Context, member: discord.Member) -> None:
"""View strikes for a member."""
async with self.bot.database.session() as session:
result = await session.execute(
select(Strike)
.where(
Strike.guild_id == ctx.guild.id,
Strike.user_id == member.id,
Strike.is_active == True,
)
.order_by(Strike.created_at.desc())
.limit(10)
)
user_strikes = result.scalars().all()
total = await self._get_strike_count(ctx.guild.id, member.id)
embed = discord.Embed(
title=f"Strikes for {member}",
description=f"Total active strikes: **{total}**",
color=discord.Color.orange(),
)
if user_strikes:
for strike in user_strikes:
embed.add_field(
name=f"Strike #{strike.id} ({strike.points} pts)",
value=f"{strike.reason}\n*{strike.created_at.strftime('%Y-%m-%d')}*",
inline=False,
)
else:
embed.description = f"{member.mention} has no active strikes."
await ctx.send(embed=embed)
@commands.command(name="timeout", aliases=["mute"])
@commands.has_permissions(moderate_members=True)
@commands.guild_only()
async def timeout(
self,
ctx: commands.Context,
member: discord.Member,
duration: str = "1h",
*,
reason: str = "No reason provided",
) -> None:
"""Timeout a member (e.g., !timeout @user 1h Spamming)."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot timeout someone with a higher or equal role.")
return
delta = parse_duration(duration)
if not delta:
await ctx.send("Invalid duration. Use format like: 30m, 1h, 7d")
return
if delta > timedelta(days=28):
await ctx.send("Timeout duration cannot exceed 28 days.")
return
try:
await member.timeout(delta, reason=f"{ctx.author}: {reason}")
except discord.Forbidden:
await ctx.send("I don't have permission to timeout this user.")
return
except discord.HTTPException as e:
await ctx.send(f"Failed to timeout user: {e}")
return
await self._log_action(
ctx.guild, member, ctx.author, "timeout", reason, int(delta.total_seconds())
)
embed = discord.Embed(
title="Member Timed Out",
description=f"{member.mention} has been timed out for {duration}.",
color=discord.Color.orange(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
@commands.command(name="untimeout", aliases=["unmute"])
@commands.has_permissions(moderate_members=True)
@commands.guild_only()
async def untimeout(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Remove timeout from a member."""
await member.timeout(None, reason=f"{ctx.author}: {reason}")
await self._log_action(ctx.guild, member, ctx.author, "unmute", reason)
embed = discord.Embed(
title="Timeout Removed",
description=f"{member.mention}'s timeout has been removed.",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
@commands.command(name="kick")
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def kick(
self, ctx: commands.Context, member: discord.Member, *, reason: str = "No reason provided"
) -> None:
"""Kick a member from the server."""
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot kick someone with a higher or equal role.")
return
# Notify the user before kicking
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Kicked from {ctx.guild.name}",
description=f"You have been kicked from the server.",
color=discord.Color.red(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
try:
await member.kick(reason=f"{ctx.author}: {reason}")
except discord.Forbidden:
await ctx.send("❌ I don't have permission to kick this member.")
return
except discord.HTTPException as e:
await ctx.send(f"❌ Failed to kick member: {e}")
return
await self._log_action(ctx.guild, member, ctx.author, "kick", reason)
embed = discord.Embed(
title="Member Kicked",
description=f"{member} has been kicked from the server.",
color=discord.Color.red(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
try:
await ctx.send(embed=embed)
except discord.HTTPException:
await ctx.send(f"{member} has been kicked from the server.")
@commands.command(name="ban")
@commands.has_permissions(ban_members=True)
@commands.guild_only()
async def ban(
self,
ctx: commands.Context,
member: discord.Member | discord.User,
*,
reason: str = "No reason provided",
) -> None:
"""Ban a member from the server."""
if isinstance(member, discord.Member):
if member.top_role >= ctx.author.top_role and ctx.author != ctx.guild.owner:
await ctx.send("You cannot ban someone with a higher or equal role.")
return
# Notify the user before banning
config = await self.bot.guild_config.get_config(ctx.guild.id)
dm_embed = discord.Embed(
title=f"Banned from {ctx.guild.name}",
description=f"You have been banned from the server.",
color=discord.Color.dark_red(),
)
dm_embed.add_field(name="Reason", value=reason)
# Use notification utility to send DM with in-channel fallback
if isinstance(ctx.channel, discord.TextChannel):
await send_moderation_notification(
user=member,
channel=ctx.channel,
embed=dm_embed,
send_in_channel=config.send_in_channel_warnings if config else False,
)
try:
await ctx.guild.ban(member, reason=f"{ctx.author}: {reason}", delete_message_days=0)
except discord.Forbidden:
await ctx.send("❌ I don't have permission to ban this member.")
return
except discord.HTTPException as e:
await ctx.send(f"❌ Failed to ban member: {e}")
return
await self._log_action(ctx.guild, member, ctx.author, "ban", reason)
embed = discord.Embed(
title="Member Banned",
description=f"{member} has been banned from the server.",
color=discord.Color.dark_red(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
try:
await ctx.send(embed=embed)
except discord.HTTPException:
await ctx.send(f"{member} has been banned from the server.")
@commands.command(name="unban")
@commands.has_permissions(ban_members=True)
@commands.guild_only()
async def unban(
self, ctx: commands.Context, user_id: int, *, reason: str = "No reason provided"
) -> None:
"""Unban a user by their ID."""
try:
user = await self.bot.fetch_user(user_id)
await ctx.guild.unban(user, reason=f"{ctx.author}: {reason}")
await self._log_action(ctx.guild, user, ctx.author, "unban", reason)
embed = discord.Embed(
title="User Unbanned",
description=f"{user} has been unbanned.",
color=discord.Color.green(),
timestamp=datetime.now(timezone.utc),
)
embed.add_field(name="Reason", value=reason, inline=False)
embed.set_footer(text=f"Moderator: {ctx.author}")
await ctx.send(embed=embed)
except discord.NotFound:
await ctx.send("User not found or not banned.")
except discord.Forbidden:
await ctx.send("I don't have permission to unban this user.")
@commands.command(name="purge", aliases=["clear"])
@commands.has_permissions(manage_messages=True)
@commands.guild_only()
async def purge(self, ctx: commands.Context, amount: int) -> None:
"""Delete multiple messages at once (max 100)."""
if amount < 1 or amount > 100:
await ctx.send("Please specify a number between 1 and 100.")
return
deleted = await ctx.channel.purge(limit=amount + 1) # +1 to include the command message
msg = await ctx.send(f"Deleted {len(deleted) - 1} message(s).")
await msg.delete(delay=3)
@commands.command(name="modlogs", aliases=["history"])
@commands.has_permissions(kick_members=True)
@commands.guild_only()
async def modlogs(self, ctx: commands.Context, member: discord.Member | discord.User) -> None:
"""View moderation history for a user."""
async with self.bot.database.session() as session:
result = await session.execute(
select(ModerationLog)
.where(ModerationLog.guild_id == ctx.guild.id, ModerationLog.target_id == member.id)
.order_by(ModerationLog.created_at.desc())
.limit(10)
)
logs = result.scalars().all()
embed = discord.Embed(
title=f"Moderation History for {member}",
color=discord.Color.blue(),
)
if logs:
for log in logs:
value = f"**Reason:** {log.reason or 'None'}\n**By:** {log.moderator_name}\n*{log.created_at.strftime('%Y-%m-%d %H:%M')}*"
embed.add_field(name=f"{log.action.upper()} (#{log.id})", value=value, inline=False)
else:
embed.description = "No moderation history found."
await ctx.send(embed=embed)
async def setup(bot: GuardDen) -> None:
"""Load the Moderation cog."""
await bot.add_cog(Moderation(bot))