Added three test modules covering: - test_auth.py: Unit tests for authentication module - API key generation and validation - Rate limiting - Multiple keys support - Constant-time comparison - test_server.py: Server endpoint tests - Authentication middleware - Protected vs public endpoints - Various auth header formats - Rate limiting at endpoint level - test_integration.py: Integration tests - Complete authentication flow - Key rotation simulation - Multiple tool discovery - Error message validation All tests verify functionality without breaking existing features.
209 lines
6.5 KiB
Python
209 lines
6.5 KiB
Python
"""Integration tests for the complete system."""
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from aegis_gitea_mcp.config import reset_settings
|
|
from aegis_gitea_mcp.auth import reset_validator
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_state():
|
|
"""Reset global state between tests."""
|
|
reset_settings()
|
|
reset_validator()
|
|
yield
|
|
reset_settings()
|
|
reset_validator()
|
|
|
|
|
|
@pytest.fixture
|
|
def full_env(monkeypatch):
|
|
"""Set up complete test environment."""
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
|
|
monkeypatch.setenv("AUTH_ENABLED", "true")
|
|
monkeypatch.setenv("MCP_API_KEYS", f"{'a' * 64},{'b' * 64}")
|
|
monkeypatch.setenv("MCP_HOST", "0.0.0.0")
|
|
monkeypatch.setenv("MCP_PORT", "8080")
|
|
monkeypatch.setenv("LOG_LEVEL", "INFO")
|
|
monkeypatch.setenv("MAX_AUTH_FAILURES", "5")
|
|
monkeypatch.setenv("AUTH_FAILURE_WINDOW", "300")
|
|
|
|
|
|
@pytest.fixture
|
|
def client(full_env):
|
|
"""Create test client with full environment."""
|
|
from aegis_gitea_mcp.server import app
|
|
return TestClient(app)
|
|
|
|
|
|
def test_complete_authentication_flow(client):
|
|
"""Test complete authentication flow from start to finish."""
|
|
# 1. Health check should work without auth
|
|
response = client.get("/health")
|
|
assert response.status_code == 200
|
|
|
|
# 2. Protected endpoint should reject without auth
|
|
response = client.get("/mcp/tools")
|
|
assert response.status_code == 401
|
|
|
|
# 3. Protected endpoint should reject with invalid key
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "c" * 64}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# 4. Protected endpoint should accept with valid key (first key)
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# 5. Protected endpoint should accept with valid key (second key)
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "b" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_key_rotation_simulation(client, monkeypatch):
|
|
"""Simulate key rotation with grace period."""
|
|
# Start with key A
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Both keys A and B work (grace period)
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "b" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_multiple_tool_calls_with_auth(client):
|
|
"""Test multiple tool calls with authentication."""
|
|
headers = {"Authorization": "Bearer " + "a" * 64}
|
|
|
|
# List tools
|
|
response = client.get("/mcp/tools", headers=headers)
|
|
assert response.status_code == 200
|
|
tools = response.json()["tools"]
|
|
|
|
# Try to call each tool (will fail without proper Gitea connection, but auth should work)
|
|
for tool in tools:
|
|
response = client.post(
|
|
"/mcp/tool/call",
|
|
headers=headers,
|
|
json={"tool": tool["name"], "arguments": {}}
|
|
)
|
|
# Should pass auth but may fail on actual execution (Gitea not available in tests)
|
|
assert response.status_code != 401 # Not auth error
|
|
|
|
|
|
def test_concurrent_requests_different_ips(client):
|
|
"""Test that different IPs are tracked separately for rate limiting."""
|
|
# This is a simplified test since we can't easily simulate different IPs in TestClient
|
|
# But we can verify rate limiting works for single IP
|
|
|
|
headers_invalid = {"Authorization": "Bearer " + "x" * 64}
|
|
|
|
# Make 5 failed attempts
|
|
for i in range(5):
|
|
response = client.get("/mcp/tools", headers=headers_invalid)
|
|
assert response.status_code == 401
|
|
|
|
# 6th attempt should be rate limited
|
|
response = client.get("/mcp/tools", headers=headers_invalid)
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Too many failed" in data["message"]
|
|
|
|
# Valid key should still work (not rate limited for valid keys)
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_all_mcp_tools_discoverable(client):
|
|
"""Test that all MCP tools are properly registered and discoverable."""
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
tools = data["tools"]
|
|
|
|
# Expected tools
|
|
expected_tools = [
|
|
"list_repositories",
|
|
"get_repository_info",
|
|
"get_file_tree",
|
|
"get_file_contents",
|
|
]
|
|
|
|
tool_names = [tool["name"] for tool in tools]
|
|
|
|
for expected in expected_tools:
|
|
assert expected in tool_names, f"Tool {expected} not found in registered tools"
|
|
|
|
# Verify each tool has required fields
|
|
for tool in tools:
|
|
assert "name" in tool
|
|
assert "description" in tool
|
|
assert "input_schema" in tool
|
|
assert tool["description"] # Not empty
|
|
assert "type" in tool["input_schema"]
|
|
|
|
|
|
def test_error_responses_include_helpful_messages(client):
|
|
"""Test that error responses include helpful messages for users."""
|
|
# Missing auth
|
|
response = client.get("/mcp/tools")
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Authorization" in data["detail"]
|
|
assert "Bearer" in data["detail"]
|
|
|
|
# Invalid key format
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer short"}
|
|
)
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Invalid" in data["message"] or "format" in data["message"].lower()
|
|
|
|
|
|
def test_audit_logging_integration(client, tmp_path, monkeypatch):
|
|
"""Test that audit logging works with authentication."""
|
|
# Set audit log to temp file
|
|
audit_log = tmp_path / "audit.log"
|
|
monkeypatch.setenv("AUDIT_LOG_PATH", str(audit_log))
|
|
|
|
# Make authenticated request
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer " + "a" * 64}
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Note: In real system, audit logs would be written
|
|
# This test verifies the system doesn't crash with audit logging enabled
|