Enhance OAuth metadata endpoints and update authorization server URLs in responses
Some checks failed
test / test (push) Failing after 19s
docker / lint (pull_request) Failing after 21s
lint / lint (pull_request) Failing after 21s
lint / lint (push) Failing after 1m29s
docker / test (pull_request) Failing after 16s
test / test (pull_request) Failing after 18s
docker / docker-test (pull_request) Has been skipped
docker / docker-publish (pull_request) Has been skipped
Some checks failed
test / test (push) Failing after 19s
docker / lint (pull_request) Failing after 21s
lint / lint (pull_request) Failing after 21s
lint / lint (push) Failing after 1m29s
docker / test (pull_request) Failing after 16s
test / test (pull_request) Failing after 18s
docker / docker-test (pull_request) Has been skipped
docker / docker-publish (pull_request) Has been skipped
This commit is contained in:
@@ -57,7 +57,10 @@ Example response:
|
|||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"resource": "https://git.hiddenden.cafe",
|
"resource": "https://git.hiddenden.cafe",
|
||||||
"authorization_servers": ["https://git.hiddenden.cafe"],
|
"authorization_servers": [
|
||||||
|
"https://gitea-mcp.hiddenden.cafe",
|
||||||
|
"https://git.hiddenden.cafe"
|
||||||
|
],
|
||||||
"bearer_methods_supported": ["header"],
|
"bearer_methods_supported": ["header"],
|
||||||
"scopes_supported": ["read:repository", "write:repository"],
|
"scopes_supported": ["read:repository", "write:repository"],
|
||||||
"resource_documentation": "https://hiddenden.cafe/docs/mcp-gitea"
|
"resource_documentation": "https://hiddenden.cafe/docs/mcp-gitea"
|
||||||
|
|||||||
@@ -3,15 +3,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import urllib.parse
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import AsyncGenerator, Awaitable, Callable
|
from collections.abc import AsyncGenerator, Awaitable, Callable
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from fastapi import FastAPI, HTTPException, Request, Response
|
from fastapi import FastAPI, HTTPException, Request, Response
|
||||||
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
|
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse, StreamingResponse
|
||||||
from pydantic import BaseModel, Field, ValidationError
|
from pydantic import BaseModel, Field, ValidationError
|
||||||
|
|
||||||
from aegis_gitea_mcp.audit import get_audit_logger
|
from aegis_gitea_mcp.audit import get_audit_logger
|
||||||
@@ -245,6 +247,7 @@ async def authenticate_and_rate_limit(
|
|||||||
"/oauth/token",
|
"/oauth/token",
|
||||||
"/.well-known/oauth-protected-resource",
|
"/.well-known/oauth-protected-resource",
|
||||||
"/.well-known/oauth-authorization-server",
|
"/.well-known/oauth-authorization-server",
|
||||||
|
"/.well-known/openid-configuration",
|
||||||
}:
|
}:
|
||||||
return await call_next(request)
|
return await call_next(request)
|
||||||
|
|
||||||
@@ -329,7 +332,7 @@ async def authenticate_and_rate_limit(
|
|||||||
effective_scopes.add(WRITE_SCOPE)
|
effective_scopes.add(WRITE_SCOPE)
|
||||||
set_gitea_user_scopes(effective_scopes)
|
set_gitea_user_scopes(effective_scopes)
|
||||||
|
|
||||||
# Probe: verify the token actually works for Gitea's REST API.
|
# Probe: verify the token actually works for Gitea's repository API.
|
||||||
# Try both "token" and "Bearer" header formats since Gitea may
|
# Try both "token" and "Bearer" header formats since Gitea may
|
||||||
# accept OAuth tokens differently depending on version/config.
|
# accept OAuth tokens differently depending on version/config.
|
||||||
import hashlib
|
import hashlib
|
||||||
@@ -341,60 +344,69 @@ async def authenticate_and_rate_limit(
|
|||||||
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
|
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
|
||||||
|
|
||||||
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
|
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
|
||||||
try:
|
# JWT tokens (OIDC) are already cryptographically validated via JWKS above.
|
||||||
probe_status = None
|
# Gitea's OIDC access_tokens cannot access the REST API without additional
|
||||||
async with httpx.AsyncClient(
|
# Gitea-specific scope configuration, so we skip the probe for them and
|
||||||
timeout=settings.request_timeout_seconds
|
# rely on per-call API errors for actual permission enforcement.
|
||||||
) as probe_client:
|
if token_type == "jwt":
|
||||||
# Try "token" format first (Gitea PAT style)
|
probe_result = "skip:jwt"
|
||||||
probe_resp = await probe_client.get(
|
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
||||||
f"{settings.gitea_base_url}/api/v1/user",
|
else:
|
||||||
headers={"Authorization": f"token {access_token}"},
|
try:
|
||||||
)
|
probe_status = None
|
||||||
probe_status = probe_resp.status_code
|
async with httpx.AsyncClient(
|
||||||
# If "token" format fails, try "Bearer" (OAuth2 standard)
|
timeout=settings.request_timeout_seconds
|
||||||
if probe_status in (401, 403):
|
) as probe_client:
|
||||||
|
probe_url = f"{settings.gitea_base_url}/api/v1/user"
|
||||||
|
# Try "token" format first (Gitea PAT style)
|
||||||
probe_resp = await probe_client.get(
|
probe_resp = await probe_client.get(
|
||||||
f"{settings.gitea_base_url}/api/v1/user",
|
probe_url,
|
||||||
headers={"Authorization": f"Bearer {access_token}"},
|
headers={"Authorization": f"token {access_token}"},
|
||||||
)
|
)
|
||||||
probe_status = probe_resp.status_code
|
probe_status = probe_resp.status_code
|
||||||
|
# If "token" format fails, try "Bearer" (OAuth2 standard)
|
||||||
|
if probe_status in (401, 403):
|
||||||
|
probe_resp = await probe_client.get(
|
||||||
|
probe_url,
|
||||||
|
headers={"Authorization": f"Bearer {access_token}"},
|
||||||
|
)
|
||||||
|
probe_status = probe_resp.status_code
|
||||||
|
|
||||||
if probe_status in (401, 403):
|
if probe_status in (401, 403):
|
||||||
probe_result = f"fail:{probe_status}"
|
probe_result = f"fail:{probe_status}"
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"oauth_token_lacks_api_scope",
|
"oauth_token_lacks_api_scope",
|
||||||
extra={
|
extra={
|
||||||
"status": probe_status,
|
"status": probe_status,
|
||||||
"login": login,
|
"login": login,
|
||||||
"token_type": token_type,
|
"token_type": token_type,
|
||||||
"scopes_observed": observed_scopes,
|
"scopes_observed": observed_scopes,
|
||||||
},
|
},
|
||||||
)
|
|
||||||
message = (
|
|
||||||
"OAuth token is valid but lacks required Gitea API access. "
|
|
||||||
"Re-authorize this OAuth app in Gitea and try again."
|
|
||||||
)
|
|
||||||
if request.url.path.startswith("/mcp/"):
|
|
||||||
return _oauth_unauthorized_response(
|
|
||||||
request,
|
|
||||||
message,
|
|
||||||
scope=READ_SCOPE,
|
|
||||||
)
|
)
|
||||||
return JSONResponse(
|
message = (
|
||||||
status_code=401,
|
"OAuth token is valid but lacks required Gitea API access. "
|
||||||
content={
|
"Re-authorize this OAuth app in Gitea and try again."
|
||||||
"error": "Authentication failed",
|
)
|
||||||
"message": message,
|
if request.url.path.startswith("/mcp/"):
|
||||||
"request_id": getattr(request.state, "request_id", "-"),
|
return _oauth_unauthorized_response(
|
||||||
},
|
request,
|
||||||
)
|
message,
|
||||||
else:
|
scope=READ_SCOPE,
|
||||||
probe_result = "pass"
|
)
|
||||||
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
return JSONResponse(
|
||||||
except httpx.RequestError:
|
status_code=401,
|
||||||
probe_result = "skip:error"
|
content={
|
||||||
logger.debug("oauth_api_scope_probe_network_error")
|
"error": "Authentication failed",
|
||||||
|
"message": message,
|
||||||
|
"request_id": getattr(request.state, "request_id", "-"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
probe_result = "pass"
|
||||||
|
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
|
||||||
|
except httpx.RequestError:
|
||||||
|
probe_result = "skip:error"
|
||||||
|
logger.debug("oauth_api_scope_probe_network_error")
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"oauth_auth_summary",
|
"oauth_auth_summary",
|
||||||
@@ -482,7 +494,7 @@ async def health() -> dict[str, str]:
|
|||||||
|
|
||||||
|
|
||||||
@app.get("/.well-known/oauth-protected-resource")
|
@app.get("/.well-known/oauth-protected-resource")
|
||||||
async def oauth_protected_resource_metadata() -> JSONResponse:
|
async def oauth_protected_resource_metadata(request: Request) -> JSONResponse:
|
||||||
"""OAuth 2.0 Protected Resource Metadata (RFC 9728).
|
"""OAuth 2.0 Protected Resource Metadata (RFC 9728).
|
||||||
|
|
||||||
Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT)
|
Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT)
|
||||||
@@ -491,11 +503,15 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
|||||||
"""
|
"""
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
gitea_base = settings.gitea_base_url
|
gitea_base = settings.gitea_base_url
|
||||||
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||||
|
authorization_servers = [base_url]
|
||||||
|
if gitea_base not in authorization_servers:
|
||||||
|
authorization_servers.append(gitea_base)
|
||||||
|
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={
|
content={
|
||||||
"resource": gitea_base,
|
"resource": gitea_base,
|
||||||
"authorization_servers": [gitea_base],
|
"authorization_servers": authorization_servers,
|
||||||
"bearer_methods_supported": ["header"],
|
"bearer_methods_supported": ["header"],
|
||||||
"scopes_supported": [READ_SCOPE, WRITE_SCOPE],
|
"scopes_supported": [READ_SCOPE, WRITE_SCOPE],
|
||||||
"resource_documentation": str(settings.oauth_resource_documentation),
|
"resource_documentation": str(settings.oauth_resource_documentation),
|
||||||
@@ -503,6 +519,67 @@ async def oauth_protected_resource_metadata() -> JSONResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/oauth/authorize")
|
||||||
|
async def oauth_authorize_proxy(request: Request) -> RedirectResponse:
|
||||||
|
"""Proxy OAuth authorization to Gitea, replacing redirect_uri with our own callback.
|
||||||
|
|
||||||
|
Clients (ChatGPT, Claude, etc.) send their own redirect_uri which Gitea doesn't know
|
||||||
|
about. This endpoint intercepts the request, encodes the original redirect_uri and
|
||||||
|
state into a new state parameter, and forwards the request to Gitea using the MCP
|
||||||
|
server's own callback URI — the only URI that needs to be registered in Gitea.
|
||||||
|
"""
|
||||||
|
settings = get_settings()
|
||||||
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||||
|
|
||||||
|
params = dict(request.query_params)
|
||||||
|
client_redirect_uri = params.pop("redirect_uri", "")
|
||||||
|
original_state = params.get("state", "")
|
||||||
|
|
||||||
|
# Encode the client's redirect_uri + original state into a tamper-evident wrapper.
|
||||||
|
# We simply base64-encode a JSON blob; Gitea will echo it back on the callback.
|
||||||
|
proxy_state_data = {"redirect_uri": client_redirect_uri, "state": original_state}
|
||||||
|
proxy_state = base64.urlsafe_b64encode(json.dumps(proxy_state_data).encode()).decode()
|
||||||
|
|
||||||
|
params["state"] = proxy_state
|
||||||
|
params["redirect_uri"] = f"{base_url}/oauth/callback"
|
||||||
|
|
||||||
|
gitea_authorize_url = f"{settings.gitea_base_url}/login/oauth/authorize"
|
||||||
|
redirect_url = f"{gitea_authorize_url}?{urllib.parse.urlencode(params)}"
|
||||||
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/oauth/callback")
|
||||||
|
async def oauth_callback_proxy(request: Request) -> RedirectResponse:
|
||||||
|
"""Handle Gitea's OAuth callback and redirect to the original client redirect_uri."""
|
||||||
|
proxy_state = request.query_params.get("state", "")
|
||||||
|
code = request.query_params.get("code", "")
|
||||||
|
error = request.query_params.get("error", "")
|
||||||
|
error_description = request.query_params.get("error_description", "")
|
||||||
|
|
||||||
|
try:
|
||||||
|
state_data = json.loads(base64.urlsafe_b64decode(proxy_state.encode()))
|
||||||
|
client_redirect_uri = state_data["redirect_uri"]
|
||||||
|
original_state = state_data["state"]
|
||||||
|
except Exception as exc:
|
||||||
|
raise HTTPException(status_code=400, detail="Invalid or missing state parameter") from exc
|
||||||
|
|
||||||
|
if not client_redirect_uri:
|
||||||
|
raise HTTPException(status_code=400, detail="No client redirect_uri in state")
|
||||||
|
|
||||||
|
result_params: dict[str, str] = {}
|
||||||
|
if error:
|
||||||
|
result_params["error"] = error
|
||||||
|
if error_description:
|
||||||
|
result_params["error_description"] = error_description
|
||||||
|
else:
|
||||||
|
result_params["code"] = code
|
||||||
|
if original_state:
|
||||||
|
result_params["state"] = original_state
|
||||||
|
|
||||||
|
redirect_url = f"{client_redirect_uri}?{urllib.parse.urlencode(result_params)}"
|
||||||
|
return RedirectResponse(url=redirect_url, status_code=302)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/.well-known/oauth-authorization-server")
|
@app.get("/.well-known/oauth-authorization-server")
|
||||||
async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
||||||
"""OAuth 2.0 Authorization Server Metadata (RFC 8414).
|
"""OAuth 2.0 Authorization Server Metadata (RFC 8414).
|
||||||
@@ -518,7 +595,7 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
|||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={
|
content={
|
||||||
"issuer": gitea_base,
|
"issuer": gitea_base,
|
||||||
"authorization_endpoint": f"{gitea_base}/login/oauth/authorize",
|
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
||||||
"token_endpoint": f"{base_url}/oauth/token",
|
"token_endpoint": f"{base_url}/oauth/token",
|
||||||
"response_types_supported": ["code"],
|
"response_types_supported": ["code"],
|
||||||
"grant_types_supported": ["authorization_code"],
|
"grant_types_supported": ["authorization_code"],
|
||||||
@@ -529,6 +606,38 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/.well-known/openid-configuration")
|
||||||
|
async def openid_configuration(request: Request) -> JSONResponse:
|
||||||
|
"""OpenID Provider metadata compatible with OAuth proxy token exchange."""
|
||||||
|
settings = get_settings()
|
||||||
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||||
|
gitea_base = settings.gitea_base_url
|
||||||
|
|
||||||
|
return JSONResponse(
|
||||||
|
content={
|
||||||
|
"issuer": gitea_base,
|
||||||
|
"authorization_endpoint": f"{base_url}/oauth/authorize",
|
||||||
|
"token_endpoint": f"{base_url}/oauth/token",
|
||||||
|
"userinfo_endpoint": f"{gitea_base}/login/oauth/userinfo",
|
||||||
|
"jwks_uri": f"{gitea_base}/login/oauth/keys",
|
||||||
|
"response_types_supported": ["code"],
|
||||||
|
"grant_types_supported": ["authorization_code"],
|
||||||
|
"code_challenge_methods_supported": ["S256"],
|
||||||
|
"token_endpoint_auth_methods_supported": ["client_secret_post", "none"],
|
||||||
|
"scopes_supported": [
|
||||||
|
READ_SCOPE,
|
||||||
|
WRITE_SCOPE,
|
||||||
|
"openid",
|
||||||
|
"profile",
|
||||||
|
"email",
|
||||||
|
"groups",
|
||||||
|
],
|
||||||
|
"subject_types_supported": ["public"],
|
||||||
|
"id_token_signing_alg_values_supported": ["RS256"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.post("/oauth/token")
|
@app.post("/oauth/token")
|
||||||
async def oauth_token_proxy(request: Request) -> JSONResponse:
|
async def oauth_token_proxy(request: Request) -> JSONResponse:
|
||||||
"""Proxy OAuth2 token exchange to Gitea.
|
"""Proxy OAuth2 token exchange to Gitea.
|
||||||
@@ -544,27 +653,43 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
raise HTTPException(status_code=400, detail="Invalid request body") from exc
|
raise HTTPException(status_code=400, detail="Invalid request body") from exc
|
||||||
|
|
||||||
|
grant_type = form_data.get("grant_type", "authorization_code")
|
||||||
code = form_data.get("code")
|
code = form_data.get("code")
|
||||||
redirect_uri = form_data.get("redirect_uri", "")
|
refresh_token = form_data.get("refresh_token")
|
||||||
code_verifier = form_data.get("code_verifier", "")
|
code_verifier = form_data.get("code_verifier", "")
|
||||||
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
|
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
|
||||||
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
|
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
|
||||||
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
|
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
|
||||||
client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret
|
client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret
|
||||||
|
|
||||||
if not code:
|
# Gitea validates that redirect_uri in the token exchange matches the one used during
|
||||||
raise HTTPException(status_code=400, detail="Missing authorization code")
|
# authorization. Because our /oauth/authorize proxy always forwards our own callback
|
||||||
|
# URI to Gitea, we must use the same URI here — not the client's original redirect_uri.
|
||||||
|
base_url = settings.public_base or str(request.base_url).rstrip("/")
|
||||||
|
|
||||||
gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token"
|
gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token"
|
||||||
payload = {
|
|
||||||
"client_id": client_id,
|
if grant_type == "refresh_token":
|
||||||
"client_secret": client_secret,
|
if not refresh_token:
|
||||||
"code": code,
|
raise HTTPException(status_code=400, detail="Missing refresh_token")
|
||||||
"grant_type": "authorization_code",
|
payload: dict[str, str] = {
|
||||||
"redirect_uri": redirect_uri,
|
"client_id": client_id,
|
||||||
}
|
"client_secret": client_secret,
|
||||||
if code_verifier:
|
"grant_type": "refresh_token",
|
||||||
payload["code_verifier"] = code_verifier
|
"refresh_token": refresh_token,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
if not code:
|
||||||
|
raise HTTPException(status_code=400, detail="Missing authorization code")
|
||||||
|
payload = {
|
||||||
|
"client_id": client_id,
|
||||||
|
"client_secret": client_secret,
|
||||||
|
"code": code,
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"redirect_uri": f"{base_url}/oauth/callback",
|
||||||
|
}
|
||||||
|
if code_verifier:
|
||||||
|
payload["code_verifier"] = code_verifier
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=30) as client:
|
async with httpx.AsyncClient(timeout=30) as client:
|
||||||
|
|||||||
@@ -90,7 +90,10 @@ def test_oauth_protected_resource_metadata(client: TestClient) -> None:
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = response.json()
|
data = response.json()
|
||||||
assert data["resource"] == "https://gitea.example.com"
|
assert data["resource"] == "https://gitea.example.com"
|
||||||
assert data["authorization_servers"] == ["https://gitea.example.com"]
|
assert data["authorization_servers"] == [
|
||||||
|
"http://testserver",
|
||||||
|
"https://gitea.example.com",
|
||||||
|
]
|
||||||
assert data["bearer_methods_supported"] == ["header"]
|
assert data["bearer_methods_supported"] == ["header"]
|
||||||
assert data["scopes_supported"] == ["read:repository", "write:repository"]
|
assert data["scopes_supported"] == ["read:repository", "write:repository"]
|
||||||
assert "resource_documentation" in data
|
assert "resource_documentation" in data
|
||||||
@@ -106,6 +109,20 @@ def test_oauth_authorization_server_metadata(client: TestClient) -> None:
|
|||||||
assert payload["scopes_supported"] == ["read:repository", "write:repository"]
|
assert payload["scopes_supported"] == ["read:repository", "write:repository"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_openid_configuration_metadata(client: TestClient) -> None:
|
||||||
|
"""OpenID metadata is exposed for clients expecting OIDC discovery."""
|
||||||
|
response = client.get("/.well-known/openid-configuration")
|
||||||
|
assert response.status_code == 200
|
||||||
|
payload = response.json()
|
||||||
|
assert payload["issuer"] == "https://gitea.example.com"
|
||||||
|
assert payload["authorization_endpoint"].endswith("/login/oauth/authorize")
|
||||||
|
assert payload["token_endpoint"].endswith("/oauth/token")
|
||||||
|
assert payload["userinfo_endpoint"].endswith("/login/oauth/userinfo")
|
||||||
|
assert payload["jwks_uri"].endswith("/login/oauth/keys")
|
||||||
|
assert "read:repository" in payload["scopes_supported"]
|
||||||
|
assert "write:repository" in payload["scopes_supported"]
|
||||||
|
|
||||||
|
|
||||||
def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> None:
|
def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
"""Public base URL is used for externally advertised OAuth metadata links."""
|
"""Public base URL is used for externally advertised OAuth metadata links."""
|
||||||
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
monkeypatch.setenv("GITEA_URL", "https://gitea.example.com")
|
||||||
@@ -124,6 +141,19 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) ->
|
|||||||
payload = metadata_response.json()
|
payload = metadata_response.json()
|
||||||
assert payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
|
assert payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
|
||||||
|
|
||||||
|
openid_response = client.get("/.well-known/openid-configuration")
|
||||||
|
assert openid_response.status_code == 200
|
||||||
|
openid_payload = openid_response.json()
|
||||||
|
assert openid_payload["token_endpoint"] == "https://mcp.example.com/oauth/token"
|
||||||
|
|
||||||
|
protected_response = client.get("/.well-known/oauth-protected-resource")
|
||||||
|
assert protected_response.status_code == 200
|
||||||
|
protected_payload = protected_response.json()
|
||||||
|
assert protected_payload["authorization_servers"] == [
|
||||||
|
"https://mcp.example.com",
|
||||||
|
"https://gitea.example.com",
|
||||||
|
]
|
||||||
|
|
||||||
challenge_response = client.post(
|
challenge_response = client.post(
|
||||||
"/mcp/tool/call",
|
"/mcp/tool/call",
|
||||||
json={"tool": "list_repositories", "arguments": {}},
|
json={"tool": "list_repositories", "arguments": {}},
|
||||||
|
|||||||
Reference in New Issue
Block a user