Files
AegisGitea-MCP/tests/test_oauth_oidc.py
latte 59e1ea53a8
Some checks failed
docker / lint (push) Has been cancelled
docker / test (push) Has been cancelled
docker / docker-build (push) Has been cancelled
lint / lint (push) Has been cancelled
test / test (push) Has been cancelled
Add OAuth2/OIDC per-user Gitea authentication
Introduce a GiteaOAuthValidator for JWT and userinfo validation and
fallbacks, add /oauth/token proxy, and thread per-user tokens through
the
request context and automation paths. Update config and .env.example for
OAuth-first mode, add OpenAPI, extensive unit/integration tests,
GitHub/Gitea CI workflows, docs, and lint/test enforcement (>=80% cov).
2026-02-25 16:54:01 +01:00

151 lines
5.2 KiB
Python

"""OIDC/JWKS-focused OAuth validator tests."""
from __future__ import annotations
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch
import jwt
import pytest
from cryptography.hazmat.primitives.asymmetric import rsa
from jwt.algorithms import RSAAlgorithm
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.oauth import GiteaOAuthValidator, reset_oauth_validator
@pytest.fixture(autouse=True)
def reset_state(monkeypatch: pytest.MonkeyPatch) -> None:
"""Reset state and configure OAuth validation environment."""
reset_settings()
reset_oauth_validator()
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600")
yield
reset_settings()
reset_oauth_validator()
def _build_jwt_fixture() -> tuple[str, dict[str, object]]:
"""Generate RS256 access token and matching JWKS payload."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
jwk = json.loads(RSAAlgorithm.to_jwk(public_key))
jwk["kid"] = "kid-123"
now = int(time.time())
token = jwt.encode(
{
"sub": "user-1",
"preferred_username": "alice",
"scope": "read:repository write:repository",
"aud": "test-client-id",
"iss": "https://gitea.example.com",
"iat": now,
"exp": now + 3600,
},
private_key,
algorithm="RS256",
headers={"kid": "kid-123"},
)
return token, {"keys": [jwk]}
@pytest.mark.asyncio
async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None:
"""JWT token validation uses discovery + JWKS and caches both documents."""
token, jwks = _build_jwt_fixture()
validator = GiteaOAuthValidator()
discovery_response = MagicMock()
discovery_response.status_code = 200
discovery_response.json.return_value = {
"issuer": "https://gitea.example.com",
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
}
jwks_response = MagicMock()
jwks_response.status_code = 200
jwks_response.json.return_value = jwks
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response])
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
valid1, error1, principal1 = await validator.validate_oauth_token(
token, "127.0.0.1", "TestAgent"
)
valid2, error2, principal2 = await validator.validate_oauth_token(
token, "127.0.0.1", "TestAgent"
)
assert valid1 is True
assert error1 is None
assert principal1 is not None
assert principal1["login"] == "alice"
assert "write:repository" in principal1["scopes"]
assert valid2 is True
assert error2 is None
assert principal2 is not None
# Discovery + JWKS fetched once each because of cache.
assert mock_client.get.await_count == 2
@pytest.mark.asyncio
async def test_invalid_jwt_falls_back_and_fails_userinfo() -> None:
"""Invalid JWT returns auth failure when userinfo fallback rejects token."""
validator = GiteaOAuthValidator()
# JWT-shaped token with invalid signature/header.
bad_token = "abc.def.ghi"
discovery_response = MagicMock()
discovery_response.status_code = 200
discovery_response.json.return_value = {
"issuer": "https://gitea.example.com",
"jwks_uri": "https://gitea.example.com/login/oauth/keys",
}
jwks_response = MagicMock()
jwks_response.status_code = 200
jwks_response.json.return_value = {
"keys": [{"kid": "missing", "kty": "RSA", "n": "x", "e": "AQAB"}]
}
userinfo_denied = MagicMock()
userinfo_denied.status_code = 401
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(
side_effect=[discovery_response, jwks_response, userinfo_denied]
)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
is_valid, error, principal = await validator.validate_oauth_token(
bad_token,
"127.0.0.1",
"TestAgent",
)
assert is_valid is False
assert principal is None
assert error is not None
def test_extract_bearer_token_strict_parsing() -> None:
"""Bearer extraction accepts only strict `Bearer <token>` format."""
assert GiteaOAuthValidator.extract_bearer_token("Bearer abc123") == "abc123"
assert GiteaOAuthValidator.extract_bearer_token("bearer abc123") is None
assert GiteaOAuthValidator.extract_bearer_token("Bearer ") is None
assert GiteaOAuthValidator.extract_bearer_token("Basic abc") is None