diff --git a/README.md b/README.md index a7f41a6..2ea7380 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # AegisGitea-MCP -Security-first MCP server for self-hosted Gitea with per-user OAuth2/OIDC authentication. +Security-first MCP server for self-hosted Gitea with per-user OAuth2/OIDC authentication for Claude, Claude Code, and Cowork. -AegisGitea-MCP exposes MCP tools over HTTP/SSE and validates each user token against Gitea so tool access follows each user's actual repository permissions. +AegisGitea-MCP exposes MCP tools over Streamable HTTP and a legacy SSE alias. Each user authenticates with Gitea through OAuth2/OIDC; repository authorization is checked per user before any service PAT call is allowed. ## Securing MCP with Gitea OAuth @@ -10,7 +10,7 @@ AegisGitea-MCP exposes MCP tools over HTTP/SSE and validates each user token aga 1. Open `https://git.hiddenden.cafe/user/settings/applications` (or admin application settings). 2. Create an OAuth2 app. -3. Set redirect URI to the ChatGPT callback URL shown after creating a New App. +3. Set the redirect URI to this MCP server's callback: `https:///oauth/callback`. 4. Save the app and keep: - `Client ID` - `Client Secret` @@ -32,19 +32,39 @@ GITEA_URL=https://git.hiddenden.cafe OAUTH_MODE=true GITEA_OAUTH_CLIENT_ID= GITEA_OAUTH_CLIENT_SECRET= -OAUTH_EXPECTED_AUDIENCE= +PUBLIC_BASE_URL=https:// +OAUTH_STATE_SECRET= ``` -### 3) Configure ChatGPT New App +`GITEA_TOKEN` is optional in OAuth mode. Without it, Gitea REST calls use the user's OAuth access token directly, so Gitea enforces permissions on every API call. With it, the token acts as a service PAT for API execution, but the MCP server first checks the requesting user's permission on the target repository through Gitea and denies the call if the user lacks the required read/write permission. -In ChatGPT New App: +### 3) Configure Claude, Claude Code, or Cowork -- MCP server URL: `https:///mcp/sse` +Claude's hosted, desktop, mobile, Claude Code, and Cowork surfaces share the same remote MCP connector infrastructure. There is no Claude-specific server code path. + +In claude.ai: + +1. Open **Settings > Connectors**. +2. Choose **Add custom connector**. +3. Paste `https:///mcp`. +4. Complete the OAuth consent flow. Dynamic Client Registration (`/register`) handles Claude client registration. + +In Claude Code: + +```bash +claude mcp add --transport http aegis-gitea https:///mcp +``` + +Cowork uses the same connector model and MCP URL. + +Manual OAuth client configuration remains available for clients that do not use DCR: + +- MCP server URL: `https:///mcp` - Authentication: OAuth -- OAuth client ID: Gitea OAuth app client ID -- OAuth client secret: Gitea OAuth app client secret +- OAuth client ID: the client id returned by `/register` or your preconfigured client id +- OAuth client secret: only for confidential clients -After creation, copy the ChatGPT callback URL and add it to the Gitea OAuth app redirect URIs. +Hosted Claude callbacks are allowed by default: `https://claude.ai/api/mcp/auth_callback` and `https://claude.com/api/mcp/auth_callback`. Loopback redirects for Claude Code local development are allowed for `http://127.0.0.1:*` and `http://localhost:*`. ### 4) OAuth-protected MCP behavior @@ -56,7 +76,7 @@ Example response: ```json { - "resource": "https://git.hiddenden.cafe", + "resource": "https://gitea-mcp.hiddenden.cafe", "authorization_servers": [ "https://gitea-mcp.hiddenden.cafe", "https://git.hiddenden.cafe" @@ -76,14 +96,16 @@ WWW-Authenticate: Bearer resource_metadata="https:///.well-known/oauth ## Architecture ```text -ChatGPT App +Claude / Claude Code / Cowork -> Authorization Code Flow -> Gitea OAuth2/OIDC (issuer: https://git.hiddenden.cafe) -> Access token - -> MCP Server (/mcp/sse, /mcp/tool/call) + -> MCP Server (/mcp, /mcp/sse, /mcp/tool/call) -> OIDC discovery + JWKS cache -> Scope enforcement (read:repository / write:repository) - -> Per-request Gitea API calls with Authorization: Bearer + -> Policy allow/deny + -> If GITEA_TOKEN is set: check Gitea collaborator permission for + -> Gitea API call with either the user token or the service PAT after authz ``` ## Example curl @@ -108,7 +130,7 @@ Authenticated tool call: curl -s https:///mcp/tool/call \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ - -d '{"tool":"list_repositories","arguments":{}}' + -d '{"tool":"get_repository_info","arguments":{"owner":"acme","repo":"demo"}}' ``` ## Threat model @@ -120,8 +142,9 @@ curl -s https:///mcp/tool/call \ - URLs leak via logs, proxies, browser history, and referers. - bearer tokens must be sent in `Authorization` headers only. - Per-user OAuth reduces lateral access: - - each call runs as the signed-in user. - - users only see repositories they already have permission for in Gitea. + - identity comes from Gitea OIDC/JWKS or userinfo validation. + - without `GITEA_TOKEN`, API calls use the user's token and Gitea enforces permissions. + - with `GITEA_TOKEN`, every repository-targeted call first checks the user's Gitea permission and fails closed if the check cannot be made. ## CI/CD diff --git a/docs/api-reference.md b/docs/api-reference.md index 2cdacd7..6bebf89 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -12,14 +12,17 @@ - Returns OAuth protected resource metadata used by MCP clients. - `GET /.well-known/oauth-authorization-server` - Returns OAuth authorization server metadata. +- `POST /register` + - Registers an OAuth client and persists the client metadata. - `POST /oauth/token` - Proxies OAuth authorization-code token exchange to Gitea. ## MCP Endpoints - `GET /mcp/tools`: list tool definitions. -- `POST /mcp/tool/call`: execute a tool. -- `GET /mcp/sse` and `POST /mcp/sse`: MCP SSE transport. +- `GET /mcp` and `POST /mcp`: streamable HTTP transport. +- `GET /mcp/sse` and `POST /mcp/sse`: MCP SSE transport alias. +- `POST /mcp/tool/call`: direct tool-call endpoint. Authentication requirements: diff --git a/docs/architecture.md b/docs/architecture.md index f69fdfd..09b05b8 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -2,20 +2,22 @@ ## Overview -AegisGitea MCP is a Python 3.10+ application built on **FastAPI**. It acts as a bridge between an AI client (such as ChatGPT) and a self-hosted Gitea instance, implementing the [Model Context Protocol (MCP)](https://modelcontextprotocol.io). +AegisGitea MCP is a Python 3.10+ application built on **FastAPI**. It acts as a bridge between an AI client (such as Claude, Claude Code, or Cowork) and a self-hosted Gitea instance, implementing the [Model Context Protocol (MCP)](https://modelcontextprotocol.io). ``` -AI Client (ChatGPT) +AI Client (Claude / Claude Code / Cowork) │ │ HTTP (Authorization: Bearer ) ▼ ┌────────────────────────────────────────────┐ │ FastAPI Server │ │ server.py │ +│ - Route: GET/POST /mcp │ │ - Route: POST /mcp/tool/call │ │ - Route: GET /mcp/tools │ │ - Route: GET /health │ -│ - SSE support (GET/POST /mcp/sse) │ +│ - Streamable HTTP transport │ +│ - Legacy SSE alias (GET/POST /mcp/sse) │ └───────┬───────────────────┬────────────────┘ │ │ ┌────▼────┐ ┌────▼──────────────┐ @@ -91,7 +93,7 @@ Key methods: | Method | Gitea endpoint | |---|---| | `get_current_user()` | `GET /api/v1/user` | -| `list_repositories()` | `GET /api/v1/repos/search` | +| `list_repositories()` | `GET /api/v1/user/repos` | | `get_repository()` | `GET /api/v1/repos/{owner}/{repo}` | | `get_file_contents()` | `GET /api/v1/repos/{owner}/{repo}/contents/{path}` | | `get_tree()` | `GET /api/v1/repos/{owner}/{repo}/git/trees/{ref}` | @@ -134,7 +136,7 @@ All handlers return a plain string. `server.py` wraps this in an `MCPToolCallRes │ 2. FastAPI routes the request to the tool-call handler in server.py │ -3. auth.validate_api_key() checks the Authorization header +3. OAuth middleware validates the Bearer token via Gitea OIDC/JWKS or userinfo ├── Fail → AuditLogger.log_access_denied() → HTTP 401 / 429 └── Pass → continue │ @@ -142,27 +144,32 @@ All handlers return a plain string. `server.py` wraps this in an `MCPToolCallRes │ 5. Tool dispatcher looks up the tool by name (mcp_protocol.get_tool_by_name) │ -6. Tool handler function (tools/repository.py) is called +6. Policy engine checks read/write mode and repository/path policy │ -7. GiteaClient makes an async HTTP call to the Gitea API +7. If GITEA_TOKEN is configured, service-PAT authz checks + GET /repos/{owner}/{repo}/collaborators/{user}/permission │ -8. Result (or error) is returned to server.py +8. Tool handler function (tools/repository.py) is called │ -9. AuditLogger.log_tool_invocation(status="success" | "error") +9. GiteaClient makes an async HTTP call to the Gitea API │ -10. MCPToolCallResponse is returned to the client +10. Result (or error) is returned to server.py + │ +11. AuditLogger.log_tool_invocation(status="success" | "error") + │ +12. MCPToolCallResponse is returned to the client ``` --- ## Key Design Decisions -**Read-only by design.** The MCP tools only read data from Gitea. No write operations are implemented. +**Read by default, writes opt-in.** Read tools are available by default. Write-capable tools require `WRITE_MODE=true`, repository write policy/whitelist approval, and `write:repository` authorization. -**Gitea controls access.** The server does not maintain its own repository ACL. The Gitea bot user's permissions are the source of truth. If the bot cannot access a repo, the server cannot either. +**Gitea controls repository access.** Without `GITEA_TOKEN`, Gitea enforces repository permissions on API calls made with the user's token. With `GITEA_TOKEN`, the service PAT can only execute after the server verifies the requesting user's actual repository permission through Gitea and writes an audit denial if the check fails. -**Public tool discovery.** `GET /mcp/tools` requires no authentication so that ChatGPT's plugin system can discover the available tools without credentials. All other endpoints require authentication. +**Public tool discovery.** `GET /mcp/tools` requires no authentication so that MCP clients can discover the available tools without credentials. All other endpoints require authentication. -**Stateless server.** No database or persistent state beyond the audit log file. Rate limit counters are in-memory and reset on restart. +**Minimal persisted state.** The audit log is persisted for tamper-evident review. Dynamic OAuth client registrations are persisted when DCR is enabled. Rate limit counters and short-lived authz caches are in-memory and reset on restart. **Async throughout.** FastAPI + `httpx.AsyncClient` means all Gitea API calls are non-blocking, allowing the server to handle concurrent requests efficiently. diff --git a/docs/configuration.md b/docs/configuration.md index 7300ce1..bc89759 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -14,8 +14,10 @@ cp .env.example .env | `OAUTH_MODE` | No | `false` | Enables OAuth-oriented validation settings | | `GITEA_OAUTH_CLIENT_ID` | Yes when `OAUTH_MODE=true` | - | OAuth client id | | `GITEA_OAUTH_CLIENT_SECRET` | Yes when `OAUTH_MODE=true` | - | OAuth client secret | -| `OAUTH_EXPECTED_AUDIENCE` | No | empty | Expected JWT audience; defaults to client id | +| `OAUTH_EXPECTED_AUDIENCE` | No | empty | Additional accepted JWT audience beyond the MCP resource and Gitea client id | | `OAUTH_CACHE_TTL_SECONDS` | No | `300` | OIDC discovery/JWKS cache TTL | +| `OAUTH_STATE_SECRET` | Yes when `OAUTH_MODE=true` | - | HMAC secret for signed OAuth state wrappers | +| `OAUTH_REDIRECT_ALLOWLIST` | No | empty | Additional allowed redirect URIs for OAuth clients | ## MCP Server Settings @@ -27,6 +29,8 @@ cp .env.example .env | `ALLOW_INSECURE_BIND` | No | `false` | Explicit opt-in required for `0.0.0.0` bind | | `LOG_LEVEL` | No | `INFO` | `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` | | `STARTUP_VALIDATE_GITEA` | No | `true` | Validate OIDC discovery endpoint at startup | +| `DCR_ENABLED` | No | `true` | Enable dynamic client registration at `/register` | +| `DCR_STORAGE_PATH` | No | `/var/lib/aegis-mcp/dcr_clients.json` | Persisted OAuth client registry path | ## Security and Limits @@ -41,6 +45,7 @@ cp .env.example .env | `MAX_TOOL_RESPONSE_CHARS` | No | `20000` | Max chars in text fields | | `REQUEST_TIMEOUT_SECONDS` | No | `30` | Upstream timeout for Gitea calls | | `SECRET_DETECTION_MODE` | No | `mask` | `off`, `mask`, `block` | +| `REPO_AUTHZ_CACHE_TTL_SECONDS` | No | `60` | TTL for cached per-user repository permission checks | ## Write Mode diff --git a/docs/getting-started.md b/docs/getting-started.md index 974b61e..eb60947 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -4,7 +4,7 @@ - Python 3.10 or higher - A running Gitea instance -- A Gitea bot user with access to the repositories you want to expose +- A Gitea OAuth2 application for this MCP server - `make` (optional but recommended) ## 1. Install @@ -31,14 +31,12 @@ pip install -e . # dev: pip install -e ".[dev]" ``` -## 2. Create a Gitea Bot User +## 2. Create a Gitea OAuth2 Application -1. In your Gitea instance, create a dedicated user (e.g. `ai-bot`). -2. Grant that user **read access** to any repositories the AI should be able to see. -3. Generate an API token for the bot user: - - Go to **User Settings** > **Applications** > **Generate Token** - - Give it a descriptive name (e.g. `aegis-mcp-token`) - - Copy the token — you will not be able to view it again. +1. In Gitea, open **User Settings > Applications**. +2. Create an OAuth2 application for AegisGitea-MCP. +3. Set the redirect URI to `https:///oauth/callback`. +4. Copy the client ID and client secret. ## 3. Configure @@ -48,27 +46,31 @@ Copy the example environment file and fill in your values: cp .env.example .env ``` -Minimum required settings in `.env`: +Minimum OAuth settings in `.env`: ```env GITEA_URL=https://gitea.example.com -GITEA_TOKEN= -AUTH_ENABLED=true -MCP_API_KEYS= +OAUTH_MODE=true +GITEA_OAUTH_CLIENT_ID= +GITEA_OAUTH_CLIENT_SECRET= +PUBLIC_BASE_URL=https:// +OAUTH_STATE_SECRET= ``` +`GITEA_TOKEN` is optional. If it is set, use a narrowly scoped service PAT and only grant it repository access you are prepared to expose after per-user authorization checks. If it is not set, Gitea REST calls use the authenticated user's OAuth token directly. + See [Configuration](configuration.md) for the full list of settings. -## 4. Generate an API Key +## 4. Optional Standard API Key Mode -The MCP server requires clients to authenticate with a bearer token. Generate one: +For non-OAuth deployments, configure `GITEA_TOKEN` and `MCP_API_KEYS`. Generate an API key with: ```bash make generate-key # or: python scripts/generate_api_key.py ``` -Copy the printed key into `MCP_API_KEYS` in your `.env` file. +Copy the printed key into `MCP_API_KEYS` in your `.env` file and set `OAUTH_MODE=false`. ## 5. Run @@ -88,24 +90,35 @@ curl http://localhost:8080/health ## 6. Connect an AI Client -### ChatGPT +### Claude -Use this single URL in the ChatGPT MCP connector: +In claude.ai, open **Settings > Connectors > Add custom connector** and paste: ``` -http://:8080/mcp/sse?api_key= +https:///mcp ``` -ChatGPT uses the SSE transport: it opens a persistent GET stream on this URL and sends tool call messages back via POST to the same URL. The `api_key` query parameter is the recommended method because the ChatGPT interface does not support setting custom request headers. +Claude discovers OAuth metadata, registers through `/register`, and uses PKCE S256 automatically. -### Other MCP clients +### Claude Code -Clients that support custom headers can use: +```bash +claude mcp add --transport http aegis-gitea https:///mcp +``` -- **SSE URL:** `http://:8080/mcp/sse` -- **Tool discovery URL:** `http://:8080/mcp/tools` (no auth required) -- **Tool call URL:** `http://:8080/mcp/tool/call` -- **Authentication:** `Authorization: Bearer ` +Claude Code uses the same remote MCP and OAuth metadata. Local development loopback callbacks are allowed by default. + +### Cowork + +Cowork uses the same connector infrastructure and MCP URL as Claude. + +### SSE compatibility + +If your client still expects SSE transport, use: + +- **SSE URL:** `https:///mcp/sse` +- **Tool discovery URL:** `https:///mcp/tools` (no auth required) +- **Tool call URL:** `https:///mcp/tool/call` For a production deployment behind a reverse proxy, see [Deployment](deployment.md). diff --git a/docs/index.md b/docs/index.md index fa14436..0ad7f8b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,7 +4,7 @@ AegisGitea MCP is a security-first [Model Context Protocol (MCP)](https://modelc ## Overview -AegisGitea MCP acts as a secure bridge between AI assistants (such as ChatGPT) and your Gitea instance. It exposes a limited set of read-only tools that allow an AI to browse repositories and read file contents, while enforcing strict authentication, rate limiting, and comprehensive audit logging. +AegisGitea MCP acts as a secure bridge between AI assistants (such as Claude, Claude Code, or Cowork) and your Gitea instance. It exposes read tools and opt-in write tools while enforcing per-user OAuth, repository authorization, policy checks, rate limiting, and tamper-evident audit logging. **Version:** 0.1.0 (Alpha) **License:** MIT diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 45e6a33..ebde992 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -1,15 +1,15 @@ # Troubleshooting -## "Internal server error (-32603)" from ChatGPT +## "Internal server error (-32603)" from Claude -**Symptom:** ChatGPT shows `Internal server error` with JSON-RPC error code `-32603` when trying to use Gitea tools. +**Symptom:** Claude shows `Internal server error` with JSON-RPC error code `-32603` when trying to use Gitea tools. -**Cause:** The OAuth token stored by ChatGPT was issued without Gitea API scopes (e.g. `read:repository`). This happens when the initial authorization request didn't include the correct `scope` parameter. The token passes OIDC validation (openid/profile/email) but gets **403 Forbidden** from Gitea's REST API. +**Cause:** In user-token mode, the OAuth token stored by the client may have been issued without Gitea API scopes (e.g. `read:repository`). In service-PAT mode, the call may fail because the authenticated user does not have the required repository permission or the permission probe cannot be completed. **Fix:** 1. In Gitea: Go to **Settings > Applications > Authorized OAuth2 Applications** and revoke the MCP application. -2. In ChatGPT: Go to **Settings > Connected apps** and disconnect the Gitea integration. -3. Re-authorize: Use the ChatGPT integration again. It will trigger a fresh OAuth flow with the correct scopes (`read:repository`). +2. In Claude: disconnect the MCP server and authenticate again. +3. Re-authorize: Use the MCP connector again. It will trigger a fresh OAuth flow. For repository-targeted calls in service-PAT mode, also verify the signed-in Gitea user has read/write access to the target repository. **Verification:** Check the server logs for `oauth_auth_summary`. A working token shows: ``` @@ -24,19 +24,19 @@ oauth_token_lacks_api_scope: status=403 login=alice **Symptom:** Tool calls return 403 with a message about re-authorizing. -**Cause:** Same root cause as above — the OAuth token doesn't have the required Gitea API scopes. The middleware's API scope probe detected this and returned a clear error instead of letting it fail deep in the tool handler. +**Cause:** The OAuth token does not have the required API scope in user-token mode, or the per-user repository permission check denied the request in service-PAT mode. -**Fix:** Same as above — revoke and re-authorize. +**Fix:** Revoke and re-authorize if the token lacks API scope. If the error mentions repository permission, grant the signed-in Gitea user the required repository access or use a repository they can access. -## ChatGPT caches stale tokens +## Claude caches stale tokens -**Symptom:** After fixing the OAuth configuration, ChatGPT still sends the old token. +**Symptom:** After fixing the OAuth configuration, Claude still sends the old token. -**Cause:** ChatGPT caches access tokens and doesn't automatically re-authenticate when the server configuration changes. +**Cause:** The client caches access tokens and doesn't automatically re-authenticate when the server configuration changes. **Fix:** -1. In ChatGPT: **Settings > Connected apps** > disconnect the integration. -2. Start a new conversation and use the integration again — this forces a fresh OAuth flow. +1. Disconnect the server in the client. +2. Start a new conversation and use the integration again - this forces a fresh OAuth flow. ## How OAuth scopes work with Gitea @@ -48,9 +48,9 @@ Gitea's OAuth2/OIDC implementation uses **granular scopes** for API access: | `write:repository` | Create/edit issues, PRs, comments, files | | `openid` | OIDC identity (login, email) | -When an OAuth application requests authorization, the `scope` parameter in the authorize URL determines what permissions the resulting token has. If only OIDC scopes are requested (e.g. `openid profile email`), the token will validate via the userinfo endpoint but will be rejected by Gitea's REST API with 403. +When an OAuth application requests authorization, the `scope` parameter in the authorize URL determines what permissions the resulting token has. If only OIDC scopes are requested (e.g. `openid profile email`), the token can establish identity but may not be usable for direct Gitea REST calls. When `GITEA_TOKEN` is configured, the server uses OIDC for identity and checks the user's repository permission before using the service PAT. -The MCP server's `openapi-gpt.yaml` file controls which scopes ChatGPT requests. Ensure it includes: +The MCP server's OAuth metadata controls which scopes the client requests. Ensure it includes: ```yaml scopes: read:repository: "Read access to Gitea repositories" @@ -73,3 +73,4 @@ Every authenticated request emits a structured log line: - `api_probe=fail:403` — token lacks API scopes, request rejected with re-auth guidance - `api_probe=skip:cached` — previous probe passed, cached result used - `api_probe=skip:error` — network error during probe, request allowed to proceed +- `repository_permission_denied` in the audit log — the user lacks required read/write permission for a service-PAT call diff --git a/openapi-gpt.yaml b/openapi-gpt.yaml deleted file mode 100644 index 6384cda..0000000 --- a/openapi-gpt.yaml +++ /dev/null @@ -1,110 +0,0 @@ -openapi: "3.1.0" -info: - title: AegisGitea MCP - description: > - AI access to your self-hosted Gitea instance via the AegisGitea MCP server. - Each user authenticates with their own Gitea account via OAuth2. - version: "0.2.0" - -servers: - - url: "https://YOUR_MCP_SERVER_DOMAIN" - description: > - Replace YOUR_MCP_SERVER_DOMAIN with the public hostname of your AegisGitea-MCP instance. - -components: - securitySchemes: - gitea_oauth: - type: oauth2 - flows: - authorizationCode: - # Replace YOUR_GITEA_DOMAIN with your self-hosted Gitea instance hostname. - authorizationUrl: "https://YOUR_GITEA_DOMAIN/login/oauth/authorize" - # The token URL must point to the MCP server's OAuth proxy endpoint. - tokenUrl: "https://YOUR_MCP_SERVER_DOMAIN/oauth/token" - scopes: - read:repository: "Read access to Gitea repositories" - write:repository: "Write access to Gitea repositories" - -security: - - gitea_oauth: - - read:repository - -paths: - /mcp/tools: - get: - operationId: listTools - summary: List available MCP tools - description: Returns all tools available on this MCP server. Public endpoint, no authentication required. - security: [] - responses: - "200": - description: List of available MCP tools - content: - application/json: - schema: - type: object - properties: - tools: - type: array - items: - type: object - properties: - name: - type: string - description: - type: string - - /mcp/tool/call: - post: - operationId: callTool - summary: Execute an MCP tool - description: > - Execute a named MCP tool with the provided arguments. - The authenticated user's Gitea token is used for all Gitea API calls, - so only repositories and data accessible to the user will be returned. - security: - - gitea_oauth: - - read:repository - requestBody: - required: true - content: - application/json: - schema: - type: object - required: - - tool - - arguments - properties: - tool: - type: string - description: Name of the MCP tool to execute - example: list_repositories - arguments: - type: object - description: Tool-specific arguments - example: {} - correlation_id: - type: string - description: Optional correlation ID for request tracing - responses: - "200": - description: Tool execution result - content: - application/json: - schema: - type: object - properties: - success: - type: boolean - result: - type: object - correlation_id: - type: string - "401": - description: Authentication required or token invalid - "403": - description: Policy denied the request - "404": - description: Tool not found - "429": - description: Rate limit exceeded diff --git a/scripts/generate_api_key.py b/scripts/generate_api_key.py index c91c320..0d032a4 100755 --- a/scripts/generate_api_key.py +++ b/scripts/generate_api_key.py @@ -19,7 +19,7 @@ def main() -> None: print() # Get optional description - description = input("Enter description for this key (e.g., 'ChatGPT Business'): ").strip() + description = input("Enter description for this key (e.g., 'Claude Code'): ").strip() if not description: description = "Generated key" @@ -56,15 +56,15 @@ def main() -> None: print() print(" docker-compose restart aegis-mcp") print() - print("3. Configure ChatGPT Business:") + print("3. Configure your MCP client:") print() - print(" - Go to ChatGPT Settings > MCP Servers") + print(" - Add the server in your MCP client settings") print(" - Add custom header:") print(f" Authorization: Bearer {api_key}") print() print("4. Test the connection:") print() - print(" Ask ChatGPT: 'List my Gitea repositories'") + print(" Ask the client: 'List my Gitea repositories'") print() print("-" * 70) print() diff --git a/scripts/rotate_api_key.py b/scripts/rotate_api_key.py index 5bab43d..8167530 100755 --- a/scripts/rotate_api_key.py +++ b/scripts/rotate_api_key.py @@ -92,7 +92,7 @@ def main() -> None: key_list.append(new_key) new_keys_str = ",".join(key_list) print("\n✓ New key will be added (total: {} keys)".format(len(key_list))) - print("\n⚠️ IMPORTANT: Remove old keys manually after updating ChatGPT config") + print("\n⚠️ IMPORTANT: Remove old keys manually after updating the client config") elif choice == "1": # Replace with only new key new_keys_str = new_key @@ -129,19 +129,19 @@ def main() -> None: print() print(" docker-compose restart aegis-mcp") print() - print("2. Update ChatGPT Business configuration:") + print("2. Update your MCP client configuration:") print() - print(" - Go to ChatGPT Settings > MCP Servers") + print(" - Update the MCP server entry in your client") print(" - Update Authorization header:") print(f" Authorization: Bearer {new_key}") print() print("3. Test the connection:") print() - print(" Ask ChatGPT: 'List my Gitea repositories'") + print(" Ask the client: 'List my Gitea repositories'") print() print("4. If using grace period (option 2):") print() - print(" - After confirming ChatGPT works with new key") + print(" - After confirming the client works with the new key") print(" - Manually remove old keys from .env") print(" - Restart server again") print() diff --git a/src/aegis_gitea_mcp/cache.py b/src/aegis_gitea_mcp/cache.py new file mode 100644 index 0000000..53540bc --- /dev/null +++ b/src/aegis_gitea_mcp/cache.py @@ -0,0 +1,71 @@ +"""Bounded, TTL-based in-memory caches with size eviction. + +Provides a small dependency-free cache used by the auth middleware and the +per-user authorization layer. Entries expire after a TTL and the cache is +bounded by a maximum size to prevent unbounded memory growth from untrusted +key cardinality (e.g. one entry per distinct token or per (user, repo) pair). +""" + +from __future__ import annotations + +import time +from collections import OrderedDict +from typing import Generic, TypeVar + +K = TypeVar("K") +V = TypeVar("V") + + +class BoundedTTLCache(Generic[K, V]): + """A size-bounded cache whose entries expire after a fixed TTL. + + Eviction is least-recently-inserted (FIFO) once ``max_size`` is reached. + Expired entries are removed lazily on access and proactively when the + cache is full, so the cache never exceeds ``max_size`` live entries. + """ + + def __init__(self, *, ttl_seconds: float, max_size: int = 1024) -> None: + """Initialize the cache with a TTL and maximum entry count.""" + if ttl_seconds <= 0: + raise ValueError("ttl_seconds must be positive") + if max_size <= 0: + raise ValueError("max_size must be positive") + self._ttl = float(ttl_seconds) + self._max_size = int(max_size) + self._store: OrderedDict[K, tuple[V, float]] = OrderedDict() + + def get(self, key: K) -> V | None: + """Return the cached value for ``key`` or ``None`` if absent/expired.""" + entry = self._store.get(key) + if entry is None: + return None + value, expiry = entry + if time.monotonic() >= expiry: + # Lazily evict expired entry. + self._store.pop(key, None) + return None + return value + + def set(self, key: K, value: V) -> None: + """Store ``value`` under ``key`` with the configured TTL.""" + now = time.monotonic() + # Drop the existing entry so reinsertion refreshes ordering. + self._store.pop(key, None) + self._store[key] = (value, now + self._ttl) + self._evict(now) + + def _evict(self, now: float) -> None: + """Remove expired entries, then enforce the size bound (FIFO).""" + expired = [key for key, (_, expiry) in self._store.items() if now >= expiry] + for key in expired: + self._store.pop(key, None) + while len(self._store) > self._max_size: + self._store.popitem(last=False) + + def clear(self) -> None: + """Remove all entries (primarily for tests).""" + self._store.clear() + + def __len__(self) -> int: + """Return the number of stored (not necessarily live) entries.""" + return len(self._store) diff --git a/src/aegis_gitea_mcp/config.py b/src/aegis_gitea_mcp/config.py index cfee191..9154198 100644 --- a/src/aegis_gitea_mcp/config.py +++ b/src/aegis_gitea_mcp/config.py @@ -106,12 +106,12 @@ class Settings(BaseSettings): description="Secret detection mode: off, mask, or block", ) - # OAuth2 configuration (for ChatGPT per-user Gitea authentication) + # OAuth2 configuration (for per-client Gitea authentication) oauth_mode: bool = Field( default=False, description=( "Enable per-user OAuth2 authentication mode. " - "When true, each ChatGPT user authenticates with their own Gitea account. " + "When true, each client user authenticates with their own Gitea account. " "GITEA_TOKEN and MCP_API_KEYS are not required in this mode." ), ) @@ -126,8 +126,9 @@ class Settings(BaseSettings): oauth_expected_audience: str = Field( default="", description=( - "Expected OIDC audience for access tokens. " - "Defaults to GITEA_OAUTH_CLIENT_ID when unset." + "Additional expected OIDC audience for access tokens. The canonical MCP " + "resource URL and the Gitea OAuth client id are always accepted; set this " + "to require an extra audience value." ), ) oauth_cache_ttl_seconds: int = Field( @@ -139,6 +140,37 @@ class Settings(BaseSettings): default="https://hiddenden.cafe/docs/mcp-gitea", description="Public documentation URL for OAuth-protected MCP resource behavior", ) + oauth_state_secret: str = Field( + default="", + description=( + "Server secret used to HMAC-sign the OAuth proxy state parameter. " + "Required when OAUTH_MODE=true so callback state is tamper-evident." + ), + ) + oauth_redirect_allowlist_raw: str = Field( + default="", + description=( + "Comma-separated additional allowed client redirect URIs for the OAuth " + "callback proxy. Claude's callback URLs and loopback URIs are always allowed." + ), + alias="OAUTH_REDIRECT_ALLOWLIST", + ) + dcr_enabled: bool = Field( + default=True, + description=( + "Enable RFC 7591 Dynamic Client Registration at /register. Claude's " + "connectors register dynamically; disable to require manual client_id/secret." + ), + ) + dcr_storage_path: Path = Field( + default=Path("/var/lib/aegis-mcp/dcr_clients.json"), + description="Path to the JSON file that persists dynamically registered clients", + ) + repo_authz_cache_ttl_seconds: int = Field( + default=60, + description="TTL (seconds) for cached per-user repository permission decisions", + ge=1, + ) # Authentication configuration auth_enabled: bool = Field( @@ -269,12 +301,28 @@ class Settings(BaseSettings): "Set ALLOW_INSECURE_BIND=true to explicitly permit this." ) + extra_redirect_uris: list[str] = [] + if self.oauth_redirect_allowlist_raw.strip(): + extra_redirect_uris = [ + value.strip() + for value in self.oauth_redirect_allowlist_raw.split(",") + if value.strip() + ] + object.__setattr__(self, "_oauth_redirect_allowlist", extra_redirect_uris) + if self.oauth_mode: # In OAuth mode, per-user Gitea tokens are used; no shared bot token or API keys needed. if not self.gitea_oauth_client_id.strip(): raise ValueError("GITEA_OAUTH_CLIENT_ID is required when OAUTH_MODE=true.") if not self.gitea_oauth_client_secret.strip(): raise ValueError("GITEA_OAUTH_CLIENT_SECRET is required when OAUTH_MODE=true.") + # The proxy state parameter carries the client's redirect_uri across the Gitea + # round-trip; it must be HMAC-signed, which requires a server-held secret. + if not self.oauth_state_secret.strip(): + raise ValueError( + "OAUTH_STATE_SECRET is required when OAUTH_MODE=true so the OAuth " + "proxy state parameter can be HMAC-signed and verified." + ) else: # Standard API key mode: require bot token and at least one API key. if not self.gitea_token.strip(): @@ -308,6 +356,11 @@ class Settings(BaseSettings): """Get parsed list of repositories allowed for write-mode operations.""" return list(getattr(self, "_write_repository_whitelist", [])) + @property + def oauth_redirect_allowlist(self) -> list[str]: + """Get parsed list of additional allowed client redirect URIs.""" + return list(getattr(self, "_oauth_redirect_allowlist", [])) + @property def gitea_base_url(self) -> str: """Get Gitea base URL as normalized string.""" diff --git a/src/aegis_gitea_mcp/mcp_protocol.py b/src/aegis_gitea_mcp/mcp_protocol.py index 5fa0b63..e249af6 100644 --- a/src/aegis_gitea_mcp/mcp_protocol.py +++ b/src/aegis_gitea_mcp/mcp_protocol.py @@ -63,7 +63,7 @@ def _tool( AVAILABLE_TOOLS: list[MCPTool] = [ _tool( "list_repositories", - "List repositories visible to the configured bot account.", + "List repositories visible to the authenticated Gitea API token.", {"type": "object", "properties": {}, "required": []}, ), _tool( diff --git a/src/aegis_gitea_mcp/oauth.py b/src/aegis_gitea_mcp/oauth.py index 58b2893..62faba5 100644 --- a/src/aegis_gitea_mcp/oauth.py +++ b/src/aegis_gitea_mcp/oauth.py @@ -177,6 +177,29 @@ class GiteaOAuthValidator: self._jwks_cache[jwks_uri] = (jwks, now + self.settings.oauth_cache_ttl_seconds) return jwks + def _acceptable_audiences(self) -> list[str]: + """Return the set of OIDC audiences this MCP server will accept. + + Per the MCP authorization spec (RFC 8707 / RFC 9728) tokens are bound to + the MCP server's canonical resource URL, so the configured public base is + the primary accepted audience. The upstream Gitea OAuth client id is also + accepted because Gitea — the actual token issuer behind this proxy — + stamps ``aud`` with the client id rather than the MCP resource URL. An + operator may add a further required audience via OAUTH_EXPECTED_AUDIENCE. + """ + audiences: list[str] = [] + canonical_resource = self.settings.public_base + if canonical_resource: + audiences.append(canonical_resource) + gitea_client_id = self.settings.gitea_oauth_client_id.strip() + if gitea_client_id: + audiences.append(gitea_client_id) + configured = self.settings.oauth_expected_audience.strip() + if configured: + audiences.append(configured) + # Preserve order while removing duplicates. + return list(dict.fromkeys(audiences)) + async def _validate_jwt(self, token: str) -> dict[str, Any]: """Validate JWT access token using OIDC discovery and JWKS.""" discovery = await self._get_discovery_document() @@ -216,19 +239,16 @@ class GiteaOAuthValidator: "oauth_jwt_invalid_jwk", ) from exc - expected_audience = ( - self.settings.oauth_expected_audience.strip() - or self.settings.gitea_oauth_client_id.strip() - ) + accepted_audiences = self._acceptable_audiences() - decode_options = cast(Any, {"verify_aud": bool(expected_audience)}) + decode_options = cast(Any, {"verify_aud": bool(accepted_audiences)}) try: claims = jwt.decode( token, key=cast(Any, public_key), algorithms=["RS256"], issuer=issuer, - audience=expected_audience or None, + audience=accepted_audiences or None, options=decode_options, ) except InvalidTokenError as exc: diff --git a/src/aegis_gitea_mcp/oauth_flow.py b/src/aegis_gitea_mcp/oauth_flow.py new file mode 100644 index 0000000..3601a1f --- /dev/null +++ b/src/aegis_gitea_mcp/oauth_flow.py @@ -0,0 +1,380 @@ +"""OAuth proxy helpers for signed state, redirect validation, and DCR storage.""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import secrets +import time +from fnmatch import fnmatchcase +from pathlib import Path +from typing import Any +from urllib.parse import ParseResult, urlparse, urlunparse + +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + +_CLAUDE_CALLBACK_URIS = { + "https://claude.ai/api/mcp/auth_callback", + "https://claude.com/api/mcp/auth_callback", +} +_LOOPBACK_HOSTS = {"localhost", "127.0.0.1", "::1"} +_SUPPORTED_TOKEN_ENDPOINT_AUTH_METHODS = {"none", "client_secret_post"} +_SUPPORTED_GRANT_TYPES = {"authorization_code", "refresh_token"} +_SUPPORTED_RESPONSE_TYPES = {"code"} + + +class OAuthRegistrationRequest(BaseModel): + """Incoming RFC 7591 client registration request.""" + + client_name: str | None = Field(default=None, max_length=200) + redirect_uris: list[str] = Field(..., min_length=1) + grant_types: list[str] = Field(default_factory=lambda: ["authorization_code", "refresh_token"]) + response_types: list[str] = Field(default_factory=lambda: ["code"]) + token_endpoint_auth_method: str = Field(default="none", max_length=64) + scope: str | None = Field(default=None, max_length=512) + + model_config = ConfigDict(extra="forbid") + + @field_validator("redirect_uris") + @classmethod + def validate_redirect_uris(cls, value: list[str]) -> list[str]: + """Normalize and validate redirect URIs.""" + uris = [uri.strip() for uri in value if isinstance(uri, str) and uri.strip()] + if not uris: + raise ValueError("redirect_uris must contain at least one non-empty URI") + return uris + + @field_validator("grant_types") + @classmethod + def validate_grant_types(cls, value: list[str]) -> list[str]: + """Restrict supported grant types to authorization code and refresh token.""" + normalized = [item.strip() for item in value if item.strip()] + if not normalized: + raise ValueError("grant_types must not be empty") + if any(item not in _SUPPORTED_GRANT_TYPES for item in normalized): + raise ValueError("Unsupported grant_types requested") + return normalized + + @field_validator("response_types") + @classmethod + def validate_response_types(cls, value: list[str]) -> list[str]: + """Restrict supported response types to authorization code.""" + normalized = [item.strip() for item in value if item.strip()] + if not normalized: + raise ValueError("response_types must not be empty") + if any(item not in _SUPPORTED_RESPONSE_TYPES for item in normalized): + raise ValueError("Unsupported response_types requested") + return normalized + + @field_validator("token_endpoint_auth_method") + @classmethod + def validate_token_endpoint_auth_method(cls, value: str) -> str: + """Restrict token endpoint auth methods to the supported subset.""" + normalized = value.strip().lower() + if normalized not in _SUPPORTED_TOKEN_ENDPOINT_AUTH_METHODS: + raise ValueError("Unsupported token_endpoint_auth_method requested") + return normalized + + @model_validator(mode="after") + def validate_pkce_ready(self) -> OAuthRegistrationRequest: + """Ensure the request is usable for PKCE-based authorization code flow.""" + if "authorization_code" not in self.grant_types: + raise ValueError("authorization_code grant is required") + if "code" not in self.response_types: + raise ValueError("code response type is required") + return self + + +class OAuthClientRecord(BaseModel): + """Persisted OAuth client registration record.""" + + client_id: str + client_name: str | None = None + redirect_uris: list[str] + grant_types: list[str] + response_types: list[str] + token_endpoint_auth_method: str + client_id_issued_at: int + client_secret_expires_at: int = 0 + client_secret_hash: str | None = None + scope: str | None = None + + model_config = ConfigDict(extra="forbid") + + +def _canonicalize_url(value: str) -> str: + """Normalize a URL for comparison.""" + parsed = urlparse(value.strip()) + if not parsed.scheme or not parsed.netloc: + return "" + + normalized = ParseResult( + scheme=parsed.scheme.lower(), + netloc=parsed.netloc.lower(), + path=parsed.path or "/", + params=parsed.params, + query=parsed.query, + fragment="", + ) + return urlunparse(normalized).rstrip("/") + + +def is_loopback_redirect_uri(redirect_uri: str) -> bool: + """Return whether a redirect URI uses a loopback host.""" + parsed = urlparse(redirect_uri.strip()) + if parsed.scheme != "http": + return False + host = (parsed.hostname or "").lower() + return host in _LOOPBACK_HOSTS + + +def is_claude_redirect_uri(redirect_uri: str) -> bool: + """Return whether a redirect URI is a built-in Claude callback URL.""" + return _canonicalize_url(redirect_uri) in _CLAUDE_CALLBACK_URIS + + +def is_redirect_uri_allowed(redirect_uri: str, allowlist: list[str]) -> bool: + """Return whether a redirect URI is allowed by policy.""" + normalized = _canonicalize_url(redirect_uri) + if not normalized: + return False + + if is_loopback_redirect_uri(redirect_uri) or is_claude_redirect_uri(redirect_uri): + return True + + for pattern in allowlist: + candidate = pattern.strip() + if not candidate: + continue + if fnmatchcase(normalized, _canonicalize_url(candidate) or candidate): + return True + if fnmatchcase(redirect_uri.strip(), candidate): + return True + return False + + +def is_origin_allowed(origin: str, request_base: str, public_base: str | None) -> bool: + """Return whether a browser Origin is allowed for MCP transport requests.""" + normalized_origin = _canonicalize_url(origin) + if not normalized_origin: + return False + + expected_bases = [request_base.rstrip("/")] + if public_base: + expected_bases.append(public_base.rstrip("/")) + return normalized_origin in expected_bases + + +def encode_proxy_state( + secret: str, + redirect_uri: str, + original_state: str, + *, + ttl_seconds: int = 600, +) -> str: + """Create a signed OAuth state wrapper for the proxy callback round-trip.""" + payload = { + "redirect_uri": redirect_uri, + "state": original_state, + "issued_at": int(time.time()), + "nonce": secrets.token_urlsafe(16), + "ttl_seconds": ttl_seconds, + } + canonical_payload = json.dumps(payload, sort_keys=True, separators=(",", ":")) + signature = hmac.new(secret.encode("utf-8"), canonical_payload.encode("utf-8"), hashlib.sha256) + envelope = { + "payload": payload, + "signature": signature.hexdigest(), + } + return base64.urlsafe_b64encode( + json.dumps(envelope, sort_keys=True, separators=(",", ":")).encode("utf-8") + ).decode("ascii") + + +def decode_proxy_state(secret: str, encoded_state: str) -> dict[str, str]: + """Verify and unpack a signed OAuth state wrapper.""" + try: + raw = base64.urlsafe_b64decode(encoded_state.encode("ascii")) + envelope = json.loads(raw) + except Exception as exc: # pragma: no cover - guarded by tests + raise ValueError("Invalid or missing state parameter") from exc + + if not isinstance(envelope, dict): + raise ValueError("Invalid or missing state parameter") + + payload = envelope.get("payload") + signature = envelope.get("signature") + if not isinstance(payload, dict) or not isinstance(signature, str): + raise ValueError("Invalid or missing state parameter") + + canonical_payload = json.dumps(payload, sort_keys=True, separators=(",", ":")) + expected_signature = hmac.new( + secret.encode("utf-8"), canonical_payload.encode("utf-8"), hashlib.sha256 + ).hexdigest() + if not hmac.compare_digest(signature, expected_signature): + raise ValueError("Invalid or missing state parameter") + + issued_at = payload.get("issued_at") + ttl_seconds = payload.get("ttl_seconds") + now = int(time.time()) + if not isinstance(issued_at, int) or not isinstance(ttl_seconds, int): + raise ValueError("Invalid or missing state parameter") + if issued_at > now or now - issued_at > max(ttl_seconds, 1): + raise ValueError("Invalid or missing state parameter") + + redirect_uri = payload.get("redirect_uri") + if not isinstance(redirect_uri, str) or not redirect_uri.strip(): + raise ValueError("Invalid or missing state parameter") + + original_state = payload.get("state") + if not isinstance(original_state, str): + raise ValueError("Invalid or missing state parameter") + + return {"redirect_uri": redirect_uri, "state": original_state} + + +class OAuthClientRegistry: + """Persisted OAuth client registry for dynamic client registration.""" + + def __init__(self, storage_path: Path) -> None: + """Initialize registry storage.""" + self.storage_path = storage_path + self.storage_path.parent.mkdir(parents=True, exist_ok=True) + self._clients: dict[str, OAuthClientRecord] = {} + self._loaded = False + + @staticmethod + def _hash_secret(secret: str) -> str: + """Hash client secrets before persistence.""" + return hashlib.sha256(secret.encode("utf-8")).hexdigest() + + def _load(self) -> None: + """Load persisted registrations from disk once.""" + if self._loaded: + return + self._loaded = True + if not self.storage_path.exists(): + self._clients = {} + return + + raw = json.loads(self.storage_path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("Persisted DCR storage must be a JSON object") + + clients: dict[str, OAuthClientRecord] = {} + for client_id, payload in raw.items(): + if not isinstance(client_id, str): + raise ValueError("Persisted client id must be a string") + if not isinstance(payload, dict): + raise ValueError(f"Persisted client record for {client_id} must be a mapping") + record = OAuthClientRecord.model_validate({"client_id": client_id, **payload}) + clients[client_id] = record + + self._clients = clients + + def _persist(self) -> None: + """Write registrations atomically.""" + payload = { + client_id: record.model_dump(mode="json", exclude={"client_id"}) + for client_id, record in self._clients.items() + } + tmp_path = self.storage_path.with_suffix(self.storage_path.suffix + ".tmp") + tmp_path.write_text(json.dumps(payload, sort_keys=True, indent=2), encoding="utf-8") + tmp_path.replace(self.storage_path) + + def get(self, client_id: str) -> OAuthClientRecord | None: + """Look up a registered client by identifier.""" + self._load() + return self._clients.get(client_id) + + def is_known_client( + self, + client_id: str, + *, + fallback_client_id: str = "", + fallback_client_secret: str = "", + ) -> bool: + """Return whether a client is recognized by the registry or environment.""" + if not client_id.strip(): + return False + if fallback_client_id.strip() and client_id == fallback_client_id.strip(): + return True + return self.get(client_id) is not None + + def validate_client_secret( + self, + client_id: str, + client_secret: str | None, + *, + fallback_client_id: str = "", + fallback_client_secret: str = "", + ) -> bool: + """Validate a client identifier and optional secret.""" + if fallback_client_id.strip() and client_id == fallback_client_id.strip(): + if not fallback_client_secret.strip(): + return True + if not client_secret: + return False + return hmac.compare_digest( + self._hash_secret(client_secret), self._hash_secret(fallback_client_secret.strip()) + ) + + record = self.get(client_id) + if record is None: + return False + + if record.client_secret_hash is None: + return True + if not client_secret: + return False + return hmac.compare_digest(self._hash_secret(client_secret), record.client_secret_hash) + + def register(self, request: OAuthRegistrationRequest) -> dict[str, Any]: + """Persist a new OAuth client registration and return its public metadata.""" + self._load() + client_id = secrets.token_urlsafe(24) + client_secret: str | None = None + client_secret_hash: str | None = None + + if request.token_endpoint_auth_method != "none": + client_secret = secrets.token_urlsafe(32) + client_secret_hash = self._hash_secret(client_secret) + + record = OAuthClientRecord( + client_id=client_id, + client_name=request.client_name, + redirect_uris=list(request.redirect_uris), + grant_types=list(request.grant_types), + response_types=list(request.response_types), + token_endpoint_auth_method=request.token_endpoint_auth_method, + client_id_issued_at=int(time.time()), + client_secret_hash=client_secret_hash, + scope=request.scope, + ) + self._clients[client_id] = record + self._persist() + + response: dict[str, Any] = record.model_dump(exclude={"client_secret_hash"}) + if client_secret is not None: + response["client_secret"] = client_secret + response["client_secret_expires_at"] = 0 + return response + + +_oauth_client_registry: OAuthClientRegistry | None = None + + +def get_oauth_client_registry(storage_path: Path) -> OAuthClientRegistry: + """Get or create the global OAuth client registry.""" + global _oauth_client_registry + if _oauth_client_registry is None or _oauth_client_registry.storage_path != storage_path: + _oauth_client_registry = OAuthClientRegistry(storage_path) + return _oauth_client_registry + + +def reset_oauth_client_registry() -> None: + """Reset the global OAuth client registry (primarily for tests).""" + global _oauth_client_registry + _oauth_client_registry = None diff --git a/src/aegis_gitea_mcp/server.py b/src/aegis_gitea_mcp/server.py index 29bc139..0d26659 100644 --- a/src/aegis_gitea_mcp/server.py +++ b/src/aegis_gitea_mcp/server.py @@ -3,12 +3,14 @@ from __future__ import annotations import asyncio -import base64 +import hashlib import json import logging +import time import urllib.parse import uuid from collections.abc import AsyncGenerator, Awaitable, Callable +from contextlib import asynccontextmanager from typing import Any import httpx @@ -18,6 +20,7 @@ from pydantic import BaseModel, Field, ValidationError from aegis_gitea_mcp.audit import get_audit_logger from aegis_gitea_mcp.automation import AutomationError, AutomationManager +from aegis_gitea_mcp.cache import BoundedTTLCache from aegis_gitea_mcp.config import get_settings from aegis_gitea_mcp.gitea_client import ( GiteaAuthenticationError, @@ -33,11 +36,20 @@ from aegis_gitea_mcp.mcp_protocol import ( get_tool_by_name, ) from aegis_gitea_mcp.oauth import get_oauth_validator +from aegis_gitea_mcp.oauth_flow import ( + OAuthRegistrationRequest, + decode_proxy_state, + encode_proxy_state, + get_oauth_client_registry, + is_origin_allowed, + is_redirect_uri_allowed, +) from aegis_gitea_mcp.observability import get_metrics_registry, monotonic_seconds from aegis_gitea_mcp.policy import PolicyError, get_policy_engine from aegis_gitea_mcp.rate_limit import get_rate_limiter from aegis_gitea_mcp.request_context import ( clear_gitea_auth_context, + get_gitea_user_login, get_gitea_user_scopes, get_gitea_user_token, set_gitea_user_login, @@ -81,16 +93,50 @@ READ_SCOPE = "read:repository" WRITE_SCOPE = "write:repository" # Cache of tokens verified to have Gitea API scope. -# Key: hash of token prefix, Value: monotonic expiry time. -_api_scope_cache: dict[str, float] = {} +# Key: hash of token prefix, Value: sentinel marking the token as probe-verified. +# Bounded by size and TTL so untrusted token cardinality cannot grow it without limit. _API_SCOPE_CACHE_TTL = 60 # seconds +_api_scope_cache: BoundedTTLCache[str, bool] = BoundedTTLCache( + ttl_seconds=_API_SCOPE_CACHE_TTL, max_size=4096 +) _REAUTH_GUIDANCE = ( "Your OAuth token lacks Gitea API scopes (e.g. read:repository). " "Revoke the authorization in Gitea (Settings > Applications > Authorized OAuth2 Applications) " - "and in ChatGPT (Settings > Connected apps), then re-authorize." + "and in your client, then re-authorize." ) +_repo_authz_cache: BoundedTTLCache[str, bool] | None = None + + +def _get_repo_authz_cache() -> BoundedTTLCache[str, bool]: + """Get the bounded cache for per-user repository permission checks.""" + global _repo_authz_cache + settings = get_settings() + if _repo_authz_cache is None: + _repo_authz_cache = BoundedTTLCache( + ttl_seconds=settings.repo_authz_cache_ttl_seconds, + max_size=2048, + ) + return _repo_authz_cache + + +def reset_repo_authz_cache() -> None: + """Reset the repository authorization cache (primarily for tests).""" + global _repo_authz_cache + _repo_authz_cache = None + + +def _repo_authz_cache_key(login: str, repository: str, required_scope: str) -> str: + """Build a bounded cache key for a user/repository permission check.""" + normalized_login = login.strip().lower() + return f"{normalized_login}:{repository.lower()}:{required_scope}" + + +def _is_mcp_transport_path(path: str) -> bool: + """Return whether a request targets the MCP transport surface.""" + return path in {"/mcp", "/mcp/sse"} or path.startswith("/mcp/") + def _has_required_scope(required_scope: str, granted_scopes: set[str]) -> bool: """Return whether granted scopes satisfy the required MCP tool scope.""" @@ -110,10 +156,144 @@ def _has_required_scope(required_scope: str, granted_scopes: set[str]) -> bool: return required_scope in expanded +def _repo_permission_satisfied(permission: dict[str, Any], required_scope: str) -> bool: + """Return whether a repository permission payload satisfies the requested scope.""" + permission_name = str(permission.get("permission", "")).lower().strip() + if permission_name in {"admin", "owner"}: + return True + if required_scope == WRITE_SCOPE and permission_name == "write": + return True + if required_scope == READ_SCOPE and permission_name in {"read", "write"}: + return True + + nested_permissions = permission.get("permissions") + if isinstance(nested_permissions, dict): + return _repo_permission_satisfied(nested_permissions, required_scope) + + if required_scope == WRITE_SCOPE: + return bool(permission.get("push") or permission.get("admin")) + return bool(permission.get("pull") or permission.get("push") or permission.get("admin")) + + +async def _verify_user_repository_access( + *, + repository: str, + required_scope: str, + user_login: str, + correlation_id: str, + tool_name: str, +) -> None: + """Verify the authenticated user can access the target repository before PAT fallback.""" + settings = get_settings() + audit = get_audit_logger() + + service_token = settings.gitea_token.strip() + if not service_token: + raise HTTPException(status_code=500, detail="Repository authorization misconfigured") + + if not user_login.strip() or user_login == "unknown": + audit.log_access_denied( + tool_name=tool_name, + repository=repository, + reason="repository_permission_missing_user", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail="Unable to verify repository permission for this user.", + ) + + cache_key = _repo_authz_cache_key(user_login, repository, required_scope) + cached = _get_repo_authz_cache().get(cache_key) + if cached is True: + return + + owner, repo = repository.split("/", 1) + encoded_owner = urllib.parse.quote(owner, safe="") + encoded_repo = urllib.parse.quote(repo, safe="") + encoded_user = urllib.parse.quote(user_login, safe="") + permission_url = ( + f"{settings.gitea_base_url}/api/v1/repos/{encoded_owner}/{encoded_repo}" + f"/collaborators/{encoded_user}/permission" + ) + + try: + async with httpx.AsyncClient(timeout=settings.request_timeout_seconds) as client: + response = await client.get( + permission_url, + headers={"Authorization": f"token {service_token}", "Accept": "application/json"}, + ) + except httpx.RequestError as exc: + audit.log_access_denied( + tool_name=tool_name, + repository=repository, + reason="repository_permission_probe_failed", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail="Unable to verify repository permission for this user.", + ) from exc + + if response.status_code != 200: + audit.log_access_denied( + tool_name=tool_name, + repository=repository, + reason=f"repository_permission_probe:{response.status_code}", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail="User does not have permission for the requested repository.", + ) + + try: + permission_payload = response.json() + except ValueError as exc: + audit.log_access_denied( + tool_name=tool_name, + repository=repository, + reason="repository_permission_invalid_json", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail="Unable to verify repository permission for this user.", + ) from exc + + if isinstance(permission_payload, dict) and _repo_permission_satisfied( + permission_payload, required_scope + ): + _get_repo_authz_cache().set(cache_key, True) + return + + audit.log_access_denied( + tool_name=tool_name, + repository=repository, + reason="repository_permission_denied", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail="User does not have permission for the requested repository.", + ) + + +@asynccontextmanager +async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: + """Run startup and shutdown hooks via the FastAPI lifespan protocol.""" + await startup_event() + try: + yield + finally: + await shutdown_event() + + app = FastAPI( title="AegisGitea MCP Server", description="Security-first MCP server for controlled AI access to self-hosted Gitea", version="0.2.0", + lifespan=lifespan, ) @@ -226,6 +406,67 @@ async def request_context_middleware( metrics.record_http_request(request.method, request.url.path, status_code) +def _cors_headers(origin: str) -> dict[str, str]: + """Build strict CORS headers for a validated browser origin.""" + return { + "Access-Control-Allow-Origin": origin, + "Access-Control-Allow-Credentials": "true", + "Access-Control-Allow-Methods": "GET,POST,OPTIONS", + "Access-Control-Allow-Headers": "Authorization,Content-Type,MCP-Protocol-Version,X-Request-ID", + "Access-Control-Expose-Headers": "X-Request-ID,WWW-Authenticate", + "Vary": "Origin", + } + + +@app.middleware("http") +async def strict_origin_and_cors_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], +) -> Response: + """Enforce strict browser origins for MCP transport requests.""" + if request.url.path not in {"/mcp", "/mcp/sse"}: + return await call_next(request) + + settings = get_settings() + origin = request.headers.get("origin") + expected_base = settings.public_base or str(request.base_url).rstrip("/") + + if origin and not is_origin_allowed(origin, expected_base, settings.public_base): + return JSONResponse( + status_code=403, + content={ + "error": "Origin not allowed", + "message": "The request origin is not allowed for this MCP transport.", + "request_id": getattr(request.state, "request_id", "-"), + }, + ) + + if request.method == "OPTIONS": + response = Response(status_code=204) + else: + response = await call_next(request) + + if origin and is_origin_allowed(origin, expected_base, settings.public_base): + for header, value in _cors_headers(origin).items(): + response.headers[header] = value + + return response + + +def _oauth_invalid_client_response() -> JSONResponse: + """Return an RFC 6749 invalid_client error for token endpoint failures.""" + response = JSONResponse(status_code=401, content={"error": "invalid_client"}) + response.headers["WWW-Authenticate"] = 'Basic realm="oauth"' + return response + + +def _jsonrpc_error(message_id: Any, code: int, message: str) -> JSONResponse: + """Build a JSON-RPC error response envelope.""" + return JSONResponse( + content={"jsonrpc": "2.0", "id": message_id, "error": {"code": code, "message": message}} + ) + + @app.middleware("http") async def authenticate_and_rate_limit( request: Request, @@ -238,11 +479,14 @@ async def authenticate_and_rate_limit( if request.url.path in {"/", "/health"}: return await call_next(request) + if request.method == "OPTIONS" and request.url.path in {"/mcp", "/mcp/sse"}: + return await call_next(request) + if request.url.path == "/metrics" and settings.metrics_enabled: # Metrics endpoint is intentionally left unauthenticated for pull-based scraping. return await call_next(request) - # OAuth discovery and token endpoints must be public so ChatGPT can complete the flow. + # OAuth discovery and token endpoints must be public so MCP clients can complete the flow. if request.url.path in { "/oauth/token", "/.well-known/oauth-protected-resource", @@ -251,7 +495,11 @@ async def authenticate_and_rate_limit( }: return await call_next(request) - if not (request.url.path.startswith("/mcp/") or request.url.path.startswith("/automation/")): + if not ( + request.url.path in {"/mcp/tools"} + or _is_mcp_transport_path(request.url.path) + or request.url.path.startswith("/automation/") + ): return await call_next(request) oauth_validator = get_oauth_validator() @@ -279,7 +527,7 @@ async def authenticate_and_rate_limit( return await call_next(request) if not access_token: - if request.url.path.startswith("/mcp/"): + if _is_mcp_transport_path(request.url.path): return _oauth_unauthorized_response( request, "Provide Authorization: Bearer .", @@ -298,7 +546,7 @@ async def authenticate_and_rate_limit( access_token, client_ip, user_agent ) if not is_valid: - if request.url.path.startswith("/mcp/"): + if _is_mcp_transport_path(request.url.path): return _oauth_unauthorized_response( request, error_message or "Invalid or expired OAuth token.", @@ -335,22 +583,18 @@ async def authenticate_and_rate_limit( # Probe: verify the token actually works for Gitea's repository API. # Try both "token" and "Bearer" header formats since Gitea may # accept OAuth tokens differently depending on version/config. - import hashlib - import time as _time - token_hash = hashlib.sha256(access_token.encode()).hexdigest()[:16] - now = _time.monotonic() probe_result = "skip:cached" token_type = "jwt" if access_token.count(".") == 2 else "opaque" - if token_hash not in _api_scope_cache or now >= _api_scope_cache[token_hash]: + if _api_scope_cache.get(token_hash) is None: # JWT tokens (OIDC) are already cryptographically validated via JWKS above. # Gitea's OIDC access_tokens cannot access the REST API without additional # Gitea-specific scope configuration, so we skip the probe for them and # rely on per-call API errors for actual permission enforcement. if token_type == "jwt": probe_result = "skip:jwt" - _api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL + _api_scope_cache.set(token_hash, True) else: try: probe_status = None @@ -387,7 +631,7 @@ async def authenticate_and_rate_limit( "OAuth token is valid but lacks required Gitea API access. " "Re-authorize this OAuth app in Gitea and try again." ) - if request.url.path.startswith("/mcp/"): + if _is_mcp_transport_path(request.url.path): return _oauth_unauthorized_response( request, message, @@ -403,7 +647,7 @@ async def authenticate_and_rate_limit( ) else: probe_result = "pass" - _api_scope_cache[token_hash] = now + _API_SCOPE_CACHE_TTL + _api_scope_cache.set(token_hash, True) except httpx.RequestError: probe_result = "skip:error" logger.debug("oauth_api_scope_probe_network_error") @@ -422,7 +666,6 @@ async def authenticate_and_rate_limit( return await call_next(request) -@app.on_event("startup") async def startup_event() -> None: """Initialize server state on startup.""" settings = get_settings() @@ -470,7 +713,6 @@ async def startup_event() -> None: logger.info("gitea_oidc_discovery_ready", extra={"issuer": settings.gitea_base_url}) -@app.on_event("shutdown") async def shutdown_event() -> None: """Log server shutdown event.""" logger.info("server_stopping") @@ -497,9 +739,14 @@ async def health() -> dict[str, str]: async def oauth_protected_resource_metadata(request: Request) -> JSONResponse: """OAuth 2.0 Protected Resource Metadata (RFC 9728). - Required by the MCP Authorization spec so that OAuth clients (e.g. ChatGPT) - can discover the authorization server that protects this resource. - ChatGPT fetches this endpoint when it first connects to the MCP server via SSE. + Required by the MCP Authorization spec so that OAuth clients (Claude's + connector infrastructure) can discover the authorization server that + protects this resource. Claude fetches this endpoint when it first connects. + + The ``resource`` value MUST be THIS server's own canonical public URL: the + MCP client verifies that the resource identifier matches the origin it + derived the MCP server URL from (RFC 9728 / RFC 8707). Returning the upstream + Gitea URL here would fail that check. """ settings = get_settings() gitea_base = settings.gitea_base_url @@ -510,7 +757,7 @@ async def oauth_protected_resource_metadata(request: Request) -> JSONResponse: return JSONResponse( content={ - "resource": gitea_base, + "resource": base_url, "authorization_servers": authorization_servers, "bearer_methods_supported": ["header"], "scopes_supported": [READ_SCOPE, WRITE_SCOPE], @@ -523,24 +770,52 @@ async def oauth_protected_resource_metadata(request: Request) -> JSONResponse: async def oauth_authorize_proxy(request: Request) -> RedirectResponse: """Proxy OAuth authorization to Gitea, replacing redirect_uri with our own callback. - Clients (ChatGPT, Claude, etc.) send their own redirect_uri which Gitea doesn't know + Clients (Claude, Claude Code, Cowork, etc.) send their own redirect_uri which Gitea doesn't know about. This endpoint intercepts the request, encodes the original redirect_uri and state into a new state parameter, and forwards the request to Gitea using the MCP server's own callback URI — the only URI that needs to be registered in Gitea. """ settings = get_settings() base_url = settings.public_base or str(request.base_url).rstrip("/") + registry = get_oauth_client_registry(settings.dcr_storage_path) params = dict(request.query_params) - client_redirect_uri = params.pop("redirect_uri", "") + client_redirect_uri = params.pop("redirect_uri", "").strip() + client_id = params.get("client_id", "").strip() or settings.gitea_oauth_client_id.strip() original_state = params.get("state", "") + params.pop("client_secret", None) - # Encode the client's redirect_uri + original state into a tamper-evident wrapper. - # We simply base64-encode a JSON blob; Gitea will echo it back on the callback. - proxy_state_data = {"redirect_uri": client_redirect_uri, "state": original_state} - proxy_state = base64.urlsafe_b64encode(json.dumps(proxy_state_data).encode()).decode() + if not client_id: + raise HTTPException(status_code=400, detail="Missing client_id") + if not registry.is_known_client( + client_id, + fallback_client_id=settings.gitea_oauth_client_id, + ): + raise HTTPException(status_code=401, detail="invalid_client") + if not client_redirect_uri: + raise HTTPException(status_code=400, detail="Missing redirect_uri") + if not is_redirect_uri_allowed(client_redirect_uri, settings.oauth_redirect_allowlist): + raise HTTPException(status_code=400, detail="redirect_uri is not allowed") + + code_challenge = params.get("code_challenge", "").strip() + code_challenge_method = params.get("code_challenge_method", "S256").strip().upper() + if not code_challenge: + raise HTTPException(status_code=400, detail="PKCE code_challenge is required") + if code_challenge_method != "S256": + raise HTTPException(status_code=400, detail="PKCE code_challenge_method must be S256") + + proxy_state = encode_proxy_state( + settings.oauth_state_secret, + client_redirect_uri, + original_state, + ttl_seconds=600, + ) + + params["client_id"] = settings.gitea_oauth_client_id params["state"] = proxy_state + params["code_challenge"] = code_challenge + params["code_challenge_method"] = "S256" params["redirect_uri"] = f"{base_url}/oauth/callback" gitea_authorize_url = f"{settings.gitea_base_url}/login/oauth/authorize" @@ -557,14 +832,17 @@ async def oauth_callback_proxy(request: Request) -> RedirectResponse: error_description = request.query_params.get("error_description", "") try: - state_data = json.loads(base64.urlsafe_b64decode(proxy_state.encode())) + state_data = decode_proxy_state(get_settings().oauth_state_secret, proxy_state) client_redirect_uri = state_data["redirect_uri"] original_state = state_data["state"] - except Exception as exc: + except ValueError as exc: raise HTTPException(status_code=400, detail="Invalid or missing state parameter") from exc + settings = get_settings() if not client_redirect_uri: raise HTTPException(status_code=400, detail="No client redirect_uri in state") + if not is_redirect_uri_allowed(client_redirect_uri, settings.oauth_redirect_allowlist): + raise HTTPException(status_code=400, detail="redirect_uri is not allowed") result_params: dict[str, str] = {} if error: @@ -584,26 +862,31 @@ async def oauth_callback_proxy(request: Request) -> RedirectResponse: async def oauth_authorization_server_metadata(request: Request) -> JSONResponse: """OAuth 2.0 Authorization Server Metadata (RFC 8414). - Proxies Gitea's OAuth authorization server metadata so that ChatGPT can - discover the authorize URL, token URL, and supported features directly - from this server without needing to know the Gitea URL upfront. + Advertises this server's OAuth proxy endpoints so that Claude's connector + infrastructure can discover the authorize URL, token URL, and dynamic client + registration endpoint directly from this server without knowing the Gitea URL + upfront. The authorize/token endpoints are this server's proxy routes because + Gitea does not know Claude's redirect_uri. """ settings = get_settings() base_url = settings.public_base or str(request.base_url).rstrip("/") gitea_base = settings.gitea_base_url - return JSONResponse( - content={ - "issuer": gitea_base, - "authorization_endpoint": f"{base_url}/oauth/authorize", - "token_endpoint": f"{base_url}/oauth/token", - "response_types_supported": ["code"], - "grant_types_supported": ["authorization_code"], - "code_challenge_methods_supported": ["S256"], - "scopes_supported": [READ_SCOPE, WRITE_SCOPE], - "token_endpoint_auth_methods_supported": ["client_secret_post", "none"], - } - ) + metadata: dict[str, Any] = { + "issuer": gitea_base, + "authorization_endpoint": f"{base_url}/oauth/authorize", + "token_endpoint": f"{base_url}/oauth/token", + "response_types_supported": ["code"], + "grant_types_supported": ["authorization_code", "refresh_token"], + "code_challenge_methods_supported": ["S256"], + "scopes_supported": [READ_SCOPE, WRITE_SCOPE], + "token_endpoint_auth_methods_supported": ["client_secret_post", "none"], + } + if settings.dcr_enabled: + # RFC 7591 dynamic client registration endpoint (Claude registers here). + metadata["registration_endpoint"] = f"{base_url}/register" + + return JSONResponse(content=metadata) @app.get("/.well-known/openid-configuration") @@ -638,61 +921,111 @@ async def openid_configuration(request: Request) -> JSONResponse: ) +@app.post("/register") +async def oauth_dynamic_client_registration(request: Request) -> JSONResponse: + """Persist a new OAuth client registration for Claude and similar MCP clients.""" + settings = get_settings() + if not settings.dcr_enabled: + raise HTTPException(status_code=404, detail="Dynamic client registration is disabled") + + content_type = request.headers.get("content-type", "").split(";", 1)[0].strip().lower() + if content_type != "application/json": + raise HTTPException(status_code=415, detail="Content-Type must be application/json") + + registry = get_oauth_client_registry(settings.dcr_storage_path) + + try: + payload = await request.json() + registration_request = OAuthRegistrationRequest.model_validate(payload) + except ValidationError as exc: + raise HTTPException(status_code=400, detail="Invalid registration payload") from exc + except Exception as exc: + raise HTTPException(status_code=400, detail="Invalid registration payload") from exc + + for redirect_uri in registration_request.redirect_uris: + if not is_redirect_uri_allowed(redirect_uri, settings.oauth_redirect_allowlist): + raise HTTPException(status_code=400, detail="redirect_uri is not allowed") + + response = registry.register(registration_request) + response["client_id_issued_at"] = int(time.time()) + response["client_secret_expires_at"] = 0 + return JSONResponse(content=response) + + @app.post("/oauth/token") async def oauth_token_proxy(request: Request) -> JSONResponse: """Proxy OAuth2 token exchange to Gitea. - ChatGPT sends the authorization code here after the user logs in to Gitea. + The client sends the authorization code here after the user logs in to Gitea. This endpoint forwards the code to Gitea's token endpoint and returns the - access_token to ChatGPT, completing the OAuth2 Authorization Code flow. + access_token to the client, completing the OAuth2 Authorization Code flow. """ settings = get_settings() + registry = get_oauth_client_registry(settings.dcr_storage_path) try: form_data = await request.form() except Exception as exc: raise HTTPException(status_code=400, detail="Invalid request body") from exc - grant_type = form_data.get("grant_type", "authorization_code") - code = form_data.get("code") - refresh_token = form_data.get("refresh_token") - code_verifier = form_data.get("code_verifier", "") - # ChatGPT sends the client_id and client_secret (that were configured in the GPT Action - # settings) in the POST body. Use those directly; fall back to env vars if not provided. - client_id = form_data.get("client_id") or settings.gitea_oauth_client_id - client_secret = form_data.get("client_secret") or settings.gitea_oauth_client_secret + def _field(name: str, default: str = "") -> str: + """Read a string form field, ignoring uploaded-file parts.""" + value = form_data.get(name, default) + return value if isinstance(value, str) else default + + grant_type = _field("grant_type", "authorization_code") + code = _field("code") + refresh_token = _field("refresh_token") + code_verifier = _field("code_verifier") + # The MCP client (Claude) sends client_id and, for confidential clients, client_secret + # in the POST body. Use those directly; fall back to env vars if not provided. + client_id = _field("client_id") or settings.gitea_oauth_client_id + client_secret = _field("client_secret") or settings.gitea_oauth_client_secret # Gitea validates that redirect_uri in the token exchange matches the one used during # authorization. Because our /oauth/authorize proxy always forwards our own callback # URI to Gitea, we must use the same URI here — not the client's original redirect_uri. base_url = settings.public_base or str(request.base_url).rstrip("/") + if not client_id: + return _oauth_invalid_client_response() + if not registry.validate_client_secret( + client_id, + client_secret or None, + fallback_client_id=settings.gitea_oauth_client_id, + fallback_client_secret=settings.gitea_oauth_client_secret, + ): + return _oauth_invalid_client_response() + gitea_token_url = f"{settings.gitea_base_url}/login/oauth/access_token" + upstream_client_id = settings.gitea_oauth_client_id + upstream_client_secret = settings.gitea_oauth_client_secret if grant_type == "refresh_token": if not refresh_token: raise HTTPException(status_code=400, detail="Missing refresh_token") payload: dict[str, str] = { - "client_id": client_id, - "client_secret": client_secret, + "client_id": upstream_client_id, + "client_secret": upstream_client_secret, "grant_type": "refresh_token", "refresh_token": refresh_token, } else: if not code: raise HTTPException(status_code=400, detail="Missing authorization code") + if not code_verifier: + raise HTTPException(status_code=400, detail="Missing code_verifier") payload = { - "client_id": client_id, - "client_secret": client_secret, + "client_id": upstream_client_id, + "client_secret": upstream_client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": f"{base_url}/oauth/callback", } - if code_verifier: - payload["code_verifier"] = code_verifier + payload["code_verifier"] = code_verifier try: - async with httpx.AsyncClient(timeout=30) as client: + async with httpx.AsyncClient(timeout=settings.request_timeout_seconds) as client: response = await client.post( gitea_token_url, data=payload, @@ -830,6 +1163,29 @@ async def _execute_tool_call( if not user_token: raise HTTPException(status_code=401, detail="Missing authenticated user token context") + if settings.gitea_token.strip(): + if not repository: + audit.log_access_denied( + tool_name=tool_name, + reason="service_pat_requires_repository_target", + correlation_id=correlation_id, + ) + raise HTTPException( + status_code=403, + detail=( + "Service PAT mode requires a repository target so per-user " + "permission can be verified." + ), + ) + user_login = get_gitea_user_login() + await _verify_user_repository_access( + repository=repository, + required_scope=required_scope, + user_login=user_login or "", + correlation_id=correlation_id, + tool_name=tool_name, + ) + # In OAuth mode, Gitea OIDC access_tokens can't call the Gitea REST API # (they only carry OIDC scopes). If a service PAT is configured via # GITEA_TOKEN, use that for API calls while OIDC handles identity/authz. @@ -954,6 +1310,7 @@ async def call_tool(request: MCPToolCallRequest) -> JSONResponse: ) +@app.get("/mcp") @app.get("/mcp/sse") async def sse_endpoint(request: Request) -> StreamingResponse: """Server-Sent Events endpoint for MCP transport.""" @@ -988,6 +1345,7 @@ async def sse_endpoint(request: Request) -> StreamingResponse: ) +@app.post("/mcp") @app.post("/mcp/sse") async def sse_message_handler(request: Request) -> JSONResponse: """Handle POST messages for MCP SSE transport.""" diff --git a/src/aegis_gitea_mcp/tools/repository.py b/src/aegis_gitea_mcp/tools/repository.py index 7d437e1..ff6ca1e 100644 --- a/src/aegis_gitea_mcp/tools/repository.py +++ b/src/aegis_gitea_mcp/tools/repository.py @@ -18,7 +18,7 @@ from aegis_gitea_mcp.tools.arguments import ( async def list_repositories_tool(gitea: GiteaClient, arguments: dict[str, Any]) -> dict[str, Any]: - """List repositories visible to the bot user. + """List repositories visible to the active Gitea API token. Args: gitea: Initialized Gitea client. diff --git a/tests/conftest.py b/tests/conftest.py index 2868c39..d6962d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,9 +9,11 @@ from aegis_gitea_mcp.audit import reset_audit_logger from aegis_gitea_mcp.auth import reset_validator from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import reset_oauth_validator +from aegis_gitea_mcp.oauth_flow import reset_oauth_client_registry from aegis_gitea_mcp.observability import reset_metrics_registry from aegis_gitea_mcp.policy import reset_policy_engine from aegis_gitea_mcp.rate_limit import reset_rate_limiter +from aegis_gitea_mcp.server import reset_repo_authz_cache @pytest.fixture(autouse=True) @@ -22,6 +24,8 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_audit_logger() reset_validator() reset_oauth_validator() + reset_oauth_client_registry() + reset_repo_authz_cache() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() @@ -37,6 +41,8 @@ def reset_globals(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[ reset_audit_logger() reset_validator() reset_oauth_validator() + reset_oauth_client_registry() + reset_repo_authz_cache() reset_policy_engine() reset_rate_limiter() reset_metrics_registry() @@ -66,4 +72,5 @@ def mock_env_oauth(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..2aaf139 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,72 @@ +"""Tests for the bounded TTL cache utility.""" + +from __future__ import annotations + +import time + +import pytest + +from aegis_gitea_mcp.cache import BoundedTTLCache + + +def test_set_and_get_returns_value() -> None: + """A stored value is returned before it expires.""" + cache: BoundedTTLCache[str, int] = BoundedTTLCache(ttl_seconds=60, max_size=8) + cache.set("a", 1) + assert cache.get("a") == 1 + + +def test_missing_key_returns_none() -> None: + """An unknown key returns None.""" + cache: BoundedTTLCache[str, int] = BoundedTTLCache(ttl_seconds=60) + assert cache.get("missing") is None + + +def test_entry_expires_after_ttl() -> None: + """An entry is evicted once its TTL elapses.""" + cache: BoundedTTLCache[str, int] = BoundedTTLCache(ttl_seconds=0.05, max_size=8) + cache.set("a", 1) + assert cache.get("a") == 1 + time.sleep(0.06) + assert cache.get("a") is None + + +def test_size_bound_evicts_oldest() -> None: + """The cache never exceeds max_size; oldest entries are evicted first.""" + cache: BoundedTTLCache[int, int] = BoundedTTLCache(ttl_seconds=60, max_size=3) + for i in range(5): + cache.set(i, i) + assert len(cache) == 3 + # 0 and 1 were evicted; 2, 3, 4 remain. + assert cache.get(0) is None + assert cache.get(1) is None + assert cache.get(4) == 4 + + +def test_reinsert_refreshes_recency() -> None: + """Re-setting a key refreshes its position so it is not evicted first.""" + cache: BoundedTTLCache[str, int] = BoundedTTLCache(ttl_seconds=60, max_size=2) + cache.set("a", 1) + cache.set("b", 2) + cache.set("a", 3) # refresh "a" + cache.set("c", 4) # should evict "b", the oldest + assert cache.get("b") is None + assert cache.get("a") == 3 + assert cache.get("c") == 4 + + +def test_clear_empties_cache() -> None: + """clear() removes all entries.""" + cache: BoundedTTLCache[str, int] = BoundedTTLCache(ttl_seconds=60) + cache.set("a", 1) + cache.clear() + assert cache.get("a") is None + assert len(cache) == 0 + + +def test_invalid_constructor_args() -> None: + """Non-positive TTL or size is rejected.""" + with pytest.raises(ValueError): + BoundedTTLCache(ttl_seconds=0) + with pytest.raises(ValueError): + BoundedTTLCache(ttl_seconds=60, max_size=0) diff --git a/tests/test_integration.py b/tests/test_integration.py index 986a19c..4938353 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -28,6 +28,7 @@ def full_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("MCP_HOST", "127.0.0.1") monkeypatch.setenv("MCP_PORT", "8080") diff --git a/tests/test_oauth.py b/tests/test_oauth.py index fae7bd8..f5e4739 100644 --- a/tests/test_oauth.py +++ b/tests/test_oauth.py @@ -10,6 +10,7 @@ from fastapi.testclient import TestClient from aegis_gitea_mcp.config import reset_settings from aegis_gitea_mcp.oauth import GiteaOAuthValidator, get_oauth_validator, reset_oauth_validator +from aegis_gitea_mcp.oauth_flow import OAuthClientRegistry, OAuthRegistrationRequest from aegis_gitea_mcp.request_context import ( get_gitea_user_login, get_gitea_user_token, @@ -40,6 +41,7 @@ def mock_env_oauth(monkeypatch): monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") @@ -57,6 +59,24 @@ def oauth_client(mock_env_oauth): return TestClient(app, raise_server_exceptions=False) +def _register_public_client(oauth_client: TestClient, redirect_uri: str) -> dict[str, str]: + """Register a public OAuth client for test flows.""" + response = oauth_client.post( + "/register", + json={ + "client_name": "pytest-client", + "redirect_uris": [redirect_uri], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code", "refresh_token"], + "response_types": ["code"], + }, + ) + assert response.status_code == 200 + payload = response.json() + assert "client_id" in payload + return payload + + # --------------------------------------------------------------------------- # GiteaOAuthValidator unit tests # --------------------------------------------------------------------------- @@ -248,19 +268,39 @@ def test_oauth_token_endpoint_available_when_oauth_mode_false(monkeypatch): mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) with TestClient(app, raise_server_exceptions=False) as client: - response = client.post("/oauth/token", data={"code": "abc123"}) + registration = client.post( + "/register", + json={ + "client_name": "pytest-client", + "redirect_uris": ["http://127.0.0.1:8080/callback"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert registration.status_code == 200 + client_id = registration.json()["client_id"] + response = client.post( + "/oauth/token", + data={"client_id": client_id, "code": "abc123", "code_verifier": "pkce"}, + ) assert response.status_code == 200 def test_oauth_token_endpoint_missing_code(oauth_client): """POST /oauth/token without a code returns 400.""" - response = oauth_client.post("/oauth/token", data={}) + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") + response = oauth_client.post( + "/oauth/token", + data={"client_id": client_data["client_id"], "code_verifier": "pkce"}, + ) assert response.status_code == 400 def test_oauth_token_endpoint_proxy_success(oauth_client): """POST /oauth/token proxies successfully to Gitea and returns access_token.""" + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { @@ -276,7 +316,11 @@ def test_oauth_token_endpoint_proxy_success(oauth_client): response = oauth_client.post( "/oauth/token", - data={"code": "auth-code-123", "redirect_uri": "https://chat.openai.com/callback"}, + data={ + "client_id": client_data["client_id"], + "code": "auth-code-123", + "code_verifier": "pkce-verifier", + }, ) assert response.status_code == 200 @@ -286,6 +330,7 @@ def test_oauth_token_endpoint_proxy_success(oauth_client): def test_oauth_token_endpoint_gitea_error(oauth_client): """POST /oauth/token propagates Gitea error status.""" + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") mock_response = MagicMock() mock_response.status_code = 400 mock_response.json.return_value = {"error": "invalid_grant"} @@ -296,11 +341,202 @@ def test_oauth_token_endpoint_gitea_error(oauth_client): mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) - response = oauth_client.post("/oauth/token", data={"code": "bad-code"}) + response = oauth_client.post( + "/oauth/token", + data={ + "client_id": client_data["client_id"], + "code": "bad-code", + "code_verifier": "pkce-verifier", + }, + ) assert response.status_code == 400 +def test_oauth_authorize_and_callback_round_trip(oauth_client): + """OAuth authorize/callback round-trip preserves the original redirect URI and state.""" + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") + + authorize_response = oauth_client.get( + "/oauth/authorize", + params={ + "client_id": client_data["client_id"], + "redirect_uri": "http://127.0.0.1:8080/callback", + "state": "original-state", + "code_challenge": "pkce-challenge", + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + + assert authorize_response.status_code == 302 + location = authorize_response.headers["location"] + assert "state=" in location + assert "redirect_uri=http%3A%2F%2F127.0.0.1%3A8080%2Fcallback" not in location + + from urllib.parse import parse_qs, urlparse + + parsed = urlparse(location) + query = parse_qs(parsed.query) + proxy_state = query["state"][0] + + callback_response = oauth_client.get( + "/oauth/callback", + params={"state": proxy_state, "code": "auth-code-123"}, + follow_redirects=False, + ) + + assert callback_response.status_code == 302 + callback_location = callback_response.headers["location"] + assert callback_location.startswith("http://127.0.0.1:8080/callback?") + assert "code=auth-code-123" in callback_location + assert "state=original-state" in callback_location + + +def test_oauth_callback_rejects_tampered_state(oauth_client): + """OAuth callback rejects modified signed proxy state.""" + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") + authorize_response = oauth_client.get( + "/oauth/authorize", + params={ + "client_id": client_data["client_id"], + "redirect_uri": "http://127.0.0.1:8080/callback", + "state": "original-state", + "code_challenge": "pkce-challenge", + "code_challenge_method": "S256", + }, + follow_redirects=False, + ) + from urllib.parse import parse_qs, urlparse + + proxy_state = parse_qs(urlparse(authorize_response.headers["location"]).query)["state"][0] + tampered_state = proxy_state[:-1] + ("A" if proxy_state[-1] != "A" else "B") + + callback_response = oauth_client.get( + "/oauth/callback", + params={"state": tampered_state, "code": "auth-code-123"}, + ) + + assert callback_response.status_code == 400 + + +@pytest.mark.parametrize( + "redirect_uri", + [ + "https://claude.ai/api/mcp/auth_callback", + "https://claude.com/api/mcp/auth_callback", + ], +) +def test_dcr_accepts_default_claude_callbacks(oauth_client, redirect_uri): + """Claude's hosted connector callback URLs are allowed by default.""" + response = oauth_client.post( + "/register", + json={ + "client_name": "claude-client", + "redirect_uris": [redirect_uri], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code", "refresh_token"], + "response_types": ["code"], + }, + ) + + assert response.status_code == 200 + + +def test_oauth_authorize_rejects_unknown_client(oauth_client): + """OAuth authorize returns invalid_client for unregistered client IDs.""" + response = oauth_client.get( + "/oauth/authorize", + params={ + "client_id": "unknown-client", + "redirect_uri": "http://127.0.0.1:8080/callback", + "state": "x", + "code_challenge": "pkce-challenge", + "code_challenge_method": "S256", + }, + ) + + assert response.status_code == 401 + assert response.json()["detail"] == "invalid_client" + + +def test_oauth_token_rejects_unknown_dcr_client(oauth_client): + """Unknown dynamic clients receive RFC 6749 invalid_client from token endpoint.""" + response = oauth_client.post( + "/oauth/token", + data={ + "client_id": "deleted-or-unknown-client", + "code": "auth-code-123", + "code_verifier": "pkce-verifier", + }, + ) + + assert response.status_code == 401 + assert response.json() == {"error": "invalid_client"} + + +def test_oauth_authorize_requires_pkce_s256(oauth_client): + """Authorization endpoint enforces PKCE S256 for public clients.""" + client_data = _register_public_client(oauth_client, "http://127.0.0.1:8080/callback") + missing_challenge = oauth_client.get( + "/oauth/authorize", + params={ + "client_id": client_data["client_id"], + "redirect_uri": "http://127.0.0.1:8080/callback", + "state": "x", + }, + ) + plain_method = oauth_client.get( + "/oauth/authorize", + params={ + "client_id": client_data["client_id"], + "redirect_uri": "http://127.0.0.1:8080/callback", + "state": "x", + "code_challenge": "pkce-challenge", + "code_challenge_method": "plain", + }, + ) + + assert missing_challenge.status_code == 400 + assert plain_method.status_code == 400 + + +def test_register_rejects_foreign_redirect_uri(oauth_client): + """DCR rejects redirect URIs outside the allowlist and loopback/Claude patterns.""" + response = oauth_client.post( + "/register", + json={ + "client_name": "pytest-client", + "redirect_uris": ["https://evil.example.com/callback"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + + assert response.status_code == 400 + + +def test_dcr_registry_persists_registered_clients(tmp_path): + """Registered OAuth clients survive registry reloads.""" + storage_path = tmp_path / "dcr_clients.json" + registry = OAuthClientRegistry(storage_path) + request = OAuthRegistrationRequest.model_validate( + { + "client_name": "persisted-client", + "redirect_uris": ["http://127.0.0.1:8080/callback"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + } + ) + + response = registry.register(request) + reloaded = OAuthClientRegistry(storage_path) + + assert reloaded.get(response["client_id"]) is not None + + # --------------------------------------------------------------------------- # Config validation tests # --------------------------------------------------------------------------- diff --git a/tests/test_oauth_oidc.py b/tests/test_oauth_oidc.py index b94e5c0..597c7b8 100644 --- a/tests/test_oauth_oidc.py +++ b/tests/test_oauth_oidc.py @@ -25,13 +25,14 @@ def reset_state(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("OAUTH_CACHE_TTL_SECONDS", "600") yield reset_settings() reset_oauth_validator() -def _build_jwt_fixture() -> tuple[str, dict[str, object]]: +def _build_jwt_fixture(aud: str = "test-client-id") -> tuple[str, dict[str, object]]: """Generate RS256 access token and matching JWKS payload.""" private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = private_key.public_key() @@ -44,7 +45,7 @@ def _build_jwt_fixture() -> tuple[str, dict[str, object]]: "sub": "user-1", "preferred_username": "alice", "scope": "read:repository write:repository", - "aud": "test-client-id", + "aud": aud, "iss": "https://gitea.example.com", "iat": now, "exp": now + 3600, @@ -56,6 +57,70 @@ def _build_jwt_fixture() -> tuple[str, dict[str, object]]: return token, {"keys": [jwk]} +async def _validate_with_jwks( + validator: GiteaOAuthValidator, token: str, jwks: dict[str, object] +) -> tuple[bool, str | None, dict[str, object] | None]: + """Drive a JWT validation with mocked discovery + JWKS responses.""" + discovery_response = MagicMock() + discovery_response.status_code = 200 + discovery_response.json.return_value = { + "issuer": "https://gitea.example.com", + "jwks_uri": "https://gitea.example.com/login/oauth/keys", + } + jwks_response = MagicMock() + jwks_response.status_code = 200 + jwks_response.json.return_value = jwks + + with patch("aegis_gitea_mcp.oauth.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=[discovery_response, jwks_response]) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + return await validator.validate_oauth_token(token, "127.0.0.1", "TestAgent") + + +def test_acceptable_audiences_includes_resource_and_client_id( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The canonical MCP resource and the Gitea client id are accepted audiences.""" + monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com") + reset_settings() + reset_oauth_validator() + audiences = GiteaOAuthValidator()._acceptable_audiences() + assert "https://mcp.example.com" in audiences + assert "test-client-id" in audiences + + +@pytest.mark.asyncio +async def test_jwt_with_canonical_resource_audience_is_accepted( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A token whose aud is the canonical MCP resource URL validates (P4).""" + monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com") + reset_settings() + reset_oauth_validator() + token, jwks = _build_jwt_fixture(aud="https://mcp.example.com") + valid, error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks) + assert valid is True + assert error is None + assert principal is not None + + +@pytest.mark.asyncio +async def test_jwt_with_foreign_audience_is_rejected() -> None: + """A token minted for a different audience is rejected (audience binding).""" + token, jwks = _build_jwt_fixture(aud="some-other-service") + # Foreign-audience JWT fails JWT validation, then falls back to userinfo, which + # is not mocked here and raises a network error -> overall failure. + with patch("aegis_gitea_mcp.oauth.GiteaOAuthValidator._validate_userinfo") as mock_userinfo: + from aegis_gitea_mcp.oauth import OAuthTokenValidationError + + mock_userinfo.side_effect = OAuthTokenValidationError("Invalid", "userinfo_denied") + valid, _error, principal = await _validate_with_jwks(GiteaOAuthValidator(), token, jwks) + assert valid is False + assert principal is None + + @pytest.mark.asyncio async def test_validate_oauth_token_with_oidc_jwt_and_cache() -> None: """JWT token validation uses discovery + JWKS and caches both documents.""" diff --git a/tests/test_server.py b/tests/test_server.py index ff7d9ad..9861289 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -29,6 +29,7 @@ def oauth_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") monkeypatch.setenv("WRITE_MODE", "false") @@ -84,12 +85,13 @@ def test_health_endpoint(client: TestClient) -> None: def test_oauth_protected_resource_metadata(client: TestClient) -> None: - """OAuth protected-resource metadata contains required OpenAI-compatible fields.""" + """PRM advertises THIS server's canonical URL as the protected resource.""" response = client.get("/.well-known/oauth-protected-resource") assert response.status_code == 200 data = response.json() - assert data["resource"] == "https://gitea.example.com" + # RFC 9728/8707: the resource identifier is the MCP server's own URL, not Gitea's. + assert data["resource"] == "http://testserver" assert data["authorization_servers"] == [ "http://testserver", "https://gitea.example.com", @@ -100,12 +102,15 @@ def test_oauth_protected_resource_metadata(client: TestClient) -> None: def test_oauth_authorization_server_metadata(client: TestClient) -> None: - """Auth server metadata includes expected OAuth endpoints and scopes.""" + """Auth server metadata advertises this server's proxy OAuth endpoints.""" response = client.get("/.well-known/oauth-authorization-server") assert response.status_code == 200 payload = response.json() - assert payload["authorization_endpoint"].endswith("/login/oauth/authorize") - assert payload["token_endpoint"].endswith("/oauth/token") + # Claude must be sent to our proxy authorize endpoint (Gitea does not know + # Claude's redirect_uri), so the endpoint lives on this server. + assert payload["authorization_endpoint"] == "http://testserver/oauth/authorize" + assert payload["token_endpoint"] == "http://testserver/oauth/token" + assert payload["registration_endpoint"] == "http://testserver/register" assert payload["scopes_supported"] == ["read:repository", "write:repository"] @@ -115,8 +120,8 @@ def test_openid_configuration_metadata(client: TestClient) -> None: assert response.status_code == 200 payload = response.json() assert payload["issuer"] == "https://gitea.example.com" - assert payload["authorization_endpoint"].endswith("/login/oauth/authorize") - assert payload["token_endpoint"].endswith("/oauth/token") + assert payload["authorization_endpoint"] == "http://testserver/oauth/authorize" + assert payload["token_endpoint"] == "http://testserver/oauth/token" assert payload["userinfo_endpoint"].endswith("/login/oauth/userinfo") assert payload["jwks_uri"].endswith("/login/oauth/keys") assert "read:repository" in payload["scopes_supported"] @@ -129,6 +134,7 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("PUBLIC_BASE_URL", "https://mcp.example.com") monkeypatch.setenv("ENVIRONMENT", "test") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "false") @@ -149,6 +155,8 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> protected_response = client.get("/.well-known/oauth-protected-resource") assert protected_response.status_code == 200 protected_payload = protected_response.json() + # P4: the protected resource identifier must equal this server's public base. + assert protected_payload["resource"] == "https://mcp.example.com" assert protected_payload["authorization_servers"] == [ "https://mcp.example.com", "https://gitea.example.com", @@ -166,6 +174,201 @@ def test_oauth_metadata_uses_public_base_url(monkeypatch: pytest.MonkeyPatch) -> ) +def test_mcp_streamable_http_path_works(client: TestClient) -> None: + """The spec path /mcp exposes the same transport behavior as the SSE alias.""" + response = client.post( + "/mcp", + headers={"Authorization": "Bearer valid-read"}, + json={"jsonrpc": "2.0", "id": "init-1", "method": "initialize"}, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["result"]["protocolVersion"] == "2024-11-05" + + +def test_mcp_preflight_allows_same_origin(client: TestClient) -> None: + """Same-origin preflight requests to /mcp return strict CORS headers.""" + response = client.options( + "/mcp", + headers={ + "Origin": "http://testserver", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "authorization,content-type", + }, + ) + + assert response.status_code == 204 + assert response.headers["Access-Control-Allow-Origin"] == "http://testserver" + + +def test_mcp_preflight_rejects_cross_origin(client: TestClient) -> None: + """Cross-origin browser requests to /mcp are denied.""" + response = client.options( + "/mcp", + headers={ + "Origin": "https://evil.example.com", + "Access-Control-Request-Method": "POST", + }, + ) + + assert response.status_code == 403 + + +def test_service_pat_requests_verify_user_repo_access_before_execution( + oauth_env: None, mock_oauth_validation: None, monkeypatch: pytest.MonkeyPatch +) -> None: + """Service PAT fallback checks the user's repository permission before executing tools.""" + from aegis_gitea_mcp import server + + monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") + server._api_scope_cache.clear() + server.reset_repo_authz_cache() + + probe_response = MagicMock() + probe_response.status_code = 200 + + repo_response = MagicMock() + repo_response.status_code = 403 + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=[probe_response, repo_response]) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + from aegis_gitea_mcp.server import app + + client = TestClient(app, raise_server_exceptions=False) + response = client.post( + "/mcp/tool/call", + headers={"Authorization": "Bearer valid-read"}, + json={ + "tool": "get_repository_info", + "arguments": {"owner": "acme", "repo": "demo"}, + }, + ) + + assert response.status_code == 403 + assert "permission" in response.json()["detail"].lower() + + +@pytest.mark.asyncio +async def test_service_pat_repo_authz_allows_user_with_read_permission( + oauth_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + """Read-level collaborator permission allows service PAT execution to proceed.""" + from aegis_gitea_mcp import server + + monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") + server.reset_repo_authz_cache() + + permission_response = MagicMock() + permission_response.status_code = 200 + permission_response.json.return_value = {"permission": "read"} + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=permission_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + await server._verify_user_repository_access( + repository="acme/demo", + required_scope=server.READ_SCOPE, + user_login="alice", + correlation_id="corr-1", + tool_name="get_repository_info", + ) + + mock_client.get.assert_awaited_once() + requested_url = mock_client.get.await_args.args[0] + requested_headers = mock_client.get.await_args.kwargs["headers"] + assert requested_url.endswith("/api/v1/repos/acme/demo/collaborators/alice/permission") + assert requested_headers["Authorization"] == "token service-pat-token" + + +@pytest.mark.asyncio +async def test_service_pat_repo_authz_denies_read_user_for_write_tool( + oauth_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + """Read permission is insufficient for write tools in service PAT mode.""" + from fastapi import HTTPException + + from aegis_gitea_mcp import server + + monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") + server.reset_repo_authz_cache() + + permission_response = MagicMock() + permission_response.status_code = 200 + permission_response.json.return_value = {"permission": "read"} + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=permission_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with pytest.raises(HTTPException) as exc_info: + await server._verify_user_repository_access( + repository="acme/demo", + required_scope=server.WRITE_SCOPE, + user_login="alice", + correlation_id="corr-1", + tool_name="create_issue", + ) + + assert exc_info.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_service_pat_repo_authz_cache_hit_and_expiry( + oauth_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + """Repository permission decisions are cached briefly and rechecked after expiry.""" + from aegis_gitea_mcp import cache as cache_module + from aegis_gitea_mcp import server + + monkeypatch.setenv("GITEA_TOKEN", "service-pat-token") + monkeypatch.setenv("REPO_AUTHZ_CACHE_TTL_SECONDS", "1") + server.reset_repo_authz_cache() + + now = 1000.0 + monkeypatch.setattr(cache_module.time, "monotonic", lambda: now) + + permission_response = MagicMock() + permission_response.status_code = 200 + permission_response.json.return_value = {"permission": "read"} + + with patch("aegis_gitea_mcp.server.httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=permission_response) + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + for _ in range(2): + await server._verify_user_repository_access( + repository="acme/demo", + required_scope=server.READ_SCOPE, + user_login="alice", + correlation_id="corr-1", + tool_name="get_repository_info", + ) + assert mock_client.get.await_count == 1 + + now = 1002.0 + await server._verify_user_repository_access( + repository="acme/demo", + required_scope=server.READ_SCOPE, + user_login="alice", + correlation_id="corr-1", + tool_name="get_repository_info", + ) + + assert mock_client.get.await_count == 2 + + def test_scope_compatibility_write_implies_read() -> None: """write:repository grants read-level access for read tools.""" from aegis_gitea_mcp.server import READ_SCOPE, _has_required_scope @@ -348,6 +551,7 @@ async def test_startup_event_fails_when_discovery_unreachable( monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true") from aegis_gitea_mcp import server @@ -377,6 +581,7 @@ async def test_startup_event_succeeds_when_discovery_ready( monkeypatch.setenv("OAUTH_MODE", "true") monkeypatch.setenv("GITEA_OAUTH_CLIENT_ID", "test-client-id") monkeypatch.setenv("GITEA_OAUTH_CLIENT_SECRET", "test-client-secret") + monkeypatch.setenv("OAUTH_STATE_SECRET", "test-state-secret-0123456789abcdef") monkeypatch.setenv("STARTUP_VALIDATE_GITEA", "true") from aegis_gitea_mcp import server