From f52e99e3287674dae739295d8e1a0b4d7541d2ed Mon Sep 17 00:00:00 2001 From: latte Date: Thu, 29 Jan 2026 20:45:44 +0100 Subject: [PATCH] test: add comprehensive test suite for authentication system Added three test modules covering: - test_auth.py: Unit tests for authentication module - API key generation and validation - Rate limiting - Multiple keys support - Constant-time comparison - test_server.py: Server endpoint tests - Authentication middleware - Protected vs public endpoints - Various auth header formats - Rate limiting at endpoint level - test_integration.py: Integration tests - Complete authentication flow - Key rotation simulation - Multiple tool discovery - Error message validation All tests verify functionality without breaking existing features. --- tests/test_auth.py | 208 ++++++++++++++++++++++++++++++++++++++ tests/test_integration.py | 208 ++++++++++++++++++++++++++++++++++++++ tests/test_server.py | 205 +++++++++++++++++++++++++++++++++++++ 3 files changed, 621 insertions(+) create mode 100644 tests/test_auth.py create mode 100644 tests/test_integration.py create mode 100644 tests/test_server.py diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..597a528 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,208 @@ +"""Tests for authentication module.""" + +import pytest + +from aegis_gitea_mcp.auth import ( + APIKeyValidator, + generate_api_key, + hash_api_key, + get_validator, + reset_validator, +) +from aegis_gitea_mcp.config import reset_settings + + +@pytest.fixture(autouse=True) +def reset_auth(): + """Reset authentication state between tests.""" + reset_validator() + reset_settings() + yield + reset_validator() + reset_settings() + + +@pytest.fixture +def mock_env_with_key(monkeypatch): + """Set up environment with valid API key.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "true") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) # 64-char key + + +@pytest.fixture +def validator(mock_env_with_key): + """Create API key validator instance.""" + return APIKeyValidator() + + +def test_generate_api_key(): + """Test API key generation.""" + key = generate_api_key(length=64) + + assert len(key) == 64 + assert all(c in "0123456789abcdef" for c in key) + + +def test_generate_api_key_custom_length(): + """Test API key generation with custom length.""" + key = generate_api_key(length=128) + + assert len(key) == 128 + + +def test_hash_api_key(): + """Test API key hashing.""" + key = "test-key-12345" + hashed = hash_api_key(key) + + assert len(hashed) == 64 # SHA256 produces 64-char hex string + assert hashed == hash_api_key(key) # Deterministic + + +def test_validator_singleton(): + """Test that get_validator returns same instance.""" + validator1 = get_validator() + validator2 = get_validator() + + assert validator1 is validator2 + + +def test_constant_time_compare(validator): + """Test constant-time string comparison.""" + # Same strings + assert validator._constant_time_compare("test", "test") + + # Different strings + assert not validator._constant_time_compare("test", "fail") + + # Different lengths + assert not validator._constant_time_compare("test", "testing") + + +def test_extract_bearer_token(validator): + """Test bearer token extraction from Authorization header.""" + # Valid bearer token + token = validator.extract_bearer_token("Bearer abc123") + assert token == "abc123" + + # No header + token = validator.extract_bearer_token(None) + assert token is None + + # Wrong format + token = validator.extract_bearer_token("abc123") + assert token is None + + # Wrong scheme + token = validator.extract_bearer_token("Basic abc123") + assert token is None + + # Too many parts + token = validator.extract_bearer_token("Bearer abc 123") + assert token is None + + +def test_validate_api_key_missing(validator): + """Test validation with missing API key.""" + is_valid, error = validator.validate_api_key(None, "127.0.0.1", "test-agent") + + assert not is_valid + assert "Authorization header missing" in error + + +def test_validate_api_key_too_short(validator): + """Test validation with too-short API key.""" + is_valid, error = validator.validate_api_key("short", "127.0.0.1", "test-agent") + + assert not is_valid + assert "Invalid API key format" in error + + +def test_validate_api_key_invalid(validator, mock_env_with_key): + """Test validation with invalid API key.""" + is_valid, error = validator.validate_api_key("b" * 64, "127.0.0.1", "test-agent") + + assert not is_valid + assert "Invalid API key" in error + + +def test_validate_api_key_valid(validator, mock_env_with_key): + """Test validation with valid API key.""" + is_valid, error = validator.validate_api_key("a" * 64, "127.0.0.1", "test-agent") + + assert is_valid + assert error is None + + +def test_rate_limiting(validator): + """Test rate limiting after multiple failures.""" + client_ip = "192.168.1.1" + + # First 5 attempts should be allowed + for i in range(5): + is_valid, error = validator.validate_api_key("b" * 64, client_ip, "test-agent") + assert not is_valid + + # 6th attempt should be rate limited + is_valid, error = validator.validate_api_key("b" * 64, client_ip, "test-agent") + assert not is_valid + assert "Too many failed" in error + + +def test_rate_limiting_per_ip(validator): + """Test that rate limiting is per IP address.""" + ip1 = "192.168.1.1" + ip2 = "192.168.1.2" + + # Fail 5 times from IP1 + for i in range(5): + validator.validate_api_key("b" * 64, ip1, "test-agent") + + # IP1 should be rate limited + is_valid, error = validator.validate_api_key("b" * 64, ip1, "test-agent") + assert "Too many failed" in error + + # IP2 should still work + is_valid, error = validator.validate_api_key("b" * 64, ip2, "test-agent") + assert "Invalid API key" in error # Wrong key, but not rate limited + + +def test_auth_disabled(monkeypatch): + """Test that authentication can be disabled.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "false") + monkeypatch.setenv("MCP_API_KEYS", "") # No keys needed when disabled + + validator = APIKeyValidator() + + # Should allow access without key + is_valid, error = validator.validate_api_key(None, "127.0.0.1", "test-agent") + assert is_valid + assert error is None + + +def test_multiple_keys(monkeypatch): + """Test validation with multiple API keys.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "true") + monkeypatch.setenv("MCP_API_KEYS", f"{'a' * 64},{'b' * 64},{'c' * 64}") + + validator = APIKeyValidator() + + # All three keys should work + is_valid, _ = validator.validate_api_key("a" * 64, "127.0.0.1", "test-agent") + assert is_valid + + is_valid, _ = validator.validate_api_key("b" * 64, "127.0.0.1", "test-agent") + assert is_valid + + is_valid, _ = validator.validate_api_key("c" * 64, "127.0.0.1", "test-agent") + assert is_valid + + # Invalid key should fail + is_valid, _ = validator.validate_api_key("d" * 64, "127.0.0.1", "test-agent") + assert not is_valid diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..38b7b5d --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,208 @@ +"""Integration tests for the complete system.""" + +import pytest +from fastapi.testclient import TestClient + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.auth import reset_validator + + +@pytest.fixture(autouse=True) +def reset_state(): + """Reset global state between tests.""" + reset_settings() + reset_validator() + yield + reset_settings() + reset_validator() + + +@pytest.fixture +def full_env(monkeypatch): + """Set up complete test environment.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "true") + monkeypatch.setenv("MCP_API_KEYS", f"{'a' * 64},{'b' * 64}") + monkeypatch.setenv("MCP_HOST", "0.0.0.0") + monkeypatch.setenv("MCP_PORT", "8080") + monkeypatch.setenv("LOG_LEVEL", "INFO") + monkeypatch.setenv("MAX_AUTH_FAILURES", "5") + monkeypatch.setenv("AUTH_FAILURE_WINDOW", "300") + + +@pytest.fixture +def client(full_env): + """Create test client with full environment.""" + from aegis_gitea_mcp.server import app + return TestClient(app) + + +def test_complete_authentication_flow(client): + """Test complete authentication flow from start to finish.""" + # 1. Health check should work without auth + response = client.get("/health") + assert response.status_code == 200 + + # 2. Protected endpoint should reject without auth + response = client.get("/mcp/tools") + assert response.status_code == 401 + + # 3. Protected endpoint should reject with invalid key + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "c" * 64} + ) + assert response.status_code == 401 + + # 4. Protected endpoint should accept with valid key (first key) + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + assert response.status_code == 200 + + # 5. Protected endpoint should accept with valid key (second key) + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "b" * 64} + ) + assert response.status_code == 200 + + +def test_key_rotation_simulation(client, monkeypatch): + """Simulate key rotation with grace period.""" + # Start with key A + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + assert response.status_code == 200 + + # Both keys A and B work (grace period) + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + assert response.status_code == 200 + + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "b" * 64} + ) + assert response.status_code == 200 + + +def test_multiple_tool_calls_with_auth(client): + """Test multiple tool calls with authentication.""" + headers = {"Authorization": "Bearer " + "a" * 64} + + # List tools + response = client.get("/mcp/tools", headers=headers) + assert response.status_code == 200 + tools = response.json()["tools"] + + # Try to call each tool (will fail without proper Gitea connection, but auth should work) + for tool in tools: + response = client.post( + "/mcp/tool/call", + headers=headers, + json={"tool": tool["name"], "arguments": {}} + ) + # Should pass auth but may fail on actual execution (Gitea not available in tests) + assert response.status_code != 401 # Not auth error + + +def test_concurrent_requests_different_ips(client): + """Test that different IPs are tracked separately for rate limiting.""" + # This is a simplified test since we can't easily simulate different IPs in TestClient + # But we can verify rate limiting works for single IP + + headers_invalid = {"Authorization": "Bearer " + "x" * 64} + + # Make 5 failed attempts + for i in range(5): + response = client.get("/mcp/tools", headers=headers_invalid) + assert response.status_code == 401 + + # 6th attempt should be rate limited + response = client.get("/mcp/tools", headers=headers_invalid) + assert response.status_code == 401 + data = response.json() + assert "Too many failed" in data["message"] + + # Valid key should still work (not rate limited for valid keys) + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + assert response.status_code == 200 + + +def test_all_mcp_tools_discoverable(client): + """Test that all MCP tools are properly registered and discoverable.""" + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + + assert response.status_code == 200 + data = response.json() + tools = data["tools"] + + # Expected tools + expected_tools = [ + "list_repositories", + "get_repository_info", + "get_file_tree", + "get_file_contents", + ] + + tool_names = [tool["name"] for tool in tools] + + for expected in expected_tools: + assert expected in tool_names, f"Tool {expected} not found in registered tools" + + # Verify each tool has required fields + for tool in tools: + assert "name" in tool + assert "description" in tool + assert "input_schema" in tool + assert tool["description"] # Not empty + assert "type" in tool["input_schema"] + + +def test_error_responses_include_helpful_messages(client): + """Test that error responses include helpful messages for users.""" + # Missing auth + response = client.get("/mcp/tools") + assert response.status_code == 401 + data = response.json() + assert "Authorization" in data["detail"] + assert "Bearer" in data["detail"] + + # Invalid key format + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer short"} + ) + assert response.status_code == 401 + data = response.json() + assert "Invalid" in data["message"] or "format" in data["message"].lower() + + +def test_audit_logging_integration(client, tmp_path, monkeypatch): + """Test that audit logging works with authentication.""" + # Set audit log to temp file + audit_log = tmp_path / "audit.log" + monkeypatch.setenv("AUDIT_LOG_PATH", str(audit_log)) + + # Make authenticated request + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "a" * 64} + ) + assert response.status_code == 200 + + # Note: In real system, audit logs would be written + # This test verifies the system doesn't crash with audit logging enabled diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..211736a --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,205 @@ +"""Tests for MCP server endpoints.""" + +import pytest +from fastapi.testclient import TestClient + +from aegis_gitea_mcp.config import reset_settings +from aegis_gitea_mcp.auth import reset_validator + + +@pytest.fixture(autouse=True) +def reset_state(): + """Reset global state between tests.""" + reset_settings() + reset_validator() + yield + reset_settings() + reset_validator() + + +@pytest.fixture +def mock_env(monkeypatch): + """Set up test environment.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "true") + monkeypatch.setenv("MCP_API_KEYS", "a" * 64) + + +@pytest.fixture +def mock_env_auth_disabled(monkeypatch): + """Set up test environment with auth disabled.""" + monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") + monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345") + monkeypatch.setenv("AUTH_ENABLED", "false") + monkeypatch.setenv("MCP_API_KEYS", "") + + +@pytest.fixture +def client(mock_env): + """Create test client.""" + # Import after setting env vars + from aegis_gitea_mcp.server import app + return TestClient(app) + + +@pytest.fixture +def client_no_auth(mock_env_auth_disabled): + """Create test client with auth disabled.""" + from aegis_gitea_mcp.server import app + return TestClient(app) + + +def test_root_endpoint(client): + """Test root endpoint returns server info.""" + response = client.get("/") + + assert response.status_code == 200 + data = response.json() + assert data["name"] == "AegisGitea MCP Server" + assert "version" in data + assert data["status"] == "running" + + +def test_health_endpoint(client): + """Test health check endpoint.""" + response = client.get("/health") + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "healthy" + + +def test_health_endpoint_no_auth_required(client): + """Test that health check doesn't require authentication.""" + response = client.get("/health") + + # Should work without Authorization header + assert response.status_code == 200 + + +def test_list_tools_without_auth(client): + """Test that /mcp/tools requires authentication.""" + response = client.get("/mcp/tools") + + assert response.status_code == 401 + data = response.json() + assert "Authentication failed" in data["error"] + + +def test_list_tools_with_invalid_key(client): + """Test /mcp/tools with invalid API key.""" + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"} + ) + + assert response.status_code == 401 + + +def test_list_tools_with_valid_key(client, mock_env): + """Test /mcp/tools with valid API key.""" + response = client.get( + "/mcp/tools", + headers={"Authorization": f"Bearer {'a' * 64}"} + ) + + assert response.status_code == 200 + data = response.json() + assert "tools" in data + assert len(data["tools"]) > 0 + + # Check tool structure + tool = data["tools"][0] + assert "name" in tool + assert "description" in tool + assert "input_schema" in tool + + +def test_list_tools_no_auth_when_disabled(client_no_auth): + """Test that /mcp/tools works without auth when disabled.""" + response = client_no_auth.get("/mcp/tools") + + # Should work without Authorization header when auth is disabled + assert response.status_code == 200 + data = response.json() + assert "tools" in data + + +def test_call_tool_without_auth(client): + """Test that /mcp/tool/call requires authentication.""" + response = client.post( + "/mcp/tool/call", + json={"tool": "list_repositories", "arguments": {}} + ) + + assert response.status_code == 401 + + +def test_call_tool_with_invalid_key(client): + """Test /mcp/tool/call with invalid API key.""" + response = client.post( + "/mcp/tool/call", + headers={"Authorization": "Bearer invalid-key-12345678901234567890123456789012"}, + json={"tool": "list_repositories", "arguments": {}} + ) + + assert response.status_code == 401 + + +def test_call_nonexistent_tool(client): + """Test calling a tool that doesn't exist.""" + response = client.post( + "/mcp/tool/call", + headers={"Authorization": f"Bearer {'a' * 64}"}, + json={"tool": "nonexistent_tool", "arguments": {}} + ) + + assert response.status_code == 404 + data = response.json() + assert "not found" in data["detail"].lower() + + +def test_sse_endpoint_without_auth(client): + """Test that SSE endpoint requires authentication.""" + response = client.get("/mcp/sse") + + assert response.status_code == 401 + + +def test_auth_header_formats(client): + """Test various Authorization header formats.""" + # Missing "Bearer" prefix + response = client.get( + "/mcp/tools", + headers={"Authorization": "a" * 64} + ) + assert response.status_code == 401 + + # Wrong case + response = client.get( + "/mcp/tools", + headers={"Authorization": "bearer " + "a" * 64} + ) + assert response.status_code == 401 + + # Extra spaces + response = client.get( + "/mcp/tools", + headers={"Authorization": f"Bearer {'a' * 64}"} + ) + assert response.status_code == 401 + + +def test_rate_limiting(client): + """Test rate limiting after multiple failed auth attempts.""" + # Make 6 failed attempts + for i in range(6): + response = client.get( + "/mcp/tools", + headers={"Authorization": "Bearer " + "b" * 64} + ) + + # Last response should mention rate limiting + data = response.json() + assert "Too many failed" in data["message"]