Files
AegisGitea-MCP/tests/test_server.py
2026-01-31 15:55:22 +00:00

214 lines
6.3 KiB
Python

"""Tests for MCP server endpoints."""
import pytest
from fastapi.testclient import TestClient
from aegis_gitea_mcp.auth import reset_validator
from aegis_gitea_mcp.config import reset_settings
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state between tests."""
reset_settings()
reset_validator()
yield
reset_settings()
reset_validator()
@pytest.fixture
def mock_env(monkeypatch):
"""Set up test environment."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
monkeypatch.setenv("AUTH_ENABLED", "true")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
@pytest.fixture
def mock_env_auth_disabled(monkeypatch):
"""Set up test environment with auth disabled."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
monkeypatch.setenv("AUTH_ENABLED", "false")
monkeypatch.setenv("MCP_API_KEYS", "")
@pytest.fixture
def client(mock_env):
"""Create test client."""
# Import after setting env vars
from aegis_gitea_mcp.server import app
return TestClient(app)
@pytest.fixture
def client_no_auth(mock_env_auth_disabled):
"""Create test client with auth disabled."""
from aegis_gitea_mcp.server import app
return TestClient(app)
def test_root_endpoint(client):
"""Test root endpoint returns server info."""
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert data["name"] == "AegisGitea MCP Server"
assert "version" in data
assert data["status"] == "running"
def test_health_endpoint(client):
"""Test health check endpoint."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
def test_health_endpoint_no_auth_required(client):
"""Test that health check doesn't require authentication."""
response = client.get("/health")
# Should work without Authorization header
assert response.status_code == 200
def test_list_tools_without_auth(client):
"""Test that /mcp/tools is public (Mixed mode for ChatGPT)."""
response = client.get("/mcp/tools")
# Tool listing is public to support ChatGPT discovery
assert response.status_code == 200
data = response.json()
assert "tools" in data
def test_list_tools_with_invalid_key(client):
"""Test /mcp/tools works even with invalid key (public endpoint)."""
response = client.get(
"/mcp/tools",
headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"},
)
# Tool listing is public, so even invalid keys can list tools
assert response.status_code == 200
def test_list_tools_with_valid_key(client, mock_env):
"""Test /mcp/tools with valid API key."""
response = client.get("/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"})
assert response.status_code == 200
data = response.json()
assert "tools" in data
assert len(data["tools"]) > 0
# Check tool structure
tool = data["tools"][0]
assert "name" in tool
assert "description" in tool
assert "input_schema" in tool
def test_list_tools_with_query_param(client):
"""Test /mcp/tools with API key in query parameter."""
response = client.get(f"/mcp/tools?api_key={'a' * 64}")
assert response.status_code == 200
data = response.json()
assert "tools" in data
assert len(data["tools"]) > 0
def test_list_tools_no_auth_when_disabled(client_no_auth):
"""Test that /mcp/tools works without auth when disabled."""
response = client_no_auth.get("/mcp/tools")
# Should work without Authorization header when auth is disabled
assert response.status_code == 200
data = response.json()
assert "tools" in data
def test_call_tool_without_auth(client):
"""Test that /mcp/tool/call requires authentication."""
response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}})
assert response.status_code == 401
def test_call_tool_with_invalid_key(client):
"""Test /mcp/tool/call with invalid API key."""
response = client.post(
"/mcp/tool/call",
headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"},
json={"tool": "list_repositories", "arguments": {}},
)
assert response.status_code == 401
def test_call_nonexistent_tool(client):
"""Test calling a tool that doesn't exist."""
response = client.post(
"/mcp/tool/call",
headers={"Authorization": f"Bearer {'a' * 64}"},
json={"tool": "nonexistent_tool", "arguments": {}},
)
# Tool not found returns 404 (auth passes but tool missing)
assert response.status_code == 404
data = response.json()
assert "not found" in data["detail"].lower()
def test_sse_endpoint_without_auth(client):
"""Test that SSE endpoint requires authentication."""
response = client.get("/mcp/sse")
assert response.status_code == 401
def test_auth_header_formats(client):
"""Test various Authorization header formats on protected endpoint."""
# Test with /mcp/tool/call since /mcp/tools is now public
tool_data = {"tool": "list_repositories", "arguments": {}}
# Missing "Bearer" prefix
response = client.post("/mcp/tool/call", headers={"Authorization": "a" * 64}, json=tool_data)
assert response.status_code == 401
# Wrong case
response = client.post(
"/mcp/tool/call", headers={"Authorization": "bearer " + "a" * 64}, json=tool_data
)
assert response.status_code == 401
# Extra spaces
response = client.post(
"/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json=tool_data
)
assert response.status_code == 401
def test_rate_limiting(client):
"""Test rate limiting after multiple failed auth attempts."""
tool_data = {"tool": "list_repositories", "arguments": {}}
# Make 6 failed attempts on protected endpoint
for i in range(6):
response = client.post(
"/mcp/tool/call", headers={"Authorization": "Bearer " + "x" * 64}, json=tool_data
)
# Last response should mention rate limiting
data = response.json()
assert "Too many failed" in data["message"]