dev #12

Merged
Latte merged 2 commits from dev into main 2026-03-04 16:55:32 +00:00
3 changed files with 226 additions and 68 deletions

View File

@@ -57,7 +57,10 @@ Example response:
```json
{
"resource": "https://git.hiddenden.cafe",
"authorization_servers": ["https://git.hiddenden.cafe"],
"authorization_servers": [
"https://gitea-mcp.hiddenden.cafe",
"https://git.hiddenden.cafe"
],
"bearer_methods_supported": ["header"],
"scopes_supported": ["read:repository", "write:repository"],
"resource_documentation": "https://hiddenden.cafe/docs/mcp-gitea"

View File

@@ -3,15 +3,17 @@
from __future__ import annotations
import asyncio
import base64
import json
import logging
import urllib.parse
import uuid
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Any
import httpx
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse, StreamingResponse
from pydantic import BaseModel, Field, ValidationError
from aegis_gitea_mcp.audit import get_audit_logger
@@ -245,6 +247,7 @@ async def authenticate_and_rate_limit(
"/oauth/token",
"/.well-known/oauth-protected-resource",
"/.well-known/oauth-authorization-server",
"/.well-known/openid-configuration",
}:
return await call_next(request)
@@ -329,7 +332,7 @@ async def authenticate_and_rate_limit(
effective_scopes.add(WRITE_SCOPE)
set_gitea_user_scopes(effective_scopes)
# Probe: verify the token actually works for Gitea's REST API.
# Probe: verify the token actually works for Gitea's repository API.
# Try both "token" and "Bearer" header formats since Gitea may
# accept OAuth tokens differently depending on version/config.
import hashlib
@@ -341,60 +344,69 @@ async def authenticate_and_rate_limit(
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
try:
probe_status = None
async with httpx.AsyncClient(
timeout=settings.request_timeout_seconds
) as probe_client:
# Try "token" format first (Gitea PAT style)
probe_resp = await probe_client.get(
f"{settings.gitea_base_url}/api/v1/user",
headers={"Authorization": f"token {access_token}"},
)
probe_status = probe_resp.status_code
# If "token" format fails, try "Bearer" (OAuth2 standard)
if probe_status in (401, 403):
# JWT tokens (OIDC) are already cryptographically validated via JWKS above.
# Gitea's OIDC access_tokens cannot access the REST API without additional
# Gitea-specific scope configuration, so we skip the probe for them and
# rely on per-call API errors for actual permission enforcement.
if token_type == "jwt":
probe_result = "skip:jwt"
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
else:
try:
probe_status = None
async with httpx.AsyncClient(
timeout=settings.request_timeout_seconds
) as probe_client:
probe_url = f"{settings.gitea_base_url}/api/v1/user"
# Try "token" format first (Gitea PAT style)
probe_resp = await probe_client.get(
f"{settings.gitea_base_url}/api/v1/user",
headers={"Authorization": f"Bearer {access_token}"},
probe_url,
headers={"Authorization": f"token {access_token}"},
)
probe_status = probe_resp.status_code
# If "token" format fails, try "Bearer" (OAuth2 standard)
if probe_status in (401, 403):
probe_resp = await probe_client.get(
probe_url,
headers={"Authorization": f"Bearer {access_token}"},
)
probe_status = probe_resp.status_code
if probe_status in (401, 403):
probe_result = f"fail:{probe_status}"
logger.warning(
"oauth_token_lacks_api_scope",
extra={
"status": probe_status,
"login": login,
"token_type": token_type,
"scopes_observed": observed_scopes,
},
)
message = (
"OAuth token is valid but lacks required Gitea API access. "
"Re-authorize this OAuth app in Gitea and try again."
)
if request.url.path.startswith("/mcp/"):
return _oauth_unauthorized_response(
request,
message,
scope=READ_SCOPE,
if probe_status in (401, 403):
probe_result = f"fail:{probe_status}"
logger.warning(
"oauth_token_lacks_api_scope",
extra={
"status": probe_status,
"login": login,
"token_type": token_type,
"scopes_observed": observed_scopes,
},
)
return JSONResponse(
status_code=401,
content={
"error": "Authentication failed",
"message": message,
"request_id": getattr(request.state, "request_id", "-"),
},
)
else:
probe_result = "pass"
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
except httpx.RequestError:
probe_result = "skip:error"
logger.debug("oauth_api_scope_probe_network_error")
message = (
"OAuth token is valid but lacks required Gitea API access. "
"Re-authorize this OAuth app in Gitea and try again."
)
if request.url.path.startswith("/mcp/"):
return _oauth_unauthorized_response(
request,
message,
scope=READ_SCOPE,
)
return JSONResponse(
status_code=401,
content={
"error": "Authentication failed",
"message": message,
"request_id": getattr(request.state, "request_id", "-"),
},
)
else:
probe_result = "pass"
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
except httpx.RequestError:
probe_result = "skip:error"
logger.debug("oauth_api_scope_probe_network_error")
logger.info(
"oauth_auth_summary",
@@ -482,7 +494,7 @@ async def health() -> dict[str, str]:
@app.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource_metadata() -> JSONResponse:
async def oauth_protected_resource_metadata(request: Request) -> JSONResponse:
"""OAuth 2.0 Protected Resource Metadata (RFC 9728).
Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT)
@@ -491,11 +503,15 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
"""
settings = get_settings()
gitea_base = settings.gitea_base_url
base_url = settings.public_base or str(request.base_url).rstrip("/")
authorization_servers = [base_url]
if gitea_base not in authorization_servers:
authorization_servers.append(gitea_base)
return JSONResponse(
content={
"resource": gitea_base,
"authorization_servers": [gitea_base],
"authorization_servers": authorization_servers,
"bearer_methods_supported": ["header"],
"scopes_supported": [READ_SCOPE, WRITE_SCOPE],
"resource_documentation": str(settings.oauth_resource_documentation),
@@ -503,6 +519,67 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
)
@app.get("/oauth/authorize")
async def oauth_authorize_proxy(request: Request) -> RedirectResponse:
"""Proxy OAuth authorization to Gitea, replacing redirect_uri with our own callback.
Clients (ChatGPT, Claude, etc.) send their own redirect_uri which Gitea doesn't know
about. This endpoint intercepts the request, encodes the original redirect_uri and
state into a new state parameter, and forwards the request to Gitea using the MCP
server's own callback URI — the only URI that needs to be registered in Gitea.
"""
settings = get_settings()
base_url = settings.public_base or str(request.base_url).rstrip("/")
params = dict(request.query_params)
client_redirect_uri = params.pop("redirect_uri", "")
original_state = params.get("state", "")
# Encode the client's redirect_uri + original state into a tamper-evident wrapper.
# We simply base64-encode a JSON blob; Gitea will echo it back on the callback.
proxy_state_data = {"redirect_uri": client_redirect_uri, "state": original_state}
proxy_state = base64.urlsafe_b64encode(json.dumps(proxy_state_data).encode()).decode()
params["state"] = proxy_state
params["redirect_uri"] = f"{base_url}/oauth/callback"
gitea_authorize_url = f"{settings.gitea_base_url}/login/oauth/authorize"
redirect_url = f"{gitea_authorize_url}?{urllib.parse.urlencode(params)}"
return RedirectResponse(url=redirect_url, status_code=302)
@app.get("/oauth/callback")
async def oauth_callback_proxy(request: Request) -> RedirectResponse:
"""Handle Gitea's OAuth callback and redirect to the original client redirect_uri."""
proxy_state = request.query_params.get("state", "")
code = request.query_params.get("code", "")
error = request.query_params.get("error", "")
error_description = request.query_params.get("error_description", "")
try:
state_data = json.loads(base64.urlsafe_b64decode(proxy_state.encode()))
client_redirect_uri = state_data["redirect_uri"]
original_state = state_data["state"]
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid or missing state parameter") from exc
if not client_redirect_uri:
raise HTTPException(status_code=400, detail="No client redirect_uri in state")
result_params: dict[str, str] = {}
if error:
result_params["error"] = error
if error_description:
result_params["error_description"] = error_description
else:
result_params["code"] = code
if original_state:
result_params["state"] = original_state
redirect_url = f"{client_redirect_uri}?{urllib.parse.urlencode(result_params)}"
return RedirectResponse(url=redirect_url, status_code=302)
@app.get("/.well-known/oauth-authorization-server")
async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
"""OAuth 2.0 Authorization Server Metadata (RFC 8414).
@@ -518,7 +595,7 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
return JSONResponse(
content={
"issuer": gitea_base,
"authorization_endpoint": f"{gitea_base}/login/oauth/authorize",
"authorization_endpoint": f"{base_url}/oauth/authorize",
"token_endpoint": f"{base_url}/oauth/token",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
@@ -529,6 +606,38 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
)
@app.get("/.well-known/openid-configuration")
async def openid_configuration(request: Request) -> JSONResponse:
"""OpenID Provider metadata compatible with OAuth proxy token exchange."""
settings = get_settings()
base_url = settings.public_base or str(request.base_url).rstrip("/")
gitea_base = settings.gitea_base_url
return JSONResponse(
content={
"issuer": gitea_base,
"authorization_endpoint": f"{base_url}/oauth/authorize",
"token_endpoint": f"{base_url}/oauth/token",
"userinfo_endpoint": f"{gitea_base}/login/oauth/userinfo",
"jwks_uri": f"{gitea_base}/login/oauth/keys",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
"scopes_supported": [
READ_SCOPE,
WRITE_SCOPE,
"openid",
"profile",
"email",
"groups",
],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
}
)
@app.post("/oauth/token")
async def oauth_token_proxy(request: Request) -> JSONResponse:
"""Proxy OAuth2 token exchange to Gitea.
@@ -544,27 +653,43 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid request body") from exc
grant_type = form_data.get("grant_type", "authorization_code")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri", "")
refresh_token = form_data.get("refresh_token")
code_verifier = form_data.get("code_verifier", "")
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code")
# Gitea validates that redirect_uri in the token exchange matches the one used during
# authorization. Because our /oauth/authorize proxy always forwards our own callback
# URI to Gitea, we must use the same URI here — not the client's original redirect_uri.
base_url = settings.public_base or str(request.base_url).rstrip("/")
gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token"
payload = {
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
}
if code_verifier:
payload["code_verifier"] = code_verifier
if grant_type == "refresh_token":
if not refresh_token:
raise HTTPException(status_code=400, detail="Missing refresh_token")
payload: dict[str, str] = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
else:
if not code:
raise HTTPException(status_code=400, detail="Missing authorization code")
payload = {
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": f"{base_url}/oauth/callback",
}
if code_verifier:
payload["code_verifier"] = code_verifier
try:
async with httpx.AsyncClient(timeout=30) as client:

View File

@@ -90,7 +90,10 @@ def test_oauth_protected_resource_metadata(client: TestClient) -> None:
assert response.status_code == 200
data = response.json()
assert data["resource"] == "https://gitea.example.com"
assert data["authorization_servers"] == ["https://gitea.example.com"]
assert data["authorization_servers"] == [
"http://testserver",
"https://gitea.example.com",
]
assert data["bearer_methods_supported"] == ["header"]
assert data["scopes_supported"] == ["read:repository", "write:repository"]
assert "resource_documentation" in data
@@ -106,6 +109,20 @@ def test_oauth_authorization_server_metadata(client: TestClient) -> None:
assert payload["scopes_supported"] == ["read:repository", "write:repository"]
def test_openid_configuration_metadata(client: TestClient) -> None:
"""OpenID metadata is exposed for clients expecting OIDC discovery."""
response = client.get("/.well-known/openid-configuration")
assert response.status_code == 200
payload = response.json()
assert payload["issuer"] == "https://gitea.example.com"
assert payload["authorization_endpoint"].endswith("/login/oauth/authorize")
assert payload["token_endpoint"].endswith("/oauth/token")
assert payload["userinfo_endpoint"].endswith("/login/oauth/userinfo")
assert payload["jwks_uri"].endswith("/login/oauth/keys")
assert "read:repository" in payload["scopes_supported"]
assert "write:repository" in payload["scopes_supported"]
def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> None:
"""Public base URL is used for externally advertised OAuth metadata links."""
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
@@ -124,6 +141,19 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
payload = metadata_response.json()
assert payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
openid_response = client.get("/.well-known/openid-configuration")
assert openid_response.status_code == 200
openid_payload = openid_response.json()
assert openid_payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
protected_response = client.get("/.well-known/oauth-protected-resource")
assert protected_response.status_code == 200
protected_payload = protected_response.json()
assert protected_payload["authorization_servers"] == [
"https://mcp.example.com",
"https://gitea.example.com",
]
challenge_response = client.post(
"/mcp/tool/call",
json={"tool": "list_repositories", "arguments": {}},