"""OIDC/JWKS-focused OAuth validator tests.""" from __future__ import annotations import json import time from unittest.mock import AsyncMock, MagicMock, patch import jwt import pytest from cryptography.hazmat.primitives.asymmetric import rsa from jwt.algorithms import RSAAlgorithm from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import GiteaOAuthValidator, reset_oauth_validator @pytest.fixture(autouse=True) def reset_state(monkeypatch: pytest.MonkeyPatch) -> None: """Reset state and configure OAuth validation environment.""" reset_settings() reset_oauth_validator() monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600") yield reset_settings() reset_oauth_validator() def _build_jwt_fixture(aud: str = "test-client-id") -> tuple[str, dict[str, object]]: """Generate RS256 access token and matching JWKS payload.""" private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = private_key.public_key() jwk = json.loads(RSAAlgorithm.to_jwk(public_key)) jwk["kid"] = "kid-123" now = int(time.time()) token = jwt.encode( { "sub": "user-1", "preferred_username": "alice", "scope": "read:repository write:repository", "aud": aud, "iss": "https://gitea.example.com", "iat": now, "exp": now + 3600, }, private_key, algorithm="RS256", headers={"kid": "kid-123"}, ) return token, {"keys": [jwk]} async def _validate_with_jwks( validator: GiteaOAuthValidator, token: str, jwks: dict[str, object] ) -> tuple[bool, str | None, dict[str, object] | None]: """Drive a JWT validation with mocked discovery + JWKS responses.""" discovery_response = MagicMock() discovery_response.status_code = 200 discovery_response.json.return_value = { "issuer": "https://gitea.example.com", "jwks_uri": "https://gitea.example.com/login/oauth/keys", } jwks_response = MagicMock() jwks_response.status_code = 200 jwks_response.json.return_value = jwks with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response]) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) return await validator.validate_oauth_token(token, "127.0.0.1", "TestAgent") def test_acceptable_audiences_includes_resource_and_client_id( monkeypatch: pytest.MonkeyPatch, ) -> None: """The canonical MCP resource and the Gitea client id are accepted audiences.""" monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com") reset_settings() reset_oauth_validator() audiences = GiteaOAuthValidator()._acceptable_audiences() assert "https://mcp.example.com" in audiences assert "test-client-id" in audiences @pytest.mark.asyncio async def test_jwt_with_canonical_resource_audience_is_accepted( monkeypatch: pytest.MonkeyPatch, ) -> None: """A token whose aud is the canonical MCP resource URL validates (P4).""" monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com") reset_settings() reset_oauth_validator() token, jwks = _build_jwt_fixture(aud="https://mcp.example.com") valid, error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks) assert valid is True assert error is None assert principal is not None @pytest.mark.asyncio async def test_jwt_with_foreign_audience_is_rejected() -> None: """A token minted for a different audience is rejected (audience binding).""" token, jwks = _build_jwt_fixture(aud="some-other-service") # Foreign-audience JWT fails JWT validation, then falls back to userinfo, which # is not mocked here and raises a network error -> overall failure. with patch("aegis_gitea_mcp.oauth.GiteaOAuthValidator._validate_userinfo") as mock_userinfo: from aegis_gitea_mcp.oauth import OAuthTokenValidationError mock_userinfo.side_effect = OAuthTokenValidationError("Invalid", "userinfo_denied") valid, _error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks) assert valid is False assert principal is None @pytest.mark.asyncio async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None: """JWT token validation uses discovery + JWKS and caches both documents.""" token, jwks = _build_jwt_fixture() validator = GiteaOAuthValidator() discovery_response = MagicMock() discovery_response.status_code = 200 discovery_response.json.return_value = { "issuer": "https://gitea.example.com", "jwks_uri": "https://gitea.example.com/login/oauth/keys", } jwks_response = MagicMock() jwks_response.status_code = 200 jwks_response.json.return_value = jwks with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response]) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) valid1, error1, principal1 = await validator.validate_oauth_token( token, "127.0.0.1", "TestAgent" ) valid2, error2, principal2 = await validator.validate_oauth_token( token, "127.0.0.1", "TestAgent" ) assert valid1 is True assert error1 is None assert principal1 is not None assert principal1["login"] == "alice" assert "write:repository" in principal1["scopes"] assert valid2 is True assert error2 is None assert principal2 is not None # Discovery + JWKS fetched once each because of cache. assert mock_client.get.await_count == 2 @pytest.mark.asyncio async def test_invalid_jwt_falls_back_and_fails_userinfo() -> None: """Invalid JWT returns auth failure when userinfo fallback rejects token.""" validator = GiteaOAuthValidator() # JWT-shaped token with invalid signature/header. bad_token = "abc.def.ghi" discovery_response = MagicMock() discovery_response.status_code = 200 discovery_response.json.return_value = { "issuer": "https://gitea.example.com", "jwks_uri": "https://gitea.example.com/login/oauth/keys", } jwks_response = MagicMock() jwks_response.status_code = 200 jwks_response.json.return_value = { "keys": [{"kid": "missing", "kty": "RSA", "n": "x", "e": "AQAB"}] } userinfo_denied = MagicMock() userinfo_denied.status_code = 401 with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock( side_effect=[discovery_response, jwks_response, userinfo_denied] ) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) is_valid, error, principal = await validator.validate_oauth_token( bad_token, "127.0.0.1", "TestAgent", ) assert is_valid is False assert principal is None assert error is not None def test_extract_bearer_token_strict_parsing() -> None: """Bearer extraction accepts only strict `Bearer ` format.""" assert GiteaOAuthValidator.extract_bearer_token("Bearer abc123") == "abc123" assert GiteaOAuthValidator.extract_bearer_token("bearer abc123") is None assert GiteaOAuthValidator.extract_bearer_token("Bearer ") is None assert GiteaOAuthValidator.extract_bearer_token("Basic abc") is None