"""Tests for MCP server endpoints.""" import pytest from fastapi.testclient import TestClient from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.auth import reset_validator @pytest.fixture(autouse=True) def reset_state(): """Reset global state between tests.""" reset_settings() reset_validator() yield reset_settings() reset_validator() @pytest.fixture def mock_env(monkeypatch): """Set up test environment.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") monkeypatch.setenv("AUTH_ENABLED", "true") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) @pytest.fixture def mock_env_auth_disabled(monkeypatch): """Set up test environment with auth disabled.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") monkeypatch.setenv("AUTH_ENABLED", "false") monkeypatch.setenv("MCP_API_KEYS", "") @pytest.fixture def client(mock_env): """Create test client.""" # Import after setting env vars from aegis_gitea_mcp.server import app return TestClient(app) @pytest.fixture def client_no_auth(mock_env_auth_disabled): """Create test client with auth disabled.""" from aegis_gitea_mcp.server import app return TestClient(app) def test_root_endpoint(client): """Test root endpoint returns server info.""" response = client.get("/") assert response.status_code == 200 data = response.json() assert data["name"] == "AegisGitea MCP Server" assert "version" in data assert data["status"] == "running" def test_health_endpoint(client): """Test health check endpoint.""" response = client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" def test_health_endpoint_no_auth_required(client): """Test that health check doesn't require authentication.""" response = client.get("/health") # Should work without Authorization header assert response.status_code == 200 def test_list_tools_without_auth(client): """Test that /mcp/tools requires authentication.""" response = client.get("/mcp/tools") assert response.status_code == 401 data = response.json() assert "Authentication failed" in data["error"] def test_list_tools_with_invalid_key(client): """Test /mcp/tools with invalid API key.""" response = client.get( "/mcp/tools", headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"} ) assert response.status_code == 401 def test_list_tools_with_valid_key(client, mock_env): """Test /mcp/tools with valid API key.""" response = client.get( "/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"} ) assert response.status_code == 200 data = response.json() assert "tools" in data assert len(data["tools"]) > 0 # Check tool structure tool = data["tools"][0] assert "name" in tool assert "description" in tool assert "input_schema" in tool def test_list_tools_no_auth_when_disabled(client_no_auth): """Test that /mcp/tools works without auth when disabled.""" response = client_no_auth.get("/mcp/tools") # Should work without Authorization header when auth is disabled assert response.status_code == 200 data = response.json() assert "tools" in data def test_call_tool_without_auth(client): """Test that /mcp/tool/call requires authentication.""" response = client.post( "/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}} ) assert response.status_code == 401 def test_call_tool_with_invalid_key(client): """Test /mcp/tool/call with invalid API key.""" response = client.post( "/mcp/tool/call", headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, json={"tool": "list_repositories", "arguments": {}} ) assert response.status_code == 401 def test_call_nonexistent_tool(client): """Test calling a tool that doesn't exist.""" response = client.post( "/mcp/tool/call", headers={"Authorization": f"Bearer {'a' * 64}"}, json={"tool": "nonexistent_tool", "arguments": {}} ) assert response.status_code == 404 data = response.json() assert "not found" in data["detail"].lower() def test_sse_endpoint_without_auth(client): """Test that SSE endpoint requires authentication.""" response = client.get("/mcp/sse") assert response.status_code == 401 def test_auth_header_formats(client): """Test various Authorization header formats.""" # Missing "Bearer" prefix response = client.get( "/mcp/tools", headers={"Authorization": "a" * 64} ) assert response.status_code == 401 # Wrong case response = client.get( "/mcp/tools", headers={"Authorization": "bearer " + "a" * 64} ) assert response.status_code == 401 # Extra spaces response = client.get( "/mcp/tools", headers={"Authorization": f"Bearer {'a' * 64}"} ) assert response.status_code == 401 def test_rate_limiting(client): """Test rate limiting after multiple failed auth attempts.""" # Make 6 failed attempts for i in range(6): response = client.get( "/mcp/tools", headers={"Authorization": "Bearer " + "b" * 64} ) # Last response should mention rate limiting data = response.json() assert "Too many failed" in data["message"]