Merge pull request 'fix' (#11) from fix into dev
Some checks failed
docker / lint (push) Failing after 21s
lint / lint (pull_request) Failing after 22s
test / test (pull_request) Failing after 18s
docker / test (push) Failing after 16s
lint / lint (push) Failing after 22s
test / test (push) Failing after 18s
docker / lint (pull_request) Failing after 21s
docker / test (pull_request) Failing after 16s
docker / docker-test (push) Has been skipped
docker / docker-test (pull_request) Has been skipped
docker / docker-publish (push) Has been skipped
docker / docker-publish (pull_request) Has been skipped
Some checks failed
docker / lint (push) Failing after 21s
lint / lint (pull_request) Failing after 22s
test / test (pull_request) Failing after 18s
docker / test (push) Failing after 16s
lint / lint (push) Failing after 22s
test / test (push) Failing after 18s
docker / lint (pull_request) Failing after 21s
docker / test (pull_request) Failing after 16s
docker / docker-test (push) Has been skipped
docker / docker-test (pull_request) Has been skipped
docker / docker-publish (push) Has been skipped
docker / docker-publish (pull_request) Has been skipped
Reviewed-on: #11
This commit was merged in pull request #11.
This commit is contained in:
@@ -57,7 +57,10 @@ Example response:
|
||||
```json
|
||||
{
|
||||
"resource": "https://git.hiddenden.cafe",
|
||||
"authorization_servers": ["https://git.hiddenden.cafe"],
|
||||
"authorization_servers": [
|
||||
"https://gitea-mcp.hiddenden.cafe",
|
||||
"https://git.hiddenden.cafe"
|
||||
],
|
||||
"bearer_methods_supported": ["header"],
|
||||
"scopes_supported": ["read:repository", "write:repository"],
|
||||
"resource_documentation": "https://hiddenden.cafe/docs/mcp-gitea"
|
||||
|
||||
@@ -3,15 +3,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import urllib.parse
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator, Awaitable, Callable
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from fastapi import FastAPI, HTTPException, Request, Response
|
||||
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
|
||||
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse, StreamingResponse
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from aegis_gitea_mcp.audit import get_audit_logger
|
||||
@@ -245,6 +247,7 @@ async def authenticate_and_rate_limit(
|
||||
"/oauth/token",
|
||||
"/.well-known/oauth-protected-resource",
|
||||
"/.well-known/oauth-authorization-server",
|
||||
"/.well-known/openid-configuration",
|
||||
}:
|
||||
return await call_next(request)
|
||||
|
||||
@@ -329,7 +332,7 @@ async def authenticate_and_rate_limit(
|
||||
effective_scopes.add(WRITE_SCOPE)
|
||||
set_gitea_user_scopes(effective_scopes)
|
||||
|
||||
# Probe: verify the token actually works for Gitea's REST API.
|
||||
# Probe: verify the token actually works for Gitea's repository API.
|
||||
# Try both "token" and "Bearer" header formats since Gitea may
|
||||
# accept OAuth tokens differently depending on version/config.
|
||||
import hashlib
|
||||
@@ -341,21 +344,30 @@ async def authenticate_and_rate_limit(
|
||||
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
|
||||
|
||||
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
|
||||
# JWT tokens (OIDC) are already cryptographically validated via JWKS above.
|
||||
# Gitea's OIDC access_tokens cannot access the REST API without additional
|
||||
# Gitea-specific scope configuration, so we skip the probe for them and
|
||||
# rely on per-call API errors for actual permission enforcement.
|
||||
if token_type == "jwt":
|
||||
probe_result = "skip:jwt"
|
||||
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
||||
else:
|
||||
try:
|
||||
probe_status = None
|
||||
async with httpx.AsyncClient(
|
||||
timeout=settings.request_timeout_seconds
|
||||
) as probe_client:
|
||||
probe_url = f"{settings.gitea_base_url}/api/v1/user"
|
||||
# Try "token" format first (Gitea PAT style)
|
||||
probe_resp = await probe_client.get(
|
||||
f"{settings.gitea_base_url}/api/v1/user",
|
||||
probe_url,
|
||||
headers={"Authorization": f"token {access_token}"},
|
||||
)
|
||||
probe_status = probe_resp.status_code
|
||||
# If "token" format fails, try "Bearer" (OAuth2 standard)
|
||||
if probe_status in (401, 403):
|
||||
probe_resp = await probe_client.get(
|
||||
f"{settings.gitea_base_url}/api/v1/user",
|
||||
probe_url,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
)
|
||||
probe_status = probe_resp.status_code
|
||||
@@ -482,7 +494,7 @@ async def health() -> dict[str, str]:
|
||||
|
||||
|
||||
@app.get("/.well-known/oauth-protected-resource")
|
||||
async def oauth_protected_resource_metadata() -> JSONResponse:
|
||||
async def oauth_protected_resource_metadata(request: Request) -> JSONResponse:
|
||||
"""OAuth 2.0 Protected Resource Metadata (RFC 9728).
|
||||
|
||||
Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT)
|
||||
@@ -491,11 +503,15 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
||||
"""
|
||||
settings = get_settings()
|
||||
gitea_base = settings.gitea_base_url
|
||||
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||
authorization_servers = [base_url]
|
||||
if gitea_base not in authorization_servers:
|
||||
authorization_servers.append(gitea_base)
|
||||
|
||||
return JSONResponse(
|
||||
content={
|
||||
"resource": gitea_base,
|
||||
"authorization_servers": [gitea_base],
|
||||
"authorization_servers": authorization_servers,
|
||||
"bearer_methods_supported": ["header"],
|
||||
"scopes_supported": [READ_SCOPE, WRITE_SCOPE],
|
||||
"resource_documentation": str(settings.oauth_resource_documentation),
|
||||
@@ -503,6 +519,67 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
||||
)
|
||||
|
||||
|
||||
@app.get("/oauth/authorize")
|
||||
async def oauth_authorize_proxy(request: Request) -> RedirectResponse:
|
||||
"""Proxy OAuth authorization to Gitea, replacing redirect_uri with our own callback.
|
||||
|
||||
Clients (ChatGPT, Claude, etc.) send their own redirect_uri which Gitea doesn't know
|
||||
about. This endpoint intercepts the request, encodes the original redirect_uri and
|
||||
state into a new state parameter, and forwards the request to Gitea using the MCP
|
||||
server's own callback URI — the only URI that needs to be registered in Gitea.
|
||||
"""
|
||||
settings = get_settings()
|
||||
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||
|
||||
params = dict(request.query_params)
|
||||
client_redirect_uri = params.pop("redirect_uri", "")
|
||||
original_state = params.get("state", "")
|
||||
|
||||
# Encode the client's redirect_uri + original state into a tamper-evident wrapper.
|
||||
# We simply base64-encode a JSON blob; Gitea will echo it back on the callback.
|
||||
proxy_state_data = {"redirect_uri": client_redirect_uri, "state": original_state}
|
||||
proxy_state = base64.urlsafe_b64encode(json.dumps(proxy_state_data).encode()).decode()
|
||||
|
||||
params["state"] = proxy_state
|
||||
params["redirect_uri"] = f"{base_url}/oauth/callback"
|
||||
|
||||
gitea_authorize_url = f"{settings.gitea_base_url}/login/oauth/authorize"
|
||||
redirect_url = f"{gitea_authorize_url}?{urllib.parse.urlencode(params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
|
||||
@app.get("/oauth/callback")
|
||||
async def oauth_callback_proxy(request: Request) -> RedirectResponse:
|
||||
"""Handle Gitea's OAuth callback and redirect to the original client redirect_uri."""
|
||||
proxy_state = request.query_params.get("state", "")
|
||||
code = request.query_params.get("code", "")
|
||||
error = request.query_params.get("error", "")
|
||||
error_description = request.query_params.get("error_description", "")
|
||||
|
||||
try:
|
||||
state_data = json.loads(base64.urlsafe_b64decode(proxy_state.encode()))
|
||||
client_redirect_uri = state_data["redirect_uri"]
|
||||
original_state = state_data["state"]
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=400, detail="Invalid or missing state parameter") from exc
|
||||
|
||||
if not client_redirect_uri:
|
||||
raise HTTPException(status_code=400, detail="No client redirect_uri in state")
|
||||
|
||||
result_params: dict[str, str] = {}
|
||||
if error:
|
||||
result_params["error"] = error
|
||||
if error_description:
|
||||
result_params["error_description"] = error_description
|
||||
else:
|
||||
result_params["code"] = code
|
||||
if original_state:
|
||||
result_params["state"] = original_state
|
||||
|
||||
redirect_url = f"{client_redirect_uri}?{urllib.parse.urlencode(result_params)}"
|
||||
return RedirectResponse(url=redirect_url, status_code=302)
|
||||
|
||||
|
||||
@app.get("/.well-known/oauth-authorization-server")
|
||||
async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
||||
"""OAuth 2.0 Authorization Server Metadata (RFC 8414).
|
||||
@@ -518,7 +595,7 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"issuer": gitea_base,
|
||||
"authorization_endpoint": f"{gitea_base}/login/oauth/authorize",
|
||||
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
||||
"token_endpoint": f"{base_url}/oauth/token",
|
||||
"response_types_supported": ["code"],
|
||||
"grant_types_supported": ["authorization_code"],
|
||||
@@ -529,6 +606,38 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
||||
)
|
||||
|
||||
|
||||
@app.get("/.well-known/openid-configuration")
|
||||
async def openid_configuration(request: Request) -> JSONResponse:
|
||||
"""OpenID Provider metadata compatible with OAuth proxy token exchange."""
|
||||
settings = get_settings()
|
||||
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||
gitea_base = settings.gitea_base_url
|
||||
|
||||
return JSONResponse(
|
||||
content={
|
||||
"issuer": gitea_base,
|
||||
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
||||
"token_endpoint": f"{base_url}/oauth/token",
|
||||
"userinfo_endpoint": f"{gitea_base}/login/oauth/userinfo",
|
||||
"jwks_uri": f"{gitea_base}/login/oauth/keys",
|
||||
"response_types_supported": ["code"],
|
||||
"grant_types_supported": ["authorization_code"],
|
||||
"code_challenge_methods_supported": ["S256"],
|
||||
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
|
||||
"scopes_supported": [
|
||||
READ_SCOPE,
|
||||
WRITE_SCOPE,
|
||||
"openid",
|
||||
"profile",
|
||||
"email",
|
||||
"groups",
|
||||
],
|
||||
"subject_types_supported": ["public"],
|
||||
"id_token_signing_alg_values_supported": ["RS256"],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.post("/oauth/token")
|
||||
async def oauth_token_proxy(request: Request) -> JSONResponse:
|
||||
"""Proxy OAuth2 token exchange to Gitea.
|
||||
@@ -544,24 +653,40 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=400, detail="Invalid request body") from exc
|
||||
|
||||
grant_type = form_data.get("grant_type", "authorization_code")
|
||||
code = form_data.get("code")
|
||||
redirect_uri = form_data.get("redirect_uri", "")
|
||||
refresh_token = form_data.get("refresh_token")
|
||||
code_verifier = form_data.get("code_verifier", "")
|
||||
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
|
||||
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
|
||||
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
|
||||
client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret
|
||||
|
||||
if not code:
|
||||
raise HTTPException(status_code=400, detail="Missing authorization code")
|
||||
# Gitea validates that redirect_uri in the token exchange matches the one used during
|
||||
# authorization. Because our /oauth/authorize proxy always forwards our own callback
|
||||
# URI to Gitea, we must use the same URI here — not the client's original redirect_uri.
|
||||
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||
|
||||
gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token"
|
||||
|
||||
if grant_type == "refresh_token":
|
||||
if not refresh_token:
|
||||
raise HTTPException(status_code=400, detail="Missing refresh_token")
|
||||
payload: dict[str, str] = {
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
else:
|
||||
if not code:
|
||||
raise HTTPException(status_code=400, detail="Missing authorization code")
|
||||
payload = {
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"code": code,
|
||||
"grant_type": "authorization_code",
|
||||
"redirect_uri": redirect_uri,
|
||||
"redirect_uri": f"{base_url}/oauth/callback",
|
||||
}
|
||||
if code_verifier:
|
||||
payload["code_verifier"] = code_verifier
|
||||
|
||||
@@ -90,7 +90,10 @@ def test_oauth_protected_resource_metadata(client: TestClient) -> None:
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["resource"] == "https://gitea.example.com"
|
||||
assert data["authorization_servers"] == ["https://gitea.example.com"]
|
||||
assert data["authorization_servers"] == [
|
||||
"http://testserver",
|
||||
"https://gitea.example.com",
|
||||
]
|
||||
assert data["bearer_methods_supported"] == ["header"]
|
||||
assert data["scopes_supported"] == ["read:repository", "write:repository"]
|
||||
assert "resource_documentation" in data
|
||||
@@ -106,6 +109,20 @@ def test_oauth_authorization_server_metadata(client: TestClient) -> None:
|
||||
assert payload["scopes_supported"] == ["read:repository", "write:repository"]
|
||||
|
||||
|
||||
def test_openid_configuration_metadata(client: TestClient) -> None:
|
||||
"""OpenID metadata is exposed for clients expecting OIDC discovery."""
|
||||
response = client.get("/.well-known/openid-configuration")
|
||||
assert response.status_code == 200
|
||||
payload = response.json()
|
||||
assert payload["issuer"] == "https://gitea.example.com"
|
||||
assert payload["authorization_endpoint"].endswith("/login/oauth/authorize")
|
||||
assert payload["token_endpoint"].endswith("/oauth/token")
|
||||
assert payload["userinfo_endpoint"].endswith("/login/oauth/userinfo")
|
||||
assert payload["jwks_uri"].endswith("/login/oauth/keys")
|
||||
assert "read:repository" in payload["scopes_supported"]
|
||||
assert "write:repository" in payload["scopes_supported"]
|
||||
|
||||
|
||||
def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Public base URL is used for externally advertised OAuth metadata links."""
|
||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||
@@ -124,6 +141,19 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
|
||||
payload = metadata_response.json()
|
||||
assert payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
|
||||
|
||||
openid_response = client.get("/.well-known/openid-configuration")
|
||||
assert openid_response.status_code == 200
|
||||
openid_payload = openid_response.json()
|
||||
assert openid_payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
|
||||
|
||||
protected_response = client.get("/.well-known/oauth-protected-resource")
|
||||
assert protected_response.status_code == 200
|
||||
protected_payload = protected_response.json()
|
||||
assert protected_payload["authorization_servers"] == [
|
||||
"https://mcp.example.com",
|
||||
"https://gitea.example.com",
|
||||
]
|
||||
|
||||
challenge_response = client.post(
|
||||
"/mcp/tool/call",
|
||||
json={"tool": "list_repositories", "arguments": {}},
|
||||
|
||||
Reference in New Issue
Block a user