"""OIDC/JWKS-focused OAuth validator tests.""" from __future__ import annotations import json import time from unittest.mock import AsyncMock, MagicMock, patch import jwt import pytest from cryptography.hazmat.primitives.asymmetric import rsa from jwt.algorithms import RSAAlgorithm from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import GiteaOAuthValidator, reset_oauth_validator @pytest.fixture(autouse=True) def reset_state(monkeypatch: pytest.MonkeyPatch) -> None: """Reset state and configure OAuth validation environment.""" reset_settings() reset_oauth_validator() monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600") yield reset_settings() reset_oauth_validator() def _build_jwt_fixture() -> tuple[str, dict[str, object]]: """Generate RS256 access token and matching JWKS payload.""" private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = private_key.public_key() jwk = json.loads(RSAAlgorithm.to_jwk(public_key)) jwk["kid"] = "kid-123" now = int(time.time()) token = jwt.encode( { "sub": "user-1", "preferred_username": "alice", "scope": "read:repository write:repository", "aud": "test-client-id", "iss": "https://gitea.example.com", "iat": now, "exp": now + 3600, }, private_key, algorithm="RS256", headers={"kid": "kid-123"}, ) return token, {"keys": [jwk]} @pytest.mark.asyncio async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None: """JWT token validation uses discovery + JWKS and caches both documents.""" token, jwks = _build_jwt_fixture() validator = GiteaOAuthValidator() discovery_response = MagicMock() discovery_response.status_code = 200 discovery_response.json.return_value = { "issuer": "https://gitea.example.com", "jwks_uri": "https://gitea.example.com/login/oauth/keys", } jwks_response = MagicMock() jwks_response.status_code = 200 jwks_response.json.return_value = jwks with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response]) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) valid1, error1, principal1 = await validator.validate_oauth_token( token, "127.0.0.1", "TestAgent" ) valid2, error2, principal2 = await validator.validate_oauth_token( token, "127.0.0.1", "TestAgent" ) assert valid1 is True assert error1 is None assert principal1 is not None assert principal1["login"] == "alice" assert "write:repository" in principal1["scopes"] assert valid2 is True assert error2 is None assert principal2 is not None # Discovery + JWKS fetched once each because of cache. assert mock_client.get.await_count == 2 @pytest.mark.asyncio async def test_invalid_jwt_falls_back_and_fails_userinfo() -> None: """Invalid JWT returns auth failure when userinfo fallback rejects token.""" validator = GiteaOAuthValidator() # JWT-shaped token with invalid signature/header. bad_token = "abc.def.ghi" discovery_response = MagicMock() discovery_response.status_code = 200 discovery_response.json.return_value = { "issuer": "https://gitea.example.com", "jwks_uri": "https://gitea.example.com/login/oauth/keys", } jwks_response = MagicMock() jwks_response.status_code = 200 jwks_response.json.return_value = { "keys": [{"kid": "missing", "kty": "RSA", "n": "x", "e": "AQAB"}] } userinfo_denied = MagicMock() userinfo_denied.status_code = 401 with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock( side_effect=[discovery_response, jwks_response, userinfo_denied] ) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) is_valid, error, principal = await validator.validate_oauth_token( bad_token, "127.0.0.1", "TestAgent", ) assert is_valid is False assert principal is None assert error is not None def test_extract_bearer_token_strict_parsing() -> None: """Bearer extraction accepts only strict `Bearer ` format.""" assert GiteaOAuthValidator.extract_bearer_token("Bearer abc123") == "abc123" assert GiteaOAuthValidator.extract_bearer_token("bearer abc123") is None assert GiteaOAuthValidator.extract_bearer_token("Bearer ") is None assert GiteaOAuthValidator.extract_bearer_token("Basic abc") is None