Files
AegisGitea-MCP/src/aegis_gitea_mcp/server.py
latte b990c6c527 feat: allow api_key query parameter for ChatGPT UI
ChatGPT UI lacks custom header support for MCP servers. Added
query parameter fallback (?api_key=) alongside Authorization
header to authenticate requests.

Updated tests to cover query param authentication.
2026-01-29 21:03:05 +01:00

301 lines
8.9 KiB
Python

"""Main MCP server implementation with FastAPI and SSE support."""
import logging
from typing import Any, Dict
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import ValidationError
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.auth import get_validator
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.gitea_client import GiteaClient
from aegis_gitea_mcp.mcp_protocol import (
AVAILABLE_TOOLS,
MCPListToolsResponse,
MCPToolCallRequest,
MCPToolCallResponse,
get_tool_by_name,
)
from aegis_gitea_mcp.tools.repository import (
get_file_contents_tool,
get_file_tree_tool,
get_repository_info_tool,
list_repositories_tool,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="AegisGitea MCP Server",
description="Security-first MCP server for controlled AI access to self-hosted Gitea",
version="0.1.0",
)
# Global settings, audit logger, and auth validator
settings = get_settings()
audit = get_audit_logger()
auth_validator = get_validator()
# Tool dispatcher mapping
TOOL_HANDLERS = {
"list_repositories": list_repositories_tool,
"get_repository_info": get_repository_info_tool,
"get_file_tree": get_file_tree_tool,
"get_file_contents": get_file_contents_tool,
}
# Authentication middleware
@app.middleware("http")
async def authenticate_request(request: Request, call_next):
"""Authenticate all requests except health checks and root."""
# Skip authentication for health check and root endpoints
if request.url.path in ["/", "/health"]:
return await call_next(request)
# Only authenticate MCP endpoints
if not request.url.path.startswith("/mcp/"):
return await call_next(request)
# Extract client information
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
# Extract Authorization header
auth_header = request.headers.get("authorization")
api_key = auth_validator.extract_bearer_token(auth_header)
# Fallback: allow API key via query parameter (for ChatGPT UI without headers)
if not api_key:
api_key = request.query_params.get("api_key")
# Validate API key
is_valid, error_message = auth_validator.validate_api_key(api_key, client_ip, user_agent)
if not is_valid:
return JSONResponse(
status_code=401,
content={
"error": "Authentication failed",
"message": error_message,
"detail": (
"Provide a valid API key via Authorization header (Bearer <api-key>) "
"or ?api_key=<api-key> query parameter"
),
},
)
# Authentication successful - continue to endpoint
response = await call_next(request)
return response
@app.on_event("startup")
async def startup_event() -> None:
"""Initialize server on startup."""
logger.info(f"Starting AegisGitea MCP Server on {settings.mcp_host}:{settings.mcp_port}")
logger.info(f"Connected to Gitea instance: {settings.gitea_base_url}")
logger.info(f"Audit logging enabled: {settings.audit_log_path}")
# Log authentication status
if settings.auth_enabled:
key_count = len(settings.mcp_api_keys)
logger.info(f"API key authentication ENABLED ({key_count} key(s) configured)")
else:
logger.warning("API key authentication DISABLED - server is open to all requests!")
# Test Gitea connection
try:
async with GiteaClient() as gitea:
user = await gitea.get_current_user()
logger.info(f"Authenticated as bot user: {user.get('login', 'unknown')}")
except Exception as e:
logger.error(f"Failed to connect to Gitea: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event() -> None:
"""Cleanup on server shutdown."""
logger.info("Shutting down AegisGitea MCP Server")
@app.get("/")
async def root() -> Dict[str, Any]:
"""Root endpoint with server information."""
return {
"name": "AegisGitea MCP Server",
"version": "0.1.0",
"status": "running",
"mcp_version": "1.0",
}
@app.get("/health")
async def health() -> Dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}
@app.get("/mcp/tools")
async def list_tools() -> JSONResponse:
"""List all available MCP tools.
Returns:
JSON response with list of tool definitions
"""
response = MCPListToolsResponse(tools=AVAILABLE_TOOLS)
return JSONResponse(content=response.model_dump())
@app.post("/mcp/tool/call")
async def call_tool(request: MCPToolCallRequest) -> JSONResponse:
"""Execute an MCP tool call.
Args:
request: Tool call request with tool name and arguments
Returns:
JSON response with tool execution result
"""
correlation_id = request.correlation_id or audit.log_tool_invocation(
tool_name=request.tool,
params=request.arguments,
)
try:
# Validate tool exists
tool_def = get_tool_by_name(request.tool)
if not tool_def:
error_msg = f"Tool '{request.tool}' not found"
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error=error_msg,
)
raise HTTPException(status_code=404, detail=error_msg)
# Get tool handler
handler = TOOL_HANDLERS.get(request.tool)
if not handler:
error_msg = f"Tool '{request.tool}' has no handler implementation"
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error=error_msg,
)
raise HTTPException(status_code=500, detail=error_msg)
# Execute tool with Gitea client
async with GiteaClient() as gitea:
result = await handler(gitea, request.arguments)
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="success",
)
response = MCPToolCallResponse(
success=True,
result=result,
correlation_id=correlation_id,
)
return JSONResponse(content=response.model_dump())
except ValidationError as e:
error_msg = f"Invalid arguments: {str(e)}"
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error=error_msg,
)
raise HTTPException(status_code=400, detail=error_msg)
except Exception as e:
error_msg = str(e)
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error=error_msg,
)
response = MCPToolCallResponse(
success=False,
error=error_msg,
correlation_id=correlation_id,
)
return JSONResponse(content=response.model_dump(), status_code=500)
@app.get("/mcp/sse")
async def sse_endpoint(request: Request) -> StreamingResponse:
"""Server-Sent Events endpoint for MCP protocol.
This enables real-time communication with ChatGPT using SSE.
Returns:
Streaming SSE response
"""
async def event_stream():
"""Generate SSE events."""
# Send initial connection event
yield f"data: {{'event': 'connected', 'server': 'AegisGitea MCP', 'version': '0.1.0'}}\n\n"
# Keep connection alive
try:
while True:
if await request.is_disconnected():
break
# Heartbeat every 30 seconds
yield f"data: {{'event': 'heartbeat'}}\n\n"
# Wait for next heartbeat (in production, this would handle actual events)
import asyncio
await asyncio.sleep(30)
except Exception as e:
logger.error(f"SSE stream error: {e}")
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
def main() -> None:
"""Run the MCP server."""
import uvicorn
settings = get_settings()
uvicorn.run(
"aegis_gitea_mcp.server:app",
host=settings.mcp_host,
port=settings.mcp_port,
log_level=settings.log_level.lower(),
reload=False,
)
if __name__ == "__main__":
main()