Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 6m9s
CI/CD Pipeline / Security Scanning (push) Successful in 26s
CI/CD Pipeline / Tests (3.11) (push) Failing after 5m24s
CI/CD Pipeline / Tests (3.12) (push) Failing after 5m23s
CI/CD Pipeline / Build Docker Image (push) Has been skipped
CI/CD Pipeline / Deploy to Staging (push) Has been skipped
CI/CD Pipeline / Deploy to Production (push) Has been skipped
CI/CD Pipeline / Notification (push) Successful in 1s
143 lines
5.0 KiB
Python
143 lines
5.0 KiB
Python
"""Tests for rate limiting service."""
|
|
|
|
import pytest
|
|
|
|
from guardden.services.ratelimit import (
|
|
RateLimitBucket,
|
|
RateLimitConfig,
|
|
RateLimiter,
|
|
RateLimitScope,
|
|
)
|
|
|
|
|
|
class TestRateLimitBucket:
|
|
"""Tests for RateLimitBucket."""
|
|
|
|
def test_not_limited_initially(self) -> None:
|
|
"""Test bucket is not limited when empty."""
|
|
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
|
|
assert bucket.is_limited() is False
|
|
assert bucket.remaining() == 3
|
|
|
|
def test_limited_after_max_requests(self) -> None:
|
|
"""Test bucket is limited after max requests."""
|
|
bucket = RateLimitBucket(max_requests=3, window_seconds=60)
|
|
|
|
for _ in range(3):
|
|
bucket.record()
|
|
|
|
assert bucket.is_limited() is True
|
|
assert bucket.remaining() == 0
|
|
|
|
def test_remaining_decreases(self) -> None:
|
|
"""Test remaining count decreases."""
|
|
bucket = RateLimitBucket(max_requests=5, window_seconds=60)
|
|
|
|
bucket.record()
|
|
assert bucket.remaining() == 4
|
|
|
|
bucket.record()
|
|
assert bucket.remaining() == 3
|
|
|
|
|
|
class TestRateLimiter:
|
|
"""Tests for RateLimiter."""
|
|
|
|
@pytest.fixture
|
|
def limiter(self) -> RateLimiter:
|
|
return RateLimiter()
|
|
|
|
def test_check_not_limited(self, limiter: RateLimiter) -> None:
|
|
"""Test check returns not limited for new bucket."""
|
|
result = limiter.check("command", user_id=123, guild_id=456)
|
|
assert result.is_limited is False
|
|
|
|
def test_acquire_records_request(self, limiter: RateLimiter) -> None:
|
|
"""Test acquire records the request."""
|
|
# Configure a simple limit
|
|
limiter.configure(
|
|
"test",
|
|
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
result1 = limiter.acquire("test", user_id=123)
|
|
assert result1.is_limited is False
|
|
assert result1.remaining == 1
|
|
|
|
result2 = limiter.acquire("test", user_id=123)
|
|
assert result2.is_limited is False
|
|
assert result2.remaining == 0
|
|
|
|
result3 = limiter.acquire("test", user_id=123)
|
|
assert result3.is_limited is True
|
|
|
|
def test_different_scopes(self, limiter: RateLimiter) -> None:
|
|
"""Test different scopes create different buckets."""
|
|
# User scope - shared across guilds
|
|
limiter.configure(
|
|
"user_action",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
limiter.acquire("user_action", user_id=123, guild_id=1)
|
|
result = limiter.acquire("user_action", user_id=123, guild_id=2)
|
|
assert result.is_limited is True # Same user, different guild
|
|
|
|
# Member scope - per guild
|
|
limiter.configure(
|
|
"member_action",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.MEMBER),
|
|
)
|
|
|
|
limiter.acquire("member_action", user_id=456, guild_id=1)
|
|
result = limiter.acquire("member_action", user_id=456, guild_id=2)
|
|
assert result.is_limited is False # Same user, different guild = different bucket
|
|
|
|
def test_reset(self, limiter: RateLimiter) -> None:
|
|
"""Test resetting a bucket."""
|
|
limiter.configure(
|
|
"test",
|
|
RateLimitConfig(max_requests=1, window_seconds=60, scope=RateLimitScope.USER),
|
|
)
|
|
|
|
limiter.acquire("test", user_id=123)
|
|
assert limiter.acquire("test", user_id=123).is_limited is True
|
|
|
|
limiter.reset("test", user_id=123)
|
|
assert limiter.acquire("test", user_id=123).is_limited is False
|
|
|
|
def test_unknown_action(self, limiter: RateLimiter) -> None:
|
|
"""Test unknown action returns not limited."""
|
|
result = limiter.acquire("unknown_action", user_id=123)
|
|
assert result.is_limited is False
|
|
assert result.remaining == 999
|
|
|
|
def test_acquire_command_scopes_per_command(self, limiter: RateLimiter) -> None:
|
|
"""Test per-command rate limits are independent."""
|
|
for _ in range(5):
|
|
result = limiter.acquire_command("config", user_id=1, guild_id=1)
|
|
assert result.is_limited is False
|
|
|
|
limited = limiter.acquire_command("config", user_id=1, guild_id=1)
|
|
assert limited.is_limited is True
|
|
|
|
other = limiter.acquire_command("other", user_id=1, guild_id=1)
|
|
assert other.is_limited is False
|
|
|
|
def test_guild_scope(self, limiter: RateLimiter) -> None:
|
|
"""Test guild-scoped rate limiting."""
|
|
limiter.configure(
|
|
"guild_action",
|
|
RateLimitConfig(max_requests=2, window_seconds=60, scope=RateLimitScope.GUILD),
|
|
)
|
|
|
|
# Different users in same guild share the limit
|
|
limiter.acquire("guild_action", user_id=1, guild_id=100)
|
|
limiter.acquire("guild_action", user_id=2, guild_id=100)
|
|
result = limiter.acquire("guild_action", user_id=3, guild_id=100)
|
|
assert result.is_limited is True
|
|
|
|
# Different guild is not limited
|
|
result = limiter.acquire("guild_action", user_id=1, guild_id=200)
|
|
assert result.is_limited is False
|