"""Tests for OAuth2 per-user Gitea authentication.""" from __future__ import annotations import asyncio import os import stat from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi.testclient import TestClient from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import GiteaOAuthValidator, get_oauth_validator, reset_oauth_validator from aegis_gitea_mcp.oauth_flow import OAuthClientRegistry, OAuthRegistrationRequest from aegis_gitea_mcp.request_context import ( get_gitea_user_login, get_gitea_user_token, set_gitea_user_login, set_gitea_user_token, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(autouse=True) def reset_state(): """Reset global state between tests.""" reset_settings() reset_oauth_validator() yield reset_settings() reset_oauth_validator() @pytest.fixture def mock_env_oauth(monkeypatch): """Environment for OAuth mode tests.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") @pytest.fixture def oauth_validator(mock_env_oauth): """Create GiteaOAuthValidator instance in OAuth mode.""" return GiteaOAuthValidator() @pytest.fixture def oauth_client(mock_env_oauth): """Create FastAPI test client in OAuth mode.""" from aegis_gitea_mcp.server import app return TestClient(app, raise_server_exceptions=False) def _register_public_client(oauth_client: TestClient, redirect_uri: str) -> dict[str, str]: """Register a public OAuth client for test flows.""" response = oauth_client.post( "/register", json={ "client_name": "pytest-client", "redirect_uris": [redirect_uri], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code", "refresh_token"], "response_types": ["code"], }, ) assert response.status_code == 200 payload = response.json() assert "client_id" in payload return payload # --------------------------------------------------------------------------- # GiteaOAuthValidator unit tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_validate_oauth_token_success(oauth_validator): """Valid Gitea OAuth token returns is_valid=True and user_data.""" mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = {"login": "testuser", "id": 42} with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) is_valid, error, user_data = await oauth_validator.validate_oauth_token( "valid-gitea-token", "127.0.0.1", "TestAgent/1.0" ) assert is_valid is True assert error is None assert user_data is not None assert user_data["login"] == "testuser" @pytest.mark.asyncio async def test_validate_oauth_token_invalid_401(oauth_validator): """Gitea returning 401 results in is_valid=False.""" mock_response = MagicMock() mock_response.status_code = 401 with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) is_valid, error, user_data = await oauth_validator.validate_oauth_token( "expired-token", "127.0.0.1", "TestAgent/1.0" ) assert is_valid is False assert error is not None assert user_data is None @pytest.mark.asyncio async def test_validate_oauth_token_missing_token(oauth_validator): """Missing token results in is_valid=False.""" is_valid, error, user_data = await oauth_validator.validate_oauth_token( None, "127.0.0.1", "TestAgent/1.0" ) assert is_valid is False assert error is not None assert user_data is None @pytest.mark.asyncio async def test_validate_oauth_token_network_error(oauth_validator): """Network error results in is_valid=False with informative message.""" import httpx with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock( side_effect=httpx.RequestError("Connection refused", request=MagicMock()) ) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) is_valid, error, user_data = await oauth_validator.validate_oauth_token( "some-token", "127.0.0.1", "TestAgent/1.0" ) assert is_valid is False assert error is not None assert "unable to validate oauth token" in error.lower() assert user_data is None @pytest.mark.asyncio async def test_validate_oauth_token_rate_limit(oauth_validator): """Exceeding failure threshold triggers rate limiting.""" mock_response = MagicMock() mock_response.status_code = 401 with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) # Exhaust failures (default MAX_AUTH_FAILURES=5) for _ in range(5): await oauth_validator.validate_oauth_token("bad-token", "10.0.0.1", "Agent") # Next attempt should be rate-limited is_valid, error, user_data = await oauth_validator.validate_oauth_token( "bad-token", "10.0.0.1", "Agent" ) assert is_valid is False assert error is not None assert "too many" in error.lower() # --------------------------------------------------------------------------- # Singleton tests # --------------------------------------------------------------------------- def test_get_oauth_validator_singleton(mock_env_oauth): """get_oauth_validator returns the same instance on repeated calls.""" v1 = get_oauth_validator() v2 = get_oauth_validator() assert v1 is v2 def test_reset_oauth_validator(mock_env_oauth): """reset_oauth_validator creates a fresh instance after reset.""" v1 = get_oauth_validator() reset_oauth_validator() v2 = get_oauth_validator() assert v1 is not v2 # --------------------------------------------------------------------------- # ContextVar isolation tests # --------------------------------------------------------------------------- def test_context_var_token_isolation(): """ContextVar values do not leak between coroutines.""" results = {} async def task_a(): set_gitea_user_token("token-for-a") await asyncio.sleep(0) results["a"] = get_gitea_user_token() async def task_b(): # task_b never sets the token; should see None (default) await asyncio.sleep(0) results["b"] = get_gitea_user_token() async def run(): await asyncio.gather(task_a(), task_b()) asyncio.run(run()) assert results["a"] == "token-for-a" assert results["b"] is None # ContextVar isolation: task_b sees default def test_context_var_login_set_and_get(): """set_gitea_user_login / get_gitea_user_login work correctly.""" set_gitea_user_login("alice") assert get_gitea_user_login() == "alice" # --------------------------------------------------------------------------- # /oauth/token proxy endpoint tests # --------------------------------------------------------------------------- def test_oauth_token_endpoint_available_when_oauth_mode_false(monkeypatch): """POST /oauth/token remains available regardless of OAUTH_MODE.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("GITEA_TOKEN", "test-token-12345") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) monkeypatch.setenv("OAUTH_MODE", "false") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = {"access_token": "token"} from aegis_gitea_mcp.server import app with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) with TestClient(app, raise_server_exceptions=False) as client: registration = client.post( "/register", json={ "client_name": "pytest-client", "redirect_uris": ["http://127.0.0.1:8080/callback"], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code"], "response_types": ["code"], }, ) assert registration.status_code == 200 client_id = registration.json()["client_id"] response = client.post( "/oauth/token", data={"client_id": client_id, "code": "abc123", "code_verifier": "pkce"}, ) assert response.status_code == 200 def test_oauth_token_endpoint_missing_code(oauth_client): """POST /oauth/token without a code returns 400.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") response = oauth_client.post( "/oauth/token", data={"client_id": client_data["client_id"], "code_verifier": "pkce"}, ) assert response.status_code == 400 def test_oauth_token_endpoint_proxy_success(oauth_client): """POST /oauth/token proxies successfully to Gitea and returns access_token.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { "access_token": "gitea-access-token-xyz", "token_type": "bearer", } with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) response = oauth_client.post( "/oauth/token", data={ "client_id": client_data["client_id"], "code": "auth-code-123", "code_verifier": "pkce-verifier", }, ) assert response.status_code == 200 body = response.json() assert body["access_token"] == "gitea-access-token-xyz" def test_oauth_token_endpoint_gitea_error(oauth_client): """POST /oauth/token propagates Gitea error status.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") mock_response = MagicMock() mock_response.status_code = 400 mock_response.json.return_value = {"error": "invalid_grant"} with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) response = oauth_client.post( "/oauth/token", data={ "client_id": client_data["client_id"], "code": "bad-code", "code_verifier": "pkce-verifier", }, ) assert response.status_code == 400 def test_oauth_authorize_and_callback_round_trip(oauth_client): """OAuth authorize/callback round-trip preserves the original redirect URI and state.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") authorize_response = oauth_client.get( "/oauth/authorize", params={ "client_id": client_data["client_id"], "redirect_uri": "http://127.0.0.1:8080/callback", "state": "original-state", "code_challenge": "pkce-challenge", "code_challenge_method": "S256", }, follow_redirects=False, ) assert authorize_response.status_code == 302 location = authorize_response.headers["location"] assert "state=" in location assert "redirect_uri=http%3A%2F%2F127.0.0.1%3A8080%2Fcallback" not in location from urllib.parse import parse_qs, urlparse parsed = urlparse(location) query = parse_qs(parsed.query) proxy_state = query["state"][0] callback_response = oauth_client.get( "/oauth/callback", params={"state": proxy_state, "code": "auth-code-123"}, follow_redirects=False, ) assert callback_response.status_code == 302 callback_location = callback_response.headers["location"] assert callback_location.startswith("http://127.0.0.1:8080/callback?") assert "code=auth-code-123" in callback_location assert "state=original-state" in callback_location def test_oauth_callback_rejects_tampered_state(oauth_client): """OAuth callback rejects modified signed proxy state.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") authorize_response = oauth_client.get( "/oauth/authorize", params={ "client_id": client_data["client_id"], "redirect_uri": "http://127.0.0.1:8080/callback", "state": "original-state", "code_challenge": "pkce-challenge", "code_challenge_method": "S256", }, follow_redirects=False, ) from urllib.parse import parse_qs, urlparse proxy_state = parse_qs(urlparse(authorize_response.headers["location"]).query)["state"][0] tampered_state = proxy_state[:-1] + ("A" if proxy_state[-1] != "A" else "B") callback_response = oauth_client.get( "/oauth/callback", params={"state": tampered_state, "code": "auth-code-123"}, ) assert callback_response.status_code == 400 @pytest.mark.parametrize( "redirect_uri", [ "https://claude.ai/api/mcp/auth_callback", "https://claude.com/api/mcp/auth_callback", ], ) def test_dcr_accepts_default_claude_callbacks(oauth_client, redirect_uri): """Claude's hosted connector callback URLs are allowed by default.""" response = oauth_client.post( "/register", json={ "client_name": "claude-client", "redirect_uris": [redirect_uri], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code", "refresh_token"], "response_types": ["code"], }, ) assert response.status_code == 200 def test_oauth_authorize_rejects_unknown_client(oauth_client): """OAuth authorize returns invalid_client for unregistered client IDs.""" response = oauth_client.get( "/oauth/authorize", params={ "client_id": "unknown-client", "redirect_uri": "http://127.0.0.1:8080/callback", "state": "x", "code_challenge": "pkce-challenge", "code_challenge_method": "S256", }, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_client" def test_oauth_token_rejects_unknown_dcr_client(oauth_client): """Unknown dynamic clients receive RFC 6749 invalid_client from token endpoint.""" response = oauth_client.post( "/oauth/token", data={ "client_id": "deleted-or-unknown-client", "code": "auth-code-123", "code_verifier": "pkce-verifier", }, ) assert response.status_code == 401 assert response.json() == {"error": "invalid_client"} def test_oauth_authorize_requires_pkce_s256(oauth_client): """Authorization endpoint enforces PKCE S256 for public clients.""" client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") missing_challenge = oauth_client.get( "/oauth/authorize", params={ "client_id": client_data["client_id"], "redirect_uri": "http://127.0.0.1:8080/callback", "state": "x", }, ) plain_method = oauth_client.get( "/oauth/authorize", params={ "client_id": client_data["client_id"], "redirect_uri": "http://127.0.0.1:8080/callback", "state": "x", "code_challenge": "pkce-challenge", "code_challenge_method": "plain", }, ) assert missing_challenge.status_code == 400 assert plain_method.status_code == 400 def test_register_rejects_foreign_redirect_uri(oauth_client): """DCR rejects redirect URIs outside the allowlist and loopback/Claude patterns.""" response = oauth_client.post( "/register", json={ "client_name": "pytest-client", "redirect_uris": ["https://evil.example.com/callback"], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code"], "response_types": ["code"], }, ) assert response.status_code == 400 def test_dcr_registry_persists_registered_clients(tmp_path): """Registered OAuth clients survive registry reloads.""" storage_path = tmp_path / "dcr_clients.json" registry = OAuthClientRegistry(storage_path) request = OAuthRegistrationRequest.model_validate( { "client_name": "persisted-client", "redirect_uris": ["http://127.0.0.1:8080/callback"], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code"], "response_types": ["code"], } ) response = registry.register(request) reloaded = OAuthClientRegistry(storage_path) assert reloaded.get(response["client_id"]) is not None @pytest.mark.skipif( os.name != "posix", reason="POSIX permission bits are not enforced on this platform" ) def test_dcr_storage_is_written_owner_only(tmp_path): """The persisted DCR store must not be readable beyond its owner (0o600).""" storage_path = tmp_path / "dcr_clients.json" registry = OAuthClientRegistry(storage_path) request = OAuthRegistrationRequest.model_validate( { "client_name": "perm-client", "redirect_uris": ["http://127.0.0.1:8080/callback"], "token_endpoint_auth_method": "none", "grant_types": ["authorization_code"], "response_types": ["code"], } ) registry.register(request) mode = stat.S_IMODE(os.stat(storage_path).st_mode) assert mode == 0o600 # --------------------------------------------------------------------------- # Config validation tests # --------------------------------------------------------------------------- def test_config_oauth_mode_requires_client_id(monkeypatch): """OAUTH_MODE=true without client_id raises ValueError.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "some-secret") from aegis_gitea_mcp.config import Settings with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_ID"): Settings() # type: ignore[call-arg] def test_config_oauth_mode_requires_client_secret(monkeypatch): """OAUTH_MODE=true without client_secret raises ValueError.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "some-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "") from aegis_gitea_mcp.config import Settings with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_SECRET"): Settings() # type: ignore[call-arg] def test_config_standard_mode_requires_gitea_token(monkeypatch): """Standard mode without GITEA_TOKEN raises ValueError.""" monkeypatch.setenv("GITEA_URL", "https://gitea.example.com") monkeypatch.setenv("OAUTH_MODE", "false") monkeypatch.setenv("GITEA_TOKEN", "") monkeypatch.setenv("MCP_API_KEYS", "a" * 64) from aegis_gitea_mcp.config import Settings with pytest.raises(Exception, match="GITEA_TOKEN"): Settings() # type: ignore[call-arg] # --------------------------------------------------------------------------- # Server middleware: OAuth mode authentication # --------------------------------------------------------------------------- def test_mcp_tool_call_requires_valid_gitea_token(oauth_client): """POST /mcp/tool/call with an invalid Gitea token returns 401.""" mock_response = MagicMock() mock_response.status_code = 401 with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client.get = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) response = oauth_client.post( "/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}, headers={"Authorization": "Bearer invalid-token"}, ) assert response.status_code == 401 def test_mcp_tool_call_no_token_returns_401(oauth_client): """POST /mcp/tool/call without Authorization header returns 401.""" response = oauth_client.post( "/mcp/tool/call", json={"tool": "list_repositories", "arguments": {}}, ) assert response.status_code == 401