289 lines
9.1 KiB
Python
289 lines
9.1 KiB
Python
"""Tests for MCP server endpoints."""
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from aegis_gitea_mcp.auth import reset_validator
|
|
from aegis_gitea_mcp.config import reset_settings
|
|
from aegis_gitea_mcp.gitea_client import GiteaAuthenticationError, GiteaAuthorizationError
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_state():
|
|
"""Reset global state between tests."""
|
|
reset_settings()
|
|
reset_validator()
|
|
yield
|
|
reset_settings()
|
|
reset_validator()
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_env(monkeypatch):
|
|
"""Set up test environment."""
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
|
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
|
monkeypatch.setenv("AUTH_ENABLED", "true")
|
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
|
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_env_auth_disabled(monkeypatch):
|
|
"""Set up test environment with auth disabled."""
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
|
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
|
monkeypatch.setenv("AUTH_ENABLED", "false")
|
|
monkeypatch.setenv("MCP_API_KEYS", "")
|
|
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
|
|
|
|
|
|
@pytest.fixture
|
|
def client(mock_env):
|
|
"""Create test client."""
|
|
# Import after setting env vars
|
|
from aegis_gitea_mcp.server import app
|
|
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture
|
|
def client_no_auth(mock_env_auth_disabled):
|
|
"""Create test client with auth disabled."""
|
|
from aegis_gitea_mcp.server import app
|
|
|
|
return TestClient(app)
|
|
|
|
|
|
def test_root_endpoint(client):
|
|
"""Test root endpoint returns server info."""
|
|
response = client.get("/")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["name"] == "AegisGitea MCP Server"
|
|
assert "version" in data
|
|
assert data["status"] == "running"
|
|
|
|
|
|
def test_health_endpoint(client):
|
|
"""Test health check endpoint."""
|
|
response = client.get("/health")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
|
|
|
|
def test_metrics_endpoint(client):
|
|
"""Metrics endpoint should be available for observability."""
|
|
response = client.get("/metrics")
|
|
assert response.status_code == 200
|
|
assert "aegis_http_requests_total" in response.text
|
|
|
|
|
|
def test_health_endpoint_no_auth_required(client):
|
|
"""Test that health check doesn't require authentication."""
|
|
response = client.get("/health")
|
|
|
|
# Should work without Authorization header
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_list_tools_without_auth(client):
|
|
"""Test that /mcp/tools is public (Mixed mode for ChatGPT)."""
|
|
response = client.get("/mcp/tools")
|
|
|
|
# Tool listing is public to support ChatGPT discovery
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "tools" in data
|
|
|
|
|
|
def test_list_tools_with_invalid_key(client):
|
|
"""Test /mcp/tools works even with invalid key (public endpoint)."""
|
|
response = client.get(
|
|
"/mcp/tools",
|
|
headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"},
|
|
)
|
|
|
|
# Tool listing is public, so even invalid keys can list tools
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_list_tools_with_valid_key(client, mock_env):
|
|
"""Test /mcp/tools with valid API key."""
|
|
response = client.get("/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"})
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "tools" in data
|
|
assert len(data["tools"]) > 0
|
|
|
|
# Check tool structure
|
|
tool = data["tools"][0]
|
|
assert "name" in tool
|
|
assert "description" in tool
|
|
assert "input_schema" in tool
|
|
|
|
|
|
def test_list_tools_with_query_param(client):
|
|
"""Test /mcp/tools with API key in query parameter."""
|
|
response = client.get(f"/mcp/tools?api_key={'a' * 64}")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "tools" in data
|
|
assert len(data["tools"]) > 0
|
|
|
|
|
|
def test_list_tools_no_auth_when_disabled(client_no_auth):
|
|
"""Test that /mcp/tools works without auth when disabled."""
|
|
response = client_no_auth.get("/mcp/tools")
|
|
|
|
# Should work without Authorization header when auth is disabled
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "tools" in data
|
|
|
|
|
|
def test_call_tool_without_auth(client):
|
|
"""Test that /mcp/tool/call requires authentication."""
|
|
response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}})
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_call_tool_with_invalid_key(client):
|
|
"""Test /mcp/tool/call with invalid API key."""
|
|
response = client.post(
|
|
"/mcp/tool/call",
|
|
headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"},
|
|
json={"tool": "list_repositories", "arguments": {}},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_call_nonexistent_tool(client):
|
|
"""Test calling a tool that doesn't exist."""
|
|
response = client.post(
|
|
"/mcp/tool/call",
|
|
headers={"Authorization": f"Bearer {'a' * 64}"},
|
|
json={"tool": "nonexistent_tool", "arguments": {}},
|
|
)
|
|
|
|
# Tool not found returns 404 (auth passes but tool missing)
|
|
assert response.status_code == 404
|
|
data = response.json()
|
|
assert "not found" in data["detail"].lower()
|
|
|
|
|
|
def test_write_tool_denied_by_default_policy(client):
|
|
"""Write tools must be denied when write mode is disabled."""
|
|
response = client.post(
|
|
"/mcp/tool/call",
|
|
headers={"Authorization": f"Bearer {'a' * 64}"},
|
|
json={
|
|
"tool": "create_issue",
|
|
"arguments": {"owner": "acme", "repo": "demo", "title": "test"},
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert "policy denied" in data["detail"].lower()
|
|
|
|
|
|
def test_sse_endpoint_without_auth(client):
|
|
"""Test that SSE endpoint requires authentication."""
|
|
response = client.get("/mcp/sse")
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_auth_header_formats(client):
|
|
"""Test various Authorization header formats on protected endpoint."""
|
|
# Test with /mcp/tool/call since /mcp/tools is now public
|
|
tool_data = {"tool": "list_repositories", "arguments": {}}
|
|
|
|
# Missing "Bearer" prefix
|
|
response = client.post("/mcp/tool/call", headers={"Authorization": "a" * 64}, json=tool_data)
|
|
assert response.status_code == 401
|
|
|
|
# Wrong case
|
|
response = client.post(
|
|
"/mcp/tool/call", headers={"Authorization": "bearer " + "a" * 64}, json=tool_data
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# Extra spaces
|
|
response = client.post(
|
|
"/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json=tool_data
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_rate_limiting(client):
|
|
"""Test rate limiting after multiple failed auth attempts."""
|
|
tool_data = {"tool": "list_repositories", "arguments": {}}
|
|
|
|
# Make 6 failed attempts on protected endpoint
|
|
for _ in range(6):
|
|
response = client.post(
|
|
"/mcp/tool/call", headers={"Authorization": "Bearer " + "x" * 64}, json=tool_data
|
|
)
|
|
|
|
# Last response should mention rate limiting
|
|
data = response.json()
|
|
assert "Too many failed" in data["message"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_startup_event_fails_with_authentication_guidance(monkeypatch):
|
|
"""Startup validation should fail with explicit auth guidance on 401."""
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
|
|
monkeypatch.setenv("ENVIRONMENT", "production")
|
|
monkeypatch.setenv("AUTH_ENABLED", "true")
|
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
|
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true")
|
|
|
|
from aegis_gitea_mcp import server
|
|
|
|
async def raise_auth_error(*_args, **_kwargs):
|
|
raise GiteaAuthenticationError("Authentication failed - check bot token")
|
|
|
|
monkeypatch.setattr(server.GiteaClient, "get_current_user", raise_auth_error)
|
|
|
|
with pytest.raises(
|
|
RuntimeError, match=r"Startup validation failed: Gitea authentication was rejected"
|
|
):
|
|
await server.startup_event()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_startup_event_fails_with_authorization_guidance(monkeypatch):
|
|
"""Startup validation should fail with explicit permission guidance on 403."""
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
|
|
monkeypatch.setenv("ENVIRONMENT", "production")
|
|
monkeypatch.setenv("AUTH_ENABLED", "true")
|
|
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
|
|
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true")
|
|
|
|
from aegis_gitea_mcp import server
|
|
|
|
async def raise_authorization_error(*_args, **_kwargs):
|
|
raise GiteaAuthorizationError("Bot user lacks permission for this operation")
|
|
|
|
monkeypatch.setattr(server.GiteaClient, "get_current_user", raise_authorization_error)
|
|
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match=r"Startup validation failed: Gitea token lacks permission for /api/v1/user",
|
|
):
|
|
await server.startup_event()
|