Files
AegisGitea-MCP/tests/test_integration.py
2026-01-31 15:55:22 +00:00

207 lines
7.3 KiB
Python

"""Integration tests for the complete system."""
import pytest
from fastapi.testclient import TestClient
from aegis_gitea_mcp.auth import reset_validator
from aegis_gitea_mcp.config import reset_settings
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state between tests."""
reset_settings()
reset_validator()
yield
reset_settings()
reset_validator()
@pytest.fixture
def full_env(monkeypatch):
"""Set up complete test environment."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-gitea-token-12345")
monkeypatch.setenv("AUTH_ENABLED", "true")
monkeypatch.setenv("MCP_API_KEYS", f"{'a' * 64},{'b' * 64}")
monkeypatch.setenv("MCP_HOST", "0.0.0.0")
monkeypatch.setenv("MCP_PORT", "8080")
monkeypatch.setenv("LOG_LEVEL", "INFO")
monkeypatch.setenv("MAX_AUTH_FAILURES", "5")
monkeypatch.setenv("AUTH_FAILURE_WINDOW", "300")
@pytest.fixture
def client(full_env):
"""Create test client with full environment."""
from aegis_gitea_mcp.server import app
return TestClient(app)
def test_complete_authentication_flow(client):
"""Test complete authentication flow from start to finish."""
# 1. Health check should work without auth
response = client.get("/health")
assert response.status_code == 200
# 2. Tool listing should work without auth (Mixed mode for ChatGPT)
response = client.get("/mcp/tools")
assert response.status_code == 200
# 3. Protected endpoint (tool execution) should reject without auth
response = client.post("/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}})
assert response.status_code == 401
# 4. Protected endpoint should reject with invalid key
response = client.post(
"/mcp/tool/call",
headers={"Authorization": "Bearer " + "c" * 64},
json={"tool": "list_repositories", "arguments": {}},
)
assert response.status_code == 401
# 5. Protected endpoint should pass auth with valid key (first key)
# Note: May fail with 500 due to missing Gitea connection, but auth passes
response = client.post(
"/mcp/tool/call",
headers={"Authorization": "Bearer " + "a" * 64},
json={"tool": "list_repositories", "arguments": {}},
)
assert response.status_code != 401
# 6. Protected endpoint should pass auth with valid key (second key)
response = client.post(
"/mcp/tool/call",
headers={"Authorization": "Bearer " + "b" * 64},
json={"tool": "list_repositories", "arguments": {}},
)
assert response.status_code != 401
def test_key_rotation_simulation(client, monkeypatch):
"""Simulate key rotation with grace period."""
# Start with key A
response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64})
assert response.status_code == 200
# Both keys A and B work (grace period)
response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64})
assert response.status_code == 200
response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "b" * 64})
assert response.status_code == 200
def test_multiple_tool_calls_with_auth(client):
"""Test multiple tool calls with authentication."""
headers = {"Authorization": "Bearer " + "a" * 64}
# List tools
response = client.get("/mcp/tools", headers=headers)
assert response.status_code == 200
tools = response.json()["tools"]
# Try to call each tool (will fail without proper Gitea connection, but auth should work)
for tool in tools:
response = client.post(
"/mcp/tool/call", headers=headers, json={"tool": tool["name"], "arguments": {}}
)
# Should pass auth but may fail on actual execution (Gitea not available in tests)
assert response.status_code != 401 # Not auth error
def test_concurrent_requests_different_ips(client):
"""Test that different IPs are tracked separately for rate limiting."""
# This is a simplified test since we can't easily simulate different IPs in TestClient
# But we can verify rate limiting works for single IP
headers_invalid = {"Authorization": "Bearer " + "x" * 64}
tool_call_data = {"tool": "list_repositories", "arguments": {}}
# Make 5 failed attempts on protected endpoint
for i in range(5):
response = client.post("/mcp/tool/call", headers=headers_invalid, json=tool_call_data)
assert response.status_code == 401
# 6th attempt should be rate limited
response = client.post("/mcp/tool/call", headers=headers_invalid, json=tool_call_data)
assert response.status_code == 401
data = response.json()
assert "Too many failed" in data["message"]
# Note: Rate limiting is IP-based, so even valid keys from the same IP are blocked
# This is a security feature to prevent brute force attacks
response = client.post(
"/mcp/tool/call", headers={"Authorization": "Bearer " + "a" * 64}, json=tool_call_data
)
# After rate limit is triggered, all requests from that IP are blocked
assert response.status_code == 401
def test_all_mcp_tools_discoverable(client):
"""Test that all MCP tools are properly registered and discoverable."""
response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64})
assert response.status_code == 200
data = response.json()
tools = data["tools"]
# Expected tools
expected_tools = [
"list_repositories",
"get_repository_info",
"get_file_tree",
"get_file_contents",
]
tool_names = [tool["name"] for tool in tools]
for expected in expected_tools:
assert expected in tool_names, f"Tool {expected} not found in registered tools"
# Verify each tool has required fields
for tool in tools:
assert "name" in tool
assert "description" in tool
assert "input_schema" in tool
assert tool["description"] # Not empty
assert "type" in tool["input_schema"]
def test_error_responses_include_helpful_messages(client):
"""Test that error responses include helpful messages for users."""
tool_data = {"tool": "list_repositories", "arguments": {}}
# Missing auth on protected endpoint
response = client.post("/mcp/tool/call", json=tool_data)
assert response.status_code == 401
data = response.json()
assert "Authorization" in data["detail"] or "Authentication" in data["error"]
# Invalid key format
response = client.post(
"/mcp/tool/call", headers={"Authorization": "Bearer short"}, json=tool_data
)
assert response.status_code == 401
data = response.json()
assert (
"Invalid" in data.get("message", "")
or "format" in data.get("message", "").lower()
or "Authentication" in data.get("error", "")
)
def test_audit_logging_integration(client, tmp_path, monkeypatch):
"""Test that audit logging works with authentication."""
# Set audit log to temp file
audit_log = tmp_path / "audit.log"
monkeypatch.setenv("AUDIT_LOG_PATH", str(audit_log))
# Make authenticated request
response = client.get("/mcp/tools", headers={"Authorization": "Bearer " + "a" * 64})
assert response.status_code == 200
# Note: In real system, audit logs would be written
# This test verifies the system doesn't crash with audit logging enabled