feat: harden Claude MCP OAuth transport

This commit is contained in:
2026-06-13 21:05:11 +02:00
parent ed3130ef74
commit 541124e92a
11 changed files with 1377 additions and 68 deletions
+7
View File
@@ -9,9 +9,11 @@ from aegis_gitea_mcp.audit import reset_audit_logger
from aegis_gitea_mcp.auth import reset_validator
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.oauth import reset_oauth_validator
from aegis_gitea_mcp.oauth_flow import reset_oauth_client_registry
from aegis_gitea_mcp.observability import reset_metrics_registry
from aegis_gitea_mcp.policy import reset_policy_engine
from aegis_gitea_mcp.rate_limit import reset_rate_limiter
from aegis_gitea_mcp.server import reset_repo_authz_cache
@pytest.fixture(autouse=True)
@@ -22,6 +24,8 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[
reset_audit_logger()
reset_validator()
reset_oauth_validator()
reset_oauth_client_registry()
reset_repo_authz_cache()
reset_policy_engine()
reset_rate_limiter()
reset_metrics_registry()
@@ -37,6 +41,8 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[
reset_audit_logger()
reset_validator()
reset_oauth_validator()
reset_oauth_client_registry()
reset_repo_authz_cache()
reset_policy_engine()
reset_rate_limiter()
reset_metrics_registry()
@@ -66,4 +72,5 @@ def mock_env_oauth(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
+1
View File
@@ -28,6 +28,7 @@ def full_env(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("MCP_HOST", "127.0.0.1")
monkeypatch.setenv("MCP_PORT", "8080")
+240 -4
View File
@@ -10,6 +10,7 @@ from fastapi.testclient import TestClient
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.oauth import GiteaOAuthValidator, get_oauth_validator, reset_oauth_validator
from aegis_gitea_mcp.oauth_flow import OAuthClientRegistry, OAuthRegistrationRequest
from aegis_gitea_mcp.request_context import (
get_gitea_user_login,
get_gitea_user_token,
@@ -40,6 +41,7 @@ def mock_env_oauth(monkeypatch):
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
@@ -57,6 +59,24 @@ def oauth_client(mock_env_oauth):
return TestClient(app, raise_server_exceptions=False)
def _register_public_client(oauth_client: TestClient, redirect_uri: str) -> dict[str, str]:
"""Register a public OAuth client for test flows."""
response = oauth_client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": [redirect_uri],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
},
)
assert response.status_code == 200
payload = response.json()
assert "client_id" in payload
return payload
# ---------------------------------------------------------------------------
# GiteaOAuthValidator unit tests
# ---------------------------------------------------------------------------
@@ -248,19 +268,39 @@ def test_oauth_token_endpoint_available_when_oauth_mode_false(monkeypatch):
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with TestClient(app, raise_server_exceptions=False) as client:
response = client.post("/oauth/token", data={"code": "abc123"})
registration = client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": ["http://127.0.0.1:8080/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert registration.status_code == 200
client_id = registration.json()["client_id"]
response = client.post(
"/oauth/token",
data={"client_id": client_id, "code": "abc123", "code_verifier": "pkce"},
)
assert response.status_code == 200
def test_oauth_token_endpoint_missing_code(oauth_client):
"""POST /oauth/token without a code returns 400."""
response = oauth_client.post("/oauth/token", data={})
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
response = oauth_client.post(
"/oauth/token",
data={"client_id": client_data["client_id"], "code_verifier": "pkce"},
)
assert response.status_code == 400
def test_oauth_token_endpoint_proxy_success(oauth_client):
"""POST /oauth/token proxies successfully to Gitea and returns access_token."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
@@ -276,7 +316,11 @@ def test_oauth_token_endpoint_proxy_success(oauth_client):
response = oauth_client.post(
"/oauth/token",
data={"code": "auth-code-123", "redirect_uri": "https://chat.openai.com/callback"},
data={
"client_id": client_data["client_id"],
"code": "auth-code-123",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 200
@@ -286,6 +330,7 @@ def test_oauth_token_endpoint_proxy_success(oauth_client):
def test_oauth_token_endpoint_gitea_error(oauth_client):
"""POST /oauth/token propagates Gitea error status."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"error": "invalid_grant"}
@@ -296,11 +341,202 @@ def test_oauth_token_endpoint_gitea_error(oauth_client):
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
response = oauth_client.post("/oauth/token", data={"code": "bad-code"})
response = oauth_client.post(
"/oauth/token",
data={
"client_id": client_data["client_id"],
"code": "bad-code",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 400
def test_oauth_authorize_and_callback_round_trip(oauth_client):
"""OAuth authorize/callback round-trip preserves the original redirect URI and state."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
authorize_response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "original-state",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
follow_redirects=False,
)
assert authorize_response.status_code == 302
location = authorize_response.headers["location"]
assert "state=" in location
assert "redirect_uri=http%3A%2F%2F127.0.0.1%3A8080%2Fcallback" not in location
from urllib.parse import parse_qs, urlparse
parsed = urlparse(location)
query = parse_qs(parsed.query)
proxy_state = query["state"][0]
callback_response = oauth_client.get(
"/oauth/callback",
params={"state": proxy_state, "code": "auth-code-123"},
follow_redirects=False,
)
assert callback_response.status_code == 302
callback_location = callback_response.headers["location"]
assert callback_location.startswith("http://127.0.0.1:8080/callback?")
assert "code=auth-code-123" in callback_location
assert "state=original-state" in callback_location
def test_oauth_callback_rejects_tampered_state(oauth_client):
"""OAuth callback rejects modified signed proxy state."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
authorize_response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "original-state",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
follow_redirects=False,
)
from urllib.parse import parse_qs, urlparse
proxy_state = parse_qs(urlparse(authorize_response.headers["location"]).query)["state"][0]
tampered_state = proxy_state[:-1] + ("A" if proxy_state[-1] != "A" else "B")
callback_response = oauth_client.get(
"/oauth/callback",
params={"state": tampered_state, "code": "auth-code-123"},
)
assert callback_response.status_code == 400
@pytest.mark.parametrize(
"redirect_uri",
[
"https://claude.ai/api/mcp/auth_callback",
"https://claude.com/api/mcp/auth_callback",
],
)
def test_dcr_accepts_default_claude_callbacks(oauth_client, redirect_uri):
"""Claude's hosted connector callback URLs are allowed by default."""
response = oauth_client.post(
"/register",
json={
"client_name": "claude-client",
"redirect_uris": [redirect_uri],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
},
)
assert response.status_code == 200
def test_oauth_authorize_rejects_unknown_client(oauth_client):
"""OAuth authorize returns invalid_client for unregistered client IDs."""
response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": "unknown-client",
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_client"
def test_oauth_token_rejects_unknown_dcr_client(oauth_client):
"""Unknown dynamic clients receive RFC 6749 invalid_client from token endpoint."""
response = oauth_client.post(
"/oauth/token",
data={
"client_id": "deleted-or-unknown-client",
"code": "auth-code-123",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 401
assert response.json() == {"error": "invalid_client"}
def test_oauth_authorize_requires_pkce_s256(oauth_client):
"""Authorization endpoint enforces PKCE S256 for public clients."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
missing_challenge = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
},
)
plain_method = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
"code_challenge": "pkce-challenge",
"code_challenge_method": "plain",
},
)
assert missing_challenge.status_code == 400
assert plain_method.status_code == 400
def test_register_rejects_foreign_redirect_uri(oauth_client):
"""DCR rejects redirect URIs outside the allowlist and loopback/Claude patterns."""
response = oauth_client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": ["https://evil.example.com/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 400
def test_dcr_registry_persists_registered_clients(tmp_path):
"""Registered OAuth clients survive registry reloads."""
storage_path = tmp_path / "dcr_clients.json"
registry = OAuthClientRegistry(storage_path)
request = OAuthRegistrationRequest.model_validate(
{
"client_name": "persisted-client",
"redirect_uris": ["http://127.0.0.1:8080/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
}
)
response = registry.register(request)
reloaded = OAuthClientRegistry(storage_path)
assert reloaded.get(response["client_id"]) is not None
# ---------------------------------------------------------------------------
# Config validation tests
# ---------------------------------------------------------------------------
+67 -2
View File
@@ -25,13 +25,14 @@ def reset_state(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600")
yield
reset_settings()
reset_oauth_validator()
def _build_jwt_fixture() -> tuple[str, dict[str, object]]:
def _build_jwt_fixture(aud: str = "test-client-id") -> tuple[str, dict[str, object]]:
"""Generate RS256 access token and matching JWKS payload."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
@@ -44,7 +45,7 @@ def _build_jwt_fixture() -> tuple[str, dict[str, object]]:
"sub": "user-1",
"preferred_username": "alice",
"scope": "read:repository write:repository",
"aud": "test-client-id",
"aud": aud,
"iss": "https://gitea.example.com",
"iat": now,
"exp": now + 3600,
@@ -56,6 +57,70 @@ def _build_jwt_fixture() -> tuple[str, dict[str, object]]:
return token, {"keys": [jwk]}
async def _validate_with_jwks(
validator: GiteaOAuthValidator, token: str, jwks: dict[str, object]
) -> tuple[bool, str | None, dict[str, object] | None]:
"""Drive a JWT validation with mocked discovery + JWKS responses."""
discovery_response = MagicMock()
discovery_response.status_code = 200
discovery_response.json.return_value = {
"issuer": "https://gitea.example.com",
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
}
jwks_response = MagicMock()
jwks_response.status_code = 200
jwks_response.json.return_value = jwks
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response])
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
return await validator.validate_oauth_token(token, "127.0.0.1", "TestAgent")
def test_acceptable_audiences_includes_resource_and_client_id(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""The canonical MCP resource and the Gitea client id are accepted audiences."""
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
reset_settings()
reset_oauth_validator()
audiences = GiteaOAuthValidator()._acceptable_audiences()
assert "https://mcp.example.com" in audiences
assert "test-client-id" in audiences
@pytest.mark.asyncio
async def test_jwt_with_canonical_resource_audience_is_accepted(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A token whose aud is the canonical MCP resource URL validates (P4)."""
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
reset_settings()
reset_oauth_validator()
token, jwks = _build_jwt_fixture(aud="https://mcp.example.com")
valid, error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks)
assert valid is True
assert error is None
assert principal is not None
@pytest.mark.asyncio
async def test_jwt_with_foreign_audience_is_rejected() -> None:
"""A token minted for a different audience is rejected (audience binding)."""
token, jwks = _build_jwt_fixture(aud="some-other-service")
# Foreign-audience JWT fails JWT validation, then falls back to userinfo, which
# is not mocked here and raises a network error -> overall failure.
with patch("aegis_gitea_mcp.oauth.GiteaOAuthValidator._validate_userinfo") as mock_userinfo:
from aegis_gitea_mcp.oauth import OAuthTokenValidationError
mock_userinfo.side_effect = OAuthTokenValidationError("Invalid", "userinfo_denied")
valid, _error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks)
assert valid is False
assert principal is None
@pytest.mark.asyncio
async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None:
"""JWT token validation uses discovery + JWKS and caches both documents."""
+212 -7
View File
@@ -29,6 +29,7 @@ def oauth_env(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
monkeypatch.setenv("WRITE_MODE", "false")
@@ -84,12 +85,13 @@ def test_health_endpoint(client: TestClient) -> None:
def test_oauth_protected_resource_metadata(client: TestClient) -> None:
"""OAuth protected-resource metadata contains required OpenAI-compatible fields."""
"""PRM advertises THIS server's canonical URL as the protected resource."""
response = client.get("/.well-known/oauth-protected-resource")
assert response.status_code == 200
data = response.json()
assert data["resource"] == "https://gitea.example.com"
# RFC 9728/8707: the resource identifier is the MCP server's own URL, not Gitea's.
assert data["resource"] == "http://testserver"
assert data["authorization_servers"] == [
"http://testserver",
"https://gitea.example.com",
@@ -100,12 +102,15 @@ def test_oauth_protected_resource_metadata(client: TestClient) -> None:
def test_oauth_authorization_server_metadata(client: TestClient) -> None:
"""Auth server metadata includes expected OAuth endpoints and scopes."""
"""Auth server metadata advertises this server's proxy OAuth endpoints."""
response = client.get("/.well-known/oauth-authorization-server")
assert response.status_code == 200
payload = response.json()
assert payload["authorization_endpoint"].endswith("/login/oauth/authorize")
assert payload["token_endpoint"].endswith("/oauth/token")
# Claude must be sent to our proxy authorize endpoint (Gitea does not know
# Claude's redirect_uri), so the endpoint lives on this server.
assert payload["authorization_endpoint"] == "http://testserver/oauth/authorize"
assert payload["token_endpoint"] == "http://testserver/oauth/token"
assert payload["registration_endpoint"] == "http://testserver/register"
assert payload["scopes_supported"] == ["read:repository", "write:repository"]
@@ -115,8 +120,8 @@ def test_openid_configuration_metadata(client: TestClient) -> None:
assert response.status_code == 200
payload = response.json()
assert payload["issuer"] == "https://gitea.example.com"
assert payload["authorization_endpoint"].endswith("/login/oauth/authorize")
assert payload["token_endpoint"].endswith("/oauth/token")
assert payload["authorization_endpoint"] == "http://testserver/oauth/authorize"
assert payload["token_endpoint"] == "http://testserver/oauth/token"
assert payload["userinfo_endpoint"].endswith("/login/oauth/userinfo")
assert payload["jwks_uri"].endswith("/login/oauth/keys")
assert "read:repository" in payload["scopes_supported"]
@@ -129,6 +134,7 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
@@ -149,6 +155,8 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
protected_response = client.get("/.well-known/oauth-protected-resource")
assert protected_response.status_code == 200
protected_payload = protected_response.json()
# P4: the protected resource identifier must equal this server's public base.
assert protected_payload["resource"] == "https://mcp.example.com"
assert protected_payload["authorization_servers"] == [
"https://mcp.example.com",
"https://gitea.example.com",
@@ -166,6 +174,201 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
)
def test_mcp_streamable_http_path_works(client: TestClient) -> None:
"""The spec path /mcp exposes the same transport behavior as the SSE alias."""
response = client.post(
"/mcp",
headers={"Authorization": "Bearer valid-read"},
json={"jsonrpc": "2.0", "id": "init-1", "method": "initialize"},
)
assert response.status_code == 200
payload = response.json()
assert payload["result"]["protocolVersion"] == "2024-11-05"
def test_mcp_preflight_allows_same_origin(client: TestClient) -> None:
"""Same-origin preflight requests to /mcp return strict CORS headers."""
response = client.options(
"/mcp",
headers={
"Origin": "http://testserver",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "authorization,content-type",
},
)
assert response.status_code == 204
assert response.headers["Access-Control-Allow-Origin"] == "http://testserver"
def test_mcp_preflight_rejects_cross_origin(client: TestClient) -> None:
"""Cross-origin browser requests to /mcp are denied."""
response = client.options(
"/mcp",
headers={
"Origin": "https://evil.example.com",
"Access-Control-Request-Method": "POST",
},
)
assert response.status_code == 403
def test_service_pat_requests_verify_user_repo_access_before_execution(
oauth_env: None, mock_oauth_validation: None, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Service PAT fallback checks the user's repository permission before executing tools."""
from aegis_gitea_mcp import server
monkeypatch.setenv("GITEA_TOKEN", "service-pat-token")
server._api_scope_cache.clear()
server.reset_repo_authz_cache()
probe_response = MagicMock()
probe_response.status_code = 200
repo_response = MagicMock()
repo_response.status_code = 403
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[probe_response, repo_response])
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
from aegis_gitea_mcp.server import app
client = TestClient(app, raise_server_exceptions=False)
response = client.post(
"/mcp/tool/call",
headers={"Authorization": "Bearer valid-read"},
json={
"tool": "get_repository_info",
"arguments": {"owner": "acme", "repo": "demo"},
},
)
assert response.status_code == 403
assert "permission" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_service_pat_repo_authz_allows_user_with_read_permission(
oauth_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Read-level collaborator permission allows service PAT execution to proceed."""
from aegis_gitea_mcp import server
monkeypatch.setenv("GITEA_TOKEN", "service-pat-token")
server.reset_repo_authz_cache()
permission_response = MagicMock()
permission_response.status_code = 200
permission_response.json.return_value = {"permission": "read"}
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=permission_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
await server._verify_user_repository_access(
repository="acme/demo",
required_scope=server.READ_SCOPE,
user_login="alice",
correlation_id="corr-1",
tool_name="get_repository_info",
)
mock_client.get.assert_awaited_once()
requested_url = mock_client.get.await_args.args[0]
requested_headers = mock_client.get.await_args.kwargs["headers"]
assert requested_url.endswith("/api/v1/repos/acme/demo/collaborators/alice/permission")
assert requested_headers["Authorization"] == "token service-pat-token"
@pytest.mark.asyncio
async def test_service_pat_repo_authz_denies_read_user_for_write_tool(
oauth_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Read permission is insufficient for write tools in service PAT mode."""
from fastapi import HTTPException
from aegis_gitea_mcp import server
monkeypatch.setenv("GITEA_TOKEN", "service-pat-token")
server.reset_repo_authz_cache()
permission_response = MagicMock()
permission_response.status_code = 200
permission_response.json.return_value = {"permission": "read"}
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=permission_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with pytest.raises(HTTPException) as exc_info:
await server._verify_user_repository_access(
repository="acme/demo",
required_scope=server.WRITE_SCOPE,
user_login="alice",
correlation_id="corr-1",
tool_name="create_issue",
)
assert exc_info.value.status_code == 403
@pytest.mark.asyncio
async def test_service_pat_repo_authz_cache_hit_and_expiry(
oauth_env: None, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Repository permission decisions are cached briefly and rechecked after expiry."""
from aegis_gitea_mcp import cache as cache_module
from aegis_gitea_mcp import server
monkeypatch.setenv("GITEA_TOKEN", "service-pat-token")
monkeypatch.setenv("REPO_AUTHZ_CACHE_TTL_SECONDS", "1")
server.reset_repo_authz_cache()
now = 1000.0
monkeypatch.setattr(cache_module.time, "monotonic", lambda: now)
permission_response = MagicMock()
permission_response.status_code = 200
permission_response.json.return_value = {"permission": "read"}
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=permission_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
for _ in range(2):
await server._verify_user_repository_access(
repository="acme/demo",
required_scope=server.READ_SCOPE,
user_login="alice",
correlation_id="corr-1",
tool_name="get_repository_info",
)
assert mock_client.get.await_count == 1
now = 1002.0
await server._verify_user_repository_access(
repository="acme/demo",
required_scope=server.READ_SCOPE,
user_login="alice",
correlation_id="corr-1",
tool_name="get_repository_info",
)
assert mock_client.get.await_count == 2
def test_scope_compatibility_write_implies_read() -> None:
"""write:repository grants read-level access for read tools."""
from aegis_gitea_mcp.server import READ_SCOPE, _has_required_scope
@@ -348,6 +551,7 @@ async def test_startup_event_fails_when_discovery_unreachable(
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true")
from aegis_gitea_mcp import server
@@ -377,6 +581,7 @@ async def test_startup_event_succeeds_when_discovery_ready(
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true")
from aegis_gitea_mcp import server