Add PUBLIC_BASE_URL and refine OAuth scopes
Some checks failed
docker / lint (push) Has been cancelled
docker / test (push) Has been cancelled
docker / docker-build (push) Has been cancelled
lint / lint (push) Has been cancelled
test / test (push) Has been cancelled

This commit is contained in:
2026-02-25 20:49:08 +01:00
parent 59e1ea53a8
commit c79cc1ab9e
9 changed files with 541 additions and 11 deletions

View File

@@ -17,7 +17,11 @@ from pydantic import BaseModel, Field, ValidationError
from aegis_gitea_mcp.audit import get_audit_logger
from aegis_gitea_mcp.automation import AutomationError, AutomationManager
from aegis_gitea_mcp.config import get_settings
from aegis_gitea_mcp.gitea_client import GiteaClient
from aegis_gitea_mcp.gitea_client import (
GiteaAuthenticationError,
GiteaAuthorizationError,
GiteaClient,
)
from aegis_gitea_mcp.logging_utils import configure_logging
from aegis_gitea_mcp.mcp_protocol import (
AVAILABLE_TOOLS,
@@ -74,6 +78,36 @@ logger = logging.getLogger(__name__)
READ_SCOPE = "read:repository"
WRITE_SCOPE = "write:repository"
# Cache of tokens verified to have Gitea API scope.
# Key: hash of token prefix, Value: monotonic expiry time.
_api_scope_cache: dict[str, float] = {}
_API_SCOPE_CACHE_TTL = 60 # seconds
_REAUTH_GUIDANCE = (
"Your OAuth token lacks Gitea API scopes (e.g. read:repository). "
"Revoke the authorization in Gitea (Settings > Applications > Authorized OAuth2 Applications) "
"and in ChatGPT (Settings > Connected apps), then re-authorize."
)
def _has_required_scope(required_scope: str, granted_scopes: set[str]) -> bool:
"""Return whether granted scopes satisfy the required MCP tool scope."""
normalized = {scope.strip().lower() for scope in granted_scopes if scope and scope.strip()}
expanded = set(normalized)
# Compatibility: broad repository scopes imply both read and write repository access.
if "repository" in normalized or "repo" in normalized:
expanded.update({READ_SCOPE, WRITE_SCOPE})
if "write:repo" in normalized:
expanded.add(WRITE_SCOPE)
if "read:repo" in normalized:
expanded.add(READ_SCOPE)
if WRITE_SCOPE in expanded:
expanded.add(READ_SCOPE)
return required_scope in expanded
app = FastAPI(
title="AegisGitea MCP Server",
description="Security-first MCP server for controlled AI access to self-hosted Gitea",
@@ -131,7 +165,9 @@ TOOL_HANDLERS: dict[str, ToolHandler] = {
def _oauth_metadata_url(request: Request) -> str:
"""Build absolute metadata URL for OAuth challenge responses."""
return f"{str(request.base_url).rstrip('/')}/.well-known/oauth-protected-resource"
settings = get_settings()
base_url = settings.public_base or str(request.base_url).rstrip("/")
return f"{base_url}/.well-known/oauth-protected-resource"
def _oauth_unauthorized_response(
@@ -276,8 +312,82 @@ async def authenticate_and_rate_limit(
if user_data:
set_gitea_user_token(access_token)
set_gitea_user_login(str(user_data.get("login", "unknown")))
set_gitea_user_scopes(user_data.get("scopes", []))
login = str(user_data.get("login", "unknown"))
set_gitea_user_login(login)
observed_scopes: list[str] = list(user_data.get("scopes", []))
# Gitea's OIDC tokens only carry standard scopes (openid, profile, email),
# not granular Gitea scopes like read:repository. When a token is
# successfully validated the user has already authorized this OAuth app,
# so we grant read:repository implicitly (and write:repository when
# write_mode is enabled). The Gitea API itself still enforces per-repo
# permissions on every call made with the user's token.
effective_scopes: set[str] = set(observed_scopes)
effective_scopes.add(READ_SCOPE)
if settings.write_mode:
effective_scopes.add(WRITE_SCOPE)
set_gitea_user_scopes(effective_scopes)
# Probe: verify the token actually works for Gitea's REST API.
# Try both "token" and "Bearer" header formats since Gitea may
# accept OAuth tokens differently depending on version/config.
import hashlib
import time as _time
token_hash = hashlib.sha256(access_token.encode()).hexdigest()[:16]
now = _time.monotonic()
probe_result = "skip:cached"
token_type = "jwt" if access_token.count(".") == 2 else "opaque"
if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]:
try:
probe_status = None
async with httpx.AsyncClient(
timeout=settings.request_timeout_seconds
) as probe_client:
# Try "token" format first (Gitea PAT style)
probe_resp = await probe_client.get(
f"{settings.gitea_base_url}/api/v1/user",
headers={"Authorization": f"token {access_token}"},
)
probe_status = probe_resp.status_code
# If "token" format fails, try "Bearer" (OAuth2 standard)
if probe_status in (401, 403):
probe_resp = await probe_client.get(
f"{settings.gitea_base_url}/api/v1/user",
headers={"Authorization": f"Bearer {access_token}"},
)
probe_status = probe_resp.status_code
if probe_status in (401, 403):
probe_result = f"fail:{probe_status}"
logger.warning(
"oauth_token_lacks_api_scope",
extra={
"status": probe_status,
"login": login,
"token_type": token_type,
"scopes_observed": observed_scopes,
},
)
else:
probe_result = "pass"
_api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL
except httpx.RequestError:
probe_result = "skip:error"
logger.debug("oauth_api_scope_probe_network_error")
logger.info(
"oauth_auth_summary",
extra={
"token_type": token_type,
"scopes_observed": observed_scopes,
"scopes_effective": sorted(effective_scopes),
"api_probe": probe_result,
"login": login,
},
)
return await call_next(request)
@@ -384,7 +494,7 @@ async def oauth_authorization_server_metadata(request: Request) -> JSONResponse:
from this server without needing to know the Gitea URL upfront.
"""
settings = get_settings()
base_url = str(request.base_url).rstrip("/")
base_url = settings.public_base or str(request.base_url).rstrip("/")
gitea_base = settings.gitea_base_url
return JSONResponse(
@@ -418,6 +528,7 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri", "")
code_verifier = form_data.get("code_verifier", "")
# ChatGPT sends the client_id and client_secret (that were configured in the GPT Action
# settings) in the POST body. Use those directly; fall back to env vars if not provided.
client_id = form_data.get("client_id") or settings.gitea_oauth_client_id
@@ -434,6 +545,8 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
}
if code_verifier:
payload["code_verifier"] = code_verifier
try:
async with httpx.AsyncClient(timeout=30) as client:
@@ -447,12 +560,25 @@ async def oauth_token_proxy(request: Request) -> JSONResponse:
raise HTTPException(status_code=502, detail="Failed to reach Gitea token endpoint") from exc
if response.status_code != 200:
logger.error(
"oauth_token_exchange_failed",
extra={"status": response.status_code, "body": response.text[:500]},
)
raise HTTPException(
status_code=response.status_code,
detail="Token exchange failed with Gitea",
)
return JSONResponse(content=response.json())
token_data = response.json()
logger.info(
"oauth_token_exchange_ok",
extra={
"token_type": token_data.get("token_type"),
"scope": token_data.get("scope", "<not returned>"),
"expires_in": token_data.get("expires_in"),
},
)
return JSONResponse(content=token_data)
@app.get("/metrics")
@@ -519,7 +645,7 @@ async def _execute_tool_call(
required_scope = WRITE_SCOPE if tool_def.write_operation else READ_SCOPE
granted_scopes = set(get_gitea_user_scopes())
if required_scope not in granted_scopes:
if not _has_required_scope(required_scope, granted_scopes):
audit.log_access_denied(
tool_name=tool_name,
reason=f"insufficient_scope:{required_scope}",
@@ -623,6 +749,40 @@ async def call_tool(request: MCPToolCallRequest) -> JSONResponse:
)
raise HTTPException(status_code=400, detail=error_message) from exc
except GiteaAuthorizationError as exc:
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error="gitea_authorization_error",
)
logger.warning("gitea_authorization_error: %s", exc)
return JSONResponse(
status_code=403,
content=MCPToolCallResponse(
success=False,
error=_REAUTH_GUIDANCE,
correlation_id=correlation_id,
).model_dump(),
)
except GiteaAuthenticationError as exc:
audit.log_tool_invocation(
tool_name=request.tool,
correlation_id=correlation_id,
result_status="error",
error="gitea_authentication_error",
)
logger.warning("gitea_authentication_error: %s", exc)
return JSONResponse(
status_code=401,
content=MCPToolCallResponse(
success=False,
error="Gitea rejected the token. Please re-authenticate.",
correlation_id=correlation_id,
).model_dump(),
)
except Exception:
# Security decision: do not leak stack traces or raw exception messages.
error_message = "Internal server error"
@@ -747,6 +907,39 @@ async def sse_message_handler(request: Request) -> JSONResponse:
"error": {"code": -32000, "message": str(exc.detail)},
}
)
except GiteaAuthorizationError as exc:
audit.log_tool_invocation(
tool_name=str(tool_name),
correlation_id=correlation_id,
result_status="error",
error="gitea_authorization_error",
)
logger.warning("gitea_authorization_error: %s", exc)
return JSONResponse(
content={
"jsonrpc": "2.0",
"id": message_id,
"error": {"code": -32000, "message": _REAUTH_GUIDANCE},
}
)
except GiteaAuthenticationError as exc:
audit.log_tool_invocation(
tool_name=str(tool_name),
correlation_id=correlation_id,
result_status="error",
error="gitea_authentication_error",
)
logger.warning("gitea_authentication_error: %s", exc)
return JSONResponse(
content={
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32000,
"message": "Gitea rejected the token. Please re-authenticate.",
},
}
)
except Exception as exc:
audit.log_tool_invocation(
tool_name=str(tool_name),