Files
GuardDen/tests/test_ratelimit.py
latte 831eed8dbc
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 6m9s
CI/CD Pipeline / Security Scanning (push) Successful in 26s
CI/CD Pipeline / Tests (3.11) (push) Failing after 5m24s
CI/CD Pipeline / Tests (3.12) (push) Failing after 5m23s
CI/CD Pipeline / Build Docker Image (push) Has been skipped
CI/CD Pipeline / Deploy to Staging (push) Has been skipped
CI/CD Pipeline / Deploy to Production (push) Has been skipped
CI/CD Pipeline / Notification (push) Successful in 1s
quick commit
2026-01-17 20:24:43 +01:00

143 lines
5.0 KiB
Python

"""Tests for rate limiting service."""
import pytest
from guardden.services.ratelimit import (
RateLimitBucket,
RateLimitConfig,
RateLimiter,
RateLimitScope,
)
class TestRateLimitBucket:
"""Tests for RateLimitBucket."""
def test_not_limited_initially(self) -> None:
"""Test bucket is not limited when empty."""
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
assert bucket.is_limited() is False
assert bucket.remaining() == 3
def test_limited_after_max_requests(self) -> None:
"""Test bucket is limited after max requests."""
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
for _ in range(3):
bucket.record()
assert bucket.is_limited() is True
assert bucket.remaining() == 0
def test_remaining_decreases(self) -> None:
"""Test remaining count decreases."""
bucket = RateLimitBucket(max_requests=5, window_seconds=60)
bucket.record()
assert bucket.remaining() == 4
bucket.record()
assert bucket.remaining() == 3
class TestRateLimiter:
"""Tests for RateLimiter."""
@pytest.fixture
def limiter(self) -> RateLimiter:
return RateLimiter()
def test_check_not_limited(self, limiter: RateLimiter) -> None:
"""Test check returns not limited for new bucket."""
result = limiter.check("command", user_id=123, guild_id=456)
assert result.is_limited is False
def test_acquire_records_request(self, limiter: RateLimiter) -> None:
"""Test acquire records the request."""
# Configure a simple limit
limiter.configure(
"test",
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.USER),
)
result1 = limiter.acquire("test", user_id=123)
assert result1.is_limited is False
assert result1.remaining == 1
result2 = limiter.acquire("test", user_id=123)
assert result2.is_limited is False
assert result2.remaining == 0
result3 = limiter.acquire("test", user_id=123)
assert result3.is_limited is True
def test_different_scopes(self, limiter: RateLimiter) -> None:
"""Test different scopes create different buckets."""
# User scope - shared across guilds
limiter.configure(
"user_action",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
)
limiter.acquire("user_action", user_id=123, guild_id=1)
result = limiter.acquire("user_action", user_id=123, guild_id=2)
assert result.is_limited is True # Same user, different guild
# Member scope - per guild
limiter.configure(
"member_action",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.MEMBER),
)
limiter.acquire("member_action", user_id=456, guild_id=1)
result = limiter.acquire("member_action", user_id=456, guild_id=2)
assert result.is_limited is False # Same user, different guild = different bucket
def test_reset(self, limiter: RateLimiter) -> None:
"""Test resetting a bucket."""
limiter.configure(
"test",
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
)
limiter.acquire("test", user_id=123)
assert limiter.acquire("test", user_id=123).is_limited is True
limiter.reset("test", user_id=123)
assert limiter.acquire("test", user_id=123).is_limited is False
def test_unknown_action(self, limiter: RateLimiter) -> None:
"""Test unknown action returns not limited."""
result = limiter.acquire("unknown_action", user_id=123)
assert result.is_limited is False
assert result.remaining == 999
def test_acquire_command_scopes_per_command(self, limiter: RateLimiter) -> None:
"""Test per-command rate limits are independent."""
for _ in range(5):
result = limiter.acquire_command("config", user_id=1, guild_id=1)
assert result.is_limited is False
limited = limiter.acquire_command("config", user_id=1, guild_id=1)
assert limited.is_limited is True
other = limiter.acquire_command("other", user_id=1, guild_id=1)
assert other.is_limited is False
def test_guild_scope(self, limiter: RateLimiter) -> None:
"""Test guild-scoped rate limiting."""
limiter.configure(
"guild_action",
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.GUILD),
)
# Different users in same guild share the limit
limiter.acquire("guild_action", user_id=1, guild_id=100)
limiter.acquire("guild_action", user_id=2, guild_id=100)
result = limiter.acquire("guild_action", user_id=3, guild_id=100)
assert result.is_limited is True
# Different guild is not limited
result = limiter.acquire("guild_action", user_id=1, guild_id=200)
assert result.is_limited is False