Features: - Core moderation: warn, kick, ban, timeout, strike system - Automod: banned words filter, scam detection, anti-spam, link filtering - AI moderation: Claude/OpenAI integration, NSFW detection, phishing analysis - Verification system: button, captcha, math, emoji challenges - Rate limiting system with configurable scopes - Event logging: joins, leaves, message edits/deletes, voice activity - Per-guild configuration with caching - Docker deployment support Bug fixes applied: - Fixed await on session.delete() in guild_config.py - Fixed memory leak in AI moderation message tracking (use deque) - Added error handling to bot shutdown - Added error handling to timeout command - Removed unused Literal import - Added prefix validation - Added image analysis limit (3 per message) - Fixed test mock for SQLAlchemy model
131 lines
4.5 KiB
Python
131 lines
4.5 KiB
Python
"""Tests for rate limiting service."""
|
|
|
|
import pytest
|
|
|
|
from guardden.services.ratelimit import (
|
|
RateLimitBucket,
|
|
RateLimitConfig,
|
|
RateLimiter,
|
|
RateLimitScope,
|
|
)
|
|
|
|
|
|
class TestRateLimitBucket:
|
|
"""Tests for RateLimitBucket."""
|
|
|
|
def test_not_limited_initially(self) -> None:
|
|
"""Test bucket is not limited when empty."""
|
|
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
|
|
assert bucket.is_limited() is False
|
|
assert bucket.remaining() == 3
|
|
|
|
def test_limited_after_max_requests(self) -> None:
|
|
"""Test bucket is limited after max requests."""
|
|
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
|
|
|
|
for _ in range(3):
|
|
bucket.record()
|
|
|
|
assert bucket.is_limited() is True
|
|
assert bucket.remaining() == 0
|
|
|
|
def test_remaining_decreases(self) -> None:
|
|
"""Test remaining count decreases."""
|
|
bucket = RateLimitBucket(max_requests=5, window_seconds=60)
|
|
|
|
bucket.record()
|
|
assert bucket.remaining() == 4
|
|
|
|
bucket.record()
|
|
assert bucket.remaining() == 3
|
|
|
|
|
|
class TestRateLimiter:
|
|
"""Tests for RateLimiter."""
|
|
|
|
@pytest.fixture
|
|
def limiter(self) -> RateLimiter:
|
|
return RateLimiter()
|
|
|
|
def test_check_not_limited(self, limiter: RateLimiter) -> None:
|
|
"""Test check returns not limited for new bucket."""
|
|
result = limiter.check("command", user_id=123, guild_id=456)
|
|
assert result.is_limited is False
|
|
|
|
def test_acquire_records_request(self, limiter: RateLimiter) -> None:
|
|
"""Test acquire records the request."""
|
|
# Configure a simple limit
|
|
limiter.configure(
|
|
"test",
|
|
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
result1 = limiter.acquire("test", user_id=123)
|
|
assert result1.is_limited is False
|
|
assert result1.remaining == 1
|
|
|
|
result2 = limiter.acquire("test", user_id=123)
|
|
assert result2.is_limited is False
|
|
assert result2.remaining == 0
|
|
|
|
result3 = limiter.acquire("test", user_id=123)
|
|
assert result3.is_limited is True
|
|
|
|
def test_different_scopes(self, limiter: RateLimiter) -> None:
|
|
"""Test different scopes create different buckets."""
|
|
# User scope - shared across guilds
|
|
limiter.configure(
|
|
"user_action",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
limiter.acquire("user_action", user_id=123, guild_id=1)
|
|
result = limiter.acquire("user_action", user_id=123, guild_id=2)
|
|
assert result.is_limited is True # Same user, different guild
|
|
|
|
# Member scope - per guild
|
|
limiter.configure(
|
|
"member_action",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.MEMBER),
|
|
)
|
|
|
|
limiter.acquire("member_action", user_id=456, guild_id=1)
|
|
result = limiter.acquire("member_action", user_id=456, guild_id=2)
|
|
assert result.is_limited is False # Same user, different guild = different bucket
|
|
|
|
def test_reset(self, limiter: RateLimiter) -> None:
|
|
"""Test resetting a bucket."""
|
|
limiter.configure(
|
|
"test",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
limiter.acquire("test", user_id=123)
|
|
assert limiter.acquire("test", user_id=123).is_limited is True
|
|
|
|
limiter.reset("test", user_id=123)
|
|
assert limiter.acquire("test", user_id=123).is_limited is False
|
|
|
|
def test_unknown_action(self, limiter: RateLimiter) -> None:
|
|
"""Test unknown action returns not limited."""
|
|
result = limiter.acquire("unknown_action", user_id=123)
|
|
assert result.is_limited is False
|
|
assert result.remaining == 999
|
|
|
|
def test_guild_scope(self, limiter: RateLimiter) -> None:
|
|
"""Test guild-scoped rate limiting."""
|
|
limiter.configure(
|
|
"guild_action",
|
|
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.GUILD),
|
|
)
|
|
|
|
# Different users in same guild share the limit
|
|
limiter.acquire("guild_action", user_id=1, guild_id=100)
|
|
limiter.acquire("guild_action", user_id=2, guild_id=100)
|
|
result = limiter.acquire("guild_action", user_id=3, guild_id=100)
|
|
assert result.is_limited is True
|
|
|
|
# Different guild is not limited
|
|
result = limiter.acquire("guild_action", user_id=1, guild_id=200)
|
|
assert result.is_limited is False
|