feat(stdio): harden local MCP transport and add end-to-end tests

Reserve stdout for the JSON-RPC stream: _configure_stderr_logging() pins all
logging to stderr (and rewrites any stray stdout handler) so a log line can
never corrupt the stdio protocol. Extract a pure, testable build_server() from
_serve(). Add end-to-end tests over the mcp in-memory transport (initialize +
tools/list + tools/call), covering a successful round trip and a policy denial
surfaced as an MCP error.
This commit is contained in:
2026-06-27 15:19:42 +02:00
parent f660fa32c1
commit 5d4a98d06e
2 changed files with 117 additions and 13 deletions
+47 -13
View File
@@ -20,6 +20,7 @@ log all run exactly as they do on the server. The same tools (including
from __future__ import annotations
import asyncio
import logging
import os
import sys
from pathlib import Path
@@ -165,24 +166,36 @@ async def _dispatch(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]
return result
async def _serve() -> None:
"""Build the stdio MCP server from the shared registry and serve it."""
def _configure_stderr_logging() -> None:
"""Pin all logging to stderr so the stdout JSON-RPC channel stays clean.
The stdio MCP transport speaks JSON-RPC over stdout; a single stray log line
on stdout corrupts the stream and breaks the client. ``configure_logging``
already targets stderr, but we additionally rewrite any handler that points
at stdout (e.g. a library that called ``basicConfig``) so nothing can leak.
"""
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.logging_utils import configure_logging
configure_logging(get_settings().log_level)
root = logging.getLogger()
for handler in root.handlers:
if isinstance(handler, logging.StreamHandler) and handler.stream is sys.stdout:
handler.setStream(sys.stderr)
def build_server() -> Any:
"""Build (but do not run) the stdio MCP ``Server`` from the shared registry.
Kept separate from :func:`_serve` so it can be driven in-process by tests
over an in-memory transport without opening real stdio streams.
"""
import mcp.types as mcp_types
from mcp.server import Server
from mcp.server.stdio import stdio_server
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.policy import get_policy_engine
from aegis_gitea_mcp.registry import list_tool_definitions
# Fail fast on bad settings/policy before opening the transport.
get_settings()
get_policy_engine()
global _owner_login
_owner_login = await _resolve_owner_login()
server: Server = Server("aegis-gitea-mcp")
server: Any = Server("aegis-gitea-mcp")
@server.list_tools()
async def list_tools() -> list[mcp_types.Tool]:
@@ -200,6 +213,24 @@ async def _serve() -> None:
# Returning a dict yields structured content plus a JSON text block.
return await _dispatch(name, arguments)
return server
async def _serve() -> None:
"""Resolve identity and serve the stdio MCP server over real stdin/stdout."""
from mcp.server.stdio import stdio_server
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.policy import get_policy_engine
# Fail fast on bad settings/policy before opening the transport.
get_settings()
get_policy_engine()
global _owner_login
_owner_login = await _resolve_owner_login()
server = build_server()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
@@ -221,6 +252,9 @@ def main() -> None:
print(f"aegis-gitea-mcp: invalid configuration: {exc}", file=sys.stderr)
raise SystemExit(2) from exc
# Keep stdout reserved for the JSON-RPC stream; all logs go to stderr.
_configure_stderr_logging()
try:
asyncio.run(_serve())
except StdioConfigError as exc:
+70
View File
@@ -73,6 +73,19 @@ def test_default_audit_log_path_is_user_scoped() -> None:
assert "aegis-gitea-mcp" in str(path)
def test_configure_stderr_logging_keeps_stdout_clean(stdio_env: None) -> None:
"""No log handler may target stdout (it is the JSON-RPC channel)."""
import logging
import sys
# Simulate a library that attached a stdout handler.
root = logging.getLogger()
root.addHandler(logging.StreamHandler(sys.stdout))
stdio_app._configure_stderr_logging()
for handler in logging.getLogger().handlers:
assert getattr(handler, "stream", sys.stderr) is not sys.stdout
def test_main_exits_when_env_missing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("GITEA_URL", raising=False)
monkeypatch.delenv("GITEA_TOKEN", raising=False)
@@ -138,3 +151,60 @@ async def test_dispatch_pins_owner_login_and_returns(
assert result["name"] == "app"
# The dispatch pinned the trusted PAT owner onto the request context.
assert get_gitea_user_login() == "alice"
# --- End-to-end over the MCP protocol (in-memory transport) -----------------
async def test_build_server_exposes_registry_tools(stdio_env: None) -> None:
"""build_server() wires every registry tool, including gitea_request."""
from mcp.shared.memory import create_connected_server_and_client_session
server = stdio_app.build_server()
async with create_connected_server_and_client_session(server) as client:
await client.initialize()
listed = await client.list_tools()
names = {t.name for t in listed.tools}
assert "get_repository_info" in names
assert "gitea_request" in names
assert len(names) >= 40
async def test_stdio_tool_call_round_trip(stdio_env: None, monkeypatch: pytest.MonkeyPatch) -> None:
"""A tools/call over the MCP protocol dispatches through the shared core."""
from mcp.shared.memory import create_connected_server_and_client_session
monkeypatch.setattr(stdio_app, "_owner_login", "alice")
patcher = _patch_gitea_client(
get_repository={"owner": {"login": "acme"}, "name": "app", "full_name": "acme/app"}
)
try:
server = stdio_app.build_server()
async with create_connected_server_and_client_session(server) as client:
await client.initialize()
result = await client.call_tool("get_repository_info", {"owner": "acme", "repo": "app"})
finally:
patcher.stop()
assert result.isError is False
text = "".join(getattr(block, "text", "") for block in result.content)
assert "app" in text
async def test_stdio_tool_call_policy_denial_is_reported(
stdio_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
"""A write tool denied by WRITE_MODE surfaces as an MCP error, not a crash."""
from mcp.shared.memory import create_connected_server_and_client_session
monkeypatch.setattr(stdio_app, "_owner_login", "alice")
server = stdio_app.build_server()
async with create_connected_server_and_client_session(server) as client:
await client.initialize()
result = await client.call_tool(
"create_issue", {"owner": "acme", "repo": "app", "title": "x"}
)
assert result.isError is True
text = "".join(getattr(block, "text", "") for block in result.content)
assert "write mode is disabled" in text