|
|
|
|
@@ -3,15 +3,17 @@
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import base64
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
import urllib.parse
|
|
|
|
|
import uuid
|
|
|
|
|
from collections.abc import AsyncGenerator, Awaitable, Callable
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
import httpx
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Request, Response
|
|
|
|
|
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
|
|
|
|
|
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse, StreamingResponse
|
|
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
|
|
|
|
|
|
|
|
from aegis_gitea_mcp.audit import get_audit_logger
|
|
|
|
|
@@ -245,6 +247,7 @@ async def authenticate_and_rate_limit(
|
|
|
|
|
"/oauth/token",
|
|
|
|
|
"/.well-known/oauth-protected-resource",
|
|
|
|
|
"/.well-known/oauth-authorization-server",
|
|
|
|
|
"/.well-known/openid-configuration",
|
|
|
|
|
}:
|
|
|
|
|
return await call_next(request)
|
|
|
|
|
|
|
|
|
|
@@ -329,7 +332,7 @@ async def authenticate_and_rate_limit(
|
|
|
|
|
effective_scopes.add(WRITE_SCOPE)
|
|
|
|
|
set_gitea_user_scopes(effective_scopes)
|
|
|
|
|
|
|
|
|
|
# Probe: verify the token actually works for Gitea's REST API.
|
|
|
|
|
# Probe: verify the token actually works for Gitea's repository API.
|
|
|
|
|
# Try both "token" and "Bearer" header formats since Gitea may
|
|
|
|
|
# accept OAuth tokens differently depending on version/config.
|
|
|
|
|
import hashlib
|
|
|
|
|
@@ -341,60 +344,69 @@ async def authenticate_and_rate_limit(
|
|
|
|
|
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
|
|
|
|
|
|
|
|
|
|
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
|
|
|
|
|
try:
|
|
|
|
|
probe_status = None
|
|
|
|
|
async with httpx.AsyncClient(
|
|
|
|
|
timeout=settings.request_timeout_seconds
|
|
|
|
|
) as probe_client:
|
|
|
|
|
# Try "token" format first (Gitea PAT style)
|
|
|
|
|
probe_resp = await probe_client.get(
|
|
|
|
|
f"{settings.gitea_base_url}/api/v1/user",
|
|
|
|
|
headers={"Authorization": f"token {access_token}"},
|
|
|
|
|
)
|
|
|
|
|
probe_status = probe_resp.status_code
|
|
|
|
|
# If "token" format fails, try "Bearer" (OAuth2 standard)
|
|
|
|
|
if probe_status in (401, 403):
|
|
|
|
|
# JWT tokens (OIDC) are already cryptographically validated via JWKS above.
|
|
|
|
|
# Gitea's OIDC access_tokens cannot access the REST API without additional
|
|
|
|
|
# Gitea-specific scope configuration, so we skip the probe for them and
|
|
|
|
|
# rely on per-call API errors for actual permission enforcement.
|
|
|
|
|
if token_type == "jwt":
|
|
|
|
|
probe_result = "skip:jwt"
|
|
|
|
|
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
probe_status = None
|
|
|
|
|
async with httpx.AsyncClient(
|
|
|
|
|
timeout=settings.request_timeout_seconds
|
|
|
|
|
) as probe_client:
|
|
|
|
|
probe_url = f"{settings.gitea_base_url}/api/v1/user"
|
|
|
|
|
# Try "token" format first (Gitea PAT style)
|
|
|
|
|
probe_resp = await probe_client.get(
|
|
|
|
|
f"{settings.gitea_base_url}/api/v1/user",
|
|
|
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
|
|
|
probe_url,
|
|
|
|
|
headers={"Authorization": f"token {access_token}"},
|
|
|
|
|
)
|
|
|
|
|
probe_status = probe_resp.status_code
|
|
|
|
|
# If "token" format fails, try "Bearer" (OAuth2 standard)
|
|
|
|
|
if probe_status in (401, 403):
|
|
|
|
|
probe_resp = await probe_client.get(
|
|
|
|
|
probe_url,
|
|
|
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
|
|
|
)
|
|
|
|
|
probe_status = probe_resp.status_code
|
|
|
|
|
|
|
|
|
|
if probe_status in (401, 403):
|
|
|
|
|
probe_result = f"fail:{probe_status}"
|
|
|
|
|
logger.warning(
|
|
|
|
|
"oauth_token_lacks_api_scope",
|
|
|
|
|
extra={
|
|
|
|
|
"status": probe_status,
|
|
|
|
|
"login": login,
|
|
|
|
|
"token_type": token_type,
|
|
|
|
|
"scopes_observed": observed_scopes,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
message = (
|
|
|
|
|
"OAuth token is valid but lacks required Gitea API access. "
|
|
|
|
|
"Re-authorize this OAuth app in Gitea and try again."
|
|
|
|
|
)
|
|
|
|
|
if request.url.path.startswith("/mcp/"):
|
|
|
|
|
return _oauth_unauthorized_response(
|
|
|
|
|
request,
|
|
|
|
|
message,
|
|
|
|
|
scope=READ_SCOPE,
|
|
|
|
|
if probe_status in (401, 403):
|
|
|
|
|
probe_result = f"fail:{probe_status}"
|
|
|
|
|
logger.warning(
|
|
|
|
|
"oauth_token_lacks_api_scope",
|
|
|
|
|
extra={
|
|
|
|
|
"status": probe_status,
|
|
|
|
|
"login": login,
|
|
|
|
|
"token_type": token_type,
|
|
|
|
|
"scopes_observed": observed_scopes,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
status_code=401,
|
|
|
|
|
content={
|
|
|
|
|
"error": "Authentication failed",
|
|
|
|
|
"message": message,
|
|
|
|
|
"request_id": getattr(request.state, "request_id", "-"),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
probe_result = "pass"
|
|
|
|
|
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
|
|
|
|
except httpx.RequestError:
|
|
|
|
|
probe_result = "skip:error"
|
|
|
|
|
logger.debug("oauth_api_scope_probe_network_error")
|
|
|
|
|
message = (
|
|
|
|
|
"OAuth token is valid but lacks required Gitea API access. "
|
|
|
|
|
"Re-authorize this OAuth app in Gitea and try again."
|
|
|
|
|
)
|
|
|
|
|
if request.url.path.startswith("/mcp/"):
|
|
|
|
|
return _oauth_unauthorized_response(
|
|
|
|
|
request,
|
|
|
|
|
message,
|
|
|
|
|
scope=READ_SCOPE,
|
|
|
|
|
)
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
status_code=401,
|
|
|
|
|
content={
|
|
|
|
|
"error": "Authentication failed",
|
|
|
|
|
"message": message,
|
|
|
|
|
"request_id": getattr(request.state, "request_id", "-"),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
probe_result = "pass"
|
|
|
|
|
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
|
|
|
|
except httpx.RequestError:
|
|
|
|
|
probe_result = "skip:error"
|
|
|
|
|
logger.debug("oauth_api_scope_probe_network_error")
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"oauth_auth_summary",
|
|
|
|
|
@@ -482,7 +494,7 @@ async def health() -> dict[str, str]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/.well-known/oauth-protected-resource")
|
|
|
|
|
async def oauth_protected_resource_metadata() -> JSONResponse:
|
|
|
|
|
async def oauth_protected_resource_metadata(request: Request) -> JSONResponse:
|
|
|
|
|
"""OAuth 2.0 Protected Resource Metadata (RFC 9728).
|
|
|
|
|
|
|
|
|
|
Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT)
|
|
|
|
|
@@ -491,11 +503,15 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
|
|
|
|
"""
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
gitea_base = settings.gitea_base_url
|
|
|
|
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
|
|
|
|
authorization_servers = [base_url]
|
|
|
|
|
if gitea_base not in authorization_servers:
|
|
|
|
|
authorization_servers.append(gitea_base)
|
|
|
|
|
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
content={
|
|
|
|
|
"resource": gitea_base,
|
|
|
|
|
"authorization_servers": [gitea_base],
|
|
|
|
|
"authorization_servers": authorization_servers,
|
|
|
|
|
"bearer_methods_supported": ["header"],
|
|
|
|
|
"scopes_supported": [READ_SCOPE, WRITE_SCOPE],
|
|
|
|
|
"resource_documentation": str(settings.oauth_resource_documentation),
|
|
|
|
|
@@ -503,6 +519,67 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/oauth/authorize")
|
|
|
|
|
async def oauth_authorize_proxy(request: Request) -> RedirectResponse:
|
|
|
|
|
"""Proxy OAuth authorization to Gitea, replacing redirect_uri with our own callback.
|
|
|
|
|
|
|
|
|
|
Clients (ChatGPT, Claude, etc.) send their own redirect_uri which Gitea doesn't know
|
|
|
|
|
about. This endpoint intercepts the request, encodes the original redirect_uri and
|
|
|
|
|
state into a new state parameter, and forwards the request to Gitea using the MCP
|
|
|
|
|
server's own callback URI — the only URI that needs to be registered in Gitea.
|
|
|
|
|
"""
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
|
|
|
|
|
|
|
|
|
params = dict(request.query_params)
|
|
|
|
|
client_redirect_uri = params.pop("redirect_uri", "")
|
|
|
|
|
original_state = params.get("state", "")
|
|
|
|
|
|
|
|
|
|
# Encode the client's redirect_uri + original state into a tamper-evident wrapper.
|
|
|
|
|
# We simply base64-encode a JSON blob; Gitea will echo it back on the callback.
|
|
|
|
|
proxy_state_data = {"redirect_uri": client_redirect_uri, "state": original_state}
|
|
|
|
|
proxy_state = base64.urlsafe_b64encode(json.dumps(proxy_state_data).encode()).decode()
|
|
|
|
|
|
|
|
|
|
params["state"] = proxy_state
|
|
|
|
|
params["redirect_uri"] = f"{base_url}/oauth/callback"
|
|
|
|
|
|
|
|
|
|
gitea_authorize_url = f"{settings.gitea_base_url}/login/oauth/authorize"
|
|
|
|
|
redirect_url = f"{gitea_authorize_url}?{urllib.parse.urlencode(params)}"
|
|
|
|
|
return RedirectResponse(url=redirect_url, status_code=302)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/oauth/callback")
|
|
|
|
|
async def oauth_callback_proxy(request: Request) -> RedirectResponse:
|
|
|
|
|
"""Handle Gitea's OAuth callback and redirect to the original client redirect_uri."""
|
|
|
|
|
proxy_state = request.query_params.get("state", "")
|
|
|
|
|
code = request.query_params.get("code", "")
|
|
|
|
|
error = request.query_params.get("error", "")
|
|
|
|
|
error_description = request.query_params.get("error_description", "")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
state_data = json.loads(base64.urlsafe_b64decode(proxy_state.encode()))
|
|
|
|
|
client_redirect_uri = state_data["redirect_uri"]
|
|
|
|
|
original_state = state_data["state"]
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Invalid or missing state parameter") from exc
|
|
|
|
|
|
|
|
|
|
if not client_redirect_uri:
|
|
|
|
|
raise HTTPException(status_code=400, detail="No client redirect_uri in state")
|
|
|
|
|
|
|
|
|
|
result_params: dict[str, str] = {}
|
|
|
|
|
if error:
|
|
|
|
|
result_params["error"] = error
|
|
|
|
|
if error_description:
|
|
|
|
|
result_params["error_description"] = error_description
|
|
|
|
|
else:
|
|
|
|
|
result_params["code"] = code
|
|
|
|
|
if original_state:
|
|
|
|
|
result_params["state"] = original_state
|
|
|
|
|
|
|
|
|
|
redirect_url = f"{client_redirect_uri}?{urllib.parse.urlencode(result_params)}"
|
|
|
|
|
return RedirectResponse(url=redirect_url, status_code=302)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/.well-known/oauth-authorization-server")
|
|
|
|
|
async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
|
|
|
|
"""OAuth 2.0 Authorization Server Metadata (RFC 8414).
|
|
|
|
|
@@ -518,7 +595,7 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
content={
|
|
|
|
|
"issuer": gitea_base,
|
|
|
|
|
"authorization_endpoint": f"{gitea_base}/login/oauth/authorize",
|
|
|
|
|
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
|
|
|
|
"token_endpoint": f"{base_url}/oauth/token",
|
|
|
|
|
"response_types_supported": ["code"],
|
|
|
|
|
"grant_types_supported": ["authorization_code"],
|
|
|
|
|
@@ -529,6 +606,38 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/.well-known/openid-configuration")
|
|
|
|
|
async def openid_configuration(request: Request) -> JSONResponse:
|
|
|
|
|
"""OpenID Provider metadata compatible with OAuth proxy token exchange."""
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
|
|
|
|
gitea_base = settings.gitea_base_url
|
|
|
|
|
|
|
|
|
|
return JSONResponse(
|
|
|
|
|
content={
|
|
|
|
|
"issuer": gitea_base,
|
|
|
|
|
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
|
|
|
|
"token_endpoint": f"{base_url}/oauth/token",
|
|
|
|
|
"userinfo_endpoint": f"{gitea_base}/login/oauth/userinfo",
|
|
|
|
|
"jwks_uri": f"{gitea_base}/login/oauth/keys",
|
|
|
|
|
"response_types_supported": ["code"],
|
|
|
|
|
"grant_types_supported": ["authorization_code"],
|
|
|
|
|
"code_challenge_methods_supported": ["S256"],
|
|
|
|
|
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
|
|
|
|
|
"scopes_supported": [
|
|
|
|
|
READ_SCOPE,
|
|
|
|
|
WRITE_SCOPE,
|
|
|
|
|
"openid",
|
|
|
|
|
"profile",
|
|
|
|
|
"email",
|
|
|
|
|
"groups",
|
|
|
|
|
],
|
|
|
|
|
"subject_types_supported": ["public"],
|
|
|
|
|
"id_token_signing_alg_values_supported": ["RS256"],
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/oauth/token")
|
|
|
|
|
async def oauth_token_proxy(request: Request) -> JSONResponse:
|
|
|
|
|
"""Proxy OAuth2 token exchange to Gitea.
|
|
|
|
|
@@ -544,27 +653,43 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Invalid request body") from exc
|
|
|
|
|
|
|
|
|
|
grant_type = form_data.get("grant_type", "authorization_code")
|
|
|
|
|
code = form_data.get("code")
|
|
|
|
|
redirect_uri = form_data.get("redirect_uri", "")
|
|
|
|
|
refresh_token = form_data.get("refresh_token")
|
|
|
|
|
code_verifier = form_data.get("code_verifier", "")
|
|
|
|
|
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
|
|
|
|
|
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
|
|
|
|
|
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
|
|
|
|
|
client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret
|
|
|
|
|
|
|
|
|
|
if not code:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Missing authorization code")
|
|
|
|
|
# Gitea validates that redirect_uri in the token exchange matches the one used during
|
|
|
|
|
# authorization. Because our /oauth/authorize proxy always forwards our own callback
|
|
|
|
|
# URI to Gitea, we must use the same URI here — not the client's original redirect_uri.
|
|
|
|
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
|
|
|
|
|
|
|
|
|
gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token"
|
|
|
|
|
payload = {
|
|
|
|
|
"client_id": client_id,
|
|
|
|
|
"client_secret": client_secret,
|
|
|
|
|
"code": code,
|
|
|
|
|
"grant_type": "authorization_code",
|
|
|
|
|
"redirect_uri": redirect_uri,
|
|
|
|
|
}
|
|
|
|
|
if code_verifier:
|
|
|
|
|
payload["code_verifier"] = code_verifier
|
|
|
|
|
|
|
|
|
|
if grant_type == "refresh_token":
|
|
|
|
|
if not refresh_token:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Missing refresh_token")
|
|
|
|
|
payload: dict[str, str] = {
|
|
|
|
|
"client_id": client_id,
|
|
|
|
|
"client_secret": client_secret,
|
|
|
|
|
"grant_type": "refresh_token",
|
|
|
|
|
"refresh_token": refresh_token,
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
if not code:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Missing authorization code")
|
|
|
|
|
payload = {
|
|
|
|
|
"client_id": client_id,
|
|
|
|
|
"client_secret": client_secret,
|
|
|
|
|
"code": code,
|
|
|
|
|
"grant_type": "authorization_code",
|
|
|
|
|
"redirect_uri": f"{base_url}/oauth/callback",
|
|
|
|
|
}
|
|
|
|
|
if code_verifier:
|
|
|
|
|
payload["code_verifier"] = code_verifier
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
|
|
|
|