Files
Latte b8217dce8a
docker / test (pull_request) Successful in 24s
lint / lint (pull_request) Successful in 37s
lint / lint (push) Successful in 1m26s
test / test (push) Successful in 1m40s
test / test (pull_request) Successful in 34s
docker / lint (pull_request) Successful in 1m59s
docker / docker-test (pull_request) Successful in 14s
docker / docker-publish (pull_request) Has been skipped
feat: harden OAuth state secret validation, DCR file permissions, and policy defaults
- Enforce 32-char minimum on OAUTH_STATE_SECRET at startup (config.py)
- Write DCR client registry with owner-only (0o600) permissions before atomic replace
- Flip policy.yaml default write action from allow → deny
- Add CLAUDE.md with architecture, commands, and AGENTS.md contract summary
- Add .pre-commit-config.yaml mirroring `make lint` checks
- Update .gitignore: add .venv, .claude, .mypy_cache, .ruff_cache, .coverage.*
- Extend docs: audit log rotation guidance, OAUTH_STATE_SECRET and DCR_STORAGE_PATH notes
- Tests: short-secret rejection, 32-char acceptance, POSIX permission check for DCR store

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 14:13:22 +02:00

641 lines
22 KiB
Python

"""Tests for OAuth2 per-user Gitea authentication."""
from __future__ import annotations
import asyncio
import os
import stat
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from aegis_gitea_mcp.config import reset_settings
from aegis_gitea_mcp.oauth import GiteaOAuthValidator, get_oauth_validator, reset_oauth_validator
from aegis_gitea_mcp.oauth_flow import OAuthClientRegistry, OAuthRegistrationRequest
from aegis_gitea_mcp.request_context import (
get_gitea_user_login,
get_gitea_user_token,
set_gitea_user_login,
set_gitea_user_token,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state between tests."""
reset_settings()
reset_oauth_validator()
yield
reset_settings()
reset_oauth_validator()
@pytest.fixture
def mock_env_oauth(monkeypatch):
"""Environment for OAuth mode tests."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret")
monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
@pytest.fixture
def oauth_validator(mock_env_oauth):
"""Create GiteaOAuthValidator instance in OAuth mode."""
return GiteaOAuthValidator()
@pytest.fixture
def oauth_client(mock_env_oauth):
"""Create FastAPI test client in OAuth mode."""
from aegis_gitea_mcp.server import app
return TestClient(app, raise_server_exceptions=False)
def _register_public_client(oauth_client: TestClient, redirect_uri: str) -> dict[str, str]:
"""Register a public OAuth client for test flows."""
response = oauth_client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": [redirect_uri],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
},
)
assert response.status_code == 200
payload = response.json()
assert "client_id" in payload
return payload
# ---------------------------------------------------------------------------
# GiteaOAuthValidator unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_validate_oauth_token_success(oauth_validator):
"""Valid Gitea OAuth token returns is_valid=True and user_data."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"login": "testuser", "id": 42}
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
is_valid, error, user_data = await oauth_validator.validate_oauth_token(
"valid-gitea-token", "127.0.0.1", "TestAgent/1.0"
)
assert is_valid is True
assert error is None
assert user_data is not None
assert user_data["login"] == "testuser"
@pytest.mark.asyncio
async def test_validate_oauth_token_invalid_401(oauth_validator):
"""Gitea returning 401 results in is_valid=False."""
mock_response = MagicMock()
mock_response.status_code = 401
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
is_valid, error, user_data = await oauth_validator.validate_oauth_token(
"expired-token", "127.0.0.1", "TestAgent/1.0"
)
assert is_valid is False
assert error is not None
assert user_data is None
@pytest.mark.asyncio
async def test_validate_oauth_token_missing_token(oauth_validator):
"""Missing token results in is_valid=False."""
is_valid, error, user_data = await oauth_validator.validate_oauth_token(
None, "127.0.0.1", "TestAgent/1.0"
)
assert is_valid is False
assert error is not None
assert user_data is None
@pytest.mark.asyncio
async def test_validate_oauth_token_network_error(oauth_validator):
"""Network error results in is_valid=False with informative message."""
import httpx
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(
side_effect=httpx.RequestError("Connection refused", request=MagicMock())
)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
is_valid, error, user_data = await oauth_validator.validate_oauth_token(
"some-token", "127.0.0.1", "TestAgent/1.0"
)
assert is_valid is False
assert error is not None
assert "unable to validate oauth token" in error.lower()
assert user_data is None
@pytest.mark.asyncio
async def test_validate_oauth_token_rate_limit(oauth_validator):
"""Exceeding failure threshold triggers rate limiting."""
mock_response = MagicMock()
mock_response.status_code = 401
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
# Exhaust failures (default MAX_AUTH_FAILURES=5)
for _ in range(5):
await oauth_validator.validate_oauth_token("bad-token", "10.0.0.1", "Agent")
# Next attempt should be rate-limited
is_valid, error, user_data = await oauth_validator.validate_oauth_token(
"bad-token", "10.0.0.1", "Agent"
)
assert is_valid is False
assert error is not None
assert "too many" in error.lower()
# ---------------------------------------------------------------------------
# Singleton tests
# ---------------------------------------------------------------------------
def test_get_oauth_validator_singleton(mock_env_oauth):
"""get_oauth_validator returns the same instance on repeated calls."""
v1 = get_oauth_validator()
v2 = get_oauth_validator()
assert v1 is v2
def test_reset_oauth_validator(mock_env_oauth):
"""reset_oauth_validator creates a fresh instance after reset."""
v1 = get_oauth_validator()
reset_oauth_validator()
v2 = get_oauth_validator()
assert v1 is not v2
# ---------------------------------------------------------------------------
# ContextVar isolation tests
# ---------------------------------------------------------------------------
def test_context_var_token_isolation():
"""ContextVar values do not leak between coroutines."""
results = {}
async def task_a():
set_gitea_user_token("token-for-a")
await asyncio.sleep(0)
results["a"] = get_gitea_user_token()
async def task_b():
# task_b never sets the token; should see None (default)
await asyncio.sleep(0)
results["b"] = get_gitea_user_token()
async def run():
await asyncio.gather(task_a(), task_b())
asyncio.run(run())
assert results["a"] == "token-for-a"
assert results["b"] is None # ContextVar isolation: task_b sees default
def test_context_var_login_set_and_get():
"""set_gitea_user_login / get_gitea_user_login work correctly."""
set_gitea_user_login("alice")
assert get_gitea_user_login() == "alice"
# ---------------------------------------------------------------------------
# /oauth/token proxy endpoint tests
# ---------------------------------------------------------------------------
def test_oauth_token_endpoint_available_when_oauth_mode_false(monkeypatch):
"""POST /oauth/token remains available regardless of OAUTH_MODE."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("GITEA_TOKEN", "test-token-12345")
monkeypatch.setenv("ENVIRONMENT", "test")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
monkeypatch.setenv("OAUTH_MODE", "false")
monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"access_token": "token"}
from aegis_gitea_mcp.server import app
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with TestClient(app, raise_server_exceptions=False) as client:
registration = client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": ["http://127.0.0.1:8080/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert registration.status_code == 200
client_id = registration.json()["client_id"]
response = client.post(
"/oauth/token",
data={"client_id": client_id, "code": "abc123", "code_verifier": "pkce"},
)
assert response.status_code == 200
def test_oauth_token_endpoint_missing_code(oauth_client):
"""POST /oauth/token without a code returns 400."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
response = oauth_client.post(
"/oauth/token",
data={"client_id": client_data["client_id"], "code_verifier": "pkce"},
)
assert response.status_code == 400
def test_oauth_token_endpoint_proxy_success(oauth_client):
"""POST /oauth/token proxies successfully to Gitea and returns access_token."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access_token": "gitea-access-token-xyz",
"token_type": "bearer",
}
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
response = oauth_client.post(
"/oauth/token",
data={
"client_id": client_data["client_id"],
"code": "auth-code-123",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 200
body = response.json()
assert body["access_token"] == "gitea-access-token-xyz"
def test_oauth_token_endpoint_gitea_error(oauth_client):
"""POST /oauth/token propagates Gitea error status."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"error": "invalid_grant"}
with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
response = oauth_client.post(
"/oauth/token",
data={
"client_id": client_data["client_id"],
"code": "bad-code",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 400
def test_oauth_authorize_and_callback_round_trip(oauth_client):
"""OAuth authorize/callback round-trip preserves the original redirect URI and state."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
authorize_response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "original-state",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
follow_redirects=False,
)
assert authorize_response.status_code == 302
location = authorize_response.headers["location"]
assert "state=" in location
assert "redirect_uri=http%3A%2F%2F127.0.0.1%3A8080%2Fcallback" not in location
from urllib.parse import parse_qs, urlparse
parsed = urlparse(location)
query = parse_qs(parsed.query)
proxy_state = query["state"][0]
callback_response = oauth_client.get(
"/oauth/callback",
params={"state": proxy_state, "code": "auth-code-123"},
follow_redirects=False,
)
assert callback_response.status_code == 302
callback_location = callback_response.headers["location"]
assert callback_location.startswith("http://127.0.0.1:8080/callback?")
assert "code=auth-code-123" in callback_location
assert "state=original-state" in callback_location
def test_oauth_callback_rejects_tampered_state(oauth_client):
"""OAuth callback rejects modified signed proxy state."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
authorize_response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "original-state",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
follow_redirects=False,
)
from urllib.parse import parse_qs, urlparse
proxy_state = parse_qs(urlparse(authorize_response.headers["location"]).query)["state"][0]
tampered_state = proxy_state[:-1] + ("A" if proxy_state[-1] != "A" else "B")
callback_response = oauth_client.get(
"/oauth/callback",
params={"state": tampered_state, "code": "auth-code-123"},
)
assert callback_response.status_code == 400
@pytest.mark.parametrize(
"redirect_uri",
[
"https://claude.ai/api/mcp/auth_callback",
"https://claude.com/api/mcp/auth_callback",
],
)
def test_dcr_accepts_default_claude_callbacks(oauth_client, redirect_uri):
"""Claude's hosted connector callback URLs are allowed by default."""
response = oauth_client.post(
"/register",
json={
"client_name": "claude-client",
"redirect_uris": [redirect_uri],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
},
)
assert response.status_code == 200
def test_oauth_authorize_rejects_unknown_client(oauth_client):
"""OAuth authorize returns invalid_client for unregistered client IDs."""
response = oauth_client.get(
"/oauth/authorize",
params={
"client_id": "unknown-client",
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
"code_challenge": "pkce-challenge",
"code_challenge_method": "S256",
},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_client"
def test_oauth_token_rejects_unknown_dcr_client(oauth_client):
"""Unknown dynamic clients receive RFC 6749 invalid_client from token endpoint."""
response = oauth_client.post(
"/oauth/token",
data={
"client_id": "deleted-or-unknown-client",
"code": "auth-code-123",
"code_verifier": "pkce-verifier",
},
)
assert response.status_code == 401
assert response.json() == {"error": "invalid_client"}
def test_oauth_authorize_requires_pkce_s256(oauth_client):
"""Authorization endpoint enforces PKCE S256 for public clients."""
client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback")
missing_challenge = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
},
)
plain_method = oauth_client.get(
"/oauth/authorize",
params={
"client_id": client_data["client_id"],
"redirect_uri": "http://127.0.0.1:8080/callback",
"state": "x",
"code_challenge": "pkce-challenge",
"code_challenge_method": "plain",
},
)
assert missing_challenge.status_code == 400
assert plain_method.status_code == 400
def test_register_rejects_foreign_redirect_uri(oauth_client):
"""DCR rejects redirect URIs outside the allowlist and loopback/Claude patterns."""
response = oauth_client.post(
"/register",
json={
"client_name": "pytest-client",
"redirect_uris": ["https://evil.example.com/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 400
def test_dcr_registry_persists_registered_clients(tmp_path):
"""Registered OAuth clients survive registry reloads."""
storage_path = tmp_path / "dcr_clients.json"
registry = OAuthClientRegistry(storage_path)
request = OAuthRegistrationRequest.model_validate(
{
"client_name": "persisted-client",
"redirect_uris": ["http://127.0.0.1:8080/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
}
)
response = registry.register(request)
reloaded = OAuthClientRegistry(storage_path)
assert reloaded.get(response["client_id"]) is not None
@pytest.mark.skipif(
os.name != "posix", reason="POSIX permission bits are not enforced on this platform"
)
def test_dcr_storage_is_written_owner_only(tmp_path):
"""The persisted DCR store must not be readable beyond its owner (0o600)."""
storage_path = tmp_path / "dcr_clients.json"
registry = OAuthClientRegistry(storage_path)
request = OAuthRegistrationRequest.model_validate(
{
"client_name": "perm-client",
"redirect_uris": ["http://127.0.0.1:8080/callback"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
}
)
registry.register(request)
mode = stat.S_IMODE(os.stat(storage_path).st_mode)
assert mode == 0o600
# ---------------------------------------------------------------------------
# Config validation tests
# ---------------------------------------------------------------------------
def test_config_oauth_mode_requires_client_id(monkeypatch):
"""OAUTH_MODE=true without client_id raises ValueError."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "some-secret")
from aegis_gitea_mcp.config import Settings
with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_ID"):
Settings() # type: ignore[call-arg]
def test_config_oauth_mode_requires_client_secret(monkeypatch):
"""OAUTH_MODE=true without client_secret raises ValueError."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("OAUTH_MODE", "true")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "some-id")
monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "")
from aegis_gitea_mcp.config import Settings
with pytest.raises(Exception, match="GITEA_OAUTH_CLIENT_SECRET"):
Settings() # type: ignore[call-arg]
def test_config_standard_mode_requires_gitea_token(monkeypatch):
"""Standard mode without GITEA_TOKEN raises ValueError."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
monkeypatch.setenv("OAUTH_MODE", "false")
monkeypatch.setenv("GITEA_TOKEN", "")
monkeypatch.setenv("MCP_API_KEYS", "a" * 64)
from aegis_gitea_mcp.config import Settings
with pytest.raises(Exception, match="GITEA_TOKEN"):
Settings() # type: ignore[call-arg]
# ---------------------------------------------------------------------------
# Server middleware: OAuth mode authentication
# ---------------------------------------------------------------------------
def test_mcp_tool_call_requires_valid_gitea_token(oauth_client):
"""POST /mcp/tool/call with an invalid Gitea token returns 401."""
mock_response = MagicMock()
mock_response.status_code = 401
with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
response = oauth_client.post(
"/mcp/tool/call",
json={"tool": "list_repositories", "arguments": {}},
headers={"Authorization": "Bearer invalid-token"},
)
assert response.status_code == 401
def test_mcp_tool_call_no_token_returns_401(oauth_client):
"""POST /mcp/tool/call without Authorization header returns 401."""
response = oauth_client.post(
"/mcp/tool/call",
json={"tool": "list_repositories", "arguments": {}},
)
assert response.status_code == 401