216 lines
8.1 KiB
Python
216 lines
8.1 KiB
Python
"""OIDC/JWKS-focused OAuth validator tests."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import jwt
|
|
import pytest
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from jwt.algorithms import RSAAlgorithm
|
|
|
|
from aegis_gitea_mcp.config import reset_settings
|
|
from aegis_gitea_mcp.oauth import GiteaOAuthValidator, reset_oauth_validator
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_state(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Reset state and configure OAuth validation environment."""
|
|
reset_settings()
|
|
reset_oauth_validator()
|
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
|
monkeypatch.setenv("ENVIRONMENT", "test")
|
|
monkeypatch.setenv("OAUTH_MODE", "true")
|
|
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
|
|
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
|
|
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
|
|
monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600")
|
|
yield
|
|
reset_settings()
|
|
reset_oauth_validator()
|
|
|
|
|
|
def _build_jwt_fixture(aud: str = "test-client-id") -> tuple[str, dict[str, object]]:
|
|
"""Generate RS256 access token and matching JWKS payload."""
|
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
public_key = private_key.public_key()
|
|
jwk = json.loads(RSAAlgorithm.to_jwk(public_key))
|
|
jwk["kid"] = "kid-123"
|
|
|
|
now = int(time.time())
|
|
token = jwt.encode(
|
|
{
|
|
"sub": "user-1",
|
|
"preferred_username": "alice",
|
|
"scope": "read:repository write:repository",
|
|
"aud": aud,
|
|
"iss": "https://gitea.example.com",
|
|
"iat": now,
|
|
"exp": now + 3600,
|
|
},
|
|
private_key,
|
|
algorithm="RS256",
|
|
headers={"kid": "kid-123"},
|
|
)
|
|
return token, {"keys": [jwk]}
|
|
|
|
|
|
async def _validate_with_jwks(
|
|
validator: GiteaOAuthValidator, token: str, jwks: dict[str, object]
|
|
) -> tuple[bool, str | None, dict[str, object] | None]:
|
|
"""Drive a JWT validation with mocked discovery + JWKS responses."""
|
|
discovery_response = MagicMock()
|
|
discovery_response.status_code = 200
|
|
discovery_response.json.return_value = {
|
|
"issuer": "https://gitea.example.com",
|
|
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
|
|
}
|
|
jwks_response = MagicMock()
|
|
jwks_response.status_code = 200
|
|
jwks_response.json.return_value = jwks
|
|
|
|
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response])
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
return await validator.validate_oauth_token(token, "127.0.0.1", "TestAgent")
|
|
|
|
|
|
def test_acceptable_audiences_includes_resource_and_client_id(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""The canonical MCP resource and the Gitea client id are accepted audiences."""
|
|
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
|
|
reset_settings()
|
|
reset_oauth_validator()
|
|
audiences = GiteaOAuthValidator()._acceptable_audiences()
|
|
assert "https://mcp.example.com" in audiences
|
|
assert "test-client-id" in audiences
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_jwt_with_canonical_resource_audience_is_accepted(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""A token whose aud is the canonical MCP resource URL validates (P4)."""
|
|
monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com")
|
|
reset_settings()
|
|
reset_oauth_validator()
|
|
token, jwks = _build_jwt_fixture(aud="https://mcp.example.com")
|
|
valid, error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks)
|
|
assert valid is True
|
|
assert error is None
|
|
assert principal is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_jwt_with_foreign_audience_is_rejected() -> None:
|
|
"""A token minted for a different audience is rejected (audience binding)."""
|
|
token, jwks = _build_jwt_fixture(aud="some-other-service")
|
|
# Foreign-audience JWT fails JWT validation, then falls back to userinfo, which
|
|
# is not mocked here and raises a network error -> overall failure.
|
|
with patch("aegis_gitea_mcp.oauth.GiteaOAuthValidator._validate_userinfo") as mock_userinfo:
|
|
from aegis_gitea_mcp.oauth import OAuthTokenValidationError
|
|
|
|
mock_userinfo.side_effect = OAuthTokenValidationError("Invalid", "userinfo_denied")
|
|
valid, _error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks)
|
|
assert valid is False
|
|
assert principal is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None:
|
|
"""JWT token validation uses discovery + JWKS and caches both documents."""
|
|
token, jwks = _build_jwt_fixture()
|
|
validator = GiteaOAuthValidator()
|
|
|
|
discovery_response = MagicMock()
|
|
discovery_response.status_code = 200
|
|
discovery_response.json.return_value = {
|
|
"issuer": "https://gitea.example.com",
|
|
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
|
|
}
|
|
|
|
jwks_response = MagicMock()
|
|
jwks_response.status_code = 200
|
|
jwks_response.json.return_value = jwks
|
|
|
|
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response])
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
valid1, error1, principal1 = await validator.validate_oauth_token(
|
|
token, "127.0.0.1", "TestAgent"
|
|
)
|
|
valid2, error2, principal2 = await validator.validate_oauth_token(
|
|
token, "127.0.0.1", "TestAgent"
|
|
)
|
|
|
|
assert valid1 is True
|
|
assert error1 is None
|
|
assert principal1 is not None
|
|
assert principal1["login"] == "alice"
|
|
assert "write:repository" in principal1["scopes"]
|
|
|
|
assert valid2 is True
|
|
assert error2 is None
|
|
assert principal2 is not None
|
|
# Discovery + JWKS fetched once each because of cache.
|
|
assert mock_client.get.await_count == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_jwt_falls_back_and_fails_userinfo() -> None:
|
|
"""Invalid JWT returns auth failure when userinfo fallback rejects token."""
|
|
validator = GiteaOAuthValidator()
|
|
|
|
# JWT-shaped token with invalid signature/header.
|
|
bad_token = "abc.def.ghi"
|
|
|
|
discovery_response = MagicMock()
|
|
discovery_response.status_code = 200
|
|
discovery_response.json.return_value = {
|
|
"issuer": "https://gitea.example.com",
|
|
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
|
|
}
|
|
|
|
jwks_response = MagicMock()
|
|
jwks_response.status_code = 200
|
|
jwks_response.json.return_value = {
|
|
"keys": [{"kid": "missing", "kty": "RSA", "n": "x", "e": "AQAB"}]
|
|
}
|
|
|
|
userinfo_denied = MagicMock()
|
|
userinfo_denied.status_code = 401
|
|
|
|
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
|
|
mock_client = AsyncMock()
|
|
mock_client.get = AsyncMock(
|
|
side_effect=[discovery_response, jwks_response, userinfo_denied]
|
|
)
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
is_valid, error, principal = await validator.validate_oauth_token(
|
|
bad_token,
|
|
"127.0.0.1",
|
|
"TestAgent",
|
|
)
|
|
|
|
assert is_valid is False
|
|
assert principal is None
|
|
assert error is not None
|
|
|
|
|
|
def test_extract_bearer_token_strict_parsing() -> None:
|
|
"""Bearer extraction accepts only strict `Bearer <token>` format."""
|
|
assert GiteaOAuthValidator.extract_bearer_token("Bearer abc123") == "abc123"
|
|
assert GiteaOAuthValidator.extract_bearer_token("bearer abc123") is None
|
|
assert GiteaOAuthValidator.extract_bearer_token("Bearer ") is None
|
|
assert GiteaOAuthValidator.extract_bearer_token("Basic abc") is None
|