"""Tests for MCP server endpoints.""" import pytest from fastapi.testclient import TestClient from aegis_gitea_mcp.auth import reset_validator from aegis_gitea_mcp.config import reset_settings @pytest.fixture(autouse=True) def reset_state(): """Reset global state between tests.""" reset_settings() reset_validator() yield reset_settings() reset_validator() @pytest.fixture def mock_env(monkeypatch): """Set up test environment.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") monkeypatch.setenv("AUTH_ENABLED", "true") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) @pytest.fixture def mock_env_auth_disabled(monkeypatch): """Set up test environment with auth disabled.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") monkeypatch.setenv("AUTH_ENABLED", "false") monkeypatch.setenv("MCP_API_KEYS", "") @pytest.fixture def client(mock_env): """Create test client.""" # Import after setting env vars from aegis_gitea_mcp.server import app return TestClient(app) @pytest.fixture def client_no_auth(mock_env_auth_disabled): """Create test client with auth disabled.""" from aegis_gitea_mcp.server import app return TestClient(app) def test_root_endpoint(client): """Test root endpoint returns server info.""" response = client.get("/") assert response.status_code == 200 data = response.json() assert data["name"] == "AegisGitea MCP Server" assert "version" in data assert data["status"] == "running" def test_health_endpoint(client): """Test health check endpoint.""" response = client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" def test_health_endpoint_no_auth_required(client): """Test that health check doesn't require authentication.""" response = client.get("/health") # Should work without Authorization header assert response.status_code == 200 def test_list_tools_without_auth(client): """Test that /mcp/tools is public (Mixed mode for ChatGPT).""" response = client.get("/mcp/tools") # Tool listing is public to support ChatGPT discovery assert response.status_code == 200 data = response.json() assert "tools" in data def test_list_tools_with_invalid_key(client): """Test /mcp/tools works even with invalid key (public endpoint).""" response = client.get( "/mcp/tools", headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, ) # Tool listing is public, so even invalid keys can list tools assert response.status_code == 200 def test_list_tools_with_valid_key(client, mock_env): """Test /mcp/tools with valid API key.""" response = client.get("/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"}) assert response.status_code == 200 data = response.json() assert "tools" in data assert len(data["tools"]) > 0 # Check tool structure tool = data["tools"][0] assert "name" in tool assert "description" in tool assert "input_schema" in tool def test_list_tools_with_query_param(client): """Test /mcp/tools with API key in query parameter.""" response = client.get(f"/mcp/tools?api_key={'a' * 64}") assert response.status_code == 200 data = response.json() assert "tools" in data assert len(data["tools"]) > 0 def test_list_tools_no_auth_when_disabled(client_no_auth): """Test that /mcp/tools works without auth when disabled.""" response = client_no_auth.get("/mcp/tools") # Should work without Authorization header when auth is disabled assert response.status_code == 200 data = response.json() assert "tools" in data def test_call_tool_without_auth(client): """Test that /mcp/tool/call requires authentication.""" response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}) assert response.status_code == 401 def test_call_tool_with_invalid_key(client): """Test /mcp/tool/call with invalid API key.""" response = client.post( "/mcp/tool/call", headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, json={"tool": "list_repositories", "arguments": {}}, ) assert response.status_code == 401 def test_call_nonexistent_tool(client): """Test calling a tool that doesn't exist.""" response = client.post( "/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json={"tool": "nonexistent_tool", "arguments": {}}, ) # Tool not found returns 404 (auth passes but tool missing) assert response.status_code == 404 data = response.json() assert "not found" in data["detail"].lower() def test_sse_endpoint_without_auth(client): """Test that SSE endpoint requires authentication.""" response = client.get("/mcp/sse") assert response.status_code == 401 def test_auth_header_formats(client): """Test various Authorization header formats on protected endpoint.""" # Test with /mcp/tool/call since /mcp/tools is now public tool_data = {"tool": "list_repositories", "arguments": {}} # Missing "Bearer" prefix response = client.post("/mcp/tool/call", headers={"Authorization": "a" * 64}, json=tool_data) assert response.status_code == 401 # Wrong case response = client.post( "/mcp/tool/call", headers={"Authorization": "bearer " + "a" * 64}, json=tool_data ) assert response.status_code == 401 # Extra spaces response = client.post( "/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json=tool_data ) assert response.status_code == 401 def test_rate_limiting(client): """Test rate limiting after multiple failed auth attempts.""" tool_data = {"tool": "list_repositories", "arguments": {}} # Make 6 failed attempts on protected endpoint for _ in range(6): response = client.post( "/mcp/tool/call", headers={"Authorization": "Bearer " + "x" * 64}, json=tool_data ) # Last response should mention rate limiting data = response.json() assert "Too many failed" in data["message"]