Files
GuardDen/tests/test_ratelimit.py
latte 4e16777f25 Implement GuardDen Discord moderation bot
Features:
- Core moderation: warn, kick, ban, timeout, strike system
- Automod: banned words filter, scam detection, anti-spam, link filtering
- AI moderation: Claude/OpenAI integration, NSFW detection, phishing analysis
- Verification system: button, captcha, math, emoji challenges
- Rate limiting system with configurable scopes
- Event logging: joins, leaves, message edits/deletes, voice activity
- Per-guild configuration with caching
- Docker deployment support

Bug fixes applied:
- Fixed await on session.delete() in guild_config.py
- Fixed memory leak in AI moderation message tracking (use deque)
- Added error handling to bot shutdown
- Added error handling to timeout command
- Removed unused Literal import
- Added prefix validation
- Added image analysis limit (3 per message)
- Fixed test mock for SQLAlchemy model
2026-01-16 19:27:48 +01:00

131 lines
4.5 KiB
Python

"""Tests for rate limiting service."""
import pytest
from guardden.services.ratelimit import (
RateLimitBucket,
RateLimitConfig,
RateLimiter,
RateLimitScope,
)
class TestRateLimitBucket:
"""Tests for RateLimitBucket."""
def test_not_limited_initially(self) -> None:
"""Test bucket is not limited when empty."""
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
assert bucket.is_limited() is False
assert bucket.remaining() == 3
def test_limited_after_max_requests(self) -> None:
"""Test bucket is limited after max requests."""
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
for _ in range(3):
bucket.record()
assert bucket.is_limited() is True
assert bucket.remaining() == 0
def test_remaining_decreases(self) -> None:
"""Test remaining count decreases."""
bucket = RateLimitBucket(max_requests=5, window_seconds=60)
bucket.record()
assert bucket.remaining() == 4
bucket.record()
assert bucket.remaining() == 3
class TestRateLimiter:
"""Tests for RateLimiter."""
@pytest.fixture
def limiter(self) -> RateLimiter:
return RateLimiter()
def test_check_not_limited(self, limiter: RateLimiter) -> None:
"""Test check returns not limited for new bucket."""
result = limiter.check("command", user_id=123, guild_id=456)
assert result.is_limited is False
def test_acquire_records_request(self, limiter: RateLimiter) -> None:
"""Test acquire records the request."""
# Configure a simple limit
limiter.configure(
"test",
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.USER),
)
result1 = limiter.acquire("test", user_id=123)
assert result1.is_limited is False
assert result1.remaining == 1
result2 = limiter.acquire("test", user_id=123)
assert result2.is_limited is False
assert result2.remaining == 0
result3 = limiter.acquire("test", user_id=123)
assert result3.is_limited is True
def test_different_scopes(self, limiter: RateLimiter) -> None:
"""Test different scopes create different buckets."""
# User scope - shared across guilds
limiter.configure(
"user_action",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
)
limiter.acquire("user_action", user_id=123, guild_id=1)
result = limiter.acquire("user_action", user_id=123, guild_id=2)
assert result.is_limited is True # Same user, different guild
# Member scope - per guild
limiter.configure(
"member_action",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.MEMBER),
)
limiter.acquire("member_action", user_id=456, guild_id=1)
result = limiter.acquire("member_action", user_id=456, guild_id=2)
assert result.is_limited is False # Same user, different guild = different bucket
def test_reset(self, limiter: RateLimiter) -> None:
"""Test resetting a bucket."""
limiter.configure(
"test",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
)
limiter.acquire("test", user_id=123)
assert limiter.acquire("test", user_id=123).is_limited is True
limiter.reset("test", user_id=123)
assert limiter.acquire("test", user_id=123).is_limited is False
def test_unknown_action(self, limiter: RateLimiter) -> None:
"""Test unknown action returns not limited."""
result = limiter.acquire("unknown_action", user_id=123)
assert result.is_limited is False
assert result.remaining == 999
def test_guild_scope(self, limiter: RateLimiter) -> None:
"""Test guild-scoped rate limiting."""
limiter.configure(
"guild_action",
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.GUILD),
)
# Different users in same guild share the limit
limiter.acquire("guild_action", user_id=1, guild_id=100)
limiter.acquire("guild_action", user_id=2, guild_id=100)
result = limiter.acquire("guild_action", user_id=3, guild_id=100)
assert result.is_limited is True
# Different guild is not limited
result = limiter.acquire("guild_action", user_id=1, guild_id=200)
assert result.is_limited is False